3
votes

I'm having difficulty understanding what I thought was a pretty simple concept in Clojure's async library. I'm essentially creating two channels with a pipeline, where the output channel is created using the take function of the input channel.

From my understanding, the purpose of take is to limit the number of items that a channel will receive before it closes itself (if the input channel has not already closed by this time). However, the code examples I've been playing with aren't producing the results I expected.

Take the following code for example:

(def in (chan 1))
(def out (async/take 5 in 1))

(doseq [i (range 10)]
  (go (>! in i)))

(pipeline 4 out (filter even?) in)

(go-loop []
  (when-some [val (<! out)]
    (println val)
    (recur))))

What I expected to happen was that the pipeline would filter out odd numbers, and only pass even numbers to the 'out' channel, when the out channel had received 5 even numbers it would close. However what I saw was both odd and even numbers printed to the REPL, something like the following:

2 7 4 0 8 6

At this point the out channel still hadn't closed and running the doseq a second time would print some other value before finally closing.

I'm incredibly perplexed as to what's going on here, it works like a charm when using take and not the pipeline and it also works when not using take but still using the pipeline, using the two in combination is a whole different story it seems. Am I missing something obvious here? Apologies if this is a simple mistake, this is my first (albeit naive) attempt at using core.async.

1

1 Answers

2
votes

You have placed take and pipeline in competition. Both of them are taking items from in and adding them to out. Replace the definition of out:

(def out (async/chan 3))

for example, and get the expected result

0
2
4
6
8

If you really want to use async/take, you could do it like so:

(def first (async/chan 1))
(def second (async/chan 3))
(pipeline 4 second (filter even?) first)
(def third (async/take 3 second))

(defn run []
  (go
    (doseq [i (range 10)]
      (>! first i)))
  (go (loop []
        (when-some [val (<! third)]
          (println val)
          (recur)))))

with result:

0
2
4