Creating a Chat-Ops bot in Mattermost in python

In our team, which deals with the electronic document management system in terms of operational processes, often during voice meetings and during discussions in the Mattermost messenger there was a need to throw drafts of tasks into the task tracker in order to fill them out later. This need overlapped with my desire to try to write something in python related to Chat-Ops.

In the process of writing such a bot, I came across a number of poorly described aspects, which I wanted to talk about in the article.

general description

Colleagues from another team suggested that you can listen to messages in Mattermost via websocket. The python language was chosen because it is used in some of our team’s solutions; the availability of a ready-made library for creating websocket services also played a role.

I made the bot quite simple, without saving the context of previous messages, interactive buttons, to which I would have to write a Web API – in fact, I would like to create a flexible prototype to which all this can be relatively painlessly attached in the future. The scheme turned out like this:

Receiving messages via websocket – prototype

To begin with, I learned how to read messages via websocket using the websocket-client library (https://pypi.org/project/websocket-client/).

From the example presented at the link above and the Mattermost documentation regarding websocket, I created something like this:

import logging
import logging.config
import websocket

class WebSocketMattermostApp:
    """
    Приложение, слушающее Mattermost через WebSocket
    """

    mm_ws_headers = dict()
    """
    Словарь заголовков для WebSocket-запросов к API Mattermost
    """

    connection: websocket.WebSocketApp
    """
    Соединение с WebSocket
    """

    def connect():
        """
        Подключается к WebSocket
        """

        # Заголовки для подключения к Mattermost через WebSocket
        WebSocketMattermostApp.mm_ws_headers["Authorization"] = "Bearer ****************"

        WebSocketMattermostApp.connection = websocket.WebSocketApp("wss://*************/api/v4/websocket",
                                    header=WebSocketMattermostApp.mm_ws_headers,
                                    on_open=WebSocketMattermostApp.ws_on_open,
                                    on_message=WebSocketMattermostApp.ws_on_message,
                                    on_error=WebSocketMattermostApp.ws_on_error,
                                    on_close=WebSocketMattermostApp.ws_on_close)
        WebSocketMattermostApp.connection.run_forever(reconnect=5)

    def disconnect():
        """
        Отключается от WebSocket
        """
        WebSocketMattermostApp.connection.close()

    def ws_on_message(ws, message):
        """
        Обрабатывает поступающие сообщения
        """
        print(message)
    def ws_on_error(ws, error):
        """
        Выполняет действия при ошибке
        """
        logging.error(f"Error: {error}")

    def ws_on_close(ws, close_status_code, close_msg):
        """
        Выполняет действия при закрытии соединения
        """
        logging.info(
            f"Connection closed {close_status_code} | {close_msg}")

    def ws_on_open(ws):
        """
        Выполняет действия при открытии соединения
        """
        logging.info("Connection opened")

# При запуске файла напрямую соединяется автоматически 
if __name__ == "__main__":
    WebSocketMattermostApp.connect()

The authorization header specifies the Mattermost bot account token, and the URI for setting up a websocket connection specifies the host of the Mattermost instance.

After running the file above, if all the settings are correct, the application will start and start writing to the console all messages from all channels that our bot account can read.

Entities received via websocket are different. We are interested in the posted type (i.e. message), an example of which is given below:

{
    "event": "posted",
    "data": {
        "channel_display_name": "my_mm_channel",
        "channel_name": "my_mm_channel",
        "channel_type": "P",
        "post":"{\"id\":\"1qaz2wsx3edc4rfv\",\"create_at\":123456789,\"update_at\":123456789,\"edit_at\":0,\"delete_at\":0,\"is_pinned\":false,\"user_id\":\"1qaz2wsx3edc\",\"channel_id\":\"1qaz2wsx3edc\",\"root_id\":\"\",\"original_id\":\"\",\"message\":\"Test message\",\"type\":\"slack_attachment\",\"props\":{\"attachments\":[{\"id\":0,\"fallback\":\"\",\"color\":\"#CF0A2C\",\"pretext\":\"\",\"author_name\":\"\",\"author_link\":\"\",\"author_icon\":\"\",\"title\":\"Message attachment title\",\"title_link\":\"\",\"text\":\"\",\"fields\":null,\"image_url\":\"\",\"thumb_url\":\"\",\"footer\":\"message footer\",\"footer_icon\":\"\",\"ts\":null}],\"from_bot\":\"true\",\"from_webhook\":\"true\",\"override_icon_emoji\":\":love_letter:\",\"override_icon_url\":\"/static/emoji/1f48c.png\",\"override_username\":\"my_mm_bot\",\"webhook_display_name\":\"my_mm_bot\"},\"hashtags\":\"\",\"pending_post_id\":\"\",\"reply_count\":0,\"last_reply_at\":0,\"participants\":null,\"metadata\":{\"embeds\":[{\"type\":\"message_attachment\"}]}}",
        "sender_name": "my_mm_bot",
        "set_online": false,
        "team_id": "1qaz2wsx3edc4rfv"
    },
    "broadcast": {
        "omit_users": null,
        "user_id": "",
        "channel_id": "1qaz2wsx3edc4rfv",
        "team_id": "",
        "connection_id": "",
        "omit_connection_id": ""
    },
    "seq": 6
}

As we can see, in the data.post field of the incoming JSON there is a second, nested JSON; it is in it that the information we are interested in is located: the text of the message, attachments to it, ID of the author of the message, etc.:

{
    "id": "1qaz2wsx3edc4rfv",
    "create_at": 123456789,
    "update_at": 123456789,
    "edit_at": 0,
    "delete_at": 0,
    "is_pinned": false,
    "user_id": "1qaz2wsx3edc",
    "channel_id": "1qaz2wsx3edc",
    "root_id": "",
    "original_id": "",
    "message": "Test message",
    "type": "slack_attachment",
    "props": {
        "attachments": [
            {
                "id": 0,
                "fallback": "",
                "color": "#CF0A2C",
                "pretext": "",
                "author_name": "",
                "author_link": "",
                "author_icon": "",
                "title": "Message attachment title",
                "title_link": "",
                "text": "",
                "fields": null,
                "image_url": "",
                "thumb_url": "",
                "footer": "message footer",
                "footer_icon": "",
                "ts": null
            }
        ],
        "from_bot": "true",
        "from_webhook": "true",
        "override_icon_emoji": ":love_letter:",
        "override_icon_url": "/static/emoji/1f48c.png",
        "override_username": "my_mm_bot",
        "webhook_display_name": "my_mm_bot"
    },
    "hashtags": "",
    "pending_post_id": "",
    "reply_count": 0,
    "last_reply_at": 0,
    "participants": null,
    "metadata": {
        "embeds": [
            {
                "type": "message_attachment"
            }
        ]
    }
}

Functionality diversity

Let’s separate the functionality of working with Kaiten, Mattermost (for sending a response) and processing a message into different files.

The logic for processing a message in Kaiten is quite simple – we parse the message text using a regular expression, extract the text in quotes – this will be the wording for the title of the card created in Kaiten. We indicate the board, track and type of task being created. We return the numeric ID of the created card to Kaiten. The headers and client class variables will be populated by the caller. It turns out something like this:

import http.client
import json
import re

class KaitenHelper:

    """
    Помощник для обращения к API Kaiten
    """

    headers = dict()
    """
    Словарь заголовков для запросов к API Kaiten
    """

    client: http.client.HTTPSConnection = None
    """
    Клиент к API Kaiten
    """

    def create_kaiten_card(message, creator_id):
        """
        Создает карточку в Kaiten
        """

        # Параметры по умолчанию
        board_id = 1  # ИД Доски
        lane_id = 2  # ИД Дорожки на Доске
        type_id = 26  # ИД типа карточки
        properties = {}

        # Текст задачи
        double_quotes_title_regex = re.compile(
            r'"(.+)"',
            flags=re.I | re.M)
        single_quotes_title_regex = re.compile(
            r'\'(.+)\'',
            flags=re.I | re.M)
        task_title_regex = re.compile(
            r'(задач|таск|kaiten|кайтен).*? (.+)',
            flags=re.I | re.M)
        title_search = double_quotes_title_regex.search(message)
        if title_search == None:
            title_search = single_quotes_title_regex.search(message)
        if title_search == None:
            title_search = task_title_regex.search(message)
        if title_search == None or len(title_search.groups()) == 0:
            return None
        title = title_search.groups()[-1]

        # Меняем часть значений для техдолговых задач
        tech_debt_regex = re.compile(
            r'(в тех.*долг|тех.*долг.+(задач|таск))',
            flags=re.I | re.M)
        if tech_debt_regex.search(message):
            # Параметры для задач Технического долга
            board_id = 4  # Доска: Технический долг
            lane_id = 2  # Дорожка: Важные
            type_id = 7  # Тип для технического долга

        body = json.dumps({
            "title": title,
            "board_id": board_id,
            "lane_id": lane_id,
            "owner_id": creator_id,
            "type_id": type_id,
            "properties": {}
        })

        KaitenHelper.client.request("POST", "/api/latest/cards",
                              body, KaitenHelper.headers)
        response = KaitenHelper.client.getresponse()
        response_obj = json.loads(response.read().decode())
        KaitenHelper.client.close()
        return response_obj["id"]

Similarly, we create a second utility to respond to our user via the Mattermost API:

import http.client
import json

class MmHelper:
    """
    Помощник для обращения к API Mattermost
    """

    headers = dict()
    """
    Словарь заголовков для запросов к API Mattermost
    """

    client: http.client.HTTPSConnection = None
    """
    Клиент к API Mattermost
    """

    def post_to_mm(message, channel_id, root_id="", props={}):
        """
        Отправляет в Mattermost сообщение
        """

        body = json.dumps({
            "channel_id": channel_id,
            "root_id": root_id,
            "message": message,
            "props": props
        })
        MmHelper.client.request("POST", "/api/v4/posts", body, MmHelper.headers)
        MmHelper.client.close()

The message handler will tie these entities together:

import re

from utils.kaiten_helper import KaitenHelper
from utils.mm_helper import MmHelper

bot_mm_tag = "@bot"
"""
Тэг бота в Mattermost
"""

create_kaiten_card_regex = re.compile(
    r'(созда|завед|нов[ау]|полож).+(задач|таск|kaiten|кайтен)', flags=re.I | re.M)
"""
Регулярное выражение для идентификации сценария создания карточки в Kaiten
"""

class IncomingPostHandler:
    """
    Обработчик входящих сообщений
    """

    users_of_bot = {}
    """
    Пользователи бота Mattermost
    """

    def handle(post_obj):
        """
        Обрабатывает входящее сообщение
        """
        reply = ""
        if create_kaiten_card_regex.search(post_obj['message']):
            reply = IncomingPostHandler.handle_create_kaiten_card(post_obj)
        else:
            reply = IncomingPostHandler.handle_help()

        mm_root_id = post_obj["id"] if post_obj["root_id"] == "" else post_obj["root_id"]
        MmHelper.post_to_mm(reply, post_obj["channel_id"], mm_root_id)

    def handle_create_kaiten_card(post_obj):
        """
        Обрабатывает сценарий создания карточки в Kaiten
        """
        kaiten_creator_id: int = next(
            (user["kaiten_id"]
             for user in IncomingPostHandler.users_of_bot if user["id"] == post_obj["user_id"]),
            None)
        created_id = KaitenHelper.create_kaiten_card(
            post_obj['message'], kaiten_creator_id)
        if created_id == None:
            return ":( Не смог определить текст задачи"
        else:
            return f":white_check_mark: Создал задачу {created_id}\n:earth_africa: [Открыть в Kaiten](https://kaiten.mycompany.com/space/36/card/{created_id})"

    def handle_help():
        """
        Обрабатывает сценарий приветствия / просьбы о помощи
        """
        return "Привет! Не смог распознать вашу команду"

The handler (again, using regular expressions) determines that with this message the user wants to create a task card in Kaiten:

create_kaiten_card_regex = re.compile(
    r'(созда|завед|нов[ау]|полож).+(задач|таск|kaiten|кайтен)', flags=re.I | re.M)

If the card is successfully created, we save the root post for which we create a new message in the thread in the mm_root_id variable. If a request from a user has already come from a thread, specify the root post of this thread as the root post:

        mm_root_id = post_obj["id"] if post_obj["root_id"] == "" else post_obj["root_id"]
        MmHelper.post_to_mm(reply, post_obj["channel_id"], mm_root_id)

It remains to configure these entities correctly, so let’s modify the WebSocketMattermostApp class by adding the configure() method to it:

    def configure():
        """
        Конфигурирует приложение из файла config.json
        """

        # Настраиваем путь к конфигам аналогично данному файлу
        os.chdir(os.path.dirname(os.path.abspath(__file__)))

        # Добавляем логирование на основе файла
        with open('logging_config.json') as file:
            logging.config.dictConfig(json.load(file))

        with open('config.json') as file:
            global config
            config = json.load(file)
        logging.info("Configuration files loaded")

        # Клиент к API Mattermost
        MmHelper.client = http.client.HTTPSConnection(config["mattermost"]["host"])
        mm_auth = f"Bearer {config['mattermost']['token']}"
        MmHelper.headers["Authorization"] = mm_auth
        MmHelper.headers["Content-Type"] = "application/json"

        # Заголовки для подключения к Mattermost через WebSocket
        WebSocketMattermostApp.mm_ws_headers["Authorization"] = mm_auth

        # Адрес для подключения к Mattermost через WebSocket
        global mm_ws_url
        mm_ws_url = f"wss://{config['mattermost']['host']}/api/v4/websocket"

        # Клиент к API Kaiten
        KaitenHelper.client = http.client.HTTPSConnection(config["kaiten"]["host"])

        KaitenHelper.headers["Authorization"] = f"Bearer {config['kaiten']['token']}"
        KaitenHelper.headers["Content-Type"] = "application/json"

        IncomingPostHandler.users_of_bot = config['mattermost_allowed_users']

        logging.info("Configuration completed")

The configuration file looks like this:

{
    "mattermost_allowed_users": [
        {
            "id": "1f84d516a1494c1b9057f89fb2eab2d0",
            "name": "ivanovii",
            "kaiten_id": 123
        },
        {
            "id": "287422ffa5984bfd8fc720c0e1960fef",
            "name": "petrovapp",
            "kaiten_id": 456
        },
    ],
    "mattermost": {
        "host": "mattermost.mycompany.com",
        "token": "4256f55502e64dd59471e424653b1d4d"
    },
    "kaiten": {
        "host": "kaiten.mycompany.com",
        "token": "e90c03b3-cddb-4053-a7ba-201b0e581e08"
    }
}

I’ll say right away that I haven’t bothered with the models of the objects that I get from the configuration yet, since the solution given is just a prototype.

The mattermost_allowed_users field in the config above contains an array of objects representing the Mattermost user, for which the user ID from Kaiten is attached.

Actually, here is the piece where we configure both MmHelper and the Websocket connection:

        # Клиент к API Mattermost
        MmHelper.client = http.client.HTTPSConnection(config["mattermost"]["host"])
        mm_auth = f"Bearer {config['mattermost']['token']}"
        MmHelper.headers["Authorization"] = mm_auth
        MmHelper.headers["Content-Type"] = "application/json"

        # Заголовки для подключения к Mattermost через WebSocket
        WebSocketMattermostApp.mm_ws_headers["Authorization"] = mm_auth

Then the connect() method will use what came from the config:

    def connect():
        """
        Подключается к WebSocket
        """

        WebSocketMattermostApp.configure()
        WebSocketMattermostApp.connection = websocket.WebSocketApp(mm_ws_url,
                                    header=WebSocketMattermostApp.mm_ws_headers,
                                    on_open=WebSocketMattermostApp.ws_on_open,
                                    on_message=WebSocketMattermostApp.ws_on_message,
                                    on_error=WebSocketMattermostApp.ws_on_error,
                                    on_close=WebSocketMattermostApp.ws_on_close)
        WebSocketMattermostApp.connection.run_forever(reconnect=5)

KaitenHelper is configured similarly:

        # Клиент к API Kaiten
        KaitenHelper.client = http.client.HTTPSConnection(config["kaiten"]["host"])

        KaitenHelper.headers["Authorization"] = f"Bearer {config['kaiten']['token']}"
        KaitenHelper.headers["Content-Type"] = "application/json"

And finally, we pass the list of users to the IncomingPostHandler, this is necessary to match the user who sent the message with the creator of the card in Kaiten:

        IncomingPostHandler.users_of_bot = config['mattermost_allowed_users']

Let’s return to the changes in WebSocketMattermostApp. Let’s change the message processing method:

    def ws_on_message(ws, message):
        """
        Обрабатывает поступающие сообщения
        """
        msg_obj = json.loads(message)
        
        # Отбираем входящие сообщения с упоминанием my_tag
        if msg_obj["event"] == "posted" and bot_mm_tag in msg_obj["data"]["post"]:
            post_obj = json.loads(msg_obj["data"]["post"])
            
            # Ищем пользователя в списке разрешенных
            found_user = next(
                (mm_user for mm_user in config["mattermost_allowed_users"] if mm_user["id"] == post_obj["user_id"]),
                None)
            if found_user != None:
                IncomingPostHandler.handle(post_obj)

Let’s analyze this piece in more detail. There are three checks here:

  1. We filter only the event type posted (message)

        if msg_obj["event"] == "posted" 
  1. We check that our bot was clearly tagged in the message

       and bot_mm_tag in msg_obj["data"]["post"]:
  1. We check that the user is in the allowed list (mattermost_allowed_users in the config).

            if found_user != None:

If all checks have passed, we send the IncomingPostHandler message:

                IncomingPostHandler.handle(post_obj)

Let’s pay attention to the last block of code in the WebSocketMattermostApp class:

# При запуске файла напрямую соединяется автоматически 
if __name__ == "__main__":
    WebSocketMattermostApp.connect()

If we run this py file in the console, it will automatically configure other classes and initiate the connection. This method is convenient for debugging.

Deployment

To deploy the bot, one of the corporate Kubernetes clusters was selected; therefore, in order for the application to successfully report its state, it had to be wrapped in Flask. Accordingly, I implemented basic tests, moved the logging configuration to this file, and launched WebSocketMattermopstApp in a separate thread:

import json
import logging
import os
from threading import Thread
from flask import Flask
from ws_mm_app import WebSocketMattermostApp

app = Flask('mattermost-chat-ops-bot')

@app.route('/liveness')
@app.route('/readyness')
def getProbe():
    """
    Возвращает результат пробы для k8s
    """
    return "Healthy"

with app.app_context():
    # Настраиваем рабочую папку
    os.chdir(os.path.dirname(os.path.abspath(__file__)))

    # Добавляем логирование на основе файла
    with open('logging_config.json') as file:
        logging.config.dictConfig(json.load(file))
    logging.info("Logging configuration completed")

    # Стартуем WebSocket-приложение в отдельном потоке
    ws_thread = Thread(target=WebSocketMattermostApp.connect)
    ws_thread.start()

# Запускаем Flask-приложение при прямом вызове файла
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=5000)

To ensure that config files are always picked up correctly, use the command

        os.chdir(os.path.dirname(os.path.abspath(__file__)))

The working directory changes to the one where the file itself is located.

The logging configuration is placed in a separate json file, which has the following structure:

{
    "version": 1,
    "formatters": {
        "default": {
            "format": "%(asctime)s|%(levelname)s|%(module)s|%(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "stream": "ext://sys.stdout",
            "formatter": "default"
        }
    },
    "root": {
        "level": "INFO",
        "handlers": [
            "console"
        ]
    }
}

In the code, we simply read this file, deserialize it, and pass the object to the standard logging configuration method:

        with open('logging_config.json') as file:
            logging.config.dictConfig(json.load(file))

results

Interaction with the bot looks something like this:

The bot allows you to quickly add tasks to the backlog. At the same time, as new ideas and needs arise, new functionality can be easily added to the bot.

In the end, I managed to dilute the usual stack a little and learn something new.

In the future, we need to add Kaiten variables, bot tag and other values ​​from the code to the config.

Also, there are plans to modify the code so that if Mattermost is disconnected, the bot can try to connect again after some time. I deliberately did not reflect the state of the connection to the websocket in the tests, because if Mattermost does not work, the kubernetes cluster will clearly not be able to solve this problem, and users will still not be able to write to the bot. Overall, there is still room for improvement in terms of sustainability.

Links

https://github.com/AlfaInsurance/mattermost_chatops – Repository with code from the article

https://developers.kaiten.ru/ – Kaiten API

https://api.mattermost.com/ – API Mattermost (including about WebSocket)

https://pypi.org/project/websocket-client/ – Description of the websocket-client library

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *