How to build an MVP system for convenient analyst work without Docker, Kubernetes and Airflow

Hi all!

My name is Maxim Sheptyakov, and I have been doing product analytics for over four years. It so happened that several times I came to companies or projects where (almost) there was no analytics at all before me, so I had to set up convenient work with data from scratch. And today I will tell you how to quickly build an MVP of an analytical system without Docker, Kubernetes and Airflow, because often analysts do not have knowledge of these systems.

Basic knowledge of Python and SQL is required to understand the article.

What is in the company where you came from the very beginning

Usually, the company where you came to already has some kind of working product, the necessary data about which is stored in a combat (that is, production) base, on which replication to a replica base is configured. The first analyst is given the right to go to the replica and receive data from there. But there are several nuances:

  • You can’t throw too heavy requests, because there is constant replication

  • You cannot modify the data schema, because this is a replica of the production database

  • Often it is impossible to connect third-party services for data visualization

  • Some data needs to be obtained not from a replica, but from some other sources (other databases, APIs, etc.)

An approach when data is prepared separately for each task is possible, but it starts to make you sick already in the first week of such work, so there is a desire to raise a convenient analytical system for yourself.

What does a convenient analytical system look like?

In my experience, for convenient operation, an analytical system must meet the following criteria:

  • All the data you need to work is in one place and automatically updated

  • The analyst has access to all the data necessary for work, and can run heavy queries on them

  • Analyst can create intermediate tables, data marts, views, etc.

  • An analyst can quickly add a new data source or a new data mart within the system

  • The analyst can connect the visualization system to the analytical system and give access to the team

Diagram of the analytical system
Diagram of the analytical system

It seems that building such a system from scratch, and even when you are not a developer, but an analyst, is quite difficult. But I will show that it does not take much effort to create an MVP (Minimal Viable Product) and that you can create a working prototype for convenient work in just a few hours.

In this guide, we will focus on the part with loading and updating data in the database and on the analyst’s access to all data.

Where will the data be stored

To store data, you can choose a free relational database, such as Postgres. I hope that in the company where you came, there are guys responsible for the infrastructure who can raise the database for you and give you all access to it.

But if this is not the case, there are several ways to raise your analytical base:

I will not dwell on how to raise databases, but rather I will tell you about how you can organize data inside an analytical database.

Data schema for analytical database
Data schema for analytical database

I divided the analytical database into several schemas, each of which has its own role:

  • etl – schema for loading raw data from various sources

  • arch – schema for storing old raw data that may be needed in the future

  • public – the scheme for the work of the analyst, that is, the scheme for conducting research, folding temporary data, designing new data marts

  • pre_dash – schema for preprocessed data that will be used to prepare showcases for visualization and for analytics, but will not be available for access outside the database

  • dash – schema for data prepared for visualization (that is, for small data marts that can be quickly retrieved from the database)

For convenient work with such a system, we need 3 users:

  • loader – system user for loading data from external sources into an analytical database and processing data inside the database

  • analyst – a user who has access to all data and the ability to work in the public schema

  • viewer – system user for visualization, who only has access to the DASH schema with data ready to be drawn

Code for creating schemas, users and issuing access in Postgres:

-- Создаём схемы данных
create schema etl;
create schema pre_dash;
create schema dash;
create schema arch;

-- Создаём пользователей
create role loader login password '*loader_password*';
create role analyst login password '*analyst_password*';
create role viewer login password '*viewer_password*';

-- Выдаём доступы пользователям к нужным схемам и таблицам в них 
-- После создания таблиц нужно будет заного выдать доступы к таблицам пользователям
grant usage, create on schema etl, arch, pre_dash, dash to loader;

grant usage on schema etl, arch, pre_dash, dash to analyst;
grant usage, create on schema public to analyst;
grant select on all tables in schema etl, arch, pre_dash, dash, public to analyst;


grant usage on schema dash to viewer;
grant select on all tables in schema dash to viewer;

The analytical database is ready to go, you can start filling it with data!

ETL for analytical database

Now let’s move on to the tricky part, setting up ETL to automatically load and prepare data.

Let’s first define the tasks that an ETL system should perform:

  • Loading data from different data sources into an analytical database

  • Quickly add new data sources to download

  • Data processing inside an analytical database

  • Fast addition of new data processing inside the database

  • Accounting for data dependencies during processing

  • Scheduled work (we will consider work once a day)

First, let’s create a class that will be responsible for the operation of the ETL system. Immediately take into account the fact that it will work once a day.

from datetime import date


class TransferData:
    """
    Класс, ответственный за сбор данных из внешних источников и сохранение их в аналитическую БД
    """
    def __init__(self, work_date=date.today()):
        self.work_date = work_date

All downloads and data processing will be presented as JSON files (jobs) through which the class will be traversed TransferData. This class should contain information about

  • Type of work being done (downloading or data processing)

  • The name of the data source to download

  • Required parameters for loading/processing data

Let’s create the first job that will collect data from the public API https://api.publicapis.org/entries and transfer them to our analytical database. To describe this process, we can create a dictionary in Python:

load_job = {
    # Тип джобы
    'job_type': 'load', 
    # Название источника данных
    'db_from':'open_api', 
    # DDL таблицы, в которую будут заливаться данные
    'ddl': '''create table if not exists etl.open_apis
(
    api text,
    description text,
    auth text,
    https text,
    cors text,
    link text,
    category text,
    load_date date
);''',
    # Название таблицы, в которую будут заливаться данные.
    'product': 'etl.open_apis',
    # Название промежуточного файла, через который будут загружаться данные
    'file_name': 'open_apis.txt',
}

And now let’s write a code that, using our JSON, will create a table in the analytical database and load data from another database into it.

# Добавим импорты в начало файла
import psycopg2
import os

# .....

# Добавим 2 функции в класс TransferData
class TransferData:

# .....
  
    def load_data(self, load_job):
        """
        Функция для загрузки данных из внешних источников на диск.
        Должна по итогу создавать csv-файл на диске с форматом sep='|', quotechar="^".

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        if load_job.get('db_from') == 'open_api':
            # Специфичная для источника логика сбора данных
            import requests
            import pandas as pd
            data = requests.get('https://api.publicapis.org/entries').json()
            data = pd.DataFrame(data['entries'])
            data['work_date'] = self.work_date
            data = data.loc[:, ['API', 'Description', 'Auth', 'HTTPS', 'Cors', 
                                'Link', 'Category', 'work_date']]
            # Сохраняем в специальный формат для уменьшения количества ошибок при загрузке
            data.to_csv(load_job.get('file_name'), index=False, 
                        sep='|', quotechar="^")

    def upload_data(self, load_job):
        """
        Функция для загрузки данных с диска в таблицы в аналитической БД.
        Для корректной работы необходим файл в формате csv с параметрами DELIMITER '|', QUOTE '^'
        ANALYTICS_DB_CONN_STRING - переменная окружения, в которой содержится connect-строка для
        нашей аналитической БД для юзера loader.
        Например, postgres://loader:*loader_password*@localhost:5432/postgres

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        #
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            file_name = load_job.get('file_name')
            with open(file_name, 'r') as f:
                product = load_job.get('product')
                curs.execute(load_job.get('ddl'))
                curs.execute(f'truncate {product};')
                copy_query = f"""COPY {product} from STDIN with (FORMAT csv, DELIMITER '|', QUOTE '^', HEADER True)"""
                curs.copy_expert(copy_query, f)
                conn.commit()
            os.remove(file_name)

Now, by running 2 functions, you can unload data from one source and load them into an analytical database:

# Убедитесь, что находитесь в одной директории с файлом simple_etl.py
# и что установлены все зависимости из requirements.txt: pip install -r requirements.txt
import simple_etl as se

tr = se.TransferData()
tr.load_data(load_query)
tr.upload_data(load_query)

Now we need to create data processing inside the database. Let’s create a showcase that will contain data on the number of open APIs in https://api.publicapis.org/entries on every day:

process_job = {
    'job_type': 'process',
    # SQL-запрос, чтобы получить требуемые данные. load_date подставится в коде:
    'query': '''create table if not exists dash.openapi_daily_count
(
    work_date date,
    public_api_count int
);

delete from dash.openapi_daily_count
where work_date="{work_date}";

insert into dash.openapi_daily_count(work_date, public_api_count) 
select work_date, count(distinct link) public_api_count 
from etl.open_apis 
where work_date="{work_date}"
group by 1;''',
    # Название таблицы, в которую будут заливаться данные. Нужно для правильной работы зависимостей:
    'product': 'dash.openapi_daily_count',
    # Список таблиц, от которых зависит обработка данных
    'dependencies': ['etl.open_public_apis'],
}

And add the code that will process the data inside the database:

class TransferData:

# .....

    def process_data(self, process_job):
        """
        Функция для обработки данных внутри БД.

        :param process_job: Описание джобы по обработке данных.
        """
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            curs.execute(process_job.get('query').format(work_date=self.work_date))
            conn.commit()

Now we can process the collected data with one command:

import simple_etl as se
tr = se.TransferData()
tr.process_data(process_job)

Let’s meet the requirement to quickly add new data sources and new data processing. To do this, you can create a folder in which all the jobs that we need will be saved as JSON files.

import os
import json 

os.mkdir('etl_jobs')

with open('etl_jobs/dash_revenue_daily.json', 'w') as f:
    json.dump(process_job, f, indent=4)
    
with open('etl_jobs/etl_payments_daily.json', 'w') as f:
    json.dump(load_job, f, indent=4)

Now we can quickly add JSON files to the folder with descriptions of jobs that need to be completed during the day.

Run all jobs with dependencies

Now let’s create a function that will help us run all jobs at once, taking into account dependencies.

# ...
import json

class TransferData:
    # .....
    # Обновим __init__ метод: 
    def __init__(self, work_date=date.today(), jobs_dir="etl_jobs"):
        self.work_date = work_date
        self.jobs_dir = jobs_dir
        self.loaded_dependencies = set()

    # .....

    def launch_job(self, job):
        """
        Функция, которая запускает джобы в зависимости от их типа

        :param job: Джоба по сбору или обработке данных
        """
        job_type = job.get('job_type')
        if job_type == 'load':
            self.load_data(job)
            self.upload_data(job)
        elif job_type == 'process':
            self.process_data(job)
        else:
            print('Unknown job type for job', job)

    def launch_all_jobs(self):
        """
        Запускаем все джобы с учётом наличия зависимостей в них
        """
        # Загружаем все джобы в память
        job_paths = os.listdir(self.jobs_dir)
        jobs = {}
        for job_path in job_paths:
            with open(os.path.join(self.jobs_dir, job_path), 'r') as f:
                jobs[job_path] = json.load(f)
        # Итерируемся по джобам, проверяя, загружены ли зависимости для них.
        # Осторожно, если зависимости не найдутся, цикл будет вечным
        while True:
            jobs_local = jobs.copy() # Для удаления элементов словаря во время итерации по нему
            for job_name, job in jobs_local.items():
                job_dependencies = job.get('dependencies')
                if job_dependencies is None or len(set(job_dependencies) - self.loaded_dependencies) == 0:
                    print(f'Started {job_name}')
                    self.launch_job(job)
                    product = job.get('product')
                    if product is not None:
                        self.loaded_dependencies.add(product)
                    jobs.pop(job_name)
                    print(f'Finished {job_name}')
            if len(jobs) == 0:
                break
        print('All jobs processed')

Now we can run all the jobs in the folder etl_jobs with one command:

import simple_etl as se
tr = se.TransferData()
tr.launch_all_jobs()

Regular start of data collection and processing

To regularly start collecting and processing data, add the logic for starting data processing to the file with the TransferData class:

if __name__ == '__main__':
    """
    Запускаем обновление данных раз в день в 10.00
    """
    import schedule
    import time

    def scheduled_update():
        tr = TransferData(jobs_dir="path_to_jobs_dir/")
        tr.launch_all_jobs()

    schedule.every().day.at("10:00").do(scheduled_update)

    while True:
        schedule.run_pending()
        time.sleep(60)

Now we can run the file on our computer (or on the server, if your infrastructure workers can give it to you) in the background and enjoy the updated data in the database every day:

How to add a new data collection from a new source

You can add data collection from a new source if you update the function load_data and add a new condition to if with new db_from. Here you can add collection logic from any sources: databases, APIs, documents, online tables…

This is how you can add a new data collection to your code:

    def load_data(self, load_job):
        # .....
        if load_job.get('db_from') == 'open_api':

           # .....

        elif load_job.get('db_from') == 'other_db_type':
            # DO SOME HERE AND SAVE FILE TO load_job.get('file_name')
            return

Link to the repository with the complete simple_etl code.

That’s all

Congratulations, you have created an MVP of an analytical system! Now you have:

  • Automatic loading and processing of data from different sources

  • Ability to work with all data in one database

  • Easy and fast addition of new data (or processing of existing ones) to the system

  • Ability to connect visualization to pre-prepared and updated data

Fill the system with the necessary data and conveniently work with them in one database!
Then you can set up automatically updated dashboards in any convenient visualization system, for example, in Metabase (if you are interested to see how easy it can be set up, write in the comments).

Afterword

Of course, this system is just the beginning, it can and should be improved and developed: add logging, error handling, work parallelism, improve fault tolerance… Generally speaking, there are usually separate commands for this. But that’s a completely different story!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *