3
votes

I'm sort of a newbie to Clojure. I have multiple threads trying to write to an output stream and If I'm not mistaken sockets and their streams are not thread safe meaning bits can be mixed up if I write to them simultaneously. One of the main benefits of clojure is inbuilt concurrency handling of race conditions. How can I utilize this for my scenario?

I tried looking into atoms, refs and so on. I initially thought declaring the output stream as an atom would work but I'm not too sure, as it seems it avoids changing the atom state simultaneously (using swap!) however i think you can dereference an atom from multiple threads meaning multiple threads will deref the atom holding my output stream and write to it concurrently.

Any advise will be most helpful.

thanks in advance

(defn send-my-data [output data-bytes]
  (try 
    (.write output)
    (.flush output)
    (catch Exception excp
       (println (format "issue %s" (.printStackTrace excp))))

Now all my threads call this function anytime they want to write data to the output stream

3

3 Answers

5
votes

Agents are often considered the correct tool for this sort of task. They take a queue of tasks to run on their internal state and run them in the order they where received. They also play nicely with the rest of Clojure's STM. For instance messages sent to an agent form within a transaction are sent exactly once and only when the transaction commits.

user> (let [output-agent (agent "")] 
        (dotimes [x 10] 
          (send output-agent (fn [_] (println "hello" x)))))
nil
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

In this example the action to be taken is an anonymous function that ignores it's input and just prints something.

1
votes

You can use locking if you need to ensure that no other thread is using an object (and want to wait at that point in your code and not do anything in that particular thread until that object is unlocked so you can lock it).

user> (dotimes [i 10] (future (println \h \e \l \l \o)))
hhh h e
nil
 eh  le  l lo 
h e lh he  e ll h  el  ll  lo 
e  e l l oh
l l oo
l  ol

l lo o

 e

o
 l l o

user> (dotimes [i 10] (future (locking *out* (println \h \e \l \l \o))))
h
nil
 e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o

user> 
0
votes

I eventually implemented the agent method as I thought it was more idiomatic and for some other benefits of agents.

(let [wtr (agent (.getOutputStream mysocket) "agent.log")]
    (defn log [msg]
      (letfn [(write [out msg]
             (.write out msg)
             (.flush out)
                out)]
      (send-off wtr write msg)))
  (defn close []
        (send wtr #(.close %))))

Adapted from here

Remember to return the output stream as agents take on the returned value. A common mistake.

Thank you Arthur Ulfeldt