Monthly Archives: December 2014

Quick Overview: State, Transactions and Concurrency Primitives in Clojure

“What was the difference between delays and futures again…?” – If you’ve ever found yourself asking that, you are probably right here! In this blog post I will give a short overview over the number of language constructs that Clojure offers for dealing with mutable state and concurrent behavior. This is not supposed to be exhaustive at all, my main goal was to keep it short and concise.

Vars

To get a complete overview let’s start with vars:

(def somevar "some content")

Vars offer mutable state and are bound to a namespace. You can rebind vars locally on a per-thread basis. I wrote about this in detail here. Changes to a vars’ root binding are not a good idea when you are running multiple threads. Also you probably already know about vars, so let’s skip ahead to the more interesting parts…

Refs and Transactions

Refs are encapsulated data packages whose access is put under restrictions. Every change to a ref has to be made inside a transaction.

Define a ref:

> (def bowloffruits (ref ["apple"]))

To obtain the content of a ref, one has to dereference it. Dereferencing will always return a snapshot of their state at the time of dereferencing:

> @bowloffruits ;; or use the long version: (deref bowloffruits)
["apple"]

Now to modify the ref we need to run a transaction. We do this by running the so called transformation function dosync:

> (def emptybowl (ref []))
> (dosync (alter bowloffruits conj "banana"))
["apple" "banana"]
> (dosync 
    (alter emptybowl conj (first @bowloffruits)) 
    (ref-set bowloffruits (vec (rest @bowloffruits))))
> @emptybowl
["apple"]
> @bowloffruits
["banana"]

Using transactions to modify the refs will ensure that either all changes within the dosync block are executed or none. Further it makes sure that in case of conflicting transactions at the same time, one of them will be retried at a later point.

Atoms

Atoms are encapsulated pieces of data just like refs, except that you don’t need a transaction to modify them. You can use an atom if you want thread security and atomic state change for a single entity, but you don’t need coordination with other activities. If two conflicting changes are made at the same time to an atom, the first one to complete with a return value will succeed and the other one will be retried with that latest value.

> (def food (atom "apples"))
> @food
"apples"
> (reset! food "carrots")
> (swap! food str " and peas")
> @food
"Carrots and peas"

Agents

Agents are encapsulated pieces of data which can be modified by a function that is sent to it. Like Atoms, the changes made to an agent’s state are always uncoordinated and happen independently from other agents (no transactions). In difference to atoms though, the modifications will always be executed in another thread.

> (def counter (agent 0))
> (send counter inc)
> @counter
1

Apart from send, there is another function available to modify an agent’s state: send-off. The only difference between the two is that send works with a fixed-size thread pool while with send-off there are no restrictions to the number of concurrent threads. That means you shouldn’t run blocking operations with send, because that may also block other operations which then may need to wait for the blocking operation to be finished in order to start. Before exiting a program, you have to call the function shutdown-agents to let the VM know that it’s safe to quit the agents.

Futures

A future takes one or several bodies of code, executes them in another thread and returns the value of the last body. They allow an asynchronous return to the thread they are called from without blocking it. When they are dereferenced though, the thread will block until the value is available.

> (def waitabit (future (Thread/sleep 5000) "That was worth waiting for!"))
> @waitabit
"That was worth waiting for!"

Delays

Suspends some body of code until the user demands for it. When the delay gets dereferenced for the first time, the code is executed once, the result will be saved and every future dereferencing will return that same result without executing the code again.

> (def delayed-slurp (delay (slurp "http://www.peterfessel.com")))
> @delayed-slurp
...

Promises

They have similar characteristics as delays and futures. When dereferenced, promises will block the current thread until they are fulfilled and have some data to deliver. The difference to the above constructs is that promises are not initialized with a body of code that will eventually deliver the data. Instead it is the user’s responsibility to deliver the data to the promise once it’s available:

> (def p (promise))
> (realized? p)
false
> (deliver p "keeping my promise")
> (realized? p)
true
@p
"keeping my promise"

(Note that the realized? function can also be used with delays and futures.)

Core.async channels

Clojure has some extra nice asynchronous features in the library core.async that you need to import separately if you want to use it. Core.async is mostly about channels. Channels have a similarity to promises: You put something in on one side and it comes out on the other. Yet, channels are far more sophisticated:

> (require '[clojure.core.async :refer :all])
> (def jackie (chan))
> (thread (println "Some" (<!! jackie) "are what I was waiting for.")) 
> (>!! jackie "carrots")
Some carrots are what I was waiting for

Here we define a channel. We start a thread and let it listen to the one end of the channel (<!! reads from the channel and waits until there is something to read). Once we put something on the channel via >!!, we get the output from the thread. The thread method here works just like a future with the difference that it returns a channel as a result.

This is cool, but not very efficient, since we have a thread running empty until there is something to read from the channel. That’s why core.async offers a more controlled way to do this with go blocks:

> (def jackie (chan 2))
> (go (loop [food (<! jackie)]
    (if food
      (do 
        (println "Some" food "is what I was waiting for.")
        (Thread/sleep 1000)
        (recur (<! jackie))))))
> (doseq [food ["carrots" "peas"]]
    (println "deliver" food)
    (>!! jackie food)
    (Thread/sleep 1000))
deliver carrots
Some carrots is what I was waiting for.
deliver peas
Some peas is what I was waiting for.

The go macro runs its body in a pool of threads, making sure to put execution on hold as long as nothing happens, so that no threads are blocked. Inside go blocks you use the methods <! and >! to read and write on a channel. When defining the channel in this case we make it a buffered channel via (chan 2), because in our code it may happen that two foods get delivered in the channel before they are taken out of it. So we create a channel with a fixed buffer size of 2.

This is only scratching the top of what is possible. For a more detailed yet easy to follow introduction on core.async and channels, I recommend this blog article.

Java Threads

… and then it’s still possible to use Java’s Threads, if you must :-)

> (.start (Thread.
    (fn []
      (dotimes [i 5]
        (println i)))))
0
1
2
3
4

Summary

We can divide the constructs from this post into two groups: first we have vars, refs, atoms and agents, they are entities holding a mutable state. We can modify them by passing a function (except for vars). Vars take a special position here, because usually we just use them as a container to bind the other entities to a namespace. For refs all changes happen in a coordinated way through transactions, while atoms do the same thing without coordination. In agents the modifications happen in another thread and are put in order through an unbounded queue. You can listen for changes in all of these entities (again, except for vars) via watches.

The rest of the constructs – futures, delays, promises and channels – we can use to manage the control flow of our code, defer the execution of a body of code or let it be executed in another thread. Futures execute a body of code in a separate thread, while delays just defer the execution of the code. Through promisessingle user can deliver a value once, that can then be seen by multiple consumers. Channels on the other hand allow for multiple users to deliver an endless amount of data through a bounded queue, but each data entity can only be taken by a single consumer.

It may take some time really understanding the principles of the here described primitives, but I hope this little overview can be of help for you to decide which primitives to use in which situation. Writing it certainly helped me understand the differences between all the described features.