core.async

implement IDeref, IBlockingDeref for channels

Details

  • Type: Enhancement Enhancement
  • Status: Open Open
  • Priority: Major Major
  • Resolution: Unresolved
  • Affects Version/s: None
  • Fix Version/s: None
  • Component/s: None
  • Labels:
    None
  • Patch:
    Code and Test
  • Approval:
    Vetted

Description

  • each deref will take a value from the channel
  • deref from a closed channel returns nil
  • deref with timeout must either timeout or take value from the channel (not both!). Might just want to call some variant of alt!! through the var
  • use internals efficiently
  • do NOT implement IPending/realized, neither interpretation is that great

Patch: async-102-2.patch

The implementation of deref and timed deref is conceptually straightforward, but is difficult to implement due to circular namespace dependencies. Suggestions welcome.

  1. async-102.patch
    06/Nov/14 12:56 PM
    4 kB
    Alex Miller
  2. async-102-2.patch
    06/Jan/15 11:42 AM
    3 kB
    Alex Miller

Activity

Hide
Stuart Halloway added a comment - - edited

The async-102.patch does not match the description "a channel becomes realized when it is closed" – instead, a channel becomes realized when a value is currently available. Both of these interpretations seem problematic to me, and I think the definition of realized? needs clarification to support this new case. The other uses of realized? in Clojure all guarantee that subsequent derefs will be filled immediately, but that will not be true for channels if another consumer takes the value.

Once that is solved, I need better doc strings for the core.async implementation protocols before I can screen this. What guarantees are made about use of threads? Where does the work enqueue, and what happens if the queue is full?

Show
Stuart Halloway added a comment - - edited The async-102.patch does not match the description "a channel becomes realized when it is closed" – instead, a channel becomes realized when a value is currently available. Both of these interpretations seem problematic to me, and I think the definition of realized? needs clarification to support this new case. The other uses of realized? in Clojure all guarantee that subsequent derefs will be filled immediately, but that will not be true for channels if another consumer takes the value. Once that is solved, I need better doc strings for the core.async implementation protocols before I can screen this. What guarantees are made about use of threads? Where does the work enqueue, and what happens if the queue is full?
Hide
Alex Miller added a comment -

My implementation of realized? is based on Rich's response to your question (from internal chat): "open+empty = false, else true" and I believe that is what's implemented and reflected in the tests. This also makes more sense to me than only being realized on close.

Your comment vs other cases of realized? seems accurate, so I agree that's a question.

Can you be more specific on which implementation protocol? I'm guessing you specifically mean Channel and Buffer. Any thread could be calling into the Channel. The M2MC protects its internal state with a mutex and will forward calls down to the Buffer. All calls into the buffer are protected by the channel mutex. M2MC enqueues pending puts and takes in the puts and takes lists. Those are bounded by the fixed limit clojure.core.async.impl.protocols/MAX-QUEUE-SIZE (1024), at which point an exception is thrown.

Show
Alex Miller added a comment - My implementation of realized? is based on Rich's response to your question (from internal chat): "open+empty = false, else true" and I believe that is what's implemented and reflected in the tests. This also makes more sense to me than only being realized on close. Your comment vs other cases of realized? seems accurate, so I agree that's a question. Can you be more specific on which implementation protocol? I'm guessing you specifically mean Channel and Buffer. Any thread could be calling into the Channel. The M2MC protects its internal state with a mutex and will forward calls down to the Buffer. All calls into the buffer are protected by the channel mutex. M2MC enqueues pending puts and takes in the puts and takes lists. Those are bounded by the fixed limit clojure.core.async.impl.protocols/MAX-QUEUE-SIZE (1024), at which point an exception is thrown.
Hide
Alex Miller added a comment - - edited

The conceptual implementations of IDeref and IBlockingDeref on ManyToManyChannel using existing async constructs is relatively straightforward:

(deftype ManyToManyChannel
  #_existing_code...
  IDeref
  (deref [this] (<!! this))

  IBlockingDeref
  (deref [this ms timeoutValue]
    (alt!!
      this ([val _] val)
      (timeout ms) timeoutValue)))

However, M2MC is defined in clojure.core.async.impl.channels. <!!, alt!!, and timeout are all defined in clojure.core.async, which depends on clojure.core.async.impl.channels, so there is a cyclic dependency problem here. The <!! and timeouts are pretty easy to deal with looking up the var behind delay like this:

(def ^:private <!!' (delay (find-var 'clojure.core.async/<!!)))

;; then:
  IDeref
  (deref [this] (@<!!' this))

However, I'm a little stumped on how to do the equivalent with alt!!, which is a macro around do-alt and alts!!.

Show
Alex Miller added a comment - - edited The conceptual implementations of IDeref and IBlockingDeref on ManyToManyChannel using existing async constructs is relatively straightforward:
(deftype ManyToManyChannel
  #_existing_code...
  IDeref
  (deref [this] (<!! this))

  IBlockingDeref
  (deref [this ms timeoutValue]
    (alt!!
      this ([val _] val)
      (timeout ms) timeoutValue)))
However, M2MC is defined in clojure.core.async.impl.channels. <!!, alt!!, and timeout are all defined in clojure.core.async, which depends on clojure.core.async.impl.channels, so there is a cyclic dependency problem here. The <!! and timeouts are pretty easy to deal with looking up the var behind delay like this:
(def ^:private <!!' (delay (find-var 'clojure.core.async/<!!)))

;; then:
  IDeref
  (deref [this] (@<!!' this))
However, I'm a little stumped on how to do the equivalent with alt!!, which is a macro around do-alt and alts!!.
Hide
Ghadi Shayban added a comment -

Don't rely on resolving the circular dependency. Instead make a shared alt-flag and enqueue the two handlers [1]. fret will be delivering a promise, and then deref the promise.

[1] https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L230-L231

Show
Ghadi Shayban added a comment - Don't rely on resolving the circular dependency. Instead make a shared alt-flag and enqueue the two handlers [1]. fret will be delivering a promise, and then deref the promise. [1] https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L230-L231
Hide
Alex Miller added a comment -

New patch goes back to prior implementation of blocking deref (basically guts of <!!) and adds new alts-based version of timed deref.

Show
Alex Miller added a comment - New patch goes back to prior implementation of blocking deref (basically guts of <!!) and adds new alts-based version of timed deref.
Hide
Fogus added a comment -

> However, I'm a little stumped on how to do the equivalent with
> alt!Unable to render embedded object: File (, which is a macro around do-alt and alts) not found.!.

I wonder if the following will solve the alts!! problem:

(def ^:private ^:macro alts!!' (deref (find-var 'clojure.core.async/alts!!)))

And then using it in the deref impl behind IBlockingDeref. As it stands it's difficult to grok the code as written given that it's using some core.async implementation details (indeed, the same code). Whereas your implementation of IBlockingDeref.deref is crystal clear, the implementation prompted by the cyclic dependency is... less so. Not to belabor the point (which I seem to be), but alts!! is a clear implementation for this, but using the guts muddies the water. If I had my druthers, I'd prefer clarity.

That said, being a human macro-expander I was able to reason through the implementation eventually.

Show
Fogus added a comment - > However, I'm a little stumped on how to do the equivalent with > alt!Unable to render embedded object: File (, which is a macro around do-alt and alts) not found.!. I wonder if the following will solve the alts!! problem:
(def ^:private ^:macro alts!!' (deref (find-var 'clojure.core.async/alts!!)))
And then using it in the deref impl behind IBlockingDeref. As it stands it's difficult to grok the code as written given that it's using some core.async implementation details (indeed, the same code). Whereas your implementation of IBlockingDeref.deref is crystal clear, the implementation prompted by the cyclic dependency is... less so. Not to belabor the point (which I seem to be), but alts!! is a clear implementation for this, but using the guts muddies the water. If I had my druthers, I'd prefer clarity. That said, being a human macro-expander I was able to reason through the implementation eventually.
Hide
Alex Miller added a comment -

That doesn't seem to work to me. If you have some formulation that would, that would be great though!

Show
Alex Miller added a comment - That doesn't seem to work to me. If you have some formulation that would, that would be great though!

People

Vote (0)
Watch (3)

Dates

  • Created:
    Updated: