1
votes

I'm experimenting with core.async on Clojure and ClojureScript, to try and understand how merge works. In particular, whether merge makes any values put on input channels available to take immediately on the merged channel.

I have the following code:

(ns async-merge-example.core
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
   [async-merge-example.exec :as exec]))

(defn async-fn-timeout
  [v]
  (async/go
    (async/<! (async/timeout (rand-int 5000)))
    v))

(defn async-fn-exec
  [v]
  (exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))

(defn merge-and-print-results
  [seq async-fn]
  (let [chans (async/merge (map async-fn seq))]
    (async/go
      (while (when-let [v (async/<! chans)]
               (prn v)
               v)))))

When I try async-fn-timeout with a large-ish seq:

(merge-and-print-results (range 20) async-fn-timeout)

For both Clojure and ClojureScript I get the result I expect, as in, results start getting printed pretty much immediately, with the expected delays.

However, when I try async-fn-exec with the same seq:

(merge-and-print-results (range 20) async-fn-exec)

For ClojureScript, I get the result I expect, as in results start getting printed pretty much immediately, with the expected delays. However for Clojure even though the sh processes are executed concurrently (subject to the size of the core.async thread pool), the results appear to be initially delayed, then mostly printed all at once! I can make this difference more obvious by increasing the size of the seq e.g. (range 40)

Since the results for async-fn-timeout are as expected on both Clojure and ClojureScript, the finger is pointed at the differences between the Clojure and ClojureScript implementation for exec..

But I don't know why this difference would cause this issue?

Notes:

  • These observations were made in WSL on Windows 10
  • The source code for async-merge-example.exec is below
  • In exec, the implementation differs for Clojure and ClojureScript due to differences between Clojure/Java and ClojureScript/NodeJS.
(ns async-merge-example.exec
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))

; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c

; clj implementation based on https://stackguides.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure

#?(:cljs (def spawn (.-spawn (js/require "child_process"))))

#?(:cljs
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
       (.on (.-stdout p) "data"  #(async/put! c [:out  (str %)]))
       (.on (.-stderr p) "data"  #(async/put! c [:err  (str %)]))
       (.on p            "close" #(async/put! c [:exit (str %)]))
       c)))

#?(:clj
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan)]
       (async/go
         (let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
               process (.start builder)]
           (with-open [reader (clojure.java.io/reader (.getInputStream process))
                       err-reader (clojure.java.io/reader (.getErrorStream process))]
             (loop []
               (let [line (.readLine ^java.io.BufferedReader reader)
                     err (.readLine ^java.io.BufferedReader err-reader)]
                 (if (or line err)
                   (do (when line (async/>! c [:out line]))
                       (when err (async/>! c [:err err]))
                       (recur))
                   (do
                     (.waitFor process)
                     (async/>! c [:exit (.exitValue process)]))))))))
       c)))

(defn exec
  "executes cmd with args. returns a channel immediately which
   will eventually receive a result map of 
   {:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
  [cmd & args]
  (let [c (exec-chan cmd args)]
    (async/go (loop [output (async/<! c) result {}]
                (if (= :exit (first output))
                  (assoc result :exit (second output))
                  (recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))
1

1 Answers

2
votes

Your Clojure implementation uses blocking IO in a single thread. You are first reading from stdout and then stderr in a loop. Both do a blocking readLine so they will only return once they actually finished reading a line. So unless your process creates the same amount of output to stdout and stderr one stream will end up blocking the other one.

Once the process is finished the readLine will no longer block and just return nil once the buffer is empty. So the loop just finishes reading the buffered output and then finally completes explaining the "all at once" messages.

You'll probably want to start a second thread that deals reading from stderr.

node does not do blocking IO so everything happens async by default and one stream doesn't block the other.