Loading the stage DWH layer. Part 1

Good day. My name is Ivan Klimenko and I am a data flow developer at Ascona. In this series of articles, I will share the experience of implementing the Apache Nifi tool for generating DWH.

This article focuses on the first phase of Apache NIFI implementation – initial upload streams, implementation of incremental uploads, and a description of the existing architecture.

Goals, objectives

I am a member of the data processing group of the Data Platform division. I came to the team at the beginning of last year with the task of organizing the collection of data from sources and filling DWH using Apache NIFI. Since a certain structure already existed, the designations appeared:

  1. Old architecture – data collection from sources is performed once a day, at night, the tool is SSIS on MsSqlServer (yes, MSSql is used as a storage, but we are planning to switch to GreenPlum), everything is initialized with jobs through the Sql Server Agent.

  2. New architecture – data collection from sources is performed once a day using Apache NIFI, staging tables are filled, then everything is the same as before – jobs, packages, SSIS …

What were the difficulties in the old architecture:

  1. The download was carried out once a day, and in the case of a large amount of data, SSIS could not cope with unloading data from the source, and the job crashed.

  2. With the increase in the amount of data and the number of unloaded entities, there was not enough time for unloading and rebuilding the storefronts.

  3. Incremental download was not performed.

Tasks of the new architecture:

  1. Ensure that data is loaded as it changes at the source.

  2. Scale the load across a large number of tables.

  3. Provide the formation of a showcase layer based on the loaded increment.

  4. Separate the processes of loading and recalculating storefronts.

  5. Provide easy migration to GreenPlum.

ETL flow

The main data source is the ERP “Galaktika”, the DBMS is Oracle. Our team has access to the source and has the ability to develop View. Since NIFI has a built-in mechanism for storing an incremental field upon request (numeric field or date-time), it was decided to create two Views for each source table – historical and incremental. The View structure is the same, the only difference is that for an incremental view, data is retrieved from logs, the logging period is from 2 weeks to 1 month. Each table has two numeric fields – the date and time the record was modified. Since NIFI is able to store only increasing fields, and the time is cyclically dependent, we decided to make an incremental key by combining in one field the date and time the record was modified in numerical form.

A data load thread has been developed that retrieves data from a source table, performs a series of transformations, and loads into a target table. I note that the stream was mainly developed by my colleague Farhad, I joined after solving infrastructure problems.

Rice.  1. General view of the flow.  Gives an idea of ​​the direction of movement of the data.
Rice. 1. General view of the flow. Gives an idea of ​​the direction of movement of the data.

Let’s take a closer look, from launch to completion.

At the initial stage, data is retrieved using the QueryDatabaseTable processor.

Rice.  2. Processor QueryDatabaseTable - spawns a thread and executes a query to the source.
Rice. 2. Processor QueryDatabaseTable – spawns a thread and executes a query to the source.
Rice.  3. QueryDatabaseTable settings
Rice. 3. QueryDatabaseTable settings

In the settings, we specify the connection, scheme and table, list the fields (or leave the field empty, then all fields will be selected), and specify the field name in the settings Maximum-value Columns… NIFI, upon request, will compute the maximum and store it in its state. On the next generation of the query, NIFI will add the condition “WHERE DT_KEY> 1923929292929”, that is, it will substitute both the field name and its previous value. In this way, incremental unloading is realized.

Next, you need to perform a number of transformations. Since data may be received that has already been entered into the system (for example, during editing), it is necessary to understand whether the data has changed. For tracking purposes, we calculate the hash of each entry, and if it differs, then we need to update the data in the storefront layer.

Rice.  4. Updating data and calculating a hash
Rice. 4. Updating data and calculating a hash

First comes the preparation of the data. We form a new field using Updaterecord, and calculate its hash using a third-party processor HashColumn (thanks to the author for development, source code in github). In further development, I decided to make the most of the stock processors. In the next part I will show exactly how I replaced this bundle.

Rice.  5. UpdateRecord settings
Rice. 5. UpdateRecord settings

In settings UpdateAttribute specify the name of the new field – HASH_DIFF, and the rule for its formation is the concatenation of significant fields. As a result, a new field is formed, which will contain a line containing all the fields. And the processor HashColumn will calculate the hash according to the given algorithm and place the value in the same field.

  Rice.  6. Steps for converting to CSV
Rice. 6. Steps for converting to CSV

First, there is the formation of attributes, where the attribute $ filename the current file name is assigned with the extension “.csv “. This attribute will be needed when saving the file. Next, meta-information is added – we enter the unloading time in each entry. Recording is converted to CVS format using a processor ConvertRecord… As a result, content from the “Avro” format will be converted to “CSV”, screening occurs depending on the settings of the recording service.

Rice.  7. Writing a file to a network folder and cleaning content
Rice. 7. Writing a file to a network folder and cleaning content

The resulting file is saved to a network folder on the server, where the target MS Sql is running using PutSmbFile, and the content is cleaned up. This is an optional step, but during the work it was noticed that if there is content, the subsequent request is very slow.

After writing, the server runs a query to insert data into the table using the ExecuteSQL component.

Rice.  8. Sequence of actions for entering and deleting data on the server
Rice. 8. Sequence of actions for entering and deleting data on the server

The request itself looks like this:

BULK INSERT #{tgt.sql.schema}.#{tgt.sql.table.name.customer_orders}
FROM '#{local.folder.Bulk.Insert}${filename}'
WITH(FIRSTROW = 2, FIELDTERMINATOR = '~',ROWTERMINATOR = '0x0a',
     CODEPAGE = 65001, TABLOCK )

As you can see, the file name is passed to the server to be entered into the specified table, where the schema and table are defined in the group parameters, and the file name is extracted from the attribute.

If a failure occurs, the stream is sent for retry, and in case of three unsuccessful entries, an error message is generated.

On successful checkout, the stored procedure is executed and deletes the uploaded file:

DECLARE 
@deletefile NVARCHAR(MAX) , @cmd NVARCHAR(MAX)

set @deletefile="${filename}"
set @cmd = 
'xp_cmdshell ''del "#{local.folder.Bulk.Insert}' + @deletefile  + '"''';
EXEC (@cmd)

The file can also be deleted by means of NIFI, for example, via GetSmbFile, but in this case the content is transferred back over the network. SSH or PowerShell script can be invoked, but this will issue additional permissions for the NIFI user.

Further steps are service steps – generating and recording a log, sending notifications, etc.

Conclusion.

This stream allowed our team to test incremental loading, exclude loading data from the source using SSIS, and separate the process of loading data from the process of calculating marts.

Advantages:

  1. A simple flow that reflects all the stages of ETL – unloading, transforming, loading.

  2. Easy to scale – by saving as a template or putting it in the Registry, changing variables and the list of fields, you can easily add a new table.

  3. It is easy to monitor – you can see which stream has fallen, where is the error.

Flaws:

  1. When the number of tables exceeded 20, it became inconvenient to maintain this solution.

  2. All launches were performed at approximately the same time, placing a heavy load on the source.

  3. If a modification was necessary, it was necessary to make changes to all streams, in the same place.

In the next parts I will talk about the development of the unloading stream, its parameterization, how the problem of informing about the completion of the download and notifying about errors was solved, and what we ended up with.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *