the tale of how a duckling found shelter in Persistent Volume

Hello dear friend. My name is Maxim and I am a data engineer in one of the excellent AI area teams at Domklik. A huge zoo of tools helps us transport bits correctly from place to place. And it seems that we have tamed another pet.

In this article I want to share a scenario that, in my opinion, perfectly describes the use case for the compact embedded database DuckDB. Perhaps the obvious things that will be here will seem ingenious to you, or you will simply pet the duckling and try it in action.

The colleague who implemented the functionality did not want to tell how the duckling solved some of the problems we encountered, but I agreed with him, and below we will describe how the duckling lives now.

Table of contents

Introduction

IN In the previous article, I mentioned that the main consumer of data in our processes is models. Data scientists train models in Airflow Dags and often process considerable volumes. They also like to do a lot of experiments. This is especially noticeable in recommender systems. We have many models, and each one consumes the required amount of server resources CPU, GPU and RAM, and, of course, it is not rubber. I would like to use it optimally, and also not interfere with other teams training their models. In one of the cases, we were able to do this with the help of that same pet.

Let's not figure out who he is DuckDBwhat does he have under the hoodWhich dialect what is used to work with it and is there a convenient one? API. The instrument has a very good documentationhuge community, I’ll leave useful links below. Stars on GitHub speak for themselves, and it’s only worth mentioning that Mark Rasveldt and Hannes Mühleisen announced that the duckling had become quite an adult and published the release of the stable version, which is currently already 1.1.0.

So, let's move on to what pains we felt and how we tried to solve them.

Problems that were solved

Solutions options

Before the duckling came to us, we transferred Clickstream data into RAM and stored it in DAGs. For example, data from ClickHouse is needed to work in several DAGs, and yes, you understood correctly: each DAG shoveled the same volume, unloading it into RAM every day. Over time, we realized that this approach requires a lot of time to load data during training. Moreover, requests for data volumes from models were growing and were not going to stop. DAGs became more and more difficult, they began to periodically fail in OOM, but the tools did not change, and we left to think about this problem.

What we watched and are watching:

S3 and Parquet: with separate storage of the necessary data in Parquet format in S3.

Advantages:

Flaws:

PV and DuckDB: working with data via DuckDB in Persistent Volume (PV).

Advantages:

Flaws:

Compute in DB: For example, connecting external sources to ClickHouse and preparing data in the DB.

Advantages:

Flaws:

Iceberg and Trino (and maybe DuckDB): calculation and storage of all data under Iceberg.

The current option is being considered as part of a future study as infrastructure deployment work is underway.

..in progress

PV and DuckDB

Now let's see what a duckling's house looks like and how it works when it comes out of it. We use Persistent Volume Claim – a k8s entity to manage cluster storage. In the DAG settings, we mount the existing Persistent Volume (for the corresponding namespace). We use the path of the mounted volume in our tasks to work with DuckDB files.

Let's look at the example of the logic for working with data from Clickhouse:

import datetime as dt
from pathlib import Path

import duckdb

import pandas as pd

from dag.config import DATE_FORMAT


class DuckDB:
    DB_FILE_NAME = Path("clickhouse_actions")
    ACTIONS_TABLE_NAME = "actions"

    def __init__(self, db_directory: Path):
        self.db_file_path = db_directory / self.DB_FILE_NAME

    def query(self, query: str) -> duckdb.DuckDBPyRelation:
        with duckdb.connect(str(self.db_file_path)) as con:
            return con.sql(query)

    def fetchone(self, query: str) -> tuple | None:
        with duckdb.connect(str(self.db_file_path)) as con:
            return con.sql(query).fetchone()

    def insert_data(self, data: pd.DataFrame) -> duckdb.DuckDBPyRelation:
        return self.query(
            f"""INSERT INTO {self.ACTIONS_TABLE_NAME} SELECT * FROM data"""
        )

    def remove_old_actions(self, depth_dt: dt.datetime):
        return self.query(
            f"""
            DELETE FROM {self.ACTIONS_TABLE_NAME}
            WHERE timestamp <= '{depth_dt.strftime(DATE_FORMAT)}'
            """
        )

    def fetch_latest_action_dt(self) -> dt.datetime:
        result = self.fetchone(
            f"""SELECT timestamp FROM {self.ACTIONS_TABLE_NAME}
            ORDER BY timestamp DESC LIMIT 1"""
        )
        if not result:
            raise ValueError("Empty table.")
        return result[0].astimezone()

    def remove_db_file(self) -> None:
        self.db_file_path.unlink(missing_ok=True)

After loading the data into both duckling houses, you can easily connect to S3 and PV to work with updated data from ClickHouse. But it is worth remembering that if there is one connection to the DuckDB file per write, then no one else will be able to connect to it. Therefore, if there is no need to write, then for a competitive connection it is better to select only the mode read_only = True.

Yes, you read correctly – there are two houses. If the duckling sees that something has happened to his Persistent Volume or he just needs to move to another, then he knows that all his data is additionally stored in S3 for analytics and test training of models. By the way, the main Persistent Volume house looks like this in DAGs:

from airflow import DAG
from kubernetes.client import models as k8s

NAMESPACE = "неймспейс"
NAME_VOLUME = "домик_утёнка"
VOLUME_MOUNT_PATH = "/app/mount"

VOLUMES = [
    k8s.V1Volume(
        name=NAME_VOLUME,
        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
            claim_name=NAME_VOLUME
        ),
    ),
]
VOLUME_MOUNTS = [
    k8s.V1VolumeMount(
        mount_path=VOLUME_MOUNT_PATH,
        name=NAME_VOLUME,
        sub_path=None,
        read_only=False,
    ),
]

dag = DAG(
    # ...
)

kof = KubeOpFactory.open(
    dag=dag,
    namespace=NAMESPACE,
    default_image="IMG-URL",
    common_params={
        "volume_mount_path": VOLUME_MOUNT_PATH,
    },
    # ...
)

extract_all_actions = kof.get_operator(
    task_id="extract_all_actions",
    volumes=VOLUMES,
    volume_mounts=VOLUME_MOUNTS,
)

Looks like we got cash on steroids! If we take the time that DAGs previously spent exporting data, roughly 30 minutes, and loading the DuckDB file from Persistent Volume with already prepared and filtered data – in just a couple of minutes, then with DuckDB you can use resources efficiently in terms of time and compete less for them with other teams.

Conclusion

Now we have come to the end of the tale. It was small compared to the volume that DuckDB processes. As a result, we received an unusual solution that allowed us to use our infrastructure resources without waste, speeding up the loading and retrieval of data. Reusability made it possible to conveniently conduct tests and collect data sets for models.

Of course, everything can’t be perfect; we haven’t solved a number of other problems related to the amount of RAM needed to train models in the moment. We continued to study this issue and, perhaps, we will tell you something more interesting in the future.

Thank you everyone for your time! I will be glad to discuss this topic in the comments. All the best!

By the way, DuckDB is called that because Hannes, the project manager, had a pet duck named Wilbur. The developers also believe that ducks are hardy and can live off anything, much like how they envisioned their base operating.

Useful links

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *