Even though the Observable is the foundation, reactive extensions is mostly useful because of its operators. Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner.
There are two kinds of operators:
Pipeable Operators are the kind that can be piped to Observables using the syntax
source |> operator(). These include the
map() operators. When called, operators do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable remains unmodified.
A Pipeable Operator is essentially a pure callable object that accepts one Observable as input and returns another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable.
For example, the operator called
map() is analogous to the Array method of the same name. Just like the array method
map((d) -> d ^ 2, [ 1, 2, 3 ]) yields
[ 1, 4, 9 ], the Observable emits
source = from([ 1, 2, 3 ]) subscribe!(source |> map(Int, Int, (d) -> d ^ 2), lambda( on_next = (d) -> println(d) )) // Logs: // 1 // 4 // 9
Another useful operator is
source = from([ 1, 2, 3 ]) subscribe!(source |> first(Int), lambda( on_next = (d) -> println(d), on_complete = () -> "Completed" )) // Logs: // 1 // Completed
Note that the
map() is constructed on the fly, since it must be given the mapping function to. By contrast,
first() could be a constant, but it is nonetheless constructed on the fly. In general, all operators are constructed - whether they need arguments or not.
Distinct from pipeable operators, creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables. For example:
from([ 1, 2, 3 ]) creates an observable that will sequentially emit 1, 2, and 3.
source = from([ 1, 2, 3 ]) subscribe!(source, lambda( on_next = (d) -> println("Value: $d"), on_error = (e) -> println("Oh no, error: $e") on_complete = () -> println("Completed") )) // Logs: // Value: 1 // Value: 2 // Value: 3 // Completed
Pipeable operators are special objects that can be used like ordinary functions with
on_call!(operator, source). In practice however they tend to accumulate and quickly grow unreadable:
on_call!(operator1, on_call!(operator2, on_call!(operator3, source))). Therefore, Rocket.jl overloads
|> for operators and Observables:
using Rocket source = from([ i for i in 1:100 ]) |> filter((d) -> d % 2 === 0) |> map(Int, (d) -> d ^ 2) |> sum() subscribe!(source, logger()) // Logs // [LogActor] Data: 171700 // [LogActor] Completed
It is also possible to create an operator composition. It might be useful to create an alias for some often used operator chain
using Rocket mapAndFilter = map(Int, d -> d ^ 2) + filter(d -> d % 2 == 0) source = from(1:5) |> mapAndFilter subscribe!(source, logger()) // Logs // [LogActor] Data: 4 // [LogActor] Data: 16 // [LogActor] Completed mapAndFilterAndSum = mapAndFilter + sum() source = from(1:5) |> mapAndFilterAndSum subscribe!(source, logger()) // Logs // [LogActor] Data: 20 // [LogActor] Completed
For stylistic reasons,
on_call!(operator, source) is never used in practice - even if there is only one operator. Instead,
source |> operator() is generally preferred.