How to cook delicious “Celery”
What we respect most in the python community is a small spread in approaches and a fairly stable set of architectural configurations. In our practice, we can often encounter Django projects with a fairly typical architecture, when Celery is used as a tool for background task processing.
You may or may not love Celery, but it is almost impossible to avoid working with it. However, not all engineers think about what happens when Celery tasks are called. And in this article, I want to tell you how exactly the call and sending of messages occurs, what can happen if you ignore the fact that Celery has a transport underneath it, and what can happen if this transport starts to fail.
This is the base
We will not focus on development from scratch, because there are already a lot of good materials about Celery itself and how to write tasks, and it is also available very good documentation on the official website with code examples.
So, let's say we have a web application on Django, and we use RabbitMQ as a message broker. RabbitMQ is chosen here as an example, in general, the message broker can be any.
Let's assume there is an API method (creating some entity) and during its execution it is required to collect additional analytics or measure the method execution time, etc. And for these secondary tasks third-party services are used. The execution time of which, as well as their success or failure, should not affect the execution of the API method. Accordingly, we decide to implement these secondary tasks using Celery.
Example API method:
class CreateTaskSimpleView(APIView):
def post(self, request: Request) -> Response:
result: AsyncResult = simple_task.delay(
request.data.get("a", 1),
request.data.get("b", 2)
)
return Response(
data={"task_id": result.task_id if result else None}
)
Where simple_task
– this is a secondary task that may fail:
@shared_task(**settings.CELERY_DEFAULT_BIND_TASK_CONF)
def simple_task(self: Task, a: int, b: int):
return a / b
settings.CELERY_DEFAULT_BIND_TASK_CONF
contains default settings:
CELERY_DEFAULT_BIND_TASK_CONF = {
'bind': True,
'queue': 'default'
}
The process of sending messages to the broker queue
In the example higher a simple Celery call is used via the method delay
Here we “directly” call the task without any possible exceptions.
In practice, you may not encounter other ways to call Celery. And this is what I mentioned at the beginning – not everyone may think that the organization of sending messages to the broker is much more complicated than just calling one method. And that something may break at this stage.
Let's look at the process of sending messages:
The process involves kombu and pyamqp, which are part of the Celery ecosystem and are developed and maintained by the Celery project.
Kombu — Transport level. This is where you decide which broker client to use.
pyamqp – Direct client for the message broker.
As you can see in the diagram, after calling the method delay
(or apply_async
) the following happens:
Call the connection manager to get/create a connection to the broker. The manager stores and manages the connection pool.
Packing messages according to the broker protocol.
Directly call the broker client method to write a message to the RabbitMQ queue.
That is, as we see, quite a lot of work is being done, even if we look superficially.
After the message is written, after some time a free handler (Celery.worker) will enter the queue, take the message and start processing it.
Is Celery very complex inside?
In the process of studying the work of Celery, such a scheme was drawn up, but unfortunately the history and comments of all the arrows were lost, but I hope the general principle is clear. Everything is very complicated.
Don't break down right away
Now let's go back to our example and let's say the network “blinks”, then the Celery task call will end with an exception and our API method will return 500, 502 or 504 (depending on the environment configuration). And it is precisely these situations that we want to avoid, since these tasks are secondary and should not affect the main business process.
retry
The first thing that might come to mind is that an option will save us retryhowever if you read official documentation more closely, it will become clear that this option is for handlers (Celery.worker) at the immediate moment of task execution. Through retry we specify how many attempts and with what pauses the handler can try to complete the task. But now we are talking about the moment the message was sent in line and option retry It doesn't protect here at all.
Celery actually has options “retirement” to send messages. But we have little control over these options and They are configured at the transport level. By default, the broker client has 100 attempts, but they may not be enough and we will still get an exception if the broker is completely unavailable.
It turns out that there is no explicit retry in the method delay
does not exist.
try/catch
So the first working option for protection would be to wrap the Celery task call in try/catch.
For example like this:
def wrapped_simple_task(a: int, b: int) -> AsyncResult:
try:
return simple_task.delay(a, b)
except Exception as ex:
logger.error("error while calling celery.delay: %s",ex)
And now the task can be called using the wrapper:
class CreateTaskWrappedView(APIView):
def post(self, request: Request) -> Response:
result: AsyncResult = wrapped_simple_task(
request.data.get("a", 1),
request.data.get("b", 2)
)
return Response(
data={"task_id": result.task_id if result else None}
)
And we could stop at this point. However, when working on a project with a large legacy, changing the call of all Celery tasks will be labor-intensive, even if you use a decorator.
But there is a more elegant solution. Celery has a great API and we can override and specify the base task class Celery at application startup.
For example:
celery_task_cls = os.environ.get('CELERY_TASK_CLS', 'celery.app.task.PatchedTask')
app = Celery('example', task_cls=celery_task_cls)
Where is our class? celery.app.task.PatchedTask
overrides the method delay
(or apply_async
) and we won't need to make changes in all places where Celery tasks are called.
celery.app.task.PatchedTask
class PatchedTask(celery.app.task.Task):
try_apply_async = True # wrap to try_except by default behaviour
propagate_exception = True # propagate exception by default
@contextmanager
def wrap_connection_exceptions(self):
connection_succeed = True
try:
yield
except transport_errors as exc:
connection_succeed = False
raise exc
finally:
logger.debug("celery.task.connection.succeed | %s", connection_succeed)
@contextmanager
def wrap_apply_async_exceptions(self):
apply_succeed = True
try:
with self.wrap_connection_exceptions():
yield
except Exception as e:
apply_succeed = False
logger.error("celery.task.apply_async.failed | %s", self.name)
if self.propagate_exception:
raise CeleryTaskApplyException(e)
finally:
logger.debug("celery.task.apply_succeed | %s", apply_succeed)
def apply_async(
self,
args=None,
kwargs=None,
task_id=None,
producer=None,
link=None,
link_error=None,
shadow=None,
**options,
):
logger.debug("%s called by apply_async", self.name)
if get_connection().in_atomic_block:
logger.warning("celery.task.apply_async.in_atomic_block | %s", self.name)
if not self.try_apply_async:
return super().apply_async(
args, kwargs, task_id, producer, link, link_error, shadow, **options
)
with self.wrap_apply_async_exceptions():
return super().apply_async(
args, kwargs, task_id, producer, link, link_error, shadow, **options
)
Better ask DevOps
In the modern world, applications and web services are not launched by one simple python command. To launch, a Web Server Gateway (for example, uwsgi or gunicorn) is used, and the application can run in its own docker container. Also, Nginx will most likely be used to distribute statics and implement reverse proxy, and it can also be launched in its own docker container. In turn, all this can work on a separate POD or Virtual Machine (I think the burger allegory is appropriate here).
Accordingly, depending on the number of “ingredients” and the load on the application, we can get different scenarios in the event of a sudden disaster.
Let's consider what happens if we use Celery task calls in a transaction and we have problems sending messages to the broker. How this can negatively affect the DBMS and the application as a whole.
Namely:
Taking into account the number of attempts to send messages, the operation itself try/catch can take a significant amount of time. In this case, it is easy to miss the statement_timeout (in the case of postgresql). Which will lead to an error 500 Internal Server Error.
How much is significant?
At the broker's client default 100 attempts establish a connection. Also timeout increases with each attempt. Then the operation try/catch can expect 100 * Xy ms. Let's say we have more than 1 such calls, for example N and the API method operation itself also takes time and takes M ms. So we get M + N * (Xy * 100) ms
Also in case uwsgi server is used and harakiri if it works earlier, we will get an error 502 Bad Gateway.
Or if nginx is used and proxy_connect_timeout (proxy_read_timeout, proxy_send_timeout) works earlier, then we will get an error 504 Gateway Timeout.
Also, depending on the environment and healthcheck settings (if a message broker availability check is added to the healthcheck method), an emergency reboot of the POD or Virtual Machine (VM) may be triggered.
In the worst case, due to an incorrect process termination (POD/VM shutdown), we will get a hung connection to the DBMS. This will entail an increase in the load on the DBMS itself, on the connection manager (for example, pgbouncer), and, if the DB is the main data source, the application may become completely unavailable.
Also, the POD/VM may no longer start successfully because by default broker_connection_retry_on_startup = True and if the broker is unavailable we will get a delay when trying to start and the Readiness probe timeout may work in the case of K8S and we will get a new cycle of attempts and as a result we can see CrashLoopBackOff.
Total
Broker unavailability may cause increased load on the DB, which may trigger healthckech, which may be followed by a cyclic POD/VM shutdown and, as a result, complete application unavailability. That is, an avalanche-like shutdown of the system may occur. And the most difficult thing is that the SRE team will have a hard time determining the root cause of the problem, because there will be a lot of affected nodes.
How to avoid all this and minimize risks
As always, there is no silver bullet. And calling Celery tasks in try/catch will not save us completely unless we take into account the following:
Don't forget about the “fast transaction rule” – access to the DB and especially the execution of the transaction should be as fast as possible. And of course, do not perform long calculations under the transaction, or especially make calls to external services, and Celery accesses the broker, which many forget about.
In the example higherin the modified Task class I added checking and logging, which will help to identify all the places where Celery is called in a transaction. After that you need to postpone all calls outside the transaction, or use the method transaction.on_commit().
It also makes sense review the need to use Celery operation check in the healthcheck method. Ideally, the application should not depend on the functionality of the message broker and it is better to add logging and alerts and implement the pattern fallback in case of failure of any transport or system node. But if the main functionality still depends on Celery, then it is better to think about implementing the pattern Outbox (For example, there is an article about this on habr).
If healthcheck includes a message broker availability check, then it is worth pay attention on its implementation. It happens that this logic relies on checking the connection manager pool, which is incorrect. Because the pool may not be empty, but all connections are “dead” and have not yet been thrown out of the pool. Their invalidation will occur at the first attempt to call the Celery task. Accordingly, a situation may arise when healthcheck will respond 200 OK, because the pool is not empty, but the application will no longer work.
Bonus
Also, for a visual demonstration, I have prepared a small project on githab with all the examples above.
How to use:
Download
Launch
make docker-up-all
To create a “catastrophe”
make disaster
Make sure the method without wrapper responds 500
make call-task-simple
Make sure the method with the wrapper responds 200
make call-task-wrapped
“Treat”
make heal
Clean everything
make docker-down
Thank you
I would like to say a special thank you to my wife and editor-in-chief @avasileva86 for their support and overwhelming assistance in the process of writing this article.