Error formatting macro: pagetree: java.lang.NullPointerException
Skip to end of metadata
Go to start of metadata
You are viewing an old version of this page. View the current version. Compare with Current  |   View Page History

Problems

  • If the generating function of a lazy sequence blocks (e.g. for I/O), it blocks all consuming threads
  • Clojure's sequence functions cannot be applied to asynchronous events without converting events to blocking sequences

Proposed Solution

  • Protocols for generators and consumers of asynchronous events
  • Functions which mirror the Clojure sequence API (e.g. map, filter) for "push" events
  • Integrate with Scheduled Events

References

Experiments

  • Cljque by Stuart Sierra
    • Protocols "Observable" and "Observer" very similar to Rx
    • "Push" sequence API with fns that look like the sequence API, with the same names
    • Incomplete, currently-broken integration with Netty
  • Lamina by Zach Tellman
    • Foundation for Aleph
    • No direct analogue for first/rest, but event stream equivalents for map/filter/reduce/take
    • Based on Channels (previous discussion here)
      • Queues to which "handler" functions can subscribe
      • Permits both "push" and "pull" on the same data

Design Discussion: April 1, 2011

Models of Event Handling

3 models for events: pull (blocking), poll (nonblocking), and push (registered event handlers). Poll isn't really a model; it always ends up being pull somewhere. Event handlers usually end up being dead-ends – once a handler is triggered, the event stops.

Models of Iteration

Iteration has three operations: 1) is there more stuff? 2) get the stuff; and 3) move. In Clojure, these correspond to 'seq', 'first', and 'rest'. ('next' is derived as the 'seq' of 'rest'.) Java's iterators and .NET's enumerators each conflate 2 out of the 3, but in different combinations. What are the corresponding operations for asynchronous events?

Unsubscribing

In .NET, subscribing to an IObservable returns an IDisposable, which connects to the rest of the .NET resource management infrastructure. In Java, the closest analog is java.io.Closeable, so subscribing to an observable object should return a Closeable.

Which Thread to Run On

Event handlers can be invoked on the thread that generated the event or dispatched to another thread (such as the Agent thread pool). However, if the framework forces dispatch to another thread, you have no choice. If execution stays on the thread that generated the event, you always have the option of dispatching to another thread.

Duality and Naming

Eric Meyer's talk on Rx emphasizes that IObserver/IObservable is a dual to Enumerator/Enumerable. What is the dual to Clojure seqs? Seqs are nice because they're simple: the body of a seq function boils down to one block of code, a closure. Can asynchronous events be made equally simple? Can you merge the "on event," "on completed," and "on error" operations of an observer into one block of code?

Ordering

The "pull" model implies ordering; you only deal with one thing at at a time. In the "push" model, do you still have to provide that, or can you send many events at once? Do event handlers need to be reentrant? (The Rx Design Guidelines, sec. 6.7, say that calls to IObserver methods should be serialized so that consumers do not need to deal with concurrency.)

In the "push" model, the order in which events are received and dispatched is non-deterministic. One problem people have with Rx occurs when you receive an event and start a process based on it, assuming you're finished. But then you receive another event from the same source.

Design Discussion: April 8, 2011

Reactive Extensions' IObservable isn't really a dual of IEnumerator. The arguments don't line up, and the fact that Subscribe returns an IDisposable complicates the model. In the Observable model, the "move" operation is hidden.

.NET's IEnumerator is a closer analog to Clojure sequences than Java's Iterator, because it separates the "get current" operation from the "move" operation. So you can call "get current" multiple times with the same result, like Clojure's "first".

Can we create a purely-functional observer? An observer could return a modified version of itself, which the Observable would use to process the next event. This implies that events must be sequentialized, but that's what people expect anyway. It also implies that the observable needs to keep track of that return value. This limits the stateful parts of the model to sources of events (and possibly the final sinks to which they go). Just because sources and sinks are mutable doesn't mean that everything on the chain between them has to be mutable.

Haskell's Iteratee is an attempt to solve the same resource-management problems of lazy seqs, by putting the collection in charge of the iteration.

Clojure has the IReduce interface for "internal" reduce operations. Instead of realizing a sequence and reducing over it, you pass a function to a collection and tell it to reduce over itself, which is more efficient. But it can't short-circuit; there's no way for the function to signal to the collection that it is not interested in receiving any more values. If we had a purely-functional observer, we might be able to unify it with internal reduce.

Ideas: April 23, 2011

Expressed in this source file

Start by calling (observe observable observer), which returns immediately. In another thread, the observable will call (on-next observer event) where event is a view of the observable supporting the method current, which either returns the latest value of the observable or throws an exception. An event of nil signals to the observer that the observable is finished generating events.

The observer's on-next method returns an updated observer to the observable. An observer supports a more? method to signal to the observable whether or not it wants to receive more events. When more? returns false, the observer must also support a result method. The result method returns a final value computed by this observer (such as an aggregation or reduce), or throws an exception.

All the methods are pure except for observe, which may initiate a computation in another thread. Combinators such as map, reduce, and filter are applied to observers instead of observables.

The methods can be summarized as follows:

  • Observable
    • (observe observable observer) ;;=> updated observable
    • (current observable) ;;=> current value or throws exception
  • Observer
    • (on-next observer observable) ;;=> updated observer
    • (result observer) ;;=> final value or throws exception
    • (more? observer) ;;=> boolean

The methods roughly correspond to lazy sequences as follows:

Observable

Observer

Seq

observe

on-next

seq

current

result

first

not nil?

more?

rest

Where I'm currently stuck is on the return value of observe. Logically, it should return another Observable, containing some reference to the result of the passed observer. I think it should return an "Observable Future," which can be dereferenced to get the result value of the observer, or observed again to subscribe another observer to that result when it becomes available.

Ideas: July 9, 2011: "Future Seqs"

  • Add callbacks to promises: invoke a fn when the promise gets a value
  • Construct a "future seq" where each Cons cell is backed by a promise
  • Example implementation here
Labels: