core.async

Race condition when closing mults

Details

  • Type: Defect Defect
  • Status: Open Open
  • Priority: Major Major
  • Resolution: Unresolved
  • Affects Version/s: None
  • Fix Version/s: None
  • Component/s: None
  • Labels:
  • Approval:
    Triaged

Description

When a mult is tapped at around the same time as the source channel is closed, the tapped channel may not be closed.

(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))

The above code will sometimes return true, and sometimes return false.

Cause: This is caused by the following code in the mult function:

(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))

Any channels tapped after cs is dereferenced will not be closed.

Approach: A possible solution to this could be to always close channels tapped to a closed source. i.e.

(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; will always close c

This could be achieved by adding a flag to the cs atom to denote whether the mult is open or closed. If it's closed, any tapped channel is closed automatically.

Activity

Hide
James Reeves added a comment -

For reference, below is the custom fix for mult I'm using:

(defn mult [ch]
  (let [state (atom [true {}])
        m (reify
            Mux
            (muxch* [_] ch)
            Mult
            (tap* [_ ch close?]
              (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                    [open? _] (swap! state add-ch)]
                (when-not open? (close! ch))
                nil))
            (untap* [_ ch]
              (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
              nil)
            (untap-all* [_]
              (swap! state (fn [[open? _]] [open? {}]))))
        dchan (chan 1)
        dctr (atom nil)
        done (fn [_] (when (zero? (swap! dctr dec))
                       (put! dchan true)))]
    (go-loop []
      (let [val (<! ch)]
        (if (nil? val)
          (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
            (doseq [[c close?] cs]
              (when close? (close! c))))
          (let [chs (keys (second @state))]
            (reset! dctr (count chs))
            (doseq [c chs]
              (when-not (put! c val done)
                (swap! dctr dec)
                (untap* m c)))
            (when (seq chs)
              (<! dchan))
            (recur)))))
    m))
Show
James Reeves added a comment - For reference, below is the custom fix for mult I'm using:
(defn mult [ch]
  (let [state (atom [true {}])
        m (reify
            Mux
            (muxch* [_] ch)
            Mult
            (tap* [_ ch close?]
              (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                    [open? _] (swap! state add-ch)]
                (when-not open? (close! ch))
                nil))
            (untap* [_ ch]
              (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
              nil)
            (untap-all* [_]
              (swap! state (fn [[open? _]] [open? {}]))))
        dchan (chan 1)
        dctr (atom nil)
        done (fn [_] (when (zero? (swap! dctr dec))
                       (put! dchan true)))]
    (go-loop []
      (let [val (<! ch)]
        (if (nil? val)
          (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
            (doseq [[c close?] cs]
              (when close? (close! c))))
          (let [chs (keys (second @state))]
            (reset! dctr (count chs))
            (doseq [c chs]
              (when-not (put! c val done)
                (swap! dctr dec)
                (untap* m c)))
            (when (seq chs)
              (<! dchan))
            (recur)))))
    m))
Hide
David Nolen added a comment -

Is this also fixed in master? Thanks.

Show
David Nolen added a comment - Is this also fixed in master? Thanks.
Hide
Ghadi Shayban added a comment -

I understand the scenario, but honestly I'm not sure this is a bug in mult or the usage. A channel shouldn't be expected to always yield a take. The consumer of the "late tap" can guard against it with alts or some other mechanism, and also you can enforce a no-late-taps through a policy on the "production" side of things.

Rich Hickey can you weigh in?

Show
Ghadi Shayban added a comment - I understand the scenario, but honestly I'm not sure this is a bug in mult or the usage. A channel shouldn't be expected to always yield a take. The consumer of the "late tap" can guard against it with alts or some other mechanism, and also you can enforce a no-late-taps through a policy on the "production" side of things. Rich Hickey can you weigh in?
Hide
James Reeves added a comment - - edited

The "tap" function currently has an explicit "close?" flag, and if a tapped channel isn't guaranteed to close when the source channel closes, that argument probably shouldn't exist. Also, if auto-closing taps is taken out, should we remove the "close?" argument on "sub" as well?

Show
James Reeves added a comment - - edited The "tap" function currently has an explicit "close?" flag, and if a tapped channel isn't guaranteed to close when the source channel closes, that argument probably shouldn't exist. Also, if auto-closing taps is taken out, should we remove the "close?" argument on "sub" as well?
Hide
Ghadi Shayban added a comment -

It's more than respecting the flag. Related to the close behavior, channels can tap and untap without receiving anything while the mult process happily distributes a value to another set of channels (like the ABA problem). Could also make it an error to tap after the close is distributed to the last deref'ed set of channels. That is different than the familiar permanent nil receive, but mults already differ from simple channels.

Show
Ghadi Shayban added a comment - It's more than respecting the flag. Related to the close behavior, channels can tap and untap without receiving anything while the mult process happily distributes a value to another set of channels (like the ABA problem). Could also make it an error to tap after the close is distributed to the last deref'ed set of channels. That is different than the familiar permanent nil receive, but mults already differ from simple channels.

People

Vote (0)
Watch (1)

Dates

  • Created:
    Updated: