Is FastStream the New Celery Killer?

FastStream is a relatively new shiny toy in the hands of Python enthusiasts that is designed specifically for working with message brokers.

There is a strong belief in Python that if we work with MQ, then we need Celery, but it is slightly outdated. This is why people are trying to throw out the “grandfather” and drag in any new promising MQ tool instead. Moreover, the cult Celery so strong in the minds that almost all new libraries for working with MQ are trying to become its “killer” and replacement.

However, this is not entirely true. There is a huge layer of projects that do not need a framework for task management, but simply “bare” functionality Kafka/RabbitMQ/NATS/whatever for inter-service communication. And all these projects are forced to be content with “raw” python clients for their brokers, and write all the wiring around these clients themselves. FastStream aims precisely at this niche.

As part of the article, I want to convince you that it is not Celery we are united, and there is a place in the sun for alternative instruments. We’ll also look at the features FastStreamwhich it brings to the stagnant world of MQ tools.

Who am I?

Hello! I am Nikita – the creator Propan and now core-developer FastStream. You may have already read my previous articles about the development of the project or even saw a speech at Sublodka Python Crew #3.

The last time we saw each other was almost a year ago, when FastStream just came out. Therefore, now I would like to summarize and publish in a single list the advantages that should convince you use try FastStream. The project does not stand still and I have something to tell you about.

What kind of framework is FastStream?

FastStream is a python framework for working with message brokers. It was created to simplify the development of event-driven systems as much as possible.

Simply put, it is a very thick client for brokers that allows you to write less infrastructure code and focus on the business logic of your applications. Therefore, it would be much more appropriate to compare with aiokafka/aio-pika or Kombu (whose logical continuation is FastStream). However, we provide additional functionality that is not directly related to the code, but is extremely necessary for modern systems: auto-documentation of the project, ease of testing and Observability out of the box.

Let's finally figure out why FastStream do you need it?

Basic API

First of all, this is a very simple API. For now FastStream supports Kafka, RabbitMQ, NATS And Redis as a message broker (the list is constantly growing, at the moment we have 8 Issues from the community to support new brokers).

The framework provides both a unified way to interact with brokers and methods specific to each of them. A typical endpoint for messages using FastStream as follows:

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("test-queue")  # название очереди RMQ
async def handle(msg: str):
    print(msg)

Getting started is also extremely simple:

faststream run main:app

This syntax works for all supported brokers and differs only in imports.

Main advantage FastStream before “raw” clients in this aspect – this is declarative syntax. We move from writing imperatives (“connect to X,” “create a queue,” “create a consumer,” “bind a function to a consumer”) to declarative “I need a subscriber for X.” Also all issues of message serialization/deserialization FastStream takes over – it casts the message body to the type requested in the annotation using pydantic.

As you can see, the process of creating consumers is maximally simplified relative to the usual MQ libraries and is reminiscent of another very popular Python framework.

Next, I propose to understand what benefits we get in addition to “sugar over sugar.”

Inspiration for FastStream

As I already said, FastStream does not strive to be a copy/replacement Celery, and besides him we only had “raw” clients. It turns out that we won’t find inspiration among the MQ tools.

But we found it among the popular HTTP tools! Namely – FastAPI. This framework at one time showed what requirements a modern tool should meet. Let's list them:

  • Intuitive API

  • Data serialization based on type annotation

  • Automatic documentation

  • Convenient testing

We didn't have any of this in the MQ world before. Now it is!

And if we have already dealt with the first two points, then let's look at the remaining features.

Automatic documentation

Documentation in current realities is extremely important for the project. How will you transfer contracts that your service expects to the adjacent team? How will you interact with testers? What will a new team member look at during onboarding?

For HTTP services, a solution has existed for a long time – OpenAPI + SwaggerUI. What to do with asynchronous services? Of course, you can write documentation yourself in one form or another, but then how can you keep it relevant to the code?

FastStream solves this problem in a completely transparent and familiar way – it generates AsyncAPI specification from your service code. You can send out the received file, post it on your Wiki, or simply host its HTML representation right next to the service.

So, for the following service from one subscriber and producer:

from pydantic import BaseModel
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

class InputData(BaseModel):
    data: bytes

class Prediction(BaseModel):
    result: float

@broker.subscriber("test-subject")
@broker.publisher("out-subject")
async def handle_prediction(
    msg: InputData
) -> Prediction:
    predict = model.predict(msg)
    return predict

We will receive the following page, where we can see:

  • address of the broker with whom the system works

  • input queue and expected data format

  • output queue and format of provided data

AsyncAPI service schema

AsyncAPI service schema

Service testing

The issue of testing asynchronous services is especially acute. Of course, we can test business logic (if we decomposed the code well enough), but what to do with contracts? Raise the broker every time in CI and test it on a real external dependency?

With HTTP clients everything is simple – we mock the transport and send its emulation “in memory” instead of a real request. Why not do the same for message brokers? Moreover, Kombu already had this functionality. It's a pity that everyone forgot about the old man.

So, the service tests from the first example will look like this:

import pytest
from pydantic import ValidationError
from faststream.rabbit import TestRabbitBroker

@pytest.mark.asyncio()
async def test_correct():
    async with TestRabbitBroker(broker) as br:
        await br.publish("Hi!", "test-queue")

@pytest.mark.asyncio()
async def test_invalid():
    async with TestRabbitBroker(broker) as br:
        with pytest.raises(ValidationError):
            await br.publish(1, "test-queue")

In this case, all interactions will occur “in memory”, which ensures simplicity and reproducibility of testing in CI.

But if you want to reuse the same tests for a real broker, then there is nothing simpler:

async with TestRabbitBroker(broker, with_real=True) as br:

Observability

System monitoring is an extremely important aspect of operating your services. In order to sleep peacefully while the code is running, you need

  • Logs

  • Metrics

  • Traces

  • Healthcheck

At the moment FastStream has built-in functionality for integration with any logging system, as well as a ready-made solution for OpenTelemetrywith which you can build end-to-end traces throughout your system.

You can read about logging in our documentationlike about tracing. I also highly recommend taking a look at this repository (thanks to our contributor Roman) to look at a ready-made example of services and infrastructure in order to get such a beautiful picture of your traces:

Work on support Prometheus metrics and a convenient API for k8s tests (healthchecks) are already underway and now this is a priority task, so the saga with Observability should soon end.

Other features

I don’t want to retell all the documentation, but I’ll briefly outline the features that we have:

  • Application decomposition via Routers

  • Flexible Middleware system

  • Its own dependency injection system based on Depends And Context

  • CLI with hotload (how else to develop?)

  • Possibility to disable pydantic and use custom message format (msgpack, protobuf, avroetc)

  • Tight integration with FastAPI

  • Integration with taskiq for scheduled publication of messages

In general, we have everything for your comfortable development.

Conclusion

In conclusion, I would like to say that most of the problems currently solved using Celery can be solved (and even more effectively) using FastStream. But it's not because we are a killer Сelery“, but simply because you stuck celery where you don't need it.

If you need interaction between services in different languages ​​and technologies on top of brokers, this is FastStream.

If you need asynchronous “tasks” on the same code base – with all their “statuses”, monitoring via flower, etc. – this is the place for you Celery and its analogues.

Here's a simple conclusion, I apologize for the clickbait title 🙂

Afterword

Despite its relatively young age (the release was in September 2023), the project has already gained some popularity. Everyone who looked closely has already taken a closer look and began to actively implement it. I can't say on behalf of companies that they use our tool, but the community's interest is supported by the following facts:

  • 2000 stars per GitHub since release (less than a year)

  • More than 100k installations per month

  • Inclusion in the Top-100 list OpenSource Achievements 2023 of the year

  • Support for the development of the framework by teams RabbitMQ, NATS And AsyncAPI

  • A constant stream of requests for new features from the community

FastStream showed what a modern MQ tool should be like – and, we hope, other libraries will follow us. For example, the creator pydantic already announced Roadmap on arqin which he wants to bring the features that we showed.

You can also support our project:

  • [ ] putting a star on GitHub

  • [ ] telling your colleagues and friends about it

  • [ ] joining our RU-telergram communitywhere you can directly participate in the discussion and design of new functionality of the framework

Also, if you are interested in the future of the project, you can check out our Roadmap at this link:

https://github.com/airtai/faststream/issues/1510

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *