I'm experimenting with core.async
on Clojure and ClojureScript, to try and understand how merge
works. In particular, whether merge
makes any values put on input channels available to take immediately on the merged channel.
I have the following code:
(ns async-merge-example.core
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
[async-merge-example.exec :as exec]))
(defn async-fn-timeout
[v]
(async/go
(async/<! (async/timeout (rand-int 5000)))
v))
(defn async-fn-exec
[v]
(exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))
(defn merge-and-print-results
[seq async-fn]
(let [chans (async/merge (map async-fn seq))]
(async/go
(while (when-let [v (async/<! chans)]
(prn v)
v)))))
When I try async-fn-timeout
with a large-ish seq
:
(merge-and-print-results (range 20) async-fn-timeout)
For both Clojure and ClojureScript I get the result I expect, as in, results start getting printed pretty much immediately, with the expected delays.
However, when I try async-fn-exec
with the same seq
:
(merge-and-print-results (range 20) async-fn-exec)
For ClojureScript, I get the result I expect, as in results start getting printed pretty much immediately, with the expected delays. However for Clojure even though the sh
processes are executed concurrently (subject to the size of the core.async
thread pool), the results appear to be initially delayed, then mostly printed all at once! I can make this difference more obvious by increasing the size of the seq e.g. (range 40)
Since the results for async-fn-timeout
are as expected on both Clojure and ClojureScript, the finger is pointed at the differences between the Clojure and ClojureScript implementation for exec
..
But I don't know why this difference would cause this issue?
Notes:
- These observations were made in WSL on Windows 10
- The source code for
async-merge-example.exec
is below - In
exec
, the implementation differs for Clojure and ClojureScript due to differences between Clojure/Java and ClojureScript/NodeJS.
(ns async-merge-example.exec
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))
; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c
; clj implementation based on https://stackguides.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure
#?(:cljs (def spawn (.-spawn (js/require "child_process"))))
#?(:cljs
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
(.on (.-stdout p) "data" #(async/put! c [:out (str %)]))
(.on (.-stderr p) "data" #(async/put! c [:err (str %)]))
(.on p "close" #(async/put! c [:exit (str %)]))
c)))
#?(:clj
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan)]
(async/go
(let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
process (.start builder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))
err-reader (clojure.java.io/reader (.getErrorStream process))]
(loop []
(let [line (.readLine ^java.io.BufferedReader reader)
err (.readLine ^java.io.BufferedReader err-reader)]
(if (or line err)
(do (when line (async/>! c [:out line]))
(when err (async/>! c [:err err]))
(recur))
(do
(.waitFor process)
(async/>! c [:exit (.exitValue process)]))))))))
c)))
(defn exec
"executes cmd with args. returns a channel immediately which
will eventually receive a result map of
{:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
[cmd & args]
(let [c (exec-chan cmd args)]
(async/go (loop [output (async/<! c) result {}]
(if (= :exit (first output))
(assoc result :exit (second output))
(recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))