We are creating our own data register based on the FSIS “Arshin”. Part 1 – adding data to PostgreSQL and reducing the size of the database

Hi all. This practical series of articles is designed for beginners. I decided to share my experience of creating a data registry based on the state one. The data will be stored in a PostgreSQL database and accessed through the Fast API. In this article we will deal with loading data into the database and reducing its size.

Introduction

We have to work with data on the results of verification of measuring instruments, which we will download from the section of the site “SI verification results» FSIS ARSHIN. In short, each measuring device from the laboratory must count with a certain accuracy. To check the errors of a measuring device for compliance with acceptable limits, it is periodically sent to the metrological authority, which then, through ARSHIN, posts information about the device and the verification results (passed or not).

Here is an example of the data that is stored in the registry:

SI verification results

SI verification results

Creating and populating the database

Based on this data, I created a database “Habrdb” with a table “Info”:

CREATE TABLE IF NOT EXISTS "Info"
(
    id bigint NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    "org_title" TEXT,
    "mit_number" TEXT,
    "mit_title" TEXT,
    "mi_number" TEXT,
    "mit_notation" TEXT,
    "mi_modification" TEXT,
    "verification_date" date,
    "valid_date" date,
    "result_docnum" TEXT,
    "applicability" boolean,
    "vri_id" bigint
);

ARSHIN provides an API for accessing data, and also periodically uploads data in zip archives via this ways. You need to download the latest poverki.snapshot…zip file, which contains csv files with verification data for the entire period of FSIS operation. This archive has the following nesting:

Archive nesting structure

Archive nesting structure

The downloaded archive contains 3 folders containing about 30 small archives, each of which contains one csv file with information about trusted devices for one day. It also contains a large poverki.snapshot archive, which contains 3 folders, 30 small archives for the next month and a large poverki.snapshot archive, then similarly. Each csv file often contains data of this type:

Data excerpt from csv file

Data excerpt from csv file

I’ll say right away that it won’t be possible to load data without processing, because firstly, we don’t yet need columns numbered 1, 12, 13 and 15 (this column is not visible on the screen, it’s usually empty or contains information we don’t need). Secondly, incorrect values ​​were often specified when sending data for verification; subsequently, such rows were deleted or changed in the database.

In addition, in some files the column names do not correspond to the contents, such lines will have to be discarded. I will give examples of invalid rows (the names of the columns are assigned for convenience, 1 and 0 have already been replaced with boolean True and False).

Example of incorrect values

Example of incorrect values

Problems:

  1. In these lines from the file, the value in the 'result_docnum' column is a dash, but the column must contain values ​​of letters and numbers.

  2. There are no 'verification_date' values ​​specified here, with 'appliciability' = True. That is, there is no verification date, but it is indicated that it has passed verification. Such lines are also removed from Arshin over time. In addition, do not forget that the date field in postgresql has a single format “YYYY-MM-DD”, and in the files there may be values ​​​​with a different format.

Thus, we will have to process the data before adding it to the database; for this we use the pandas and sqlalchemy libraries. Since there is too much data, we will take only for 2023 and 2024.

Code for loading data into the database
import pandas as pd
import re, os, zipfile
from sqlalchemy import BigInteger, TEXT, Boolean, Date, create_engine, create_engine
from sqlalchemy.orm import sessionmaker

# Создаём сессию к БД
engine = create_engine('postgresql://ИМЯ:ПАРОЛЬ@localhost:5432/Habrdb')
Session = sessionmaker(bind=engine)
session = Session()


class DataLoader:

    def __unzip_file(self, zip_filename, extract_to):
        '''Распаковывает и удаляет zip архив'''
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
            # Удаляем распакованный архив
        os.remove(zip_filename)


    def __add_file_to_db(self, path):
        '''Читает содержимое файла, валидирует и записывает в БД'''

        def convert_to_date_with_limitation(xVal):
            '''Приводит дату к формату YYYY-MM-DD и отбрасывает года < 2023'''
            xVals = str(xVal).split(' ')
            for x in xVals:
                if len(x) == 10:
                    x = re.sub(r"[.,:;]", "-", x)
                    year, month, day = x.split("-")
                    if int(year) == 2023 or int(year) == 2024:
                        return f"{year}-{month}-{day}"
                else:
                    return pd.NaT  # Если формат не распознан, вернуть NaT (Not a Time)
                
        def convert_to_date(xVal):
            '''Приводит дату к формату YYYY-MM-DD'''
            xVals = str(xVal).split(' ')
            for x in xVals:
                if len(x) == 10:
                    x = re.sub(r"[.,:;]", "-", x)
                    year, month, day = x.split("-")
                    return f"{year}-{month}-{day}"
                else:
                    return pd.NaT

        names = ['Number', 'org_title', 'mit_number', 'mit_title', 'mit_notation', 'mi_modification',
                        'mi_number', 'verification_date', 'valid_date', 'result_docnum', 'appliciability',
                        'Date1', 'Date2', 'Pusto', 'vri_id', 'rabbish']   
        # Читаем файл частями по 100000 строк
        df = pd.read_csv(path, chunksize = 100000, on_bad_lines="skip", delimiter=";",quotechar=""", header=None, names=names)

        # Пробегаемся по частям
        for chunk in df:   
            # Удаляем ненужные столбцы 1, 12, 13, 15
            chunk = chunk.drop(columns=['Number', 'Date1', 'Date2', 'Pusto', 'rabbish'])

            # Валидируем даты
            chunk['verification_date'] = chunk['verification_date'].map(convert_to_date_with_limitation)
            chunk['valid_date'] = chunk['valid_date'].map(convert_to_date)
            
            # Будем брать только эти колонки
            allColumns = ['org_title', 'mit_number', 'mit_title', 'mit_notation', 'mi_modification',
            'mi_number', 'verification_date', 'valid_date', 'result_docnum', 'appliciability', 'vri_id']

            # Убираем двойные пробелы из всех колонок
            chunk[allColumns].replace('  ', ' ')

            # Явно задаём типы для столбцов, параметр errors="coerce" позволяет отбрасывать некорректные строки
            chunk['verification_date'] = pd.to_datetime(chunk['verification_date'], errors="coerce")
            chunk['valid_date'] = pd.to_datetime(chunk['valid_date'], errors="coerce")

            # Удаляем все NaT значения из столбца 'verification_date' 
            chunk = chunk.dropna(subset=['verification_date'])

            # Удаляем строки, где 'result_docnum' состоит только из дефисов и пробелов
            chunk = chunk[~chunk['result_docnum'].str.fullmatch(r'[-\s]+', na=False)]

            # Преобразуем пригодность из 1 и 0 в True и False
            chunk['appliciability'] = chunk['appliciability'].astype('bool')
            # Преобразуем 'vri_id' из строк в bigInt 
            chunk['vri_id'] = pd.to_numeric(chunk['vri_id'], errors="coerce").astype('Int64')

            # Заменяем все значения NaN на None (чтобы они добавились в таблицу как null)
            chunk = chunk.where(pd.notnull(chunk), None)

            # Типы данных для временной таблицы
            infoTypes = {
            'mi_number': TEXT,
            'result_docnum': TEXT,
            'mit_number': TEXT,
            'mit_title': TEXT,
            'mit_notation': TEXT,
            'mi_modification': TEXT,
            'org_title': TEXT,
            'verification_date': Date,
            'valid_date': Date,
            'vri_id': BigInteger,
            'appliciability': Boolean
            }
            # Добавляем часть в таблицу
            chunk.to_sql('Info', engine, if_exists="append", index=False, dtype=infoTypes)


    def extract_and_add_to_db_old_files(self, rootPath):
        """ Разархивирует все вложенные архивы, добавляет в БД и удаляет все вложенные в них файлы"""
        
        for z, dirs, files in os.walk(rootPath):
            # Проверяем содержит ли папка файлы
            for filename in files:
                fileSpec = os.path.join(z, filename)
                
                # Если это архив
                if filename.endswith('.zip'):
                    # Проверяем на наличие 'snapshot' в имени файла
                    if 'snapshot' in fileSpec.split('\\')[-1]:
                        self.__unzip_file(fileSpec, self.rootPath)
                        # Рекурсия для новых файлов внутри snapshot
                        if self.extract_and_add_to_db_old_files(self.rootPath) == 0:
                            # Удаляем пустые папки
                            self.__remove_empty_dirs(self.rootPath)
                            return 0
                    else:
                        # Распаковываем архив
                        self.__unzip_file(fileSpec, z)
                    
                    # Добавляем данные из файла в БД
                    self.__add_file_to_db(fileSpec[:-4])
                    print(f'Файл {filename[:-4]} добавлен в БД')
                    # Удаляем файл
                    os.remove(fileSpec[:-4])
                
                # Если это разархивированный файл
                elif filename.endswith('.csv'):
                    self.__add_file_to_db(fileSpec)
                    print(f'Файл {filename} добавлен в БД')
                    os.remove(fileSpec)
        return 0
    
    def __remove_empty_dirs(self, rootPath):
        """ Рекурсивно удаляет все пустые папки, начиная с самой глубокой"""
        for dirpath, dirnames, filenames in os.walk(rootPath, topdown=False):  # topdown=False позволяет идти с самого конца
            # Удаляем папку, если в ней нет файлов и папок
            if not dirnames and not filenames:
                os.rmdir(dirpath)
                print(f'Удалена пустая папка: {dirpath}')
                # Выходим, если наткнулись на родительскую папку в которой лежали остальные
                if dirpath == self.rootPath:
                    return 0
                self.__remove_empty_dirs(self.rootPath)

    def Main(self):
        self.rootPath="ПОЛНЫЙ_ПУТЬ_К_РАЗАРХИВИРОВАННОЙ_ПАПКЕ"
        self.extract_and_add_to_db_old_files(self.rootPath)

# Создаём экземпляр класса
ekz = DataLoader()
# Вызываем функцию, которая является точкой входа 
ekz.Main()

Let's see how much our database weighs, I got 38 GB.

SELECT pg_size_pretty(pg_database_size('Habrdb');
Extract of data from the table "Info"

Excerpt of data from the “Info” table

Reducing Database Size

You may notice that the lines in the columns “mit_title” (device type name), “org_title” (verifying organization), “mit_number” (device registration number), “mit_notaion” (type designation), “mi_modification” (type modification) can be long and may be repeated. Even without analyzing the table, you can notice that the lines in the “mit_title” column are the longest, so let's start with it.

Let's create a table into which we will add non-zero unique values ​​from “mit_title”

CREATE TABLE IF NOT EXISTS "UniqueMitTitles"
(
    id integer NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    "mit_title" TEXT NOT NULL
);

We add there unique values ​​from the “mit_title” column of the “Info” table

INSERT INTO "UniqueMitTitles" ("mit_title")
SELECT DISTINCT "mit_title"
FROM "Info"
WHERE COALESCE("mit_title") IS NOT NULL;

Create a column in “Info” that will store identifiers from the “UniqueMitTitles” database

ALTER TABLE "Info" ADD COLUMN "mit_titleId" INT;

We add there the corresponding identifiers from the unique table

UPDATE "Info" SET "mit_titleId" = "UniqueMitTitles".id
FROM "UniqueMitTitles"
WHERE "Info"."mit_title" = "UniqueMitTitles"."mit_title";

Removing an extra column from the main table

ALTER TABLE "Info" DROP COLUMN "mit_title";

I’ll say right away that “Info” contains a lot of text lines of impressive size. In PostgreSQL, strings (a variable-length data type) whose compressed size exceeds 2 KB are cut into pieces, compressed, and stored in tuples of a special TOAST table. This table is hidden from the user, it is attached to the main one and is filled in automatically. It is only important for us to know that such tables can grow greatly in size, since when UPDATE the main table, new tuples are added to TOAST, but the old ones (dead tuples) are not deleted and have to be cleaned using the VACUUM command. In general, TOAST is a mechanism and data is not always compressed, you can read about this Here and here. It's time to see how much the “Info” table weighs and what's going on with TOAST

SELECT l.metric, l.nr AS bytes
     , CASE WHEN is_size THEN pg_size_pretty(nr) END AS bytes_pretty
     , CASE WHEN is_size THEN nr / NULLIF(x.ct, 0) END AS bytes_per_row
FROM  (
SELECT min(tableoid)        AS tbl      -- = 'public.tbl'::regclass::oid
     , count(*)             AS ct
     , sum(length(t::text)) AS txt_len  -- length in characters
   FROM "Info" t
   ) x
CROSS  JOIN LATERAL (
   VALUES
     (true , 'core_relation_size'               , pg_relation_size(tbl))
   , (true , 'free_space_map'                   , pg_relation_size(tbl, 'fsm'))
   , (true , 'total_size_incl_toast_and_indexes', pg_total_relation_size(tbl))
   , (true , 'live_rows_in_text_representation' , txt_len)
   , (false, 'row_count'                        , ct)
   , (false, 'live_tuples'                      , pg_stat_get_live_tuples(tbl))
   , (false, 'dead_tuples'                      , pg_stat_get_dead_tuples(tbl))
   ) l(is_size, metric, nr);
Table information after UPDATE

Table information after UPDATE

  • core_relation_size — size of the main table excluding indexes and metadata.

  • free_space_map — the size of the free space map will indicate the free space in the table pages, which is formed as a result of frequent insertions, deletions and updates.

  • total_size_incl_toast_and_indexes — total size, including TOAST and indexes. We did not create any indexes, so this value is approximately equal to the size of TOAST.

  • live_rows_in_text_representation — the size of “live” text strings, it will gradually decrease by replacing strings with identifiers.

  • row_count — the total number of rows in the table.

  • live_tuples — the number of “live” tuples (rows) in the table.

  • dead_tuples — the number of “dead” tuples (rows) in the table, they occupy an impressive part of the space

Now the table weighs as much as 77 GB! It is necessary to clear dead tuples, as previously noted, the VACUUM command is used for this. There are 2 options: wait a few hours until AUTOVACUUM is executed or perform VACUUM manually. Autocleanup will have to wait this long if you use the default postgresql configuration, which contains special timers to delay and reduce the intensity of cleaning, this is done to reduce the load on the database.

You can monitor the cleaning process using pg_stat_progress_vacuum; on my server, within a minute the cleaning process automatically started

SELECT * FROM pg_stat_progress_vacuum;
VACUUM process data

VACUUM process data

More information about VACUUM phases and these columns can be found Here

We see that now VACUUM is at the heap scanning stage and so far only 10% has been scanned, then the vacuuming indexes phase will begin and only then the heap cleaning stage (vacuuming heap). In general, instead of waiting for many hours or changing the configuration, we will clear dead tuples and records after DROP manually

VACUUM;
As you can see dead_tuples = 0

As you can see dead_tuples = 0

Since the identifiers from the “mit_titleId” column are the link between two tables, we create a foreign key for a column from a unique table. When deleting rows in “UniqueMitTitles” in the dependent table “Info”, rows with this Id will be deleted.

ALTER TABLE "Info" ADD CONSTRAINT "Info_mit_titleId_fkey" FOREIGN KEY ("mit_titleId")
REFERENCES "UniqueMitTitles" ("id") ON DELETE CASCADE;

Next, we repeat the same steps for the remaining four columns “mit_notation”, “org_title”, “mit_number”, “mi_modification” (we create the tables “UniqueMitNotations”, “UniqueOrgTitles”, “UniqueMitNumbers” and “UniqueMiModifications”).

Data excerpt from "Info" after all requests

Excerpt of data from “Info” after all requests

After all operations, we perform a cleanup with overwrite (VACUUM FULL). With this cleanup, the old table will not be deleted until the new table is completely written, so there must be free space on your disk for the new table. After complete cleaning, the database weighs 17 GB, which is 2.2 times less than the original size of 38 GB.

Disadvantages of this downsizing approach:

  1. The need to use JOIN unique tables to obtain all data.

  2. Difficulty adding data to a table.

  3. The complexity of support – now you have to monitor 6 tables, and not just one.

If a slight reduction in sampling speed is not critical for you, then these disadvantages are more than compensated by the small size of the database.

In the following articles we will speed up table searches and create an API.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *