Propan – Python framework for writing microservices using message brokers
It so happened historically that for the last 5 years of my product development I have been working with microservices around message brokers (mainly RabbitMQ and Kafka).
And all this time I had a feeling of some dissatisfaction and inferiority of the tools that were available to me.
Coming from the world of HTTP frameworks, you feel like you are on crutches – neither hotreload‘a, which is available in almost any wsgi-asgi server, if you want to test – raise environment containers or mock dependencies (especially convenient in CI, yeah), don’t forget about reconnects, logging, tracing, etc., etc.
And so, dragging a bunch of all these problems from service to service (and the code that solves these problems), I got
ingenious idea: to arrange all the code of the same type, common for all services, into a single package!
This is how the framework was born. Propan.
Why You Should Use Propan
The framework is molded in the image and likeness FastAPIbut for working with message brokers and taking into account my personal pains that arose when working with this HTTP framework. Propan is more open to extension and doesn’t dictate how you use it.
From FastAPI we have:
Validation and type casting of incoming messages using Pydantic
dependency injection system
The most simple and understandable way for everyone to write an application
Features that stand out Propan from a number of other frameworks and native libraries for working with message brokers:
Broker Independence – The code you write is independent of the broker you are using. You can easily migrate from RabbitMQ on Kafka or Nats with increasing load.
RPC over MQ – you don’t need to think about how to turn Messaging V RPC. If the request is waiting for a response, the framework will do everything for you.
Testability – the framework allows you to emulate the behavior of brokers and test your services without the need to connect to external dependencies.
Own CLI – allows you to manage application settings, the number of running instances, generates project templates, and also reloads your project when the code changes (I really missed this when developing locally).
At the moment, the framework supports working with RabbitMQ, Redis Pub/Sub, Nats. Support Kafka expected in the next month, then – work on the generation of the circuit in accordance with AsyncAPI.
Let’s drop all these dry words a little and move on to a practical example.
For example, here is the “app” code to work with RabbitMQ with use aio-pika
import asyncio import aio_pika async def main(): connection = await aio_pika.connect_robust("amqp://guest:firstname.lastname@example.org/") async with connection: channel = await connection.channel() queue = await channel.declare_queue("test_queue") async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): print(message.body) asyncio.run(main())
And here is the same example using Propan
from propan import PropanApp, RabbitBroker broker = RabbitBroker("amqp://guest:guest@localhost:5672/") app = PropanApp(broker) @broker.handle("test_queue") async def base_handler(body: str): # тело сообщения будет приведено к `str` print(body)
In it, you do not need to declare all connections, queues, or process messages yourself: you just write code, and the framework does everything for you. However, if necessary, you still have the option to manually control this behavior.
However, an example using Propan gives you all the benefits described in the previous section.
Example using Redis or Nats identical up to function arguments.
from propan import PropanApp from propan import RabbitBroker # from propan import RedisBroker # from propan import NatsBroker broker = RabbitBroker("amqp://guest:guest@localhost:5672/") # broker = NatsBroker("nats://localhost:4222") # broker = RedisBroker("redis://localhost:6379") app = PropanApp(broker) @broker.handle("test") async def base_handler(body: str): print(body)
Usage in HTTP services
Very often we need to use the logic of working with message brokers within applications that also process HTTP requests.
No problem! Just start and stop Propan along with your application.
Example with aiohttp
from aiohttp import web from propan import RabbitBroker broker = RabbitBroker("amqp://guest:guest@localhost:5672/") @broker.handle("test") async def base_handler(body): print(body) async def start_broker(app): await broker.start() async def stop_broker(app): await broker.close() app = web.Application() app.on_startup.append(start_broker) # запускаем Propan при старте app.on_cleanup.append(stop_broker) # и останавливаем вместе с приложением web.run_app(app)
In addition, if you are using FastAPIyou can use Propan directly – as part of your application FastAPI.
from fastapi import FastAPI from pydantic import BaseModel from propan.fastapi import RabbitRouter app = FastAPI() router = RabbitRouter("amqp://guest:guest@localhost:5672") class Incoming(BaseModel): ... @router.event("test") async def hello(body: Incoming): print(body) app.include_router(router)
I don’t see the point in retelling the entire framework documentation, you can find it Here. If you have any questions while reading the article, you may also find the answers there.
Now the framework is actively developing only by my efforts. If you are interested in its further development, I will be extremely grateful for any help: pointing out inaccuracies / incomprehensible points in the documentation, writing code, testing.
In any case, I will be glad to any feedback: criticism, suggestions for implementation. If you decide to use the framework in your projects, I will be glad to hear about your experience.