<< Back to previous view

[ASYNC-103] promise-chan Created: 05/Nov/14  Updated: 28/Aug/15

Status: Reopened
Project: core.async
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Enhancement Priority: Major
Reporter: Stuart Halloway Assignee: Alex Miller
Resolution: Unresolved Votes: 3
Labels: None

Attachments: Text File async-103-2.patch     Text File async-103-3.patch     Text File async-103-4.patch     Text File async-103-5.patch     Text File async-103-6.patch     Text File async-103.patch    
Patch: Code and Test
Approval: Screened
Waiting On: Alex Miller

 Description   

A promise-chan is a new kind of core.async channel that gives promise semantics to a channel.

  • Takes block prior to put or close!
  • After a put, all pending takes are notified, and any subsequent takes return the put value immediately
  • After a close (and no put), all pending takes are notified with nil, and any subsequent takes return nil immediately

In practice, a promise-chan is a normal channel that uses a promise-buf:

  • buf can only take on one value (or nil if closed) ever
  • buf is never emptied: N consumers can all get the same, one and only value by reading from the channel

In order to make close! sensible with this new kind of buf

  • all buffers need to be closeable
  • closing a channel closes its buffer
  • all basic buffer types do nothing when closed
  • a promise-buf make a one-time transition to having no value when closed (if not previously delivered)
  • closing a buffer is implementation detail not exposed to end users

Approach:

  • Buffer protocol now has a close-buf! function (close! would have overlapped the Channel function close!). close-buf! is invoked on a buffer when the channel is closed, allowing it to update itself if necessary.
  • Existing buffers implement close-buf! and do nothing (buffer still available for taking)
  • New promise-buffer implementation. Makes a one-time transition when value is supplied or buffer is closed. value is held in an unsynchronized-mutable deftype field - updates via add!* or close-buf! always happen under the containing channel mutex.
  • New api functions: promise-chan creates a channel with a promise-buffer and promise-buffer.

Patch: async-103-6.patch



 Comments   
Comment by Ghadi Shayban [ 07/Nov/14 4:06 PM ]

My initial gut reaction was that this is more related to semantics of the channel, not the buffer, and I'm wary of significant changes to esp. the impl protocols. But after seeing an impl, it looks straightforward and the changes aren't too significant. (Another possibility is to make another simpler implementation of a channel, with just slots for the value, lock and pending takers before the value is delivered. No slots for xfn or putters or buffer handling would be needed.)

Note an atom is not needed in the PromiseBuffer, just a set! on a mutable field to be inline with the other buffer flavors. If the patch continues using an atom, maybe guard val to not swap unnecessarily.

Comment by Alex Miller [ 07/Nov/14 11:09 PM ]

Good call on the atom - backed off to just using existing clojure.lang.Box. If that's too weird, will just deftype it.

Comment by Alex Miller [ 07/Nov/14 11:18 PM ]

Dur, just need the val field itself.

Comment by Fogus [ 09/Jan/15 4:27 PM ]

I'd really love to see the reason for the current impl over a more pointed promise channel (perhaps as described by Ghadi). This is a clear implementation, but with the addition of close-buf! some complexity is added for implementations in the future. Specifically, I'd like to at least see a dcostring in the protocol describing the meaning and desired semantics around close-buf! and what it should return and when. Like I said, this is a quality patch, but I'm just concerned that there are unstated assumptions around close-buf! that escape me.

Comment by Alex Miller [ 28/Jan/15 12:43 PM ]

Fogus, I looked into implementing the promise-chan a bit today but it requires replicating a significant portion of the existing chan implementation. I believe that's the primary reason to do it this way, just leveraging the majority chan code.

New -5 patch has two changes - I commented the Buffer protocol fns, and I removed promise-buffer from the api, as I think users should only use promise-chan.

Comment by Fogus [ 30/Jan/15 10:06 AM ]

The patch looks good. My only (minor) reservation is that the Buffer docstrings are written under the assumption that people will use instances only in the context of a guarded channel. I understand why of course, so I think I might be too pedantic here. Because of this I see no reason not to +1 this patch.

Comment by Alex Miller [ 23/Feb/15 8:32 AM ]

Applied patch.

Comment by Alex Miller [ 03/Apr/15 3:57 PM ]

Reopening (and likely reverting commit).

This example demonstrates that only one of the waiting blocks will be notified, instead of all, as we'd like:

(let [c (promise-chan)] 
  ;; takers
  (dotimes [_ 3] 
    (go (println (<! c)))) 

  ;; put, should get to all takers
  (go (>! c 1)))
Comment by Jozef Wagner [ 30/May/15 10:35 AM ]

ASYNC-124 fixes the issue mentioned by Alex

Comment by Leon Grapenthin [ 11/Jun/15 11:44 AM ]

Quote: "buf is never emptied: N consumers can all get the same, one and only value by reading from the channel"

This is not true with the new closing semantics introduced here, correct? If I deref a Clojure promise, I get the same value everytime. But what is the purpose in closing promise-chans after the promise has been delivered?

When is a good time to close a promise-chan? If I close after the put, which is what many core.async processes do, takers must have taken before the close which means it will often be a race condition unless explicit care is taken.

Is there a benefit or am I simply missing something here?

Comment by Herwig Hochleitner [ 08/Jul/15 2:23 PM ]

Here is my cljs implementation of a promise channel: https://github.com/webnf/webnf/blob/master/cljs/src/webnf/promise.cljs
I'm the sole author and you are free to use any of it.

Comment by Alex Miller [ 12/Aug/15 3:00 PM ]

I believe that this patch can be re-evaluated after ASYNC-124 is applied as that patch addresses the reason this was retracted.

Comment by Leon Grapenthin [ 23/Aug/15 6:46 AM ]

It seems that there are two different approaches to how a promise-chan is supposed to or can be used.

1. All consumers are supposed to be takers before the promise is delivered. Taking after delivery is not intended, hence producers should close the channel after delivery.
2. " can take at any time and either have to wait for the promise (take blocks) or they get its value right away.

(2) models the behavior of real promises. (1) rather models that of a mutable reference.

It seems that the current patch explicitly makes room for (1) by giving producers the possibility to close.

Now if you want to use a promise-chan like (1) as a producer you can't create and return a promise-chan. Instead you need to be passed a promise-chan which has (a) taker(s) already. On the consumer side, it seems like an annoying quirk to enqueue takes before being able to pass the promise-chan to the producer (since he could close the promise immediately). I can't think of a scenario where one would want to do such thing. So my new question is:

Given the lengths taken here to enable (1), are there practical usecases for that? Why is closing a promise-chan not just going to be a no op?

Comment by Alex Miller [ 24/Aug/15 12:56 PM ]

Taking after delivery is intended. It is hard sometimes to know whether a take will happen before or after delivery in asynchronous systems so both must work. close! will deliver nil to all pending or future takers - in this case (as with all channels) nil indicates a drained channel, meaning that no delivery will ever happen.

However, one good question here is - what happens when you close a promise-chan that has already been delivered?

Currently:

  • (def pc (promise-chan))
  • take from promise-chan (blocks)
  • (>!! pc 5) - delivers 5 to blocked taker(s)
  • take from promise-chan (immediately returns 5) ;; for any number of takes
  • (close! pc 5)
  • take from promise-chan (immediately returns nil) ;; ???

I could easily be persuaded that the last answer is wrong and that it should always deliver 5 regardless of whether the pc gets closed (that is closing has no effect on delivered promise-chans).

Comment by Alex Miller [ 24/Aug/15 1:03 PM ]

I just confirmed with Rich that once delivered, a promise-chan should always deliver that value to takers, regardless of whether it is subsequently closed.

Comment by Stuart Sierra [ 28/Aug/15 4:02 PM ]

Screened. Clojure version good, assuming ASYNC-124 is applied first. ClojureScript version may or may not need additional work.

I agree that the addition of close-buf! and the no-op implementations feel like working around some missing abstraction. (What does it even mean to "close" a buffer?) But the Buffer protocol is not part of the public API, it's buried down in clojure.core.async.impl.protocols. The Channel and Buffer implementations are already tightly coupled, so adding another point of collaboration does not seem likely to cause problems. Adding close-buf! is certainly preferable to duplicating the implementation of ManyToManyChannel.





[ASYNC-124] put! into channel that has mapcat transducer fails to dispatch more than one waiting takes Created: 30/May/15  Updated: 28/Aug/15

Status: Open
Project: core.async
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Defect Priority: Critical
Reporter: Jozef Wagner Assignee: Alex Miller
Resolution: Unresolved Votes: 1
Labels: None
Environment:

Clojure 1.7.0-RC1, latest core.async head (May 2015)


Attachments: Text File async-124-2.patch     Text File async-124-3.patch     Text File async-124-4.patch     Text File async-124-5.patch     Text File async-124-6.patch     Text File async-124-7.patch     Text File async-124-8.patch     Text File async-124.patch    
Patch: Code and Test
Approval: Screened

 Description   

This issue is similar to one that has caused the revert of ASYNC-103. When channel has multiple pending takes, one put will dispatch only one take, even if transducer attached to the channel produces multiple values.

(require '[clojure.core.async :refer (chan put! take!)])
(let [xf (mapcat #(range % (+ % 5)))
        c (chan 10 xf)]
    (dotimes [x 5]
      (take! c #(println (str x %))))
    (put! c 10)
    (put! c 15))

The output of the above code is

0 10
1 11

but it should be

0 10
1 11
2 12
3 13
4 14

Note that if we change the order and execute puts before takes, the result will be as expected.

Cause: When executing a write on a channel, no more than one pending take will be satisfied. Other pending takes will wait until the next put. When a put could release at most one take, this was sufficient but with expanding transducers, a put can potentially satisfy many pending takers.

Approach: Once a put succeeds into the buffer (through the transducer), instead of finding a single taker, do the prior work in a loop, pairing up callbacks with values removed from the buffer. Once the loop is complete, release the mutex and dispatch all paired takes.

Patch: async-124-8.patch

Screened by:



 Comments   
Comment by Jozef Wagner [ 30/May/15 10:30 AM ]

patch with test

Comment by Jozef Wagner [ 30/May/15 10:32 AM ]

Note that with attached patch the actual dispatch/run on takes happens before (.unlock mutex) and before (when done? (abort this)) are called.

Also note that added test only works with 1.7.0 version of clojure

Comment by Jozef Wagner [ 30/May/15 12:46 PM ]

Added new patch async-124-2.patch that dispatches after unlocking and aborting. Test was also changed as it had race condition.

Comment by Alex Miller [ 06/Jul/15 9:23 AM ]

We can't add a test that is 1.7-only. See https://github.com/clojure/core.async/blob/master/src/test/clojure/clojure/core/pipeline_test.clj#L7 for an example of how to patch in a transducer impl that will work in pre-1.7.

Comment by Jozef Wagner [ 06/Jul/15 10:41 AM ]

Added async-124-3.patch that works with Clojure <1.7

Comment by Alex Miller [ 10/Aug/15 3:53 PM ]

Jozef, I don't see any reason why the independent callbacks in your test will necessarily conj in order as they contend for the atom. When I run the test I usually get a failure. If you agree, could wrap expected and actual in a set. Or if that seems wrong, then let's decide why.

Comment by Jozef Wagner [ 11/Aug/15 10:49 AM ]

Attached async-124-4.patch that uses unordered comparison in test

Comment by Alex Miller [ 14/Aug/15 1:01 PM ]

Added async-124-5.patch which should be semantically identical as -4 but retains the structure of a single dispatch point.

Comment by Alex Miller [ 14/Aug/15 3:03 PM ]

New -6 patch tries to retain more of the original algorithm and leverages the original loop instead of adding a new one.

Comment by Alex Miller [ 18/Aug/15 8:25 AM ]

New -7 patch addresses some bugs in the prior version and cleans up the loop code, also adds better tests.

Comment by Alex Miller [ 28/Aug/15 11:53 AM ]

test tweaks added in -8, per Stuart Sierra

Comment by Stuart Sierra [ 28/Aug/15 12:01 PM ]

Screened. Clojure version good. Still needs corresponding fix to ClojureScript version.





[ASYNC-144] pipeline-async docstring correction Created: 27/Aug/15  Updated: 27/Aug/15

Status: Open
Project: core.async
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Defect Priority: Minor
Reporter: Derek Troy-West Assignee: Unassigned
Resolution: Unresolved Votes: 0
Labels: None


 Description   

Currently says:

"af must close! the channel before returning."

more correctly:

"af is responsible for closing the channel.", or similar.

af is asynchronous so should return immediately, the thread or go-block it spawns should close the channel later.






[ASYNC-137] Make (<! (timeout 0)) be closer to 0 Created: 03/Aug/15  Updated: 25/Aug/15

Status: Open
Project: core.async
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Enhancement Priority: Major
Reporter: Mike Thompson Assignee: Unassigned
Resolution: Unresolved Votes: 4
Labels: None


 Description   

Currently `(<! (timeout 0))` will take about 4ms on a modern browser because it ends up as `(js/setTimeout f 0)`. On a modern browser, setTimeout won't come back in under 4ms, even if given a 0. And 4ms is too long if you are just wanting to hand back control to the browser for the minimum about of time.

See:
https://github.com/clojure/core.async/blob/d073896192fa55fab992eb4c9ea57b86ec5cf076/src/main/clojure/cljs/core/async/impl/timers.cljs#L161-L164

followed by:
https://github.com/clojure/core.async/blob/d073896192fa55fab992eb4c9ea57b86ec5cf076/src/main/clojure/cljs/core/async/impl/dispatch.cljs#L35-L36

I was imagining this tiny rewrite:

(defn queue-delay [f delay]
(if (= 0 delay)
(goog.async.nextTick f) ;; <---- new path for 0
(js/setTimeout f delay)) ;; <---- existing path



 Comments   
Comment by Patrick O'Brien [ 03/Aug/15 10:31 PM ]

I think the use of goog.Timer.callOnce should work for any amount of time and would still solve the problem posed by the use of js/setTimeout.

Comment by Mike Thompson [ 04/Aug/15 2:38 AM ]

From what I can see, goog.Timer.callOnce use setTimeout. But that's specifically what we're trying to avoid in the case of a 0ms duration. So I don't see callOnce as a solution.

Comment by Patrick O'Brien [ 04/Aug/15 7:27 AM ]

I think you are right, Mike. Hard as it is to believe, since obviously Google knows about the problem. But now that I'm rereading the goog.Timer code I can't see that they've fixed it here. Looks like goog.async.nextTick is the only solution.

Comment by Patrick O'Brien [ 04/Aug/15 7:33 AM ]

I just wanted to mention that this issue is for the ClojureScript version of core.async, and that it appears to be fairly idiomatic to use (<! (timeout 0)) as a way to temporarily pass control back to the js event processing. So I think it is very important to make this efficient.

Comment by Ghadi Shayban [ 06/Aug/15 1:56 PM ]

In reading this "bug", it seems that the actual problem is having more varied goblock dispatch in CLJS. It could/should be improved, but making (<! (timeout 0)) – non-sensical code – behave differently is not addressing the problem in a meaningful way.

We should eliminate the need for this hack at the root, rather than make undefined behavior more specific. ASYNC-131 and ASYNC-43 are related and probably better starting points.

Comment by Mike Thompson [ 06/Aug/15 10:32 PM ]

@Ghadi

  1. I can't see how issue 131 is closely enough related to act as a "better starting point". It has a different thrust, represents different needs, and will likely have a different solution (as evidenced by the current suggestions).
  1. It almost looks to me as if Issue 43 could be closed with the switch to Google Closure dispatch noted by David Nolen. BTW, you'll note that my suggestion is to indeed use goog.async.nextTick (so I'm adhering to the use Google Closure approach).

Also, I would disagree with your assertion that (<! (timeout 0)) is "non-sensical code". In a single threaded environment, like the browser, there is sometimes a need to explicitly "hand back control" to the browser from within go loops (if that go loop is, for a time, hogging the CPU). At the moment (<! (timeout 0)) is the only way I know to do that. It could reworked to be (<! (yield-cpu)) in order to make the intent clearer, and that might be nice, but what I care about is having a way of yielding which doesn't always cost 4ms. I believe this need is genuine, and very "sensical"

Comment by Mike Thompson [ 06/Aug/15 11:06 PM ]

@Ghadi
Just to be clear ... you refer to this as a "bug" in double quotes. Almost as if it isn't a bug.

But for me this is a sufficiently real "bug" that I'll have to either work with a fork OR move away from using core.async entirely.

I need (<! (timeout 0)) to mean 0, or as close to it as possible. I need it to work as advertised. The current implementation does not deliver on 0 (or as close to it as possible), and instead delivers the same as (<! (timeout 4)) and 4ms is FOREVER compared to 0ms (or close to it). At 4ms each time around, a goloop can only iterate 250 times a second.

Anyone who has a goloop listening to the output of a bursty websocket will have this problem. They need to process like crazy (no 4ms delays) BUT also hand back control so the browser can do what it needs to, every now and again. All of this get exacerbated when a browser page loses "focus", goes into the background, and the js gets throttled, animation frames slow down, etc.

Comment by Patrick O'Brien [ 15/Aug/15 9:59 PM ]

In case it helps anyone who might be reading this, I have begun using the following as a temporary solution:

(defn yield []
  (let [ch (chan)]
    (goog.async.nextTick #(close! ch))
    ch))

(go
  ; Some code...
  (<! (yield))  ;; Instead of (<! (timeout 0))
  ...)
Comment by Mike Thompson [ 25/Aug/15 5:04 AM ]


That's nice! Certainly provides a placeholder solution.





Generated at Fri Aug 28 19:32:28 CDT 2015 using JIRA 4.4#649-r158309.