2
votes

I'm using core.async on ClojureScript to avoid using node.js callbacks. The problem is that I'm hitting the 1024 pending messages limit.

To avoid it, I would need to send all messages to channels inside the same go block. But this is not really possible on core.async because an anonymous function nullifies the effect of go, so I can't do this:

(go
  (. socket on "data" #(>! chan %)))

So, is there a way to get around this limitation?

2
I just added an appropriate language hint. When tagging with both clojurescript and node.js or javascript please use the hint to ensure the syntax highlighter works correctly. - Jared Smith

2 Answers

0
votes

I am not sure I understand why it has to be in the same go block. Normally, you would just do:

(. socket on "data" #(go (put! chan %)))

and then process the chan in another go block

0
votes

To simulate the error, we can "simulate" a kind of callback:

(let [callback (atom nil)
      on-data (fn [fun]
                (reset! callback fun))
      chan (async/chan)]

If we try to add a callback with (on-data #(async/go (async/put! chan %))), it'll blow the limit. Also, as we're using async/go, it'll cause the messages inside the channel to be out of order.

The only way I found out to fix this is to create an infinite list of promise-chan inside an atom and every callback will pick up the first element, publish a message, and remove the first for the list. Then, we can have a doseq inside a go block that'll publish messages for us:

(let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
  ; We need to iterate over inf-list before anything
  (let [lst @inf-list]
    (async/go
     (doseq [c lst]
       (async/>! chan (async/<! c)))))

  (on-data #(do
              (async/put! (first @inf-list) %)
              (swap! inf-list rest))))