Because a new (fill) action is created every time an item is (drain)ed, the LBQ gets loaded up with piles of EOS markers. If the output seq is consumed faster than the input sequence can produce items, then the original agent action does all of the filling in a single action, while the agent's queue continues to back up with (fill) actions. Eventually the entire sequence is consumed, and each queued action does a blocking .put of an EOS marker; once the queue has filled up, one of these actions blocks forever, since nobody will ever .take from the queue again.
The attached patch does two things:
- Make sure to put exactly one EOS marker in the stream, by setting the agent's state to nil if and only if a .offer of EOS has been accepted.
- Replace all occurrences of .put with .offer (and appropriate checking of the return value). This is necessary because if .put ever blocks, it's possible for the system to remain in that state forever (say, because the client stops consuming the queue), thus leading to the same leaked-thread scenario.
The diff is a little bit inconvenient to read because of indentation changes; https://gist.github.com/b7ecd4395a0d3d473de6 is an ignore-whitespace view of the patch for convenience.