Create Terabyte Low Latency Queues

Queues are often fundamental components in software design patterns.

But what if there are millions of messages every second, and multiprocessor consumers need to be able to read a complete log of all messages?

Java can only store a limited amount of information until the heap becomes the limiting factor, causing garbage collection to have a big impact, potentially preventing us from meeting our target SLAs (Service Level Agreements) or even stopping the JVM for seconds or even minutes.

This article talks about how to create huge persistent queues while maintaining predictable and stable low latency using Chronicle Queue is open source.

Application

In this article, the task is to maintain a queue of objects from market data streams (for example, the latest price of a security traded on an exchange). Other business areas could be chosen, such as touch input from IoT devices or reading traffic accident information in the automotive industry could also be chosen. The principle is the same.

First, a class containing market data is defined:

public class MarketData extends SelfDescribingMarshallable {
    int securityId;
    long time;
    float last;
    float high;
    float low;

    // Геттеры и сеттеры не показаны для краткости.
}

Note. In real life scenarios, you need to be very careful when using floats and doubles to store monetary values, as this can lead to rounding problems. [Bloch18, Item 60]. However, in this introductory article, I want to keep it simple.

There is also a small utility function MarketDataUtil::create which, when called, will create and return a new random MarketData object:

static MarketData create() {
    MarketData marketData = new MarketData();
    int id = ThreadLocalRandom.current().nextInt(1000);
    marketData.setSecurityId(id);
    float nextFloat = ThreadLocalRandom.current().nextFloat();
    float last = 20 + 100 * nextFloat;

    marketData.setLast(last);
    marketData.setHigh(last * 1.1f);
    marketData.setLow(last * 0.9f);
    marketData.setTime(System.currentTimeMillis());

    return marketData;
}

The challenge now is to create a queue that is durable, concurrent, low latency, accessible from multiple processes, and capable of storing billions of objects.

Naive approach

Armed with these classes, a naive approach to using ConcurrentLinkedQueue can be explored:

public static void main(String[] args) {
    final Queue queue = new ConcurrentLinkedQueue();
    for (long i = 0; i < 1e9; i++) {
        queue.add(MarketDataUtil.create());
    }
}

This will fail for several reasons:

  1. ConcurrentLinkedQueue will create a wrapper node for each element added to the queue. This will effectively double the number of objects created.

  2. Objects are allocated on the Java heap, which leads to heap exhaustion and garbage collection issues. On my machine, this caused my entire JVM to become unresponsive, and the only way out was to force-quit it with “kill -9”.

  3. The queue cannot be read from other processes (i.e. other JVMs).

  4. As soon as the JVM terminates, the contents of the queue are lost. Therefore, the queue is not durable.

Looking at various other standard Java classes, one can conclude that there is no support for large persistent queues.

Using Chronicle Queue

Chronicle Queue is an open source library designed with the requirements outlined above in mind. Here is one way to set it up and use it:

public static void main(String[] args) {
    final MarketData marketData = new MarketData();
    final ChronicleQueue q = ChronicleQueue
            .single("market-data");
    final ExcerptAppender appender = q.acquireAppender();

    for (long i = 0; i < 1e9; i++) {
        try (final DocumentContext document =
                     appender.acquireWritingDocument(false)) {
             document
                    .wire()
                    .bytes()
                    .writeObject(MarketData.class, 
                            MarketDataUtil.recycle(marketData));
        }
    }
}

Using a 2019 MacBook Pro with a 2.3GHz 8-core Intel Core i9 processor, over 3,000,000 messages per second could be inserted using just one thread.

The queue is saved via a memory-mapped file in the specified “market-data” directory. One would expect the MarketData object to be 4 (int securityId) + 8 (long time) + 4*3 (float last, high and low) = at least 24 bytes.

In the example above, 1 billion objects were added, resulting in the matched file taking up 30,148,657,152 bytes, which equates to approximately 30 bytes per message. In my opinion, it is really very effective.

As you can see, the same instance MarketData can be reused over and over again because the Chronicle Queue aligns the contents of the current object in a memory-mapped file, allowing the object to be reused. This further reduces the memory load. Here’s how the reuse method works:

static MarketData recycle(MarketData marketData) {
    final int id = ThreadLocalRandom.current().nextInt(1000);
    marketData.setSecurityId(id);
    final float nextFloat = ThreadLocalRandom.current().nextFloat();
    final float last = 20 + 100 * nextFloat;

    marketData.setLast(last);
    marketData.setHigh(last * 1.1f);
    marketData.setLow(last * 0.9f);
    marketData.setTime(System.currentTimeMillis());

    return marketData;
}

Reading from Chronicle Queue

Reading from the Chronicle Queue is easy. Continuing the example above, the following shows how the first two MarketData objects can be read from the queue:

public static void main(String[] args) {
    final ChronicleQueue q = ChronicleQueue
            .single("market-data");
    final ExcerptTailer tailer = q.createTailer();

    for (long i = 0; i < 2; i++) {
        try (final DocumentContext document =
                     tailer.readingDocument()) {
            MarketData marketData = document
                    .wire()
                    .bytes()
                    .readObject(MarketData.class);
            System.out.println(marketData);
        }
    }
}

This can lead to the following result:

!software.chronicle.sandbox.queuedemo.MarketData {
  securityId: 202,
  time: 1634646488837,
  last: 45.8673,
  high: 50.454,
  low: 41.2806
}

!software.chronicle.sandbox.queuedemo.MarketData {
  securityId: 117,
  time: 1634646488842,
  last: 34.7567,
  high: 38.2323,
  low: 31.281
}

Possibilities are provided to efficiently search for the position of the shank, for example, to the end of the queue or to a specific index.

What’s next?

There are many other features that are beyond the scope of this article.

For example, queue files can be set to scroll through at specific intervals (such as every day, hour, or minute), effectively creating a decomposition of information so that old data can be cleaned up over time.

It also provides the ability to isolate CPUs and bind Java threads to those isolated CPUs, greatly reducing application jitter.

Finally, there is an enterprise version with queue replication between server clusters, which paves the way for high availability and improved performance in distributed architectures.

The enterprise version also includes many other features such as encryption, time zone switching, and asynchronous additions.

Resources

Open-source Chronicle Queue

Chronicle Queue Enterprise

Similar Posts

Leave a Reply