When a channel is closed, pending takes are committed, but pending puts are not, stranding handlers
Description
Environment
org.clojure:clojure:1.7.0-alpha5
org.clojure:core.async:0.1.346.0-17112a-alpha
Attachments
- 12 Aug 2017, 09:08 PM
Activity
Alex Miller January 8, 2022 at 9:58 PM
Declining this...
Per the `close!` docstring, "Logically closing happens after all puts have been delivered. Therefore, any
blocked or parked puts will remain blocked/parked until a taker releases them." Thus, we can't decline pending puts on close - they are logically ahead of the close and can be satisfied by a later take on the closed channel.
So, this is not a bug and is the intended behavior.
andrewhr August 12, 2017 at 9:08 PM
As far as I could understand, the `close!` implementation follows all `take`s and run their respective callbacks for releasing.
The approach I follow was doing the same for `put`s, so we guarantee previously parked `put` will be release - like demonstrated by the general case of this error.
In the same patch, I added a single test to assert this behavior and ported the same code to ClojureScript implementation. I could split both impls on different patches, but I judged having everything together will ease the screening.
import June 8, 2015 at 12:22 AM
Comment made by: klauswuestefeld
A more general case:
Doing this:
(go (println (>! c 42)))
and then closing c will cause the >! to block, instead of returning false.
If c is closed before that, >! will return false.
Is this race condition the intended behavior?
When a channel is closed, there may be both pending takes and/or pending puts. Pending takes are all committed, either with values from the buffer or nil if empty. Pending puts are left as is. Inactive puts may be removed during a later cleanup (if triggered), but active puts on a closed channel will be stranded when they could be declined.
Pending take is delivered:
(require '[clojure.core.async :as a]) (def c (a/chan)) (future (println "take" (a/<!! c))) ;; take (a/close! c) ;;prints: take nil
Pending put is not:
(def c (a/chan)) (future (println "put" (a/>!! c :hi))) ;; put :hi (a/close! c) ;; NOTE: put does not complete or print here, it's blocked (a/<!! c) ;;prints: put true
Proposed: When a channel is closed, we currently commit all pending take handlers - some of those may satisfy pending puts. If the takes run out of buffer vals (commit with nil val), then we can also close pending puts (the “end” of the channel has been reached and no new puts will ever succeed). At that point, commit and release all remaining put handlers.
Patch:
ASYNC-125.patch