Making a Pocket Data Analyst with OpenAI Assistants API and Code Interpreter in Telegram

This is a continuation of the article:
Making an AI Waiter with OpenAI Assistants API and Vector Store in Telegram

This time we'll use another tool from OpenAI's Assistants API called Code Interpreter.

What is Code Interpreter?

A language model generates text, it cannot perform any complex mathematical calculations or data analysis, it is simply not designed for this. However, the model can generate code and very well. What if you give the model a task for which it will generate program code, it will be executed in an isolated development environment, and the model will then use the result to generate a response? This is exactly the task performed by the Code Interpreter.

Task

In this article, we will look at the following example: I have a simple CSV file with all payments for a fake online store. The data format is as follows:

"id","user_id","amount","created_at"
"4675","2251032","1837.00","2024-05-22 07:10:02"
"4676","7472836","2849.00","2024-05-22 07:27:45"
"4677","6271037","2999.00","2024-05-22 07:33:12"
"4678","6815010","2877.00","2024-05-22 08:01:58"
"4679","2565937","5000.00","2024-05-22 08:13:17"
"4680","7074300","299.00","2024-05-22 08:16:49"
"4681","7028029","5770.00","2024-05-22 08:33:42"

I want to ask questions that require analysis of this data. For example: how much did you earn in a given month? Is there a difference in buyer behavior by day of the week? I want to receive the results in the form of text or graphs, where applicable.

Preparation

We repeat all the steps from the original article. The only difference will be in the code for the cloud function.

requirements.txt

In the editor, first create a new file and name it requirements.txt and put the following code there:

openai~=1.33.0
boto3~=1.34.122
pyTelegramBotAPI~=4.19.1

This is a list of Python dependencies that the cloud will automatically download during build so that we can use them in the function code itself.

config.py

Now let's create a file that will contain the code responsible for the project configuration. Let's call it config.py and write the following code:

import json
import os

import boto3
import openai

YANDEX_KEY_ID = os.environ.get("YANDEX_KEY_ID")
YANDEX_KEY_SECRET = os.environ.get("YANDEX_KEY_SECRET")
YANDEX_BUCKET = os.environ.get("YANDEX_BUCKET")
PROXY_API_KEY = os.environ.get("PROXY_API_KEY")
ASSISTANT_MODEL = os.environ.get("ASSISTANT_MODEL")
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN")
TG_BOT_ADMIN = os.environ.get("TG_BOT_ADMIN")


def get_s3_client():
    session = boto3.session.Session(
        aws_access_key_id=YANDEX_KEY_ID, aws_secret_access_key=YANDEX_KEY_SECRET
    )
    return session.client(
        service_name="s3", endpoint_url="https://storage.yandexcloud.net"
    )


def get_config() -> dict:
    s3client = get_s3_client()
    try:
        response = s3client.get_object(Bucket=YANDEX_BUCKET, Key="config.json")
        return json.loads(response["Body"].read())
    except:
        return {}


def save_config(new_config: dict):
    s3client = get_s3_client()
    s3client.put_object(
        Bucket=YANDEX_BUCKET, Key="config.json", Body=json.dumps(new_config)
    )


proxy_client = openai.Client(
    api_key=PROXY_API_KEY,
    base_url="https://api.proxyapi.ru/openai/v1",
)

Here we read all the necessary parameters from the environment variables and write functions for receiving and saving additional (dynamic) parameters in the Yandex Cloud storage bucket. We also initiate the client for ProxyAPI according to documentationthat is, we redefine the path to the API.

admin.py

Next we create a file admin.py and we put there all the code that is responsible for the various administrative tasks of the project.

from config import ASSISTANT_MODEL, get_config, proxy_client, save_config


def create_assistant(name, instructions):
    assistant_id = get_assistant_id()
    if not assistant_id:
        new_assistant = proxy_client.beta.assistants.create(
            model=ASSISTANT_MODEL,
            instructions=instructions,
            name=name,
            tools=[
                {
                    "type": "code_interpreter",
                }
            ],
        )
        config = get_config()
        config["assistant_id"] = new_assistant.id
        save_config(config)
    else:
        proxy_client.beta.assistants.update(
            assistant_id=assistant_id, instructions=instructions
        )


def get_assistant_id():
    config = get_config()
    return config["assistant_id"] if "assistant_id" in config else None


def upload_file(chat_id, filename, file):
    config = get_config()
    file_object = proxy_client.files.create(file=(filename, file), purpose="assistants")
    thread = proxy_client.beta.threads.create(
        messages=[
            {
                "role": "user",
                "content": "Для всех вопросов, которые я буду задавать, используй эти данные для анализа",
                "attachments": [
                    {"file_id": file_object.id, "tools": [{"type": "code_interpreter"}]}
                ],
            }
        ]
    )
    if not "threads" in config:
        config["threads"] = {}

    config["threads"][chat_id] = thread.id
    save_config(config)


def get_thread_id(chat_id: str):
    config = get_config()

    if "threads" in config and chat_id in config["threads"]:
        return config["threads"][chat_id]

    return None

create_assistant

Creates or updates settings for the assistant based on the name and instructions. We immediately bind the assistant to our vector storage. We save the assistant ID in the configuration.

get_assistant_id

Returns the assistant ID from the configuration.

upload_file

A method that uploads a file to the API's file (not vector!) storage and initiates a new thread for that chat. The idea here is this: for each new uploaded file, we start a new thread so that the user can query the model based on the data in that file. The old thread is “forgotten”.

get_thread_id

Returns the current thread ID if at least one file has already been downloaded.

chat.py

In the chat.py file (we will also create it) we will store methods for working with user messages.

import os

from admin import get_assistant_id, get_thread_id
from config import proxy_client


def process_message(chat_id: str, message: str) -> list[dict]:
    assistant_id = get_assistant_id()
    thread_id = get_thread_id(chat_id)
    if not thread_id:
        raise ValueError("Thread not found")

    proxy_client.beta.threads.messages.create(
        thread_id=thread_id, content=message, role="user"
    )

    run = proxy_client.beta.threads.runs.create_and_poll(
        thread_id=thread_id,
        assistant_id=assistant_id,
    )

    answer = []

    if run.status == "completed":
        messages = proxy_client.beta.threads.messages.list(
            thread_id=thread_id, run_id=run.id
        )
        for message in messages:
            if message.role == "assistant":
                for block in message.content:
                    if block.type == "text":
                        answer.insert(0, {"type": "text", "text": block.text.value})
                        if block.text.annotations:
                            for annotation in block.text.annotations:
                                if annotation.type == "file_path":
                                    answer.insert(
                                        0,
                                        {
                                            "type": "file",
                                            "file": download_file(
                                                annotation.file_path.file_id
                                            ),
                                            "filename": os.path.basename(
                                                annotation.text.split(":")[-1]
                                            ),
                                        },
                                    )
                    elif block.type == "image_file":
                        answer.insert(
                            0,
                            {
                                "type": "image",
                                "file": download_file(block.image_file.file_id),
                            },
                        )

    return answer


def download_file(file_id: str) -> str:
    file_content = proxy_client.files.content(file_id)
    content = file_content.read()
    with open(f"/tmp/{file_id}", "wb") as f:
        f.write(content)
    return f"/tmp/{file_id}"

process_message

This method accepts user messages and sends them via ProxyAPI for processing in the Assistants API. Assistants API works a little differently than the regular API. Here, after adding a message to a thread, we will not immediately receive a response from the model. We must additionally start processing the entire thread using the run method. Which is what we do, after which we receive a response that may consist of several messages.

Code Interpreter can generate other files and images (e.g. graphs) in addition to text. So we additionally recognize such cases using block.type and annotations, load the contents of such files and add them to the array of messages returned by the assistant.

download_file

Uploads the file generated by the assistant to a local temporary directory. It will be deleted automatically when the cloud function finishes working.

index.py

import json
import logging
import threading
import time

import telebot
from admin import create_assistant, get_assistant_id, get_thread_id, upload_file
from chat import process_message
from config import TG_BOT_ADMIN, TG_BOT_TOKEN
from telebot.types import InputFile

logger = telebot.logger
telebot.logger.setLevel(logging.INFO)

bot = telebot.TeleBot(TG_BOT_TOKEN, threaded=False)


is_typing = False


def start_typing(chat_id):
    global is_typing
    is_typing = True
    typing_thread = threading.Thread(target=typing, args=(chat_id,))
    typing_thread.start()


def typing(chat_id):
    global is_typing
    while is_typing:
        bot.send_chat_action(chat_id, "typing")
        time.sleep(4)


def stop_typing():
    global is_typing
    is_typing = False


def check_setup(message):
    if not get_assistant_id():
        if message.from_user.username != TG_BOT_ADMIN:
            bot.send_message(
                message.chat.id, "Бот еще не настроен. Свяжитесь с администратором."
            )
        else:
            bot.send_message(
                message.chat.id,
                "Бот еще не настроен. Используйте команду /create для создания ассистента.",
            )
        return False
    return True


def check_admin(message):
    if message.from_user.username != TG_BOT_ADMIN:
        bot.send_message(message.chat.id, "Доступ запрещен")
        return False
    return True


@bot.message_handler(commands=["help", "start"])
def send_welcome(message):
    if not check_setup(message):
        return

    bot.send_message(
        message.chat.id,
        (
            f"Привет! Я твой карманный дата-аналитик. Загрузи файл и задавай вопросы по нему. Я умею проводить анализ данных и строить графики."
        ),
    )


@bot.message_handler(commands=["create"])
def create_assistant_command(message):
    if not check_admin(message):
        return

    instructions = message.text.split("/create")[1].strip()
    if len(instructions) == 0:
        bot.send_message(
            message.chat.id,
            """
Введите подробные инструкции для работы ассистента после команды /create и пробела.

Например: 
/create Ты - дата-аналитик. Пользователь загружает файл для анализа, а ты, используя инструменты, отвечаешь на вопросы и, если нужно, строишь графики.

Если ассистент уже был ранее создан, инструкции будут обновлены.
            """,
        )
        return

    name = bot.get_me().full_name
    create_assistant(name, instructions)

    bot.send_message(
        message.chat.id,
        "Ассистент успешно создан. Теперь вы можете добавлять документы в базу знаний с помощью команды /upload.",
    )


@bot.message_handler(commands=["upload"])
def upload_file_command(message):
    if not check_setup(message):
        return

    return bot.send_message(message.chat.id, "Загрузите файл с данными для анализа.")


@bot.message_handler(content_types=["document"])
def upload_file_handler(message):
    if not check_setup(message):
        return

    file_info = bot.get_file(message.document.file_id)
    downloaded_file = bot.download_file(file_info.file_path)

    try:
        upload_file(message.chat.id, message.document.file_name, downloaded_file)
    except Exception as e:
        return bot.send_message(message.chat.id, f"Ошибка при загрузке файла: {e}")

    return bot.send_message(
        message.chat.id,
        "Файл успешно загружен и новая сессия анализа начата. Задавайте ваши вопросы.",
    )


@bot.message_handler(content_types=["text"])
def handle_message(message):
    if not check_setup(message):
        return

    if not get_thread_id(str(message.chat.id)):
        return bot.send_message(
            message.chat.id,
            "Для начала работы загрузите файл с данными для анализа с помощью команды /upload.",
        )

    start_typing(message.chat.id)

    try:
        answers = process_message(str(message.chat.id), message.text)
    except Exception as e:
        bot.send_message(message.chat.id, f"Ошибка при обработке сообщения: {e}")
        return

    stop_typing()

    for answer in answers:
        if answer["type"] == "text":
            bot.send_message(message.chat.id, answer["text"])
        elif answer["type"] == "image":
            bot.send_photo(message.chat.id, InputFile(answer["file"]))
        elif answer["type"] == "file":
            bot.send_document(
                message.chat.id,
                InputFile(answer["file"]),
                visible_file_name=answer["filename"],
            )


def handler(event, context):
    message = json.loads(event["body"])
    update = telebot.types.Update.de_json(message)

    if update.message is not None:
        try:
            bot.process_new_updates([update])
        except Exception as e:
            print(e)

    return {
        "statusCode": 200,
        "body": "ok",
    }

typing

A helper function that sends a “typing message…” status so that our users don't get bored while the model prepares a response.

`check_setup

A helper function that returns an error if the bot has not yet been configured by the administrator.

check_admin

Helper function that checks if the author of a message is a bot administrator.

send_welcome

We send a greeting message after the /start command.

create_assistant_command

The /create command is available only to the administrator and creates or updates instructions for the assistant.

upload_file_command

The /upload command simply informs the user that a file needs to be uploaded to begin a data analysis session.

upload_file_handler

When loading a file, we save it on the API side and inform the user that a new session has been opened.

handle_message

The actual handler of incoming messages from users. This is where communication with the assistant occurs.

handler

The entry point for the entire cloud function. All commands and messages from Telegram will come here.


For convenience, I have published the entire source code of the function on GitLab:

https://gitlab.com/evrovas/data-analyst-telegram-bot

In the future, if there are any updates, they will be in the repository.


Go back to the original article and complete the remaining steps, starting from the part “Set the entry point equal to index.handler” and up to the end, including filling in environment variables, creating an API gateway and installing Telegram WebHook.

Test

Let's start by setting up our assistant. To do this, I'll run the command /create on behalf of the administrator:

Great, the assistant is set up. Now I'll upload my CSV file with sales data.

Now, finally, I will try to find out the information I need.

Hooray! Everything works!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *