Alerting Apache Airflow, telegram notifications

The typing package (as tp) is used for type annotations. To send notifications to telegram, you will need to install the package python telegram bot .

If you are here, you probably already have a rough idea of ​​what Apache Airflow, tasks (jobs) and dags are, so I won't go into details and just get to the point.
To begin with, I advise you to repeat (and in the future, surpass, if necessary!) the current configuration.

In order to understand whether notifications work, let's write a simple dag with a broken task:

def failing_task():
    raise Exception("Пример ошибки")


with DAG(
    "telegram_notification_dag",
    default_args={
        "on_failure_callback": #что-то должно вызываться при сломанной таске
    },
    description="Отправка уведомлений через Telegram бот",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:

    failing_task = PythonOperator(
        task_id="failing_task", python_callable=failing_task, dag=dag
    )

    (failing_task)

For people who just want notifications, but no one wants to figure it out, I'll explain: DAG is just a structure that defines the order of tasks, their interactions and sequences. Task (job) is a task or step within the DAG that performs some action.

Our DAG was passed its identifier, a dictionary of arguments (all arguments can be viewed in the official docs), a launch interval, etc.

Next, we define the tasks we need, we only need one (which will be broken).

There are several operators to perform tasks, in our case we use PythonOperator which will call (some) python code.

I will not describe everything step by step and will simply copy and explain. The TelegramNotification class is responsible for sending a message to the user.

class TelegramNotification:
    """intervals - интервалы переотправки оповещения, 
    если не получается отправить оповещение"""

    def __init__(
        self,
        chat_id: str,
        token: str,
        message_template: str,
        responsible_users: tp.List[str] = [],
        intervals: tp.List[int] = [1, 60, 600],
    ):
        self._chat_id = chat_id
        self._messageTemplate = MessageTemplate(message_template, responsible_users)
        self._intervals = intervals
        self._token = token

    def send_telegram_notification(self, context: tp.Dict[str, tp.Any]) -> None:

        message = self._messageTemplate.create_message_template(context)

        for interval in self._intervals:
            try:
                bot = Bot(token=self._token)
                bot.send_message(chat_id=self._chat_id, text=message)
                break
            except Exception as e:
                logger.info(f"Error sending message to Telegram: {e}")
                time.sleep(interval)

Sending a message to the bot occurs in the try except construct cycle, of all that is related to Telegram, here is only the creation of the bot and then the line with the call to the send_message method of our created bot. Why all this is wrapped is unclear, as will be explained at the end of the article. Our message should be formed depending on what the person needs, because this contradicts the purpose of this class, and each class has its own area of ​​responsibility! Then the module itself consists of another class.

class MessageTemplate:
    def __init__(self, message_template: str, responsible_users: tp.List[str]):
        self._message_template = message_template
        self._responsible_users = responsible_users

    def create_message_template(self, context: tp.Dict[str, tp.Any]) -> str:
        args = self._parse_context(context)
        message = self._message_template.format(**args)
        return message

    def _parse_context(self, context: tp.Dict[str, tp.Any]) -> tp.Dict[str, tp.Any]:
        """Доступные аргументы для message template
        DAG_NAME: название DAG
        TASK_ID: название задачи
        DATE: дата и время выполнения задачи
        TASK_LOG_URL: ссылка на лог выполнения задачи
        PARAMS: параметры, переданные в задачу
        CONF: глобальные параметры, переданные в DAG при его запуске
        PREV_EXEC_DATE: дата и время выполнения предыдущей задачи
        USERS: список пользоватлей, ответственных за выполнение задачи
        """
        return {
            "DAG_NAME": context.get("dag").dag_id,
            "TASK_ID": context.get("task_instance").task_id,
            "DATE": self._create_formatted_date(context.get("execution_date")),
            "TASK_LOG_URL": context.get("ti").log_url,
            "PARAMS": context.get("params"),
            "CONF": context.get("conf"),
            "PREV_EXEC_DATE": self._create_formatted_date(
                context.get("prev_execution_date")
            ),
            "USERS": self._create_users_string(),
        }

    def _create_formatted_date(self, date: datetime) -> str:
        return date.strftime("%Y-%m-%d %H:%M:%S") if date else ""

    def _create_users_string(self) -> str:
        return ", ".join([f"@{user_name}" for user_name in self._responsible_users])

Of all that is here, I think it is worth mentioning the context object, which is passed to the callback function upon successful (you can add a function to the class yourself that sends something upon success) or unsuccessful task completion. This object is a dictionary from which you can extract almost all the information you need. The arguments in this example are not all. There are at least twice as many, if you need something additional, you can easily implement it here.

Now let's see how to work with this. Here is the entire code for our test DAG:

import time

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from loguru import logger

from dependencies.tg_notification import TelegramNotification, read_bot_secrets

PATH_TO_SECRETS = "путь_к_секретам"
SECRETS = read_bot_secrets(PATH_TO_SECRETS)

CHAT_ID_KEY = "chat_id"
TOKEN_KEY = "token"

NOTIFY_MESSAGE = """
Идентификатор DAG: {DAG_NAME}. 
Идентификатор задачи: ❌{TASK_ID}❌. 
Дата и время выполнения задачи: {DATE}. 
Ответственные лица: {USERS}
"""


telegram_notification = TelegramNotification(
    chat_id=SECRETS.get(CHAT_ID_KEY),
    token=SECRETS.get(TOKEN_KEY),
    message_template=NOTIFY_MESSAGE,
    responsible_users=[
        "your_username", 
    ],
    intervals=[1, 100, 1000],
)


def failing_task():
    raise Exception("Пример ошибки")


with DAG(
    "telegram_notification_dag",
    default_args={
        "on_failure_callback": telegram_notification.send_telegram_notification
    },
    description="Отправка уведомлений через Telegram бот",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:

    failing_task = PythonOperator(
        task_id="failing_task", python_callable=failing_task, dag=dag
    )

    (failing_task)

Of course, there will be explanations. First, let's read our secrets. The function for reading (read_bot_secrets) will be shown a little lower, it plays a rather important role for us, so that everything does not break.

We will also create a message template. The code with available arguments (MessageTemplate) is above and you can create whatever you want.

We create an object of the TelegramNotification class and pass there the token of our bot, our chat ID, the message template, as well as the responsible persons for the DAG you need (without @).

All the magic happens here:

"on_failure_callback": telegram_notification.send_telegram_notification

When a task fails, the send_telegram_notification function is called, where the Airflow context object is passed, which already contains all the information we need.

What happens if we fail to read the token or chat id data, or fail to read our secrets at all? An empty dictionary or a dictionary with the required (correct or not) values ​​will be returned. Since this code uses the get method to obtain values ​​(token and chat_id), then if any key is missing, then None will simply be returned to us. At the end of the send_telegram_notification function of the TelegramNotification class, the try except construct is also used, this ensures that if we have network problems, or we do not have any values, or they are incorrect, an error will be output to the logs and we will be able to work with it further.

In my case, I'm using a working Apache Airflow, from all the information available to me, I can say that the version is 2.5.3.

Results:

That's all I have for now. Ask your questions, write comments, leave your remarks. I hope my article helped you in some way.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *