How we save hardware for ML computing

Hello! My name is Oleg Bugrimov, I lead development in the Data Science SWAT team at Avito. We do engineering for machine learning. One of the areas is optimization of production inference. Our task is for models to work quickly and not consume an insane amount of resources. So, we optimized to the point that we implemented tool which allows you to save 30% of iron. Below is a real GPU processor load graph:

As you can see, after moving to our solution, the load on the GPU processor decreased by a third, while, of course, the traffic to the service itself did not change. Next I will tell you how we achieved this.

Why python and not go

Although our main development language at Avito is golang, we write our ML services in python. Of course, we have ML solutions in go in our company, but it is faster and cheaper to roll out a model in python, right after it comes off the training conveyor. Then, provided that the model's work is useful, it can be transferred to go, but with our aqueduct there is no urgent need for this.

Theoretical part

Formulation of the problem

We write services in Python using asyncio. There were implementations with flask on threads, but after a series of studies, we came to the conclusion that asyncio is still better. When working with a model in a service using asyncio, there is one difficulty: if the ML model works for a significant time, say tens of milliseconds or more, then ioloop starts to block and the service starts to work noticeably slower. Only this problem can be solved, but we decided that we can still increase the throughput of models. In the animation, you can see how blocking occurs: the object waits until the model processes the previous object, becomes free and starts processing the next one. It is more difficult to depict ioloop blocking, so we will stop at the fact that the object simply waits until it is taken into work.

Animated image where the main process with the model is blocked

An animated picture where the main process with the model is blocked

Of course, there is the simplest and most obvious solution to get rid of such waiting – add more servers and roll out more of these models on them, and then requests will fall on free capacities and not wait. But, ML-models even when idle take up at least RAM, and at most also the processor and GPU memory. Therefore, although this solution is tempting, it is worth going for it after trying to somehow optimize consumption. And only then, with a clear conscience and a feeling that from the point of view of optimization, the maximum has already been done, you can buy more servers.

Transferring the model to a subprocess

We solve the ioloop blocking problem by moving the model's work to a child subprocess. Thus, the main process is busy accepting requests, performing io-bound operations, such as downloading an image, and passing this image to the child process, where the model performs inference on this data and returns the result back to the main process.

Animated picture where the first child subprocess appears

Animated picture where the first child subprocess appears

How to do this using an aqueduct can be seen below in the practice section.

Splitting calculations into stages

To move on, I’ll reveal a few details about how the ML model works. This is a set of algorithms that receive input data, perform manipulations with this data or calculations based on it, and return the result of the work. As a rule, the work of the model can be divided into several stages. The classic case consists of three stages: the data preparation stage, the calculation stage on the prepared data and the post-processing stage of the results obtained. For example, if the input is an image, then it must first be brought to the desired size, to the desired color palette and converted to the numpy format so that the model can work with this data. Next, for example, a neural network works with this converted data, it produces some kind of conclusion based on the data, this is the second stage. At the third stage, for example, the original picture is changed based on data received from the neural network, for example, a plate is installed to hide the license plate.

Stages of the model's work

Model operation stages

If you can break your model's work into stages, then you can optimize inference using aqueduct. To begin with, we simply take each stage into a separate subprocess. Thus, while the stage with the model is working, the data preparation stage has already received new data and is already processing it. At the same time, at the post-processing stage, after the model, work is done with the data that was previously processed by the model. As a result, your service can simultaneously process three requests, not one.

Animated picture with the simultaneous operation of three subprocesses

Animated image with three subprocesses running simultaneously

But, I think it’s obvious that the stages last for different times, and also, very often there are models that process several objects at once relatively quickly, instead of one, this is the so-called batching or processing a stack of objects.

Butching

Batching, i.e. processing several objects at once, is a common engineering practice that very often allows for acceleration. This is especially noticeable in ML-inference, acceleration occurs due to a decrease in the number of I/O operations and memory operations. Even more significant acceleration during batching is obtained if the model runs on a GPU. For example, your model will process one object in 10 ms, and it will process 10 such objects at the same time in 15 ms. It is clear that it is better to process them in a batch than to spend a total of 100 ms on sequential processing of each such object in turn. All that remains to be done is to implement the mechanics that will accumulate objects in batches and send the batch to the model at once. Aqueduct supports two batching modes out of the box. The first mode is with a timeout, also known as a timeout, and there is also a mode without a timeout, the so-called dynamic batching.

Animated picture of how batches are assembled at the entrance to the stage with the model

An animated picture about how batches are assembled at the entrance to the stage with the model

Increasing the number of processes

Well, we have learned how to save objects in a batch and then quickly process them in the model. But this is in the model itself, and before the model, as you remember, we have a preprocessing stage, for example, converting a numpy image. The problem is that at this stage, batching will not work, since there are no libraries or mechanisms that can convert a batch of images in a faster time. Of course, you can use threads here, provided that your logic releases the GIL. But because of the GIL, not everything in Python will be faster, and even if there is acceleration, you are still limited by the speed of one core, since the interpreter runs only on one core, you will not be able to run more parallel operations than your core allows you. Multiprocessing comes to our aid here. In Aqueduct, each stage is a separate subprocess. With a small movement of the hand, you can increase the number of processes at any stage. We do not recommend increasing the number of processes for the stage with the model, this is already considered as horizontal scaling of the model for the load, and is done in other ways – for example, for us it is Kubernetes. But for the stages before and after the model – please. For example, the model processes 10 objects in 15 ms, the data at the preliminary stage is prepared for 20 ms, it turns out that to collect a batch of 10 objects you will wait 200 ms. The total flow time is 215 ms. If you make 10 simultaneous processes for preparation, then in 20 ms 10 objects will be prepared at once and the total flow time becomes 35 ms. Do not forget that at the stages, after the model has worked, you can also increase the number of processes. This is where the full power of the aqueduct is finally revealed.

Animated picture about the scaled number of processes

Animated picture about the scaled number of processes

Shared memory

We are always thinking about how else we can speed up inference in production. We were confused by the following point: the aqueduct uses inter-process queues as a method of transferring data between processes; they are based on pipes and work quite quickly. But, if you transfer large amounts of data, or rather large objects, then this begins to take up, albeit noticeably, time. For example, if this is a large picture, it must first be serialized, that is, translated into bytes ready for transmission, then transmitted through the input/output buffers of processes, then deserialized back into an object ready for work. For large objects, on the order of megabytes, this can take units, and sometimes tens of milliseconds.

Speeding up here is possible by using shared memory: data is placed in a memory area accessible to all processes, and only identifiers are transmitted through queues. A separate art is to ensure the reliability of such a mechanism, but this topic will be covered in a separate article.

Picture from Wikipedia

Picture from Wikipedia

Metrics

Actually, above you learned the theory of how an aqueduct works. We launch the vast majority of models in the company via aqueduct, this allows us to save a huge number of servers. But when your model is working in production, it is very important to monitor whether everything is in order with it, what is the response time, how many requests have accumulated in the queue. Also, for better optimization, it is very important to know how long each stage takes and how long it takes to transfer data between stages. To do this, Aqueduct has support for recording vital metrics right out of the box.

The operating time at each stage allows you to understand which stages are slow and whether it is worth adding the number of processes:

Operating time of stages

Operating time of stages

The queue size metric also allows you to understand whether to add more processes to the stage before which a large queue is accumulating. Or you can add batching if this is a GPU model:

Queue sizes

Queue sizes

The total number of tasks, how many are successful, and how many of them end with a timeout error, for example because they cannot be processed in the allotted time:

Practical part

Now some real code. To keep things simple, let's remember that all the code is in one file.

Create a model and handler

Let's imagine that you have a ML model that blocks ioloop, this can easily be represented as a regular synchronous sleep() in python.

# импортируем сразу компоненты, которые понадобятся далее
from aqueduct import BaseTask, BaseTaskHandler, Flow, FlowStep


class MyModel:
	def __init__(self):
        """При старте обычно необходимо загрузить 
        и проинициализировать модель в памяти,
        мы этот этап опустим, но для наглядности 
        саму конструкцию я оставил
        """
	    self.model =  None

    def inference(self, param):
        sleep(1)  # обещанный sleep на 1 секунду

To communicate with each other, workers and the main process send tasks; this is just a class with properties in which you can save anything:

class Task(BaseTask):
    """Контейнер задания, для пересылки параметров между этапами"""
    def __init__(self, param):
        super().__init__()
        self.param = param
        self.sum = 0  # в это поле будет сохранять результат

The workers themselves are just handlers, with two methods. Method for initializing the model and the method that is called when receiving a task:

class SumHandler(BaseTaskHandler):
    """Инициализируем инстанс, модель пока загружать не надо, 
    так как этот процесс вызывается еще в родительском процессе. 
    Это сделано для удобного пробрасывания параметров"""
    def __init__(self):
        self._model = None

    def on_start(self):
        """А вот тут уже загружаем модельку в память, 
        этот метод вызывается уже в дочернем процессе"""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """Обработка заданий. Получаем список задач и обрабатываем"""
        for task in tasks:
            task.sum = self._model.process(task.param)

Creating a flow with one worker

Now we need the aqueduct to accept the charge and transmit it along our flow. We will define the flow as a method so that it is convenient to use it where necessary.

def get_flow() -> Flow:
    return Flow(
        FlowStep(AvitonetPreprocessingHandler()),
    )

This is what the simplest flow looks like, consisting of a single stage – the model's work. If we want to add more stages before or after the model's work, then it is enough to simply specify them in the desired order, for example like this:

def get_flow() -> Flow:
   return Flow(
       FlowStep(PreprocessingHandler()),  # сначала выполнится PreprocessingHandler
       FlowStep(SumHandler()),            # потом отработает SumHandler
       FlowStep(PostprocessingHandler()), # и в самом конце будет выполнен PostprocessingHandler
   )

But that's not all, let's say our model can process a batch of objects at once, then we specify batch_size so that the aqueduct collects objects into batches and sends them to the model. And in preprocessing we will make parallel processes so that the data is simultaneously in several processes:

def get_flow() -> Flow:
   return Flow(
       # 12 процессов готовят одновременно каждый по одному объекту
       FlowStep(PreprocessingHandler(), nproc=12),  
       FlowStep(SumHandler(), batch_size=12), # теперь будет приходить не более 12 объектов
       FlowStep(PostprocessingHandler()), 
   )

Thus, the model immediately receives a batch of objects that were prepared by parallel processes at the previous stage and processes them in one fell swoop. Due to this, time is saved. Note that we specified batch_size=12 and now the handler will receive not one object, but several, no more than 12, this is why in the handler signature SumHandler.handle we take into account the possibility of receiving both one and several objects.

Integration into http server

Aqueduct can be used in any Python http framework. Let's look at the example of the fashionable and youth FastAPI:

from typing import Annotated
from fastapi import Depends, FastAPI

app = FastAPI()

class MyFlow:
    def __init__(self):
        self.my_flow = get_flow()
        self.my_flow.start()

@app.get("/score/")
async def read_items(mymodel: Annotated[MyFlow, Depends(MyFlow)]):
    
    task = Task(param="my_param")
    await flow.my_flow(task)
    return {"result": task.sum}

This is how you get the answer from the aqueduct model.

Shared shared memory

To transfer data through shared memory, you simply need to include the field in the shared memory:

@app.get("/score/")
async def read_items(mymodel: Annotated[MyFlow, Depends(MyFlow)]):  
    task = Task(param="my_param")
    task.share_value("param")  # теперь поле param хранится в общей памяти
    await flow.my_flow(task)
    return {"result": task.sum}

Other solutions

It would be wrong not to describe the alternatives.

Airflow/celery/kubeflow

aiflow and the like perfectly solve the problem of organizing pipelines for batch models when you have a lot, a lot of data and need to process it using models. An aqueduct is about something else, an aqueduct is needed when you need to get an answer right here and now, as quickly as possible. In real-time inference, using airflow will slow down your calculations; an aqueduct is needed here. Conversely, in batch inference it is better to use airflow. So, this is not even a question of alternatives, these are generally solutions to different problems.

Nvidia Triton

A similar solution for inference from nvidia. Allows you to save GPU memory by reusing one model on one server. Feel free to try and compare with aqueduct. You can even use them together, aqueduct to build the main flow of calculation, and triton to organize the work of the step with the model.

Mpipe

https://vmlaker.github.io/mpipe/

Technically, a very similar solution to an aqueduct. In some places it’s even very fast, because instead of queues there are pipes between processes. If we add asynchrony, quick results and a number of other features, we could get an aqueduct. At the stage of building the aqueduct, we did not yet know about this lib. If we knew, then maybe the aqueduct said import mpipe and we would have reused some of the functionality. But a lot, a lot would still have to be added, so this does not change the essence.

Total

We use the aqueduct almost everywhere where there is real-time ML-inference. Due to this, we get significant iron savings. You can also try using an aqueduct in your own home.

As you already understood, the aqueduct itself is located on GitHub, you can download it, watch it and pick at it. Merge requests will be welcome: https://github.com/avito-tech/aqueduct

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *