Propan – Python framework for writing microservices using message brokers

It so happened historically that for the last 5 years of my product development I have been working with microservices around message brokers (mainly RabbitMQ and Kafka).

And all this time I had a feeling of some dissatisfaction and inferiority of the tools that were available to me.

Coming from the world of HTTP frameworks, you feel like you are on crutches – neither hotreload‘a, which is available in almost any wsgi-asgi server, if you want to test – raise environment containers or mock dependencies (especially convenient in CI, yeah), don’t forget about reconnects, logging, tracing, etc., etc.

And so, dragging a bunch of all these problems from service to service (and the code that solves these problems), I got ingenious idea: to arrange all the code of the same type, common for all services, into a single package!

This is how the framework was born. Propan.

Why You Should Use Propan

The framework is molded in the image and likeness FastAPIbut for working with message brokers and taking into account my personal pains that arose when working with this HTTP framework. Propan is more open to extension and doesn’t dictate how you use it.

From FastAPI we have:

  • Validation and type casting of incoming messages using Pydantic

  • dependency injection system

  • The most simple and understandable way for everyone to write an application

Features that stand out Propan from a number of other frameworks and native libraries for working with message brokers:

  • Broker Independence – The code you write is independent of the broker you are using. You can easily migrate from RabbitMQ on Kafka or Nats with increasing load.

  • RPC over MQ – you don’t need to think about how to turn Messaging V RPC. If the request is waiting for a response, the framework will do everything for you.

  • Testability – the framework allows you to emulate the behavior of brokers and test your services without the need to connect to external dependencies.

  • Own CLI – allows you to manage application settings, the number of running instances, generates project templates, and also reloads your project when the code changes (I really missed this when developing locally).

At the moment, the framework supports working with RabbitMQ, Redis Pub/Sub, Nats. Support Kafka expected in the next month, then – work on the generation of the circuit in accordance with AsyncAPI.

Example

Let’s drop all these dry words a little and move on to a practical example.

For example, here is the “app” code to work with RabbitMQ with use aio-pika

import asyncio
import aio_pika

async def main():
	connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

	async with connection:
		channel = await connection.channel()

		queue = await channel.declare_queue("test_queue")
		async with queue.iterator() as queue_iter:
			async for message in queue_iter:
				async with message.process():
					print(message.body)

asyncio.run(main())

And here is the same example using Propan

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body: str):  # тело сообщения будет приведено к `str`
	print(body)

In it, you do not need to declare all connections, queues, or process messages yourself: you just write code, and the framework does everything for you. However, if necessary, you still have the option to manually control this behavior.

However, an example using Propan gives you all the benefits described in the previous section.

Example using Redis or Nats identical up to function arguments.

from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")

app = PropanApp(broker)

@broker.handle("test")
async def base_handler(body: str):
	print(body)

Usage in HTTP services

Very often we need to use the logic of working with message brokers within applications that also process HTTP requests.

No problem! Just start and stop Propan along with your application.

Example with aiohttp

from aiohttp import web
from propan import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

@broker.handle("test")
async def base_handler(body):
	print(body)

async def start_broker(app):
	await broker.start()

async def stop_broker(app):
	await broker.close()

app = web.Application()
app.on_startup.append(start_broker)  # запускаем Propan при старте
app.on_cleanup.append(stop_broker)   # и останавливаем вместе с приложением
web.run_app(app)

In addition, if you are using FastAPIyou can use Propan directly – as part of your application FastAPI.

from fastapi import FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")

class Incoming(BaseModel):
	...
  
@router.event("test")
async def hello(body: Incoming):
	print(body)

app.include_router(router)

Conclusion

I don’t see the point in retelling the entire framework documentation, you can find it Here. If you have any questions while reading the article, you may also find the answers there.

Now the framework is actively developing only by my efforts. If you are interested in its further development, I will be extremely grateful for any help: pointing out inaccuracies / incomprehensible points in the documentation, writing code, testing.

In any case, I will be glad to any feedback: criticism, suggestions for implementation. If you decide to use the framework in your projects, I will be glad to hear about your experience.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *