How to Build an Asynchronous Python Application to Send Event Notifications

In this article, we will look at creating an asynchronous Python application using the httpx library to send notifications to users about upcoming events they have registered for.

The application will be launched once a day using the Cron scheduler on a Linux server. To send SMS notifications, we will use the platform MTS Exolve.

Why is this necessary?

Sending notifications to users is an important task in many services. It allows you to inform users about important updates, news or other events of interest to them. At the same time, SMS is a tool with guaranteed delivery and high open rate, messages will reach even users with disconnected Internet.

An asynchronous Python application using the httpx library allows you to effectively implement such mailing, ensuring high performance and responsiveness of the system.

Why asynchronous

In our case, using an asynchronous approach allows the application to efficiently handle a large number of notification requests, minimizing delays and ensuring system responsiveness.

Example of asynchronous client implementation on httpx

Let's look at an example of implementing SMS notifications via a python application. First, make sure you have all the necessary dependencies installed. We use httpx to create the application and the accompanying asyncio library.

The project will have the following structure:

aclient_exmp/
    /venv
    /example_db
        __init__.py
        handle_data.py
        info_db.py
    /apimodul
        __init__.py
        mtt_client.py
   /helper
        __init__.py
        decorators.py
    logging.yaml
    config.py
    main.py
    dev.env

The dev.env file stores environment variables: API key, URL address of the request to MTS Exolve and phone number for sending SMS.

In the config.py file we will get data from environment variables.

from dotenv import dotenv_values

info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL = info_env.get('BASE_URL')

Logging

Let's add logging to our project. The logging.yaml file is a configuration file that defines the formatting, handlers, and logging levels for the various loggers in the project. In this example logging.yaml file, we define the formatting for the logs, create a handler to output the logs to the console, and configure the root logger for the DEBUG level.

version: 1

formatters:
  info:
    format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

handlers:
  console_handler:
    class: logging.StreamHandler
    level: INFO
    formatter: info
    stream: ext://sys.stdout

root:
  level: DEBUG
  handlers: [console_handler]

Let's add some code to the config.py file, which will load the logging configuration from the logging.yaml file and configure the loggers in the project.

import logging.config
import os
import sys

import yaml
from dotenv import dotenv_values


info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
BASE_URL=f"{info_env.get('BASE_URL')}"



CURRENT_FILE_PATH = os.path.abspath(__file__)
BASE_DIR = os.path.dirname(CURRENT_FILE_PATH)
LOGGING_CONF = BASE_DIR + '/logging.yaml'

if os.path.isfile(LOGGING_CONF) and os.access(LOGGING_CONF, os.R_OK):
	_lc_stream = open(LOGGING_CONF, 'r')
	_lc_conf = yaml.load(_lc_stream, Loader=yaml.FullLoader)
	_lc_stream.close()
	logging.config.dictConfig(_lc_conf)
else:
	print(
    	"ERROR: logger config file '%s' not exsits or not readable\n" %
    	LOGGING_CONF)
	sys.exit(1)

In this code example, we check if the logging.yaml file exists and is readable. We then open the file, load its contents into the _lc_conf variable using the yaml module, and close the file. Next, we use the dictConfig method from the logging.config module to apply the loaded configuration to the loggers in the project.

If the logging.yaml file does not exist or is not readable, an error message is displayed.

Receiving and processing data

For a simple example, we will use a list of events as a database, which contains the elements:

event_list = [
	{
    	'name': 'Music Show',
    	'date': '2023:12:12',
    	'time': '17:00',
    	'mentor': 'Jazz Band',
    	'guests': [
        	{'name': 'Ivan Gubov', 'phone': '79007771101'},.....
        	{'name': 'Mansur Berdiev', 'phone': '79800002001'}]
	},...........,
{
'name': 'Music Show',
'date': '2023:11:14',
    	'time': '20:00',
    	'mentor': 'Jazz Band',
    	'guests': [{'name': 'Olga Lomova', 'phone': '79055551101'}]}]

By key guests we will get a list of guests with phone numbers who registered for the event. We will place the data in the info_db.py file.

Next, we'll go to the handle_data.py file and write a simple function that returns a list of events that occur on the current date.

import datetime
from example_db.info_db import event_list

def get_event_today(event_list):
	current_date = datetime.date.today().strftime("%Y:%m:%d")
	result_list = []
	for event in event_list:
if event['date'] == current_date:
result_list.append(event)
	return result_list

In a real project, when you need events for the current date, you might have to write more complex database or API queries. For example, you might need to query the database to get a list of events with certain conditions, including the current date. However, for our simple example, the function get_event_today suitable to show the basic mechanism of event sampling.

Limiting the number of requests per unit of time and error control

In order to control the number of requests per unit of time and handle errors related to server overload (for example, error 500), we will create a decorator function rate_limit.

Decorator rate_limit has two parameters: limit and interval. The limit parameter defines the maximum number of requests that can be executed in the specified interval time unit. Inside the decorator, the wrapper function is defined. This is a wrapper for the original asynchronous function. Inside the wrapper, the speed of request execution and error handling are controlled.

import asyncio
import logging
import functools
import httpx
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

def rate_limit(limit: int, interval: int):
	def decorator(func):
    	last_call = datetime.min
    	retries = 0

    	@functools.wraps(func)
    	async def wrapper(self, *args, **kwargs):
        	nonlocal last_call, retries
        	elapsed = datetime.now() - last_call
        	if elapsed < timedelta(seconds=interval):
            	await asyncio.sleep((timedelta(seconds=interval) - elapsed).total_seconds())
        	last_call = datetime.now()

        	try:

            	return await func(self, *args, **kwargs)
        	except httpx.HTTPError as http_err:
            	if http_err.response.status_code == 500 and retries <= 5:
                		retries += 1
                		await asyncio.sleep(3)
                		logger.info(f"HTTP ошибка:{func.__name__} \n Дополнительная попытка запроса: {retries}")
                		return await wrapper(self, *args, **kwargs)
            	else:
logger.error(f":{http_err}. \n Код ошибки: {http_err.response.status_code}")
                		raise
        	except Exception as e:
            	logger.error(f"Программная ошибка:{e} \n В функции: {func.__name__}")
            	raise
    		return wrapper
return decorator

If an error of the type occurs when executing a query httpx.HTTPErrorwrapper checks the error code. If the error code is 500 and the number of retry attempts is less than or equal to 5, wrapper pauses for 3 seconds using await asyncio.sleep(3) and calls itself again to execute the request again:

return await wrapper(self, *args, **kwargs)

If the error code is not 500 or the number of retry attempts exceeds 5, the wrapper throws an exception that will be handled in the calling code.

Limiting the number of requests per unit of time helps reduce the load on the server and improve application performance. Handling the 500 error and re-executing the request improves the stability of the application, especially in cases where the server is temporarily unavailable or overloaded.

Using an asynchronous decorator allows you to efficiently manage asynchronous requests and ensures that your application remains responsive even when executing a large number of requests.

Client classes for working with MTS Exolve

In the code below of the mtt_client.py module there are two classes: MTSClient and SendingSMS. Both classes are designed to work with the API MTS Exolve and provide messaging functionality.

import logging
import typing

import httpx
import asyncio

from config import API_KEY, PHONE_SEND, BASE_URL
from helper.decorators import rate_limit

logger = logging.getLogger(__name__)

class MTSClient:
	"""Клиент умеющий делать запрос к API MTC."""

	def __init__(
        	self,
    	base_url: str,
    	token: str,
    	aclient: httpx.AsyncClient or None = None
	):
    		"""
    		:param base_url: API URL
    		:param token: Токен
    		:param aclient: Асинхронный клиент
    		"""
    		self.base_url = base_url
    		self.token = token


    		self.aclient = aclient or httpx.AsyncClient()
    		self.headers = {"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}"}

	@rate_limit(limit=1, interval=1)
	async def send_message(self, body) -> dict:
    		"""
    		Отправить sms c напоминанием о событии.

    		:param body: Тело запроса
    		:return: Тело ответа в формате json с message_id
    		"""
    		url = f"{self.base_url}"
    		response = await self.aclient.post(url=url, json=body, headers=self.headers, timeout=5)
    		response.raise_for_status()
    		decoded_response = response.json()
    		if response.status_code == 200:
        		logger.info(f"id сообщения: {decoded_response}")
        		logger.info(f"Напоминание о событии успешно отправлено пользователю с номером: {body['destination']}")
    		return decoded_response


class SendingSMS():
	def __init__(self, client: MTSClient):
    	self.client = client

	async def send_all_guests(self, event_today: list) -> bool:
    	"""
    	Рассылает напоминание всем гостям по списку.
	:param event_today: Cписок событий на сегодня
    	:return: Логическое значение в случае успеха
    	"""

    		for event in event_today:
        		guests_data = event["guests"]
        		for element in guests_data:
            		sms_data = {
                		"number": PHONE_SEND,
                		"destination": element["phone"],
                		"text": f"Ждем Вас сегодня в {event['time']} на событие {event['name']}"}

            		message_info = await self.client.send_message(body=sms_data)

    		return True

MTSClient class

The MTSClient class is a client for working with the MTS Exolve API. It contains methods and attributes for sending requests and processing responses from the server.

The following attributes are initialized in the MTSClient class constructor:

  • base_url: MTS Exolve API URL.

  • token: Token for authentication when sending requests.

  • aclient: asynchronous client httpx.AsyncClient or None (default).

The MTSClient class also contains the send_message method, which asynchronously sends an SMS message with a reminder about the event. The method accepts the body of the request and returns a dictionary with information about the message. We placed the previously mentioned decorator on this method to control errors and the frequency of requests. rate_limit.

The purpose of this class is to encapsulate the logic of sending requests and processing responses, which makes it easy to use in different parts of the application. In case of changes to the MTS Exolve API, it is enough to make changes only inside the MTSClient class, without affecting other parts of the code.

SendingSMS class

The SendingSMS class is a wrapper over the MTSClient client.

In the SendingSMS class constructor, the client attribute is initialized, this is an instance of the MTSClient class. It allows you to use the MTSClient functionality to send messages.

The SendingSMS class contains a send_all_guests method that asynchronously sends an event reminder to all guests in the list. The method takes a list of event_today events and returns True if all messages are successfully sent.

This class provides a convenient interface for sending SMS messages to event guests. It hides the details of interaction with API MTS Exolve and allows you to focus on the business logic of the application.

amain function

Let's create a function amain, which will be the asynchronous entry point to the application.

async def amain(event_today):
	async with httpx.AsyncClient() as aclient:
    		mtt_client = MTSClient(
base_url=BASE_URL,
        			token=API_KEY,
        			aclient=aclient)
    		sms_sender = SendingSMS(client=mtt_client)
    		tasks = [sms_sender.send_all_guests(event_today)]
    		result_work_send = await asyncio.gather(*tasks)

Inside the function, an instance of the MTSClient class and the SendingSMS class are created with the asynchronous client httpx.AsyncClient passed in. Then, asynchronous tasks are created to send messages to event guests.

The amain function can be called in the main application file using asyncio.run, which allows you to perform message sending tasks asynchronously.

In the mtt_client.py module, the presented classes and functions provide convenience, flexibility and extensibility when working with the MTS Exolve API for sending SMS messages.

Main application file

Let's create the main application file main.py

import sys
import logging
import asyncio
import apimodul.mtt_client as mtt
from example_db import handle_data, info_db

logger = logging.getLogger(__name__)

if __name__ == "__main__":
	while True:
    		try:
        		event_today = handle_data.get_event_today(info_db.event_list)
        		if event_today:
            		asyncio.run(mtt.amain(event_today))
        		break
    		except AssertionError as e:
        		logger.error(e, exc_info=True)
        		logger.error("Произошла ошибка при работе с данными.")
    		except KeyboardInterrupt:
        		logger.error(f" Произошло прерывание программы с клавиатуры.")
        		sys.exit(1)
    		except Exception as e:
        		logger.error(f" Ошибка выполнения программы: {e}")
        		sys.exit(1)
	sys.exit(0)

Inside the while True loop, the get_event_today function from the handle_data module is called, which gets a list of events scheduled for the current day. If the list contains events, the asynchronous amain function from the mtt_client.py module is called, which processes these events.

This structure of the main project file ensures control of program execution and handling of possible errors. The while True cycle allows repeating program execution until the condition for exiting the cycle is met.

The try-except block handles exceptions that may occur during code execution. Different types of exceptions are handled differently:

  • AssertionError: If an assertion occurs inside the get_event_today function, an AssertionError exception is thrown. In this case, the error and information about the exception that occurred are written to the log file.

  • KeyboardInterrupt: If the user interrupts the program execution from the keyboard (presses Ctrl+C), then the KeyboardInterrupt exception is generated. In this case, information about the program interruption is written to the log file.

  • Exception: If any other exception occurs that does not fall into the previous categories, then an Exception exception is generated. In this case, information about the program execution error is written to the log file.

After handling exceptions if execution is successful, the program terminates by calling sys.exit(0) or sys.exit(1) if errors occurred, where:

  • 0 – a signal to the operating system about the successful completion of the program, it can be taken into account for the subsequent launch of other applications.

  • 1- signal to the operating system about an error.

Scheduled launch

Our mailing program is ready. Now we need to think about automatic launch at a certain time. For this, we will use the Cron task scheduler. Open the Linux terminal and enter the command:

crontab -e

We will open a file with launch intervals. The time and date inside the file are set using a special syntax, let's look at setting up one Cron task:

 *		*   * 	*	   * 		 cd /home/projects/program
[минута][час][день][месяц][день недели][команда(ы)]

In our case, we'll run the script once a day at 11:15 a.m. Then the final entry in the cron file will look like this:

15 11 * * * cd PycharmProjects/aclient_exemp && venv/bin/python3.8 main.py

That is, first we set the time, go to the project folder and run the script through the virtual environment. Since the project is in the PycharmProjects folder, and I run the scheduler not from the super user, but from the regular one, there is no need to write the full path to the project folder, because the user is by default in the /home folder.

Now the script will send an SMS to guests every day at exactly 11:30 with a reminder about the event.

You can read more about the planner in this article. It is worth noting that among the advantages of the Cron scheduler are simplicity, reliability and support for work on most Linux and Unix systems. This means that you can use it on various platforms without any problems.

Conclusion

This project is an example of how to use httpx and the Cron scheduler to create an SMS notification system. The client can serve as a starting point for developing more complex applications for event reminders. With this simple example, we were able to show that Cron provides a simple and clear way to schedule tasks. We can easily set them to run on a specific schedule, specifying the time and date of their execution.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *