how we use Airflow and NiFi for this

I work as a lead software developer at STM Labs and want to tell you about how we migrate data from Oracle to PostgreSQL using open-source solutions Airflow and NiFi. This article was written based on my speech at the “World of Open Source: Cases of Real Application” meetup.

Many Russian companies faced the need to transfer data from Oracle: in July 2022, the corporation that created this product left the Russian market due to sanctions, like many other foreign IT companies. Users from our country no longer have vendor support, which means that over time the system may stop working correctly.

The Oracle storage system was very popular in Russia: even public sector companies stored and processed data in it. And we all had to quickly decide where and how to transfer huge amounts of valuable information without losing anything in the transfer process.

Conditions of the problem

For further data storage, our client chose the Russian PostgreSQL system. It was important to find a DBMS that could provide stable operation and would have support from the developer. PostgreSQL fits these criteria well.

But how can you transfer without loss? How to implement reverse synchronization for systems that have not yet moved? After analyzing various options and proposals, we decided on the Airflow and NiFi systems, and the use of the latter was a mandatory requirement of the customer.

Also, a whole list of wishes was received from future users of the system:

• transfer of the entire table (dictionaries and small tables);
• delta transfer by time column (large tables);
• delta transfer by transaction if there is no time column;
• when transferring tables, the connection between them must be maintained, and they do not necessarily have to be connected by links – a logical connection or business data connection is sufficient;
• transfer of master-detail tables: there is a main table with a key and tables that supplement records with data. The update date is always entered in the master table;
• and most importantly: if an error occurs when transferring related tables, no data should ever reach the recipient. If an error does occur, the data should be rolled back to its original state, and the system should notify the user that there is a problem.

What does our data transfer system consist of?

Based on the results of working on the task, we were able to develop a system that performs all the necessary functions. It consists of the following components:

• configurator,
• Snapshot Manager,
• AIflow,
• NiFi,
• NiFi Task Manager,
• PostgreSQL storage systems,
• Redis.

All these components are connected to each other as follows:

Let's now look at how each component works individually.

Configurator

The Configurator is a web application that creates configurations for Airflow, NiFi and other related systems based on user settings.

  1. Once we enter the configurator, we must register the source and destination databases. You must specify:

    • subsystem,
    • who owns the source,
    • source system code,
    • Name,
    • database connection settings.

  1. Then in the schemes section you need to register the schemes from which you can transfer data. This step is necessary so that you do not have to scan the entire database. The system will scan the source and pull up the names of all tables and fields with their data types.

  1. Having added all possible connections to the database, we move on to the flow settings. The flow contains settings for transferring several tables, as well as the ability to specify in what order to migrate data.

For each table or query from the source, you need to configure the destination table:

  1. Specify what is the key in the source and what type of replication is needed for this table. For example, if we select replication by time, then we need to specify which column displays the date the record was modified. In this case, it is not necessary to have the same column in the recipient:

  1. On the second tab you need to configure the recipient and field mapping. If the fields in the source and destination have the same names, automatic substitution of values ​​occurs:

When the model is created and activated, the configurator will automatically configure related systems: create temporary tables in the recipient, place a new DAG file for Airflow and, if necessary, configure a new connection to the database in NiFi.

The user will then be able to track the progress of their tasks directly through the interface, where successful tasks will be displayed, as well as errors that may occur during the data migration process.

Snapshot Manager

A Snapshot in PostgreSQL is a snapshot of a database or transaction that allows you to save the state of the database at a certain point in time and, if necessary, return to this point from different connections. This function guarantees data consistency.

Oracle has similar functionality – it’s called Oracle Flashback Query. It also allows you to return to the previous state of the data, but it works differently: it is not necessary to keep an open transaction; we can even retrieve deleted data at a certain point in time.

Our service can work with both databases: for PG it opens and maintains a connection, for Oracle it asks for the last transaction number.

Airflow

Airflow is a workflow automation platform. It allows you to create and execute workflows consisting of various tasks and steps.

Airflow features:

Task orchestration. Airflow can perform tasks in a certain order, taking into account the connections between them, and also limit the workload on the source or typify it using virtual pools.
Monitoring and alerting. Airflow provides tools to monitor your workflows and alert you if anything goes wrong.
Scalability. Airflow easily scales to handle a large number of tasks.
Flexibility. Airflow offers greater flexibility in customizing workflows and integrating with other systems. It allows you to run disposable containers on pods in Kubernetes, create virtual pools, prioritize queues, limit the number of simultaneous launches, and much more. If you wish, you can write your tasks of any complexity in Python when creating a configuration for dags.

How we use Airflow in our system
Our tasks at the moment are quite similar. Let's look at the diagram of one of them:

  1. We see the first INIT block. Its task is to send information about the launch of a task to the configurator.

  2. Next comes a group in which blocks are connected in parallel that clean up temporary tables in the recipient.

  3. They are followed by the snapshot_create block, which is responsible for creating a snapshot and obtaining an identifier for readers.

  4. The snapshot block is linked by a dotted line with its closing. Airflow has a mechanism for closing resources. Closing will occur even if the task fails.

  5. Next comes a group in which threads are also executed in parallel for each overflow table. Each pouring block consists of several components:

    • getting the last CDC value that we have already transfused. If this is not the case, then obtaining the initial standard value;
    • sending a request to NiFi through the NiFi Task Manager microservice to start the transfer;
    • waiting for completion. This block queries the NiFi Task Manager at what stage the task is in NiFi;
    • after completion we get a new CDC, which will be recorded for future runs.

  6. Once all merge groups are completed, the data merge block will begin. The block executes update or insert queries on the tables that are specified as recipients.

  7. After a successful merge, we record new CDC values ​​for the next runs.

  8. At the end of this process, you need to report to the configurator about successful completion.

If any error occurs during the execution of a task, the error handler located above the DAG will notify the configurator about it:

Cons and problems

At the moment, we only have problems with Airlfow during the process of synchronizing statuses with the configurator. Even though we have laid out handlers for everything, Airflow can simply interrupt the task itself, killing it. This can happen if the flow is turned off, or the configuration file is accidentally deleted, or you manually complete the entire task in Airflow by forcing a status on the task.

NiFi

NiFi is an ETL tool for managing data flows, which consists of many small queues and processors that process these queues.

NiFi Features:
• NiFi allows you to manage data flows, performing tasks such as format conversion, compression and decompression, as well as moving data between systems;
• NiFi is easily scalable and can process large amounts of data;
• NiFi is easy to integrate into an existing infrastructure;
• and if there are no tools necessary for this, then you can always write your own Java components.

Difficulties we encountered
The NiFi unit is flowfile. It stores both the data itself and the attributes. Processors take such files into operation and at the output either update the attributes or recreate a new flowfile with new data. For one input file, the system can create hundreds of output ones – for example, divide the result of an SQL query into batches. And we need to understand when all the received batches will be uploaded to the recipient.

Other disadvantages of NiFi:
• difficult to debug,
• flow deployment occurs only through the UI,
• It is almost impossible to update a thread in a running thread.

How we work with NiFi
Working with a specific database is implemented by adding the appropriate JDBC driver. We send requests to the API to create new connections in the system. The flow itself is launched by an http request, where all the settings are specified.

Any simple action that usually takes a couple of lines will look like this in NiFi:

In the basic implementation we see 3 blocks:

The first block is needed to check the resulting task configuration. Here we have simple core processors that check for parameters and write them to attributes.

The next block is responsible for creating an entry in the Redis cache. Based on this record, the task will be interrupted if an error occurs somewhere or a signal is received from the outside. The cache will also store the number of records read and written.

Next, the flow is divided into 2 branches: the first (which goes to the left) sends a successful response about creating a task, the second (which goes down) is aimed at asynchronously performing the transfer with a passing jump to one of the NiFi nodes.

Finally, the third block does the data transfer itself and writes the information to the cache.

The execution of a task consists of two blocks – reading and writing:

The first thing done in both read and write is to check the implementation. The passed parameter goes to the desired reading implementation.

For reading, we wrote our own processor based on the standard one. It has been modified so that it can check the cache for task status. And if at some point the task is interrupted, it will end with an error:

Next comes a block that writes information about the number of batches read to the cache. This is where the reading ends:

The write block looks almost the same: it also has its own processor, which will do a task check in the cache before writing. And after recording, it will send information about the number of recorded batches to the cache:

This completes NiFi's work. Its main task is to drive data before it is interrupted.

NiFi Task Manager

The last component of the system is a microservice that synchronizes the work of Airflow and NiFi. Its main task is to monitor the progress of processes through the Redis cache. It receives the number of batches read and written, and based on this information it concludes that the task has completed. It is through it that the task is launched and canceled.

So, in the process of searching for ways to transfer data from Oracle to PostgreSQL, we ended up with a fairly powerful and flexible complex, which we continue to refine and improve. In the future, we plan to expand its functionality: for example, in addition to databases, it will be possible to add new types of sources and recipients if the customer and system users need it. In any case, this development now makes it possible to transfer data from system to system without loss.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *