asynchronous Python framework for services working with RabbitMQ

WARNING: long introduction. If you want to get straight to the point, scroll down to Getting Started.

Introduction

In 2023, writing services that communicate with each other via RabbitMQ is still unreasonably difficult. Even more difficulties arise with testing the business logic in them, with the coordination of contracts between them, with the organization of mono-repositories.

In the bowels of the company Alem Research, since the 19th year, I started writing a lightweight framework, which, as planned, was supposed to be something like Flask or FastAPI for working with RabbitMQ.

Did it succeed? Perhaps yes. With the help of this framework, our data scientists were able to independently write and package services into the docker, which are built into the pipe with a slight movement of the hand and a couple of rebindings.

However, the first version worked with kombuand later the kernel was rewritten to aio-pika, and the framework has also become asynchronous. What gives asynchrony? For example, you can write an HTTP page fetcher that, when zoomed in prefetch_count will fetch a huge number of pages in parallel, and even be able to utilize the network almost completely, which would be quite difficult to achieve without asynchrony.

In addition, asynchrony makes it easier to easily implement the RPC pattern through RabbitMQ within a single process, which is quite convenient for testing. And integration with FastAPI becomes very convenient. And the processing speed grows tenfold.

In 2020, together with the founder and the then CEO, we decided that the framework can be safely uploaded to open source. And in general, it would seem that everything is fine.

However, there are also problems. The main one is the lack of live users and contributors. I left Alem Research, and there is no one in the company to continue working on the framework at the moment. Sometimes someone from the company writes me requests for the framework, but I’m too lazy to roll out updates for a single company: it turns out to be a kind of work for an employer who doesn’t pay you anymore 🙂

I have long wanted to write a pitching article about Mela to attract new users, and maybe even contributors, but I did not dare, because the code was very far from ideal. Yes, yes, I was simply devoured by internal perfectionism and shame for rolling out something not very beautiful. But today I realized that I have nothing to be ashamed of: yes, the kernel architecture is not ideal, but over the past couple of months, while I was out of work, I have already thought it through from all sides, and I know what to do next. In general, a beautiful version is not ready, but I have a plan.

I have a plan

I have a plan

And I decided that it would be wrong (and for a long time) to implement my plan in one person, it is better to immediately try to attract potential contributors to it. In general, if you are interested in what is described below – welcome on board!

Sorry for the long introduction. We are finally on our way.

Getting started

Of course, it all starts with:

pip install mela=1.1.1

I specified the version so that the article remains relevant even after updates.

Let’s write a service that will receive a message from the queue, output its body parsed to json, and return the message back.

# app.py

from mela import Mela

app = Mela(__name__)


@app.service("printer")
def printer(body, message):
    print(body)
    return body


if __name__ == '__main__':
    app.run()

As you can guess, this is not all that is required to run the application: there is no information about queues and connection here. This is because it is stored in a file application.yml.

# application.yml
connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

services:
  printer:
    consumer:
      exchange: general-sentiment-x
      routing_key: general-sentiment-q
      queue: general-sentiment-q
    publisher:
      exchange: general-sentiment-x
      routing_key: general-sentiment-q

That’s all.

This example introduces three of the most basic high-level concepts of the Mela at once: Publisher, consumer And Service.

If everything is clear with the publisher and consumer, then about Service, perhaps, I will explain that this is a pipe element that consists of a combination of a consumer and a publisher: it consumes messages from a certain queue, processes it, and publishes it to the specified exchange. In fact, the service consists of a publisher and a consumer, which is also reflected in the Yaml file.

The function that is under the decorator @app.service(...)as you might guess, is a consumer callback, and what it returns will be sent to the publisher associated with the service.

Pydantic

What is a modern framework without Pydantic, right? And I have them.

# app.py
from pydantic import BaseModel
from datetime import datetime

from mela import Mela


app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime


@app.service('validator')
def validator(body: Document) -> Document:
    if '#' in body.url:
        body.url = body.url.split('#')[0]
    return body


if __name__ == '__main__':
    app.run()

In this case, we use the new-style signature of the handler, and we do not need to explicitly specify the second argument, into which in the previous example the Message object would fly.

body is first translated into json, then this json is fed into the Document class.

On validation errors, the errors will be printed to the console, and the message will be returned back to the queue. Unless, of course, in the consumerrequeue_broken_messages=True (by default so) and if the consumer does not have dead letter exchange. The same will happen if any other error raises in the callback.

ack/nack control

There are several ways to manage responses. Let’s look at everything at once.

from datetime import datetime
from datetime import timedelta
from pydantic import BaseModel

from mela import IncomingMessage
from mela import Mela
from mela.components.exceptions import NackMessageError

app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime


@app.service("filter")
async def filter_(body: Document, message: IncomingMessage):
    if body.date > datetime.utcnow():
        # First way: we can raise special exception with some `requeue` value
        raise NackMessageError("We are not working with time travellers", requeue=False)
    elif body.date < datetime.utcnow() - timedelta(days=365):
        # Second way: we can manually nack message via IncomingMessage object
        # As you can see, in this case we can't write any message about requeue reason.
        # But it is still useful if you need to silently send message to DLX
        await message.nack(requeue=False)  # Go to archive, dude

    if body.url == '':
        # Third way: we can raise almost any exception. The message should be or should not 
        # be requeued based on `requeue_broken_messages` value
        raise AssertionError("Message without url is not acceptable")

    return body


if __name__ == '__main__':
    app.run()

Pure consumer

A consumer is created in the same way as a service, but with a different decorator.

from pydantic import BaseModel
from pydantic import EmailStr
from mela import Mela

app = Mela(__name__)


class EmailNotification(BaseModel):
    template_name: str
    vars: dict
    receiver: EmailStr


@app.consumer("email-sender")
def printer(body: EmailNotification):
    # Some Jinja2 and SMTP integration
    pass


if __name__ == '__main__':
    app.run()

If no error occurred while processing the message, the message will be discarded automatically. This behavior can be changed. However, most of the time it suits everyone. In any case, you can manually delete the message, I think you have already guessed how to do it.

It is also important to show application.yml for the consumer:

connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

consumers:
  email-sender:
    exchange: notifications-x
    exchange_type: topic
    routing_key: "email.#"
    queue: email-sender-q

Actually, yes. Just another block name. And by chance a shown example of working with other types of exchanges 🙂

Clean publisher and FastAPI integration

from datetime import datetime
from uuid import uuid4

from fastapi import FastAPI
from mela import Mela
from mela.settings import Settings
from pydantic import BaseModel

app = FastAPI()

mela_app = Mela(__name__)
mela_app.settings = Settings()


class ReportRequest(BaseModel):
    start_date: datetime
    end_date: datetime
    user_id: str
    report_id: str | None = None


@app.post("/report")
async def read_root(report_request: ReportRequest):
    if report_request.report_id is None:
        report_request.report_id = str(uuid4())
    # some DB writing
    publisher = await mela_app.publisher_instance('report-generator')
    await publisher.publish(report_request)
    return report_request

application.yml:

connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

publishers:
  report-generator:
    exchange: report-x
    routing_key: new-report

I don’t even know what to comment here. Except that I would add that in the future I would like to make an external decorator to inject publishers and RPC clients into third-party functions in the same way as in the following example. But this is already for the plans for the future. For now, let’s move on to the next example.

Injections of additional publishers

Sometimes we need to implement a splitter or just publish something somewhere in the course of message processing. You can, of course, do as in the previous section, but you can do it cooler:

from datetime import datetime

from pydantic import BaseModel

from mela import Mela
from mela.components import Publisher

app = Mela(__name__)


class Document(BaseModel):
    text: str
    url: str
    date: datetime
    has_images: bool = False


@app.service('archiver')
async def archiver(document: Document, images_downloader: Publisher="images-downloader") -> Document:
    # archiving document
    
    if document.has_images:
        await images_downloader.publish(document)
    
    return document


if __name__ == '__main__':
    app.run()

application.yml:

connections:
  default:
    host: localhost
    port: 5672
    username: admin
    password: admin

services:
  archiver:
    consumer:
      exchange: archiver-x
      routing_key: archiver-q
      queue: archiver-q
    publisher:
      exchange: notify-archived-x
      exchange_type: topic
      routing_key: document.archived

publishers:
  images-downloader:
    exchange: images-downloader-x
    routing_key: images-downloader-q

At the moment injections are implemented incorrectly in terms of typing. There is a plan to rewrite them to a new service type Annotated. But… The main thing works.

DLX

Dead Letter Exchange is supported for any consumer.

Making it is very simple:

connections:
  default:
    host: localhost
    port: 5672
    username: admin
    password: admin

services:
  service_with_dlx:
    consumer:
      exchange: dlx-test-x
      routing_key: dlx-test-k
      queue: dlx-test-q
      dead_letter_exchange: dlx-test-dead-letter-x
      dead_letter_routing_key: dlx-test-dead-letter-k
    publisher:
      exchange: test-x
      routing_key: test_queue

But you will have to make sure that there is a queue that will collect broken messages from your exchange.

RPC

# server.py
import asyncio
import aio_pika

from mela import Mela

app = Mela(__name__)


async def fetch(url):
    # asynchronously fetching url here and return its body
    await asyncio.sleep(1)
    return url


@app.rpc_service("fetcher")
async def fetcher(url: str):
    return {"fetched": await fetch(url)}


bots = {}


def create_bot(bot_id, bot_username, bot_password):
    bots[bot_id] = {'username': bot_username, 'password': bot_password}


def get_bot(bot_id):
    return bots[bot_id]


@app.rpc_service("bot_manager")
async def fetcher(body, message: aio_pika.Message):
    if message.headers['method'] == 'create_bot':
        create_bot(**body)
        return {'result': None, 'status': "OK"}
    elif message.headers['method'] == 'get_bot':
        return {'result': get_bot(**body), 'status': "OK"}
    else:
        return {'result': None, 'status': "ERROR_UNKNOWN_METHOD"}

if __name__ == '__main__':
    app.run()
# client.py
import asyncio

from mela import Mela

app = Mela(__name__)


async def main():
    # RPC calls over RabbitMQ never were simpler!

    fetcher = await app.rpc_client_instance("fetcher")

    bot_manager = await app.rpc_client_instance("bot_manager")

    res = await fetcher.call({'url': "test"})
    print(res)

    # we can even gather call results!
    g = await asyncio.gather(fetcher.call({'url': url1}), fetcher.call({'url': url2}))
    print(g)

    create_bot_result = await bot_manager.call({
        'bot_id': 1,
        'bot_username': "LalkaPalka",
        'bot_password': "supersecret",
    },
        headers={'method': 'create_bot'},
    )
    print(f"create_bot result {create_bot_result}")

    get_bot_result = await bot_manager.call({'bot_id': 1}, headers={'method': 'get_bot'})
    print(f"get_bot_result {get_bot_result}")

    unknown_method_result = await bot_manager.call({'bot_id': 4}, headers={'method': 'getBot'})
    print(f"unknown method result: {unknown_method_result}")


if __name__ == '__main__':
    url1 = (
        'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-'
        'almatinka-rasskazala-zaderjanii-supruga-459127/'
    )
    url2 = (
        'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-'
        'hozyaystv-naschityvaetsya-v-kazahstane_a3896073'
    )
    app.run(main())
connections:
  default:
    host: localhost
    port: 5672
    username: user
    password: bitnami

rpc-services:
  fetcher:
    exchange: fetcher-x
    routing_key: fetcher-k
    queue: fetcher-q
    response_exchange: fetching-result-x
  bot_manager:
    exchange: botmanager-x
    routing_key: botmanager-k
    queue: botmanager-q
    response_exchange: botmanager-result-x

Here is an example for you with two RPC services that do not conflict with each other in one process.

Here, too, I see no reason to explain something, but if there are questions, I will be happy to answer.

Multiple Connections and Environment Variables

A very common case is when we need to transfer data from one rebbit cluster to another. Now we will not discuss the correctness of this practice, but simply show how it can be done easily and naturally.

# application.yml
connections:
  input_connection:
    host: $RABBIT_INPUT_HOST
    port: ${RABBIT_INPUT_PORT|5672}
    username: ${RABBIT_INPUT_USERNAME|rabbitmq-bridge}
    password: ${RABBIT_INPUT_PASSWORD|rabbitmq-bridge}
  output_connection:
    host: $RABBIT_OUTPUT_HOST
    port: ${RABBIT_OUTPUT_PORT|5672}
    username: ${RABBIT_OUTPUT_USERNAME|rabbitmq-bridge}
    password: ${RABBIT_OUTPUT_PASSWORD|rabbitmq-bridge}

services:
  bridge:
    consumer:
      connection: input_connection
      prefetch_count: ${RABBIT_INPUT_PREFETCH_COUNT|1}
      routing_key: ${RABBIT_INPUT_ROUTING_KEY}
      exchange: ${RABBIT_INPUT_EXCHANGE}
      queue: ${RABBIT_INPUT_QUEUE}
    publisher:
      connection: output_connection
      routing_key: ${RABBIT_OUTPUT_ROUTING_KEY}
      exchange: ${RABBIT_OUTPUT_EXCHANGE}
# app.py
from mela import Mela

app = Mela(__name__)


@app.service("bridge")
async def serve(body, message):
    return body


if __name__ == '__main__':
    app.run()

As you can see, the service code is very simple. But there is something new in the configuration file. The first thing that catches your eye is the environment variables. Yes, they are very easy to sew here. By the way, here is an example of a dotenwa:

RABBIT_INPUT_HOST=localhost
RABBIT_INPUT_ROUTING_KEY=routing-key
RABBIT_INPUT_EXCHANGE=exchange
RABBIT_INPUT_QUEUE=queue
RABBIT_OUTPUT_HOST=localhost
RABBIT_OUTPUT_PORT=5673
RABBIT_OUTPUT_ROUTING_KEY=routing-key
RABBIT_OUTPUT_EXCHANGE=exchange

Everything is very simple and straightforward, right?

And in fact, it can already be packaged in a docker and run through an orchestrator.

The second thing you need to pay attention to in the config file of this section is that there are two connections here, and different connections are used for the consumer and the publisher.

This is the end of my user cases. Let’s move on to the next section.

Performance

The purpose of this article is not to brag, and besides, the performance of the framework is entirely the merit of the authors aio-pika, not mine. Detailed benchmarks – this will be a topic for a separate article, but for now I’ll just say that on my not the most powerful laptop, a simple bridge between two rebbits processes about 500 messages per second. And the best that I could achieve from the usual pika is 80-100 messages per second. In the case of page fetching, for obvious reasons, not asynchronous pika could not show at all any adequate result.

Conclusion

It seems to me that I managed to make a good external apishka. With the exception of injections, but I wrote about them above. Inside is a mess. Everything works, of course, but it will be difficult to maintain and further develop the framework. I have already begun to rewrite the core in a clean version, but I lack motivation.

That is why I wrote this article. If the article gets a response, if this framework is of interest to someone, if there are people who will use it, and maybe even contribute, then I will definitely continue working on it. You can even just throw stars, it will also give me at least some feedback 🙂

At the end I duplicate the link to GitHub

Write questions in the comments. I will supplement the article with answers to the most important ones, I will simply answer the rest in-place.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *