I'm having difficulty understanding what I thought was a pretty simple concept in Clojure's async library. I'm essentially creating two channels with a pipeline, where the output channel is created using the take function of the input channel.
From my understanding, the purpose of take is to limit the number of items that a channel will receive before it closes itself (if the input channel has not already closed by this time). However, the code examples I've been playing with aren't producing the results I expected.
Take the following code for example:
(def in (chan 1))
(def out (async/take 5 in 1))
(doseq [i (range 10)]
(go (>! in i)))
(pipeline 4 out (filter even?) in)
(go-loop []
(when-some [val (<! out)]
(println val)
(recur))))
What I expected to happen was that the pipeline would filter out odd numbers, and only pass even numbers to the 'out' channel, when the out channel had received 5 even numbers it would close. However what I saw was both odd and even numbers printed to the REPL, something like the following:
2 7 4 0 8 6
At this point the out channel still hadn't closed and running the doseq a second time would print some other value before finally closing.
I'm incredibly perplexed as to what's going on here, it works like a charm when using take and not the pipeline and it also works when not using take but still using the pipeline, using the two in combination is a whole different story it seems. Am I missing something obvious here? Apologies if this is a simple mistake, this is my first (albeit naive) attempt at using core.async.