Making a multi-threaded pipeline

Recently, I have come across quite often the optimization of the loading and processing of information. From time to time I see solutions that are not the most optimal, and even harmful to performance, and from middle and senior level developers. Therefore, I would like to describe in more detail the general approach to writing correct and fast multithreaded architectures with the maximum utilization of resources, but at the same time with the minimum possible number of threads in each case.

What can actually be accelerated by multithreading?

First of all, you need to generally understand whether it is possible to accelerate using Thread Pool that particular piece of code or system with which you are working.

The answer is “no” if:

  • The process is already running microseconds / milliseconds and is running faster than required by requirements / terms of reference

  • Utilization of the main resource on which the algorithm relies has already reached constant 100%

Now in more detail on each item “no”.

The algorithm already fits into the requirements

By itself, the use of Thread Pools leads to an increase in memory and resource consumption, so if your algorithm already fits into the SLA, then it makes no sense to complicate everything. Adding threads by itself makes it harder to understand the code, and also immediately adds a bunch of non-obvious potential problems and bugs. The principles of KISS (Keep It Simple, Stupid) and YAGNI (You aren’t Gonna Need It) are applied more than ever.

Utilization of one of the resources is already constant 100%

What resources are we talking about? First of all – CPU, Disk IO, Network. These are exactly the resources that can be clogged by increasing the number of threads in your process. Thus, if one of the resources has already been utilized to the maximum, then we will not be able to speed up anything. Another thing is that modern hardware is difficult to score 100% in one thread. If this happens, then you should first pay attention to how you use them: it may be worth considering, for example, the possibility of replacing random access to the disk with a sequential one. Next, we will talk about a system with a CPU with 8 cores, for example. So the question. Why are we talking about any other resources if the threads themselves run on cores? The fact is that when working with all other resources, except for the CPU, the threads themselves are very often in a state of waiting for new data, whether it is reading or saving data to disk or downloading something over the network. Thus, the operating system’s task manager has enough CPU time to run hundreds of threads on our 8 cores.

Now let’s go back to resources and see an example. Suppose our channel is 1 Gbps, and we are parsing data from remote servers, the average speed of which is 10 Mbps.

Single thread solution

// single thread

while (isNextDownloadTaskAvailable()) {

  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());

  Data data = downloadProcessor.getData();

  process(data);

}

To fill such a channel to the maximum, you will need to launch 100 streams. 100 * 10 Mbps = 1 Gbps.

100 thread solution

//100 threads

private static final ExecutorService executorService = Executors.fixedThreadPool(100);

while (isNextDownloadTaskAvailable()) {

  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());

  executorService.submit(() -> {

    Data data = downloadProcessor.getData();

    process(data);

  }

}

The channel was hammered. Mission Complete. Now let’s take a closer look. The solution has disadvantages.

In general, any task requires different resources at different stages. First, you need to load something from somewhere (load the network or disk), then process it (which is usually done on the CPU), and then save (again, the network or disk).

For example, parsing download results takes 1 second of CPU time. In our case, the following will happen: in the first part of the program, the entire executorService will be clogged with a certain number of download tasks. If they all take approximately the same time to work, then after a certain period of time we will get 100 threads, in each of which the algorithm will reach the execution of the process. Total: 100 threads that want 1 second of CPU time, these 100 tasks will be executed on 100 threads after the data is loaded, 100/8 ~ = 13 seconds. And all these 13 seconds the network will be idle, since all the working threads will be engaged in processing the downloaded data. In a more complex case, the tasks themselves will require different amounts of resources. Some tasks can be loaded for 100 seconds and processed in 10 ms, the other part of tasks can be loaded in 10 ms and require 100 seconds of kernel work.

Of course, in any case, the performance gain over a conventional single-threaded application will be significant, but resources will be clogged unevenly and, on average, not by 100%.

What can be done? Let’s split our task into 2 subtasks that utilize different resources, and add another thread pool that will separately utilize the CPU, without interfering with the threads that are loading data. For 8 cores, we need exactly 8 CPU-heavy threads to utilize them by 100%.

Conveyor 100 + 8 threads

//100+8 threads

private static final ExecutorService downloadExecutorService = Executors.fixedThreadPool(100);

private static final ExecutorService processExecutorService = Executors.fixedThreadPool(8);

List<Future<Data>> futureDataResults = new ArrayList<>();

while (isNextDownloadTaskAvailable()) {

  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());

  futureDataResults.add(downloadExecutorService.submit(() -> downloadProcessor.getData()));

}

for (Future<Data> futureData : futureDataResults) {

  processExecutorService.submit(() -> process(futureData.get()));

}

It looks better, although still not perfect. This solution has one main drawback (although sometimes it may have an advantage): we cannot start processing the result of the third load until we run the result of the second, and if the second load takes an hour to load, then the second pool will be idle for a very long time without work. From the pros: a simple, almost sequential algorithm that does not require special knowledge of how to organize multi-threaded work and is very easy to read (again, almost sequential). And it will be even better to read if, for example, you need to load data from different places, and then process them together. Then it will be possible to go over the two arrays and take the elements at the same index. There are several options here to do this at least so that the blood does not go out of the eyes. Using queues, CompletableFuture, or using the Observer pattern, when some tasks will notify others that the results can be taken for processing. In any case, these are all subspecies of callbacks that we can give to our task so that it executes the piece of code passed to it, to which it will transfer the execution result. This piece of code just needs to add the next task stage to the next Thread Pool.

Conveyor 100 + 8 threads with callbacks

while (isNextDownloadTaskAvailable()) {

   DownloadProcessor downloadProcessor = downloadService.createProcessor();

   DownloadTask downloadTask = new DownloadTask(downloadTask, data -> {

       CalculateProcessor calculateProcessor = calculateService.createProcessor(data);

       CalculateTask calculateTask = new CalculateTask(calculateProcessor, results::add);

       calculateExecutorService.submit(new CalculateTaskCallbackProxy(calculateTask));

   });

   downloadExecutorService.submit(downloadTaskCallbackProxy);

}

Pipeline 100 + 8 threads on CompletableFuture

while (isNextDownloadTaskAvailable()) {

   DownloadProcessor downloadProcessor = downloadService.createProcessor();

   DownloadTask downloadTask = new DownloadTask(downloadProcessor);

   CompletableFuture<DownloadData> downloadResultFuture = CompletableFuture

           .supplyAsync(downloadTask::call, downloadExecutorService);

   CompletableFuture<Long> calcResultFuture = downloadResultFuture

           .thenApplyAsync(data -> {

               CalculateProcessor calculateProcessor = calculateService.createProcessor(data);

               CalculateTask calculateTask = new CalculateTask(calculateProcessor);

               return calculateTask.call();

           }, calculateExecutorService);

}

All runnable examples to play with are available in the repository

I periodically write down any such notes. in my telegram channel

https://t.me/developers_mind

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *