Operators

Even though the Observable is the foundation, reactive extensions is mostly useful because of its operators. Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner.

What are operators?

There are two kinds of operators:

Pipeable operators

Pipeable Operators are the kind that can be piped to Observables using the syntax source |> operator(). These include the filter() and map() operators. When called, operators do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.

Note

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable remains unmodified.

A Pipeable Operator is essentially a pure callable object that accepts one Observable as input and returns another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable.

For example, the operator called map() is analogous to the Array method of the same name. Just like the array method map((d) -> d ^ 2, [ 1, 2, 3 ]) yields [ 1, 4, 9 ], the Observable emits 1, 4, 9:

source = from([ 1, 2, 3 ])
subscribe!(source |> map(Int, Int, (d) -> d ^ 2), lambda(
    on_next = (d) -> println(d)
))

// Logs:
// 1
// 4
// 9

Another useful operator is first():

source = from([ 1, 2, 3 ])
subscribe!(source |> first(Int), lambda(
    on_next     = (d) -> println(d),
    on_complete = ()  -> "Completed"
))

// Logs:
// 1
// Completed

Note that the map() is constructed on the fly, since it must be given the mapping function to. By contrast, first() could be a constant, but it is nonetheless constructed on the fly. In general, all operators are constructed - whether they need arguments or not.

Performance tip

Do not use lambda based operators in real Julia code as them lack of performance. Either use macro helpers to generate efficient versions of operators (like @CreateMapOperator() and/or @CreateFilterOperator(), etc..) or implement your own operators without using lambda functions.

Creation operators

Distinct from pipeable operators, creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables. For example: from([ 1, 2, 3 ]) creates an observable that will sequentially emit 1, 2, and 3.

source = from([ 1, 2, 3 ])
subscribe!(source, lambda(
    on_next     = (d) -> println("Value: $d"),
    on_error    = (e) -> println("Oh no, error: $e")
    on_complete = ()  -> println("Completed")
))

// Logs:
// Value: 1
// Value: 2
// Value: 3
// Completed

Operators piping

Pipeable operators are special objects that can be used like ordinary functions with on_call!(operator, source). In practice however they tend to accumulate and quickly grow unreadable: on_call!(operator1, on_call!(operator2, on_call!(operator3, source))). Therefore, Rocket.jl overloads |> for operators and Observables:

using Rocket

source = from([ i for i in 1:100 ]) |> filter((d) -> d % 2 === 0) |> map(Int, (d) -> d ^ 2) |> sum()

subscribe!(source, logger())

// Logs
// [LogActor] Data: 171700
// [LogActor] Completed

It is also possible to create an operator composition. It might be useful to create an alias for some often used operator chain

using Rocket

mapAndFilter = map(Int, d -> d ^ 2) + filter(d -> d % 2 == 0)

source = from(1:5) |> mapAndFilter

subscribe!(source, logger())

// Logs
// [LogActor] Data: 4
// [LogActor] Data: 16
// [LogActor] Completed

mapAndFilterAndSum = mapAndFilter + sum()

source = from(1:5) |> mapAndFilterAndSum

subscribe!(source, logger())

// Logs
// [LogActor] Data: 20
// [LogActor] Completed

For stylistic reasons, on_call!(operator, source) is never used in practice - even if there is only one operator. Instead, source |> operator() is generally preferred.