3 models for events: pull (blocking), poll (nonblocking), and push (registered event handlers). Poll isn't really a model; it always ends up being pull somewhere. Event handlers usually end up being dead-ends – once a handler is triggered, the event stops.
Iteration has three operations: 1) is there more stuff? 2) get the stuff; and 3) move. In Clojure, these correspond to 'seq', 'first', and 'rest'. ('next' is derived as the 'seq' of 'rest'.) Java's iterators and .NET's enumerators each conflate 2 out of the 3, but in different combinations. What are the corresponding operations for asynchronous events?
In .NET, subscribing to an IObservable returns an IDisposable, which connects to the rest of the .NET resource management infrastructure. In Java, the closest analog is java.io.Closeable, so subscribing to an observable object should return a Closeable.
Event handlers can be invoked on the thread that generated the event or dispatched to another thread (such as the Agent thread pool). However, if the framework forces dispatch to another thread, you have no choice. If execution stays on the thread that generated the event, you always have the option of dispatching to another thread.
Eric Meyer's talk on Rx emphasizes that IObserver/IObservable is a dual to Enumerator/Enumerable. What is the dual to Clojure seqs? Seqs are nice because they're simple: the body of a seq function boils down to one block of code, a closure. Can asynchronous events be made equally simple? Can you merge the "on event," "on completed," and "on error" operations of an observer into one block of code?
The "pull" model implies ordering; you only deal with one thing at at a time. In the "push" model, do you still have to provide that, or can you send many events at once? Do event handlers need to be reentrant? (The Rx Design Guidelines, sec. 6.7, say that calls to IObserver methods should be serialized so that consumers do not need to deal with concurrency.)
In the "push" model, the order in which events are received and dispatched is non-deterministic. One problem people have with Rx occurs when you receive an event and start a process based on it, assuming you're finished. But then you receive another event from the same source.
Reactive Extensions' IObservable isn't really a dual of IEnumerator. The arguments don't line up, and the fact that Subscribe returns an IDisposable complicates the model. In the Observable model, the "move" operation is hidden.
.NET's IEnumerator is a closer analog to Clojure sequences than Java's Iterator, because it separates the "get current" operation from the "move" operation. So you can call "get current" multiple times with the same result, like Clojure's "first".
Can we create a purely-functional observer? An observer could return a modified version of itself, which the Observable would use to process the next event. This implies that events must be sequentialized, but that's what people expect anyway. It also implies that the observable needs to keep track of that return value. This limits the stateful parts of the model to sources of events (and possibly the final sinks to which they go). Just because sources and sinks are mutable doesn't mean that everything on the chain between them has to be mutable.
Haskell's Iteratee is an attempt to solve the same resource-management problems of lazy seqs, by putting the collection in charge of the iteration.
Clojure has the IReduce interface for "internal" reduce operations. Instead of realizing a sequence and reducing over it, you pass a function to a collection and tell it to reduce over itself, which is more efficient. But it can't short-circuit; there's no way for the function to signal to the collection that it is not interested in receiving any more values. If we had a purely-functional observer, we might be able to unify it with internal reduce.
Expressed in this source file
Start by calling
(observe observable observer), which returns immediately. In another thread, the observable will call
(on-next observer event) where
event is a view of the observable supporting the method
current, which either returns the latest value of the observable or throws an exception. An
nil signals to the observer that the observable is finished generating events.
on-next method returns an updated observer to the observable. An observer supports a
more? method to signal to the observable whether or not it wants to receive more events. When
more? returns false, the observer must also support a
result method. The
result method returns a final value computed by this observer (such as an aggregation or reduce), or throws an exception.
All the methods are pure except for
observe, which may initiate a computation in another thread. Combinators such as
filter are applied to observers instead of observables.
The methods can be summarized as follows:
(observe observable observer) ;;=> updated observable
(current observable) ;;=> current value or throws exception
(on-next observer observable) ;;=> updated observer
(result observer) ;;=> final value or throws exception
(more? observer) ;;=> boolean
The methods roughly correspond to lazy sequences as follows:
Where I'm currently stuck is on the return value of
observe. Logically, it should return another Observable, containing some reference to the
result of the passed observer. I think it should return an "Observable Future," which can be dereferenced to get the
result value of the observer, or observed again to subscribe another observer to that result when it becomes available.