Cat Concurrency Basics with Ref and Deferred

Concurrent access and referential transparency

For prospective students on the course “Scala developer»Prepared a translation of the material.

We also invite you to a webinar on the topic “Effects in Scala”… In the lesson, we will consider the concept of effect and the complexity that may arise if they are present. We will also introduce the concept of a functional effect, consider its properties and implement our own small functional effect. Join us.


* Concurrency – concurrency, allowing the simultaneous execution of several computational processes.

Ref and Deferred are the basic building blocks in FP, used in parallel, in a concurrent manner. Especially when used with a tagless final abstraction, these two blocks, when constructing business logic, can give us both: concurrent access and referential transparencyand we can use them to build more advanced structures like counters and state machines.

Before we dive into Ref and Deferred, it is useful for us to know what concurrency in Cats is built on Java AtomicReference, and here we will begin our journey.

Atomic Reference

AtomicReference – this is one of the elements of the package java.util.concurrent.atomic… IN Oracle docs we can read that java.util.concurrent.atomic – this is:

A small toolbox of classes that support thread-safe “blockless” programming with single variables. In fact, the classes in this package extend the concept volatile values, fields and array elements to those that also provide a conditional operation atomic updates …

Instances of classes AtomicBoolean, AtomicInteger, AtomicLong, and AtomicReference provide access and update from single variables to the corresponding type (function block).

AtomicReference with us since Java 1.5 and is used to get better performance than synchronization (although this is not always the case).

When you have to share some data between threads, you must protect access to that piece of data. The simplest example would be increasing a certain amount int: i = i + 1… Our example actually consists of 3 operations, first we read the value i then add 1 to this value, and at the end we again assign the calculated value i … With regard to multi-threaded applications, we may face a situation where each thread will perform these 3 steps between the steps of another thread, and the final value i cannot be predicted.

Usually a word pops up in your head synchronised or class mechanism lockbut with atomic.* you no longer need to worry about explicit synchronization, and you can switch to the provided atomic utility types, where one-step verification is automatically enabled.

Let’s take for example AtomicInteger.incrementAndGet:

/**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

Using the operation compareAndSet we either update our data or fail, but never make the thread wait. Thus, if the operation compareAndSet in incrementAndGet fails, we just try to repeat the whole operation again, retrieving the current value of our data using the function get() at the beginning. On the other hand, when using synchronized mechanisms, there is no limit to the number of statements you want to “execute” while blocking, but this block will never fail and can cause the calling thread to wait, providing the ability to block or degrade performance.

Now that we know certain basics, let’s move on to our first concurrency mega-star.

Ref

Ref in Cats is very similar to the Java atomic reference mentioned above. The main differences are that Ref used with tagless final abstraction F … It always contains a value, and the value contained in Ref – type A, is always immutable.

abstract class Ref[F[_], A] {
  def get: F[A]
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]
  // ... and more
}

Ref[F[_], A] Is a functional mutable link:

  • Concurrent (competitive)

  • Lock free (“no blocks”)

  • Always contains a value

It is created by providing an initial value, and each operation is done in
F, eg, cats.effect.IO

If we take a close look at the companion object for Cats Ref, we will see that our F must meet a certain requirement, namely to be Sync

def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))

The above method is just an example of many operations available on our Ref; it is used to build Ref with the original value.

Sync gives us the ability to pause any side effects using the method
delay for each operation on Ref

Ref – a fairly simple construction, we can focus mainly on it get, set and of to understand how it works.

Method get and set

Let’s say we have an object (for this blog we will call it Shared) that needs to be updated with several threads, and we use our methods for this get and set , creating a utility method that will help us further:

def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
	for {
		sh <- trace.get()
		_ <- trace.set(Shared(sh, msg))
	} yield ()
}

Our Shared an object can be constructed by using its previous state and new value to create a new instance – Sharedwhich can actually be anything we want – a simple list, a map, or whatever we want to securely access at the same time.

I just created Shared(prev: Shared, msg: String) for this article.

In our example above F has been replaced with a specific IO from Cats Effect, but keep in mind that Ref is polymorphic in F and can be used with other libraries.

Through monadic (monadic) IO we apply the function flatMap at each step and set the value stored in our Ref to the desired value – or … wait, maybe we don’t.

With this approach, when modifyShared will be called at the same timeand we might lose updates! This happens because we may face a situation where, for example, two threads can read a value using get and each of them will perform set at the same time. Methods get and set are not called atomically together.

Atomic (atomic) update

We can of course improve the above example and use other available methods from Ref… For joint implementation get and set we can use update

def update(f: A => A): F[Unit] 

This will solve our problem with updating the value, however update has its drawbacks. If we want to refer to a variable immediately after the update, similar to how we used get and set , we can end up with outdated data, for example, our Ref will contain a link to Int:

for {
		_ <- someRef.update(_ + 1)
		curr <- someRef.get
		_ <- IO { println(s"current value is $curr")}
	} yield ()

Will save us modify

We can improve the above situation a bit by using modify which will do the same as update , but nonetheless, modify will return the updated value to us for future reference.

def modify[B](f: A => (A, B)): F[B] = {
      @tailrec
      def spin: B = {
        val c = ar.get
        val (u, b) = f(c)
        if (!ar.compareAndSet(c, u)) spin
        else b
      }
      F.delay(spin)
    }

As you can see, this is practically the same implementation as in the example with AtomicInteger.incrementAndGetwhich I showed at the beginning, but only in Scala. We can clearly see that in order to do our job Ref also works based on AtomicReference

Ref restrictions

You’ve probably already noticed that on failure to update the value, the function passed to update/ modify, should be run nondeterministically and may need to be run multiple times. The good news is that this solution in general turns out to be much faster than the standard locking and synchronization mechanism, and much safer since this solution cannot be locked.

Once we know how simple Ref, we can move on to another Cats Concurrent class: Deferred (Delayed call).

Deferred

Unlike Ref, Deferred:

  • created “empty” (deferred execution result)

  • can be done once

  • and once installed, it cannot be changed or made “empty” again.

These properties make Deferred simple and at the same time quite interesting.

abstract class Deferred[F[_], A] {
  def get: F[A]
  def complete(a: A): F[Unit]
}

Deferred used for explicit functional synchronization. When we call get in “empty” Deferred we set the lock until the value becomes available again. In accordance with documentation from the class itself:

  • The blocking is only semantic, no real threads are blocked by the implementation

The same challenge get “Nonempty” Deferred will return the stored value immediately.

Another method is complete – will fill in the value if the instance is empty and when calling “non-empty” Deferred will crash (failed IO attempt).

It is important to note here that Deferred requires that F It was Concurrent, which means it can be undone.

A good example of use Deferred is a situation where one part of your application has to wait for another.

The example below is taken from Fabio Labella’s excellent performance at Scala Italy 2019 – Composable Concurrency with Ref + Deferred available at Vimeo

def consumer(done: Deferred[IO, Unit]) = for {
	c <- Consumer.setup
	_ <- done.complete(())
	msg <- c.read
	_ <- IO(println(s"Received $msg"))
} yield ()

def producer(done: Deferred[IO, Unit]) = for {
	p <- Producer.setup()
	_ <- done.get
	msg = "Msg A"
	_ <- p.write(msg)
	_ <- IO(println(s"Sent $msg"))
} yield ()

def prog = for {
  d <- Deferred[IO, Unit]
  _ <- consumer(d).start
  _ <- producer(d).start
} yield ()

In the above example, we have a producer and a consumer, and we want the producer to wait for the consumer setup to finish before writing messages, otherwise whatever we write to the producer will be lost. … To overcome this problem, we can use a shared instance Deferred and block get until the instance is filled done Deferred from the consumer side (the value in this case is simple Unit () ).

Of course, the above solution was not without problems when consumer setup never stopped, we were stuck waiting and producer couldn’t send messages. To overcome this, we can use a timeout with get and also use Either[Throwable, Unit] or some other construction instead of a simple Unit inside our object Deferred

Deferred quite simple, but combined with Ref it can be used to build more complex data structures such as semaphores.

For more details, I recommend that you check out the Cats documentation itself, where you can learn more about Cats concurrency and the data structure it provides.


Learn more about the course “Scala developer”.

Watch an open webinar on Effects in Scala.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *