Observables are lazy Push collections of multiple values. They fill the missing spot in the following table:
For example, the following code specifies an Observable that pushes the values
1, 2, 3 immediately (synchronously) when subscribed to, and the value
4 after one second has passed since subscription.
using Rocket source = make(Int) do actor next!(actor, 1) next!(actor, 2) next!(actor, 3) setTimeout(1000) do next!(actor, 4) complete!(actor) end end
To invoke the Observable and inspect these values, we need to subscribe to it.
using Rocket source = make(Int) do actor next!(actor, 1) next!(actor, 2) next!(actor, 3) setTimeout(1000) do next!(actor, 4) complete!(actor) end end println("Just before subscribe") subscribe!(source, lambda( on_next = (d) -> println(d), on_complete = () -> println("Completed") )) println("Just after subscribe") # Logs # Just before subscribe # 1 # 2 # 3 # Just after subscribe # 4 # Completed
Pull and Push are two different protocols that describe how a data Producer communicates with a data Consumer.
In a Pull system, the Consumer determines when it receives data from the Producer. The Producer itself is unaware of when the data are delivered to the Consumer.
Every Julia Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming data by "pulling" a return value from the call.
|Pull||Passive: produces data when requested.||Active: decides when data is requested.|
|Push||Active: produces data at its own pace.||Passive: reacts to received data.|
In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.
Futures and promises are the most common type of Push systems today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers). Unlike functions, it is the Promise that determines precisely when a value is "pushed" to the callbacks.
Rocket.jl introduces Observables, a new Push system for Julia. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers or
- A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
- A Generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
- A Promise is a computation that may (or may not) eventually return a single value.
- An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it's invoked.
In contrast to functions, Observables can "return" multiple values over time. For example, functions can't do this:
function foo() println("Hello!") return 0 return 1 # Dead code, will never happen end
Observables, however, can do this:
using Rocket foo = make(Int) do actor next!(actor, 0) next!(actor, 1) complete!(actor) end
Observables can also "return" values asynchronously:
using Rocket foo = make(Int) do actor setTimeout(1000) do next!(actor, 0) complete!(actor) end end
func()means "give me one value synchronously"
subscribe(observable, ...)means "give me any amount of values, either synchronously or asynchronously"
Observables are (1) created using creation operators (it is also possible to build an Observable from scratch with custom logic); (2) subscribed to with an
Actor; (3) execute to deliver
complete! notifications to the Actor, and (4) their execution may be disposed. These four aspects are all encoded in an Observable instance, but some of these aspects are related to other types, such as
The core responsibilities of an Observable are:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
You can create an Observable in various ways using Creation operators. You can also build an Observable from scratch. To see how you can build an Observable with custom logic, consult the API Section.
source in the example can be subscribed to.
using Rocket subscribe!(source, lambda( on_next = (d) -> println(d) ))
This example shows how subscribe calls are not shared among multiple Actors of the same Observable. When calling
subscribe! with an Actor, the function
on_subscribe! that is attached to this particular Observable is executed for that given actor. Each call to
subscribe! triggers its own independent setup for that given actor.
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
subscribe! call simply delivers initial values or events to an Actor.
The execution produces multiple values over time, either synchronously or asynchronously.
An Observable Execution can deliver three types of notifications:
- Next: sends a value, such as an Int, String, Dict, etc.;
- Error: sends any error as a value;
- Complete: does not send a value.
"Next" notifications are the most important and most common type: they represent actual data being delivered to an subscriber. "Error" and "Complete" notifications terminate the Observable Execution.
In an Observable Execution, any number of Next notifications may be delivered. However, once a single Error or Complete notification is delivered, nothing else can be delivered afterwards.
The following is an example of an Observable execution that delivers three Next notifications and subsequently completes:
using Rocket source = make(Int) do actor next!(actor, 1) next!(actor, 2) next!(actor, 3) complete!(actor) end # or the same with creation operator source = from([ 1, 2, 3 ])
It is advised to wrap any code in subscribe by a try/catch block that delivers an Error notification upon an exception:
using Rocket source = make(Int) do actor try next!(actor, 1) next!(actor, 2) next!(actor, 3) complete!(actor) catch e error!(actor, e) end end
It is common for an Actor to abort execution of an Observable Execution. Once the Actor is done receiving values, it may stop the execution in order to free computation power or memory resources.
subscribe! is called, the Actor gets attached to the newly created Observable execution. This call also returns an object, the
subscription = subscribe!(source, actor)
The Subscription represents the ongoing execution, and has a minimal API that allows you to cancel the execution. Read more about
Subscription type here.
you can cancel the ongoing execution.
subscribe! returns a Subscription that represents the ongoing execution. Simply call
unsubscribe! on the Subscription to cancel the execution.