- If the generating function of a lazy sequence blocks (e.g. for I/O), it blocks all consuming threads
- Clojure's sequence functions cannot be applied to asynchronous events without converting events to blocking sequences
- Protocols for generators and consumers of asynchronous events
- Functions which mirror the Clojure sequence API (e.g. map, filter) for "push" events
- Integrate with Scheduled Events
- Cljque by Stuart Sierra
- Protocols "Observable" and "Observer" very similar to Rx
- "Push" sequence API with fns that look like the sequence API, with the same names
- Incomplete, currently-broken integration with Netty
- Lamina by Zach Tellman
Design Discussion: April 1, 2011
Models of Event Handling
3 models for events: pull (blocking), poll (nonblocking), and push (registered event handlers). Poll isn't really a model; it always ends up being pull somewhere. Event handlers usually end up being dead-ends – once a handler is triggered, the event stops.
Models of Iteration
Iteration has three operations: 1) is there more stuff? 2) get the stuff; and 3) move. In Clojure, these correspond to 'seq', 'first', and 'rest'. ('next' is derived as the 'seq' of 'rest'.) Java's iterators and .NET's enumerators each conflate 2 out of the 3, but in different combinations. What are the corresponding operations for asynchronous events?
In .NET, subscribing to an IObservable returns an IDisposable, which connects to the rest of the .NET resource management infrastructure. In Java, the closest analog is java.io.Closeable, so subscribing to an observable object should return a Closeable.
Which Thread to Run On
Event handlers can be invoked on the thread that generated the event or dispatched to another thread (such as the Agent thread pool). However, if the framework forces dispatch to another thread, you have no choice. If execution stays on the thread that generated the event, you always have the option of dispatching to another thread.
Duality and Naming
Eric Meyer's talk on Rx emphasizes that IObserver/IObservable is a dual to Enumerator/Enumerable. What is the dual to Clojure seqs? Seqs are nice because they're simple: the body of a seq function boils down to one block of code, a closure. Can asynchronous events be made equally simple? Can you merge the "on event," "on completed," and "on error" operations of an observer into one block of code?
The "pull" model implies ordering; you only deal with one thing at at a time. In the "push" model, do you still have to provide that, or can you send many events at once? Do event handlers need to be reentrant? (The Rx Design Guidelines, sec. 6.7, say that calls to IObserver methods should be serialized so that consumers do not need to deal with concurrency.)
In the "push" model, the order in which events are received and dispatched is non-deterministic. One problem people have with Rx occurs when you receive an event and start a process based on it, assuming you're finished. But then you receive another event from the same source.
Design Discussion April 8, 2011
Reactive Extensions' IObservable isn't really a dual of IEnumerator. The arguments don't line up, and the fact that Subscribe returns an IDisposable complicates the model. In the Observable model, the "move" operation is hidden.
.NET's IEnumerator is a closer analog to Clojure sequences than Java's Iterator, because it separates the "get current" operation from the "move" operation. So you can call "get current" multiple times with the same result, like Clojure's "first".
Can we create a purely-functional observer? An observer could return a modified version of itself, which the Observable would use to process the next event. This implies that events must be sequentialized, but that's what people expect anyway. It also implies that the observable needs to keep track of that return value. This limits the stateful parts of the model to sources of events (and possibly the final sinks to which they go). Just because sources and sinks are mutable doesn't mean that everything on the chain between them has to be mutable.
Haskell's Iteratee is an attempt to solve the same resource-management problems of lazy seqs, by putting the collection in charge of the iteration.
Clojure has the IReduce interface for "internal" reduce operations. Instead of realizing a sequence and reducing over it, you pass a function to a collection and tell it to reduce over itself, which is more efficient. But it can't short-circuit; there's no way for the function to signal to the collection that it is not interested in receiving any more values. If we had a purely-functional observer, we might be able to unify it with internal reduce.