Cause: For an unbuffered channel, a take will try to pair up with a put, in which case the two are satisfied directly without ever involving the buffer (since there isn't one). Because of this, add! is never called and the xf will not be applied. The example above uses an expanding transducer, but problem exists for any transducer and could perhaps happen even on buffered channels?
Approach: one approach would be to not satisfy the put unless there were enough pending takes to satisfy all the transduced values, which seems consistent with an unbuffered channel.
Found this while testing some stuff on ASYNC-124.
(require '[clojure.core.async :as a :refer (>!! <!! chan onto-chan)]) (def c (chan 1 (mapcat #(range %)))) (onto-chan c [1 2 3]) (<!! (a/reduce conj [] c)) ;; [0 0 1 0 1 2] (def c (chan 0 (mapcat #(range %)))) (onto-chan c [1 2 3]) (<!! (a/reduce conj [] c)) ;; [1 2 3]Cause: For an unbuffered channel, a take will try to pair up with a put, in which case the two are satisfied directly without ever involving the buffer (since there isn't one). Because of this, add! is never called and the xf will not be applied. The example above uses an expanding transducer, but problem exists for any transducer and could perhaps happen even on buffered channels?
Approach: one approach would be to not satisfy the put unless there were enough pending takes to satisfy all the transduced values, which seems consistent with an unbuffered channel.