How Data Engineer Watched Data

Hello, Habr! I want to tell you how we wrote and implemented a service for monitoring data quality. We have many sources of data: data from financial markets, trading activity of our customers, quotes and much more. All this generates billions of records per day in our processes. The completeness and consistency of trade data is a critical component of Exness’s business.

If you are close to the problems of ensuring data quality and are interested in how we solved this problem at home, then welcome to cat.

My name is Dmitry, I work in a team that stores both raw data and the transformation, aggregation and provision of all processed data to all departments of the company. Our data is consumed by many teams within the company, such as Business Intelligence, Anti-Fraud, Finance, and we also provide them to our b2b partners.

Working with data is a responsible and difficult mission, because stopping one ETL process can lead to paralysis of part of Exness’s business.

To solve ETL problems, we use a variety of tools:

The challenges we face every day:

  • Tens of millions of transaction records daily;
  • Billion market entries daily (quotes, etc.);
  • The heterogeneity of data sources (such as external sources of Market Data, different trading platforms);
  • Providing exactly once semantics for important data (financial transactions);
  • Ensuring the integrity and completeness of the data;
  • Providing guarantees that for the stipulated time the transaction will be added to all the necessary tables and aggregates.

In order to provide such guarantees, it was necessary to learn to track, measure and proactively respond to deviations in the quality of the data themselves.

Given the complexity of our data collection and processing processes, given the high speed of development and modification of ETL processes, it becomes necessary to monitor the quality of data already at the final point. We usually have a Clickhouse or PostgreSQL database. Such metrics will tell us how quickly our processes work out:

SELECT server, 
       avg(updated - close_time) 
FROM   trades 
WHERE  close_time > subtractHours(Now(), 2) 
GROUP  BY server

They will help to find duplicates in the data (there are no constraint unique in Clickhouse):

SELECT SUM(count) FROM (
   SELECT
      COUNT(*) AS count
   FROM trades
   GROUP BY order_id
   HAVING count > 1
)

You can come up with a ton of queries (many of which we already use) that help monitor the quality of the data: comparing the number of rows in the source table and the destination table, the time of the last insertion in the table, comparing the contents of two queries and much more.

The metrics are symptoms. By themselves, they will not indicate the cause of the problem, but they can show that there is a problem. This will be a trigger for the engineer to pay attention to the problem and identify the root cause. Analogy: if a person has a temperature, then something has broken in his body. Temperature is a sufficient symptom to begin to understand and find the cause of the breakdown.

We looked for a ready-made solution that could collect such symptom metrics for us. Our requirements:

  • Support for various data sources (databases, queues, http-requests);
  • Flexible task of periodicity;
  • Monitoring requests (runtime, failures);
  • Ease of adding new queries.

At the beginning of the article, I gave a list of technologies that we use in ETL. As you can see, we are supporters of open-source solutions! One example: we use the column-oriented Clickhouse database as the main data warehouse. Our team has made several changes to the Clickhouse source code (mainly fixing bugs). As tools for working with metrics and time series, we use: ecosystem influxdb, prometheus and victoria metrics, zabbix.

To our surprise, it turned out that there is no ready-made and convenient tool for monitoring the quality of data that fits into the technologies we have chosen. Or were we looking badly?

Yes, zabbix has the ability to run Custom scripts, a telegraf You can teach how to run SQL queries and turn their results into metrics. But this required serious finishing, and did not work out of the box the way we wanted. Therefore, we wrote our own service (daemon) to monitor data quality. Meet nerve!

Nerve features

Ideologically, nerve can be described with the following phrase:

This is a service that runs scheduled, heterogeneous, customized tasks for collecting numerical values, and presents the results as metrics for different metric collection systems.

Key features of the program:

  • Support for different types of tasks: Query, CompareQueries, etc .;
  • Ability to write your types of tasks in Python as a runtime plugin;
  • Work with different types of resources: Clickhouse, Postgres, etc .;
  • Metrics data model as in prometheus
    metric_name{label="value"} 123.3;
  • Currently supported pull prometheus data acquisition model;
  • Task launch schedule: period or crontab-style;
  • WEB UI for analysis of task performance;
  • The configuration of tasks can be divided into many yaml files;
  • Following Twelve-Factor App.

Task and Resource are the basic entities for configuring and working with nerve. Task – a typed periodic action, as a result of which we obtain metrics. Resource – an object that contains configuration and logic specific for working with a specific data source. Let’s see how nerve works with an example.

We have three tasks. Two of them are of type Query – SQL query. One is of type Garcon – this is a customized task that goes to one of our services. The frequency of the task can be set by a time period. For example, 10m means once every ten minutes. Or crontab-style “* / 5 * * * *” – every fifth full minute. Tasks TaskA and TaskC are associated with the DbCon1 resource, which is of type Clickhouse. Let’s see how the config will look:

tasks:
  - name: TaskA
    type: Query
    resources: DbCon1
    period: 1m
    config:
      query: SELECT COUNT(*) FROM ticks
      gauge: metric_count{table="ticks"}

  - name: TaskB
    type: Garcon
    period: 10m
    config:
      url: "http://hostname:9003/api/v1/orders/backups/"
      gauge: backup_ago

  - name: TaskC
    type: Query
    period: "*/5 * * * *"
    resources: DbCon1
    config:
      query: SELECT now() - toDateTime(time_msc/1000)
   FROM deals WHERE trade_server= 'Real'
   ORDER BY deal DESC LIMIT 1
      gauge: orders_lag

resources:
  - name: DbCon1
    type: Clickhouse
    config:
      host: clickhouse.env
      port: 9000
      user: readonly
      password: "***"
      database: data

results:
  common_labels:
    env="prod"
task_types_paths:
  - "./tasks"

The “./tasks” path is the path to customized tasks. In particular, the Garcon task type is defined there. In this article I will omit the moment of creating my task types.

As a result of starting the nerve service with such a config, in the WEB UI it will be possible to monitor how the tasks are fulfilled:

And at / metrics, metrics for collection will be available:

The Query task type most commonly used on our team. Therefore, we expanded its capabilities for working with GROUP BY and templates. These mechanisms make it possible to collect a lot of information about data with one request at a time:

The TradesLag task will be to collect the maximum delay for a closed order to enter the trades table every five minutes for each trading server, taking into account only orders closed in the last two hours.

A few words about the implementation. Nerve is a multi-threaded python3 ~ 3k LoC application that is easy to run through Docker, complementing it with a task configuration.

What happened

With nerve, we got what we wanted. At the moment, in addition to our team, other teams in Exness have shown interest in him. It spins about 40 tasks with a frequency of 30 seconds to a day. Nerve collects about 500 metrics about our data. Adding new metrics is a matter of 5-10 minutes. The full flow of working with metrics looks like this: nerve → prometheus → Victoria Metrics → Grafana dashboards → Alerts in PagerDuty.
With nerve, we also started collecting business metrics: we periodically select raw events in the trading system to evaluate trading conditions.

Thank you, habrovchanan for reading my article to the end. I foresee your question: where is the link to github? The answer is: we have not posted nerve in Open Source yet. This requires additional work on our part to improve documentation and finish a couple of features. If this article is well received by the community, this will give us an additional incentive to share our development with you!

Good to all!

Similar Posts

Leave a Reply