Apache NiFi. Launch Pipeline When You Need It

Good afternoon. My name is Ivan, I’m a data engineer, the main profile is data delivery from sources to target DWH systems. Most often I work with Apache NIFI.

In the community NIFI in Telegram, the question often arises – how can I start my stream when I need it. I decided to systematize my experience, information from colleagues and members of our coolest community regarding the start of processing flows. This article will be useful primarily for beginners.

In Apache NiFi, the processing flow implies the formation of a FlowFile and its passage along the entire processing path (PipeLine). Thus, the task of starting a processing flow comes down, basically, to generating the initial FlowFile at the right time.

Apache NiFi has generator processors and processing processors. By generating I mean processors, after the work of which a new FlowFile is formed. Examples of these processors are GenerateFlowFile, ConsumeKafkaRecord, ListFile. Processors require an incoming queue and perform their function based on the attributes and content of the incoming FlowFile.

This division is conditional, since there are processors belonging to two groups, for example, ExecuteSql. Those. such processors can independently generate data in the absence of an incoming connection, and process the input queue.

I will describe the ideal task: you have a data processing flow, there is an input port, the arrival of a FlowFile on which initializes a certain ETL process. Let’s place an abstract group nearby, designate it “Shedulers”, define an output port, connect it to the processing thread. So, we have a separate thread itself, and a separate system for its initialization.

An example of moving a launch group separately from the main thread
An example of moving a launch group separately from the main thread

Consider the options for how you can start the thread.

Interval

Each processor has the ability to run it at an interval. This means that since the last start of the processor, the repeat start will be performed after the specified time. In the interval, you can set seconds, minutes, days, and even years (if you have a very slow data source).

Startup interval - every five minutes
Startup interval – every five minutes

The advantages of this method are obvious – the pipeline will work after a given time interval.

The disadvantages are also clear and understandable:

  • If you choose a small interval, there is a chance that call triggers will accumulate in your thread. This is especially true if there is some kind of heavy request to the source. It may not return data, but it will take a long time to complete. As a result, we will have an accumulation of a queue, a constantly working pipeline.

  • If you choose a large interval, there is a chance that many changes will occur at the source, and your pipeline will spend a lot of resources processing them. That. in possible periodic peak load on the server.

  • Customers came to you and said – we want to see the data every three hours, on Saturdays it is not necessary, at night it is also not necessary, but on the 1st of each month it is necessary to start at 8 in the morning. And the first thing we come to is adding a trigger with cron schedules.

Cron

Each processor can be scheduled to run by Cron. That is, a control trigger will be generated at the specified time.

Start on Saturdays, 8:00
Start on Saturdays, 8:00

Advantages:

Flaws:

  • no matter how complex your cron expression is, chances are you need to complete it.

It would seem that using interval and cron together can cover all needs. However, your initializer will eventually start to look something like this:

Generators for one task.  interval only)
Generators for one task. interval only)

In principle, there are no other options for generating a FlowFile. And from this it is necessary to make a convenient initialization system. Next, we will talk about the organization of the launch system and the initialization of the thread.

Combination of Cron and intervals

For this case, it is convenient to combine generators into groups and define an attribute that will reflect the sign of the schedule. For example schedule. Each generator writes its value to the attribute:

Example of schedules in the configuration database
Example of schedules in the configuration database

It is convenient to save stream configurations in a database, or in another external service. For example, this is how the list of schedules in the database looks like. Separately, GenerateFlowFile works for me with an interval of 1 minute, where the time is written to the attribute in the format HH:mm. This allows you to run threads at any convenient time, and at different times, not limited to one.

Streams are selected from the database by the query:

SELECT * from ${paramtable} 
WHERE
active = 1 AND 
'${schedule}' in (
    SELECT value 
    FROM string_split(schedule, ';')
)
ORDER BY priority

The request will select all active configurations for which the incoming schedule matches. Next, you need to translate the configuration into attributes, and apply them in the processing flow.

External control event

By this term, I mean the formation of a control trigger from the outside, and inside Nifi you will need to create a mechanism for receiving this event. Let’s consider the simplest examples.

Control via HTTP/HTTPS

We listen to HHTP/HTTPS on a specific port. We parse the content, transfer it to PipeLine.
HTTPS listens in two ways - config and schedule
HTTPS listens in two ways – config and schedule

Upon receipt, we analyze two options – either the schedule has arrived, or the finished configuration.

Translation of content into attributes.
Translation of content into attributes.

I use this case for the Airflow + Nifi bundle. There is a source for which the increment is not determined by a simple condition; a complex query is required to calculate a new piece of data. A DAG has been developed, it determines the need to obtain new data, calculates boundary values ​​from previously received data, forms a specific request and a ready-made configuration for the download stream. The JSON configuration is passed to Nifi.

Data consuming.

Getting data from Kafka
Getting data from Kafka
Getting data from Kafka

Listening to Kafka topics. Based on the topic, and also the data that came in, it is determined what to do with the message. If this control (contains either a schedule or a ready-made configuration) – transfer to the initialization group. If this is a finished message, then transfer to PipiLine processing.

You can get examples from different systems – managing Email, an event in RabbitMq… Nifi has a fairly large list of systems from where you can get data.

Example of consumers in NIFI

Enable CPU via API

Apache NiFi has an API. The description itself is available in the documentation. There is a great API implementation in Python – NiPyApi. Details in the documentation. You will need a processor ID, and a method Schedule_Processor.

Conclusion.

Of course, these are not all possible ways to initialize PipeLine. You can apply the creation of a file in the file system, the presence of an entry in a database table, or a cache entry. I did not consider the Wait / Notify processors, which allow you to wait for the onset of an event inside Nifi itself. Haven’t considered receiving data from another Nifi/MiNifi. It all depends on your environment and your needs.

Apache NiFi is a self-contained tool with its own orchestration tools. At the same time, it is easily integrated with other systems from which control events can be received, and should be considered as part of a set of data integration services.

Useful links:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *