Cat Concurrency Basics with Ref and Deferred
Concurrent access and referential transparency
For prospective students on the course “Scala developer»Prepared a translation of the material.
We also invite you to a webinar on the topic “Effects in Scala”… In the lesson, we will consider the concept of effect and the complexity that may arise if they are present. We will also introduce the concept of a functional effect, consider its properties and implement our own small functional effect. Join us.

* Concurrency – concurrency, allowing the simultaneous execution of several computational processes.
Ref and Deferred are the basic building blocks in FP, used in parallel, in a concurrent manner. Especially when used with a tagless final abstraction, these two blocks, when constructing business logic, can give us both: concurrent access and referential transparencyand we can use them to build more advanced structures like counters and state machines.
Before we dive into Ref and Deferred, it is useful for us to know what concurrency in Cats is built on Java AtomicReference
, and here we will begin our journey.
Atomic Reference
AtomicReference
– this is one of the elements of the package java.util.concurrent.atomic
… IN Oracle docs we can read that java.util.concurrent.atomic
– this is:
A small toolbox of classes that support thread-safe “blockless” programming with single variables. In fact, the classes in this package extend the concept
volatile
values, fields and array elements to those that also provide a conditional operationatomic
updates …Instances of classes AtomicBoolean, AtomicInteger, AtomicLong, and AtomicReference provide access and update from single variables to the corresponding type (function block).
AtomicReference
with us since Java 1.5 and is used to get better performance than synchronization (although this is not always the case).
When you have to share some data between threads, you must protect access to that piece of data. The simplest example would be increasing a certain amount int: i = i + 1
… Our example actually consists of 3 operations, first we read the value i
then add 1
to this value, and at the end we again assign the calculated value i
… With regard to multi-threaded applications, we may face a situation where each thread will perform these 3 steps between the steps of another thread, and the final value i
cannot be predicted.
Usually a word pops up in your head synchronised
or class mechanism lock
but with atomic.*
you no longer need to worry about explicit synchronization, and you can switch to the provided atomic utility types, where one-step verification is automatically enabled.
Let’s take for example AtomicInteger.incrementAndGet
:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
Using the operation compareAndSet
we either update our data or fail, but never make the thread wait. Thus, if the operation compareAndSet
in incrementAndGet
fails, we just try to repeat the whole operation again, retrieving the current value of our data using the function get()
at the beginning. On the other hand, when using synchronized mechanisms, there is no limit to the number of statements you want to “execute” while blocking, but this block will never fail and can cause the calling thread to wait, providing the ability to block or degrade performance.
Now that we know certain basics, let’s move on to our first concurrency mega-star.
Ref
Ref
in Cats is very similar to the Java atomic reference mentioned above. The main differences are that Ref
used with tagless final abstraction F
… It always contains a value, and the value contained in Ref
– type A
, is always immutable.
abstract class Ref[F[_], A] {
def get: F[A]
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
// ... and more
}
Ref[F[_], A]
Is a functional mutable link:
Concurrent (competitive)
Lock free (“no blocks”)
Always contains a value
It is created by providing an initial value, and each operation is done in F
, eg, cats.effect.IO
…
If we take a close look at the companion object for Cats Ref
, we will see that our F
must meet a certain requirement, namely to be Sync
…
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
The above method is just an example of many operations available on our Ref
; it is used to build Ref
with the original value.
Sync gives us the ability to pause any side effects using the method delay
for each operation on Ref
…
Ref
– a fairly simple construction, we can focus mainly on it get
, set
and of
to understand how it works.
Method get
and set
Let’s say we have an object (for this blog we will call it Shared) that needs to be updated with several threads, and we use our methods for this get
and set
, creating a utility method that will help us further:
def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
for {
sh <- trace.get()
_ <- trace.set(Shared(sh, msg))
} yield ()
}
Our Shared
an object can be constructed by using its previous state and new value to create a new instance – Shared
which can actually be anything we want – a simple list, a map, or whatever we want to securely access at the same time.
I just created Shared(prev: Shared, msg: String)
for this article.
In our example above F
has been replaced with a specific IO from Cats Effect, but keep in mind that Ref
is polymorphic in F and can be used with other libraries.
Through monadic
(monadic) IO we apply the function flatMap
at each step and set the value stored in our Ref
to the desired value – or … wait, maybe we don’t.
With this approach, when modifyShared
will be called at the same timeand we might lose updates! This happens because we may face a situation where, for example, two threads can read a value using get
and each of them will perform set
at the same time. Methods get
and set
are not called atomically together.
Atomic (atomic) update
We can of course improve the above example and use other available methods from Ref
… For joint implementation get
and set
we can use update
…
def update(f: A => A): F[Unit]
This will solve our problem with updating the value, however update
has its drawbacks. If we want to refer to a variable immediately after the update, similar to how we used get
and set
, we can end up with outdated data, for example, our Ref
will contain a link to Int
:
for {
_ <- someRef.update(_ + 1)
curr <- someRef.get
_ <- IO { println(s"current value is $curr")}
} yield ()
Will save us modify
We can improve the above situation a bit by using modify
which will do the same as update
, but nonetheless, modify
will return the updated value to us for future reference.
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
As you can see, this is practically the same implementation as in the example with AtomicInteger.incrementAndGet
which I showed at the beginning, but only in Scala. We can clearly see that in order to do our job Ref
also works based on AtomicReference
…
Ref restrictions
You’ve probably already noticed that on failure to update the value, the function passed to update
/ modify
, should be run nondeterministically and may need to be run multiple times. The good news is that this solution in general turns out to be much faster than the standard locking and synchronization mechanism, and much safer since this solution cannot be locked.
Once we know how simple Ref
, we can move on to another Cats Concurrent class: Deferred
(Delayed call).
Deferred
Unlike Ref
, Deferred
:
created “empty” (deferred execution result)
can be done once
and once installed, it cannot be changed or made “empty” again.
These properties make Deferred
simple and at the same time quite interesting.
abstract class Deferred[F[_], A] {
def get: F[A]
def complete(a: A): F[Unit]
}
Deferred
used for explicit functional synchronization. When we call get
in “empty” Deferred
we set the lock until the value becomes available again. In accordance with documentation from the class itself:
The blocking is only semantic, no real threads are blocked by the implementation
The same challenge get
“Nonempty” Deferred
will return the stored value immediately.
Another method is complete
– will fill in the value if the instance is empty and when calling “non-empty” Deferred
will crash (failed IO attempt).
It is important to note here that Deferred
requires that F
It was Concurrent
, which means it can be undone.
A good example of use Deferred
is a situation where one part of your application has to wait for another.
The example below is taken from Fabio Labella’s excellent performance at Scala Italy 2019 – Composable Concurrency with Ref + Deferred available at Vimeo
def consumer(done: Deferred[IO, Unit]) = for {
c <- Consumer.setup
_ <- done.complete(())
msg <- c.read
_ <- IO(println(s"Received $msg"))
} yield ()
def producer(done: Deferred[IO, Unit]) = for {
p <- Producer.setup()
_ <- done.get
msg = "Msg A"
_ <- p.write(msg)
_ <- IO(println(s"Sent $msg"))
} yield ()
def prog = for {
d <- Deferred[IO, Unit]
_ <- consumer(d).start
_ <- producer(d).start
} yield ()
In the above example, we have a producer and a consumer, and we want the producer to wait for the consumer setup to finish before writing messages, otherwise whatever we write to the producer will be lost. … To overcome this problem, we can use a shared instance Deferred
and block get
until the instance is filled done
Deferred
from the consumer side (the value in this case is simple Unit ()
).
Of course, the above solution was not without problems when consumer setup
never stopped, we were stuck waiting and producer
couldn’t send messages. To overcome this, we can use a timeout with get
and also use Either[Throwable, Unit]
or some other construction instead of a simple Unit
inside our object Deferred
…
Deferred
quite simple, but combined with Ref
it can be used to build more complex data structures such as semaphores.
For more details, I recommend that you check out the Cats documentation itself, where you can learn more about Cats concurrency and the data structure it provides.
Learn more about the course “Scala developer”.
Watch an open webinar on Effects in Scala.