Concurrency and Parallelism in Clojure

I was pleased to be speaking at the Clojure Exchange recently (at least it was recently when I started writing this blog post). I suggested to Bruce that a talk on concurrency and parallelism would be good because since most of the books were published reducers and core.async have appeared giving us even more options in Clojure and we have a new runtime in Clojurescript (I also ranted that our books get it a bit wrong).

The goals of the talk were:

  • Give people solid working definitions of concurrency and parallelism
  • Refresher on reference types (and that you don’t automatically get parallelism with them)
  • Show some easy ways to get parallelism in your programs
  • Demo a nice pattern serialising access to something non-threadsafe with an agent
  • Explain Reducers as a way of getting parallelism (and the idea reduce/fold itself is no good for it)
  • Introduce CSP via core.async as a way of programming concurrently (that sometimes runs in parallel)
  • Point people at people way smarter than me talking for a bit longer than I had about just one of the things I tried to

Slides are here, video here but I will expand on a few ideas here too

Concurrency is not parallelism

It has been said better by Rob Pike in this talk but the definition is:

Concurrency is the composition of independently executing ‘processes’

Parallelism is the simultaneous execution of (possibly related) computations

So:

Concurrency is about the way we structure programs

Parallelism is about the way they run

The nice thing about this definition is it’s then easy to find examples of concurrent but not parallel (running multiple threads on a single core) and parallel but not concurrent (instruction-level parallelism, data parallelism), and even when concurrency and parallelism are both present but to a different extent (running 50 threads multiplexed across 4 cores).

Wikipedia

Is correct

Joe Armstrong in Programming Erlang

C2 Wiki

I like this one as it brings out the two main ways of doing concurrency, either by message passing with no shared state or by managing access to the shared state.

I have the three main books on Clojure so took a look at how they introduced the idea

Clojure Programming

Both definitions assume shared state, though it captures the idea that parallel means running at the same time and that to get the most from your hardware you want to avoid coordination overhead.

Joy Of Clojure

I don’t like 'roughly the same time’ here, and again seems to presume shared state.

Programming Clojure

I think that saying parallel programs execute concurrently is wrong (replace it with 'at the same time’ and I think it is right) and the first bullet misuses 'concurrent’ at least once.

This is not to say it is a massive deal and stops people understanding and using the features of Clojure the books go on to explain, just that until seeing Rob’s talk I don’t think I could have given a clear definition of what the terms mean.

Reference types

Again this was just a refresher and has probably been done better elsewhere.

All of the reference types hold a value and are updated by functions with the same signature

(<changer> reference f [args*])

All can be derefed with @reference or (deref reference)

The main difference between them if they are synchronous or asynchronous, coordinated or uncoordinated.

  • Synchronisation - Does the operation block?
  • Coordination - Can multiple operations be combined in a transaction?

Atoms (Synchronous,Uncoordinated)

swap!

(swap! atom f)
(swap! atom f x)
(swap! atom f x y & args)

compare-and-set!

(compare-and-set! atom oldval newval)

reset!

(reset! atom newval)

Refs (Synchronous,Coordinated)

(Must be inside a dosync transaction)

ref-set

(ref-set ref val)

alter

(alter ref fun & args)

commute

(commute ref fun & args)

Agents (Asynchronous,Uncoordinated)

send (executed on a bounded thread pool)

(send a f & args)

send-off (executed on an unbounded thread pool)

(send-off a f & args)

Using agents to serialise access to non-threadsafe resources

If we start multiple threads and start writing to the console with println from them, very quickly you start to get overlapping writes.

(defn start-thread
  [fn]
  (.start
   (Thread. fn)))

(defn loop-print
  [n]
  (let [line (str n ":**********")]
    (println line)
    (Thread/sleep (rand 5))
    (recur n)))

(defn -main []
  (dotimes [n 50]
    (start-thread #(loop-print n))))

One way around it is to wrap the contested resource in an agent

(defn write [w content]
  (doto w
    (.write w content)
    (.write "\n")
    .flush))

(def console (agent  *out*))

(def log-file (agent (io/writer "LOG" :append true)))

(defn loop-print
  [n]
  (let [line (str n ":**********")
        sleep-time (rand 5)]
    (Thread/sleep sleep-time)
    (send-off console write (str n ":*********"))
    (if (= n 50)
      ;; We have a separate file log for the 50th thread
      (send-off log-file write (str "sleeping for" sleep-time)))
    (recur n)))

(defn -main []
  (dotimes [n 100]
    (start-thread #(loop-print n))))

Here the write function takes something writeable and a string, writes the string to it with a newline at the end and then (importantly) returns the object (this is the behaviour of doto). This allows it to be passed to an agent, this way the value of the agent is always the writer and as the agent serialises application of the functions passed into it the writer is only written by one function at a time.

The guidance for whether to send or send-off to an agent is if the function does any IO or not, send-off is for IO operations and is on an unbounded pool whereas send is for CPU bound operations and is on a fixed-size pool. Either way the message queue is unbounded and you might eventually suffer memory issues (this is one of the reasons core.aync and CSP are better as they provide back-pressure and make you think up front about buffering and queue size)

Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.RuntimeException: Agent is failed, needs restart
    at clojure.lang.Util.runtimeException(Util.java:223)
    at clojure.lang.Agent.dispatch(Agent.java:238)
    at clojure.core$send_via.doInvoke(core.clj:1915)
    at clojure.lang.RestFn.applyTo(RestFn.java:146)
    at clojure.core$apply.invoke(core.clj:623)
    at clojure.core$send_off.doInvoke(core.clj:1937)
    at clojure.lang.RestFn.invoke(RestFn.java:442)
    at clojure_exchange.agents$loop_print.invoke(agents.clj:24)
    at clojure_exchange.agents$_main$fn__34.invoke(agents.clj:31)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

Agents and STM

Thanks to Philip Potter for reminding me to mention that agents are integrated with STM, any dispatches made in a transaction are held until it commits, and are discarded if it is retried or aborted so you can use them to safely trigger side-effects from within transactions. See the docs for more.

Delays and futures

> (defn slow-inc [n]
   (Thread/sleep 1000)
   (inc n))

Delays

  • runs code on deref, blocks
> (def d (delay (slow-inc 5)))
;; Returns immediately

> @d
;; Will take a second as this is when 'slow-inc' will run
6

Futures

  • runs code in another thread (from a pool)
  • blocks on deref (till ready)
> (def f (future (slow-inc 5)))
;; Returns immediately as delay did but is already running slow-inc

> @f
;; Depends how quickly you can type
;; but if you took longer than a second it will be ready to return,
;; if not it will block till it is ready
6

So now we have a chance at some parallelism!

We can use future as a way to just kick off new threads, so we can use future where we had start-thread (and we gain a bounded thread pool to run them on)

pmap and friends

;; Make a sequence of futures
> (def futures (doall (map #(future (slow-inc %)) [1 2 3 4 5])))
;; Returns immediately (we have to use doall to force evaluation as map is lazy)

;; Now to deref them
> (map deref futures)
(2 3 4 5 6)
;; Again should return pretty rapidly but at most a second

pmap

Here is a version of a function called pmap that captures that idea

(defn pmap [f col]
  (let [futures (doall (map #(future (f %)) col))]
    (map deref futures)))

So we can do

> (pmap slow-inc [1 2 3 4 5])

and it will take 1s or thereabouts.

The actual pmap looks like this, notice the warning that f needs to be expensive enough to justify the overhead

(defn pmap
  "Like map, except f is applied in parallel. Semi-lazy in that the
  parallel computation stays ahead of the consumption, but doesn't
  realize the entire result unless required. Only useful for
  computationally intensive functions where the time of f dominates
  the coordination overhead."
  {:added "1.0"
   :static true}
  ([f coll]
   (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
         rets (map #(future (f %)) coll)
         step (fn step [[x & xs :as vs] fs]
                (lazy-seq
                 (if-let [s (seq fs)]
                   (cons (deref x) (step xs (rest s)))
                   (map deref vs))))]
     (step rets (drop n rets))))
  ([f coll & colls]
   (let [step (fn step [cs]
                (lazy-seq
                 (let [ss (map seq cs)]
                   (when (every? identity ss)
                     (cons (map first ss) (step (map rest ss)))))))]
     (pmap #(apply f %) (step (cons coll colls))))))

It also has a few friends

(defn pcalls
  "Executes the no-arg fns in parallel, returning a lazy sequence of
  their values"
  {:added "1.0"
   :static true}
  [& fns] (pmap #(%) fns))

> (pcalls function-1 function-2 ...)

(defmacro pvalues
  "Returns a lazy sequence of the values of the exprs, which are
  evaluated in parallel"
  {:added "1.0"
   :static true}
  [& exprs]
  `(pcalls ~@(map #(list `fn [] %) exprs)))

> (pvalues (expensive-calc-1) (expensive-calc-2))

Waiting for futures

You need to call shutdown-agents to exit when the agents/futures are done.

(taken from clojuredocs )

;; Note: If you leave out the call to (shutdown-agents), the program will on
;; most (all?) OS/JVM combinations "hang" for 1 minute before the process exits.
;; It is simply waiting for background threads, created by the future call, to
;; be shut down.  shutdown-agents will shut them down immediately, or
;; (System/exit <exit-status>) will exit immediately without waiting for them
;; to shut down.

;; This wait occurs even if you use futures indirectly through some other Clojure
;; functions that use them internally, such as pmap or clojure.java.shell/sh

Code

Just pushed the bits I was playing with up to github

Next time

I hope that was interesting, I am going to stop for now as it has already taken 2 months to write this. I have already described CSP and core.async here and here so won’t cover that again but I will try and describe reducers (and why reduce is considered harmful) but to be honest you would probably be better just looking at my references from the talk (especially the Guy Steel talk)