Setting Up Docker and Creating a Custom Reporter

Remark

Please note that the methods and approaches described in this article are one way to solve the problems encountered and may not be the best or recommended option for all situations. The author does not advocate following the steps described without considering possible alternatives and does not guarantee that this method will work for all users or cases. Keep in mind that using non-standard solutions may entail risks or require additional efforts for support and updates. It is recommended to always research and evaluate possible solutions in the context of your project and requirements.

How to Run Apache Atlas in Docker

The first thing I encountered is that most (maybe not all, I haven't tested of course) Apache Atlas images don't work out of the box.

Each of them crashed with the following error:

14: curl#6 - "Could not resolve host: mirrorlist.centos.org; Unknown error"


 One of the configured repositories failed (Unknown),
 and yum doesn't have enough cached data to continue. At this point the only
 safe thing yum can do is fail. There are a few ways to work "fix" this:

     1. Contact the upstream for the repository and get them to fix the problem.

     2. Reconfigure the baseurl/etc. for the repository, to point to a working
        upstream. This is most often useful if you are using a newer
        distribution release than is supported by the repository (and the
        packages for the previous distribution release still work).

     3. Run the command with the repository temporarily disabled
            yum --disablerepo=<repoid> ...

     4. Disable the repository permanently, so yum won't use it by default. Yum
        will then just ignore the repository until you permanently enable it
        again or use --enablerepo for temporary usage:

            yum-config-manager --disable <repoid>
        or
            subscription-manager repos --disable=<repoid>

     5. Configure the failing repository to be skipped, if it is unavailable.
        Note that yum will try to contact the repo. when it runs most commands,
        so will have to try and fail each time (and thus. yum will be be much
        slower). If it is a very temporary problem though, this is often a nice
        compromise:

            yum-config-manager --save --setopt=<repoid>.skip_if_unavailable=true

More about this error later.

I started looking for ready-made Docker files on GitHub.

However, they also encountered the same error.

The error is due to Docker not being able to find or resolve the host for the repository. mirrorlist.centos.org. This caused the command to fail. yumused to install packages inside a Docker container. Errors like these can occur if the repository is no longer maintained or is temporarily unavailable.

All that remains is to write your Docker files. I took as a basis files from this repository (let me clarify that I did not use the latest version, but one commit back from the version on 08/27/2024, since the latest version could not be launched in principle), since it was referred to in this article.

The first thing we do is remove all unnecessary things from docker-compose.yml.

In the original, in addition to Atlas and the services required for its operation, Spark, Hive, Hadoop (DataNode, NameNode) were also raised. All this turned out to be unnecessary, so we simply cross it out of the file, and also delete the Spark and Hive folders. We leave only what is necessary: ​​Apache Atlas itself, Apache Kafka and Apache Zookeeper.

So we get:

version: "2"

services:

  atlas-server:
    build: ./atlas
    image: pico/apache-atlas
    volumes:
      - ./atlas/resources/1000-Hadoop:/opt/atlas/models/1000-Hadoop
    ports:
      - "21000:21000"
    depends_on:
      - "zookeeper"
      - "kafka" 

  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"  

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    hostname: kafka
    environment:
      KAFKA_CREATE_TOPICS: "create_events:1:1,delete_events:1:1,ATLAS_HOOK:1:1"
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper  

Next we move on to Dockerfilewhich builds Apache Atlas in a container. To fix the error, simply specify the archive mirror for CentOS immediately after the line FROM centos:7.

FROM centos:7

COPY --from=stage-atlas /apache-atlas.tar.gz /apache-atlas.tar.gz

#новые строчки с архивным зеркалом centos
RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^mirrorlist=http/#mirrorlist=http/g /etc/yum.repos.d/*.repo

And you can launch it docker-compose up -dHowever, the following error occurs:

Command “/usr/bin/python3 -u -c “import setuptools, tokenize;__file__='/tmp/pip-build-f8uu6b5l/typed-ast/setup.py';f=getattr(tokenize, 'open', open )(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))” install – -record /tmp/pip-artr5er7-record/install-record.txt –single-version-externally-managed –compile” failed with error code 1 in /tmp/pip-build-f8uu6b5l/typed-ast/ ERROR: Service 'atlas-server' failed to build: The command '/bin/sh -c pip3 install amundsenatlastypes==1.1.0' returned a non-zero code: 1

The error is related to a failed installation of a Python package. amundsenatlastypes. The solution is even simpler: in Dockerfile in line RUN pip3 install amundsenatlastypes==1.1.0 change version to 1.2.2and run it again docker-compose up -d. This time everything will work. You can go to the address http://localhost:21000/ and open the Apache Atlas web interface. Note that it may take a few minutes to start; the first time it took me about 10 minutes, but subsequent starts are much faster.

Apache Atlas Web UI

Apache Atlas Web UI

Apache Atlas is up and running, ready-to-run files are available here

Raises Apache NI-Fi

There is no problem here. Create a file docker-compose.yml with the following content and run it with the command docker-compose up -d:

version: '3'

services:

  nifi:
    image: apache/nifi:1.20.0
    container_name: nifi
    ports:
      - "8080:8080"
      - "8443:8443"
      - "8081:8081"
    volumes:
      - nifi-data:/opt/nifi/nifi-current/logs
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_WEB_HTTPS_PORT=8443
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      - NIFI_WEB_PROXY_HOST=localhost
    restart: unless-stopped

volumes:
  nifi-data:
    driver: local

After that, just go to the address http://localhost:8080and the web interface should open NiFi. Please note that it may take several minutes to start.

Apache NI-Fi WebUI

Apache NI-Fi WebUI

Statement of the problem

The challenge is to create a reporter between NiFi and Atlas. There is a standard reporter for this task called ReportLineageToAtlas (the default Docker image does not have this plugin; there will be a short paragraph at the end about how to add plugins). Let's create in NiFi small DAG and set up a reporter in Atlas.

dag

dag

tuned reporter

a tuned reporter

result in apache atlas

result in apache atlas

As we can see, the DAG information is displayed in Atlas, but the problem is that all the information is displayed. I need to hide certain stages of the DAG so that they are not displayed in Atlas. So my task is to create a reporter that will separate what should be displayed and what should not be displayed in Atlas. We will introduce a rule that only those stages will be displayed that have a – at the end of their names. _to_atlas.

Creating a plugin

Let's create a file pom.xml with the following contents:

    <groupId>Ni-Fi</groupId>
    <artifactId>pack</artifactId>
    <!-- Тип пакета для данного артефакта (в данном случае "pom" указывает на то, что это основной POM для сборки) -->
    <packaging>pom</packaging>
    <version>1.20.0</version>

    <!-- Свойства, используемые в этом POM-файле -->
    <properties>
        <!-- Версия Apache NiFi, которая будет использоваться в модулях -->
        <nifi.version>1.20.0</nifi.version>

        <!-- Версия InfluxDB, которая будет использоваться в модулях -->
        <influxdb.version>2.9</influxdb.version>
    </properties>


    <!-- Определение модулей, которые являются частью этого Maven-проекта -->
    <modules>
        <!-- Модуль, отвечающий за реализацию задачи отчетности в Apache Atlas непосредственно с кодом  -->
        <module>reporting-task-atlas</module>

        <!-- NAR (NiFi Archive) модуль для задачи отчетности в Apache Atlas, нужен только чтоб паковать в .nsr архив проет-->
        <module>reporting-task-atlas-nar</module>
    </modules>

Let's define dependencies

    <!-- Раздел для управления зависимостями Maven. 
         В этом разделе определяются версии зависимостей, используемых в проекте. 
         Эти зависимости могут быть включены в модули без явного указания их версий.
    -->
    <dependencyManagement>
        <dependencies>
            <!-- Зависимость для использования SLF4J API (Simple Logging Facade for Java) в проекте.
                 SLF4J предоставляет абстракцию для различных логгеров, таких как Log4j и Logback.
            -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.12</version>
            </dependency>

            <!-- Зависимость для использования API Apache NiFi.
                 Эта зависимость предоставляет доступ к API Apache NiFi, необходимому для разработки NiFi компонентов.
                 Поскольку зависимость имеет scope 'provided', она требуется только во время компиляции и не будет включена в конечный артефакт.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-api</artifactId>
                <version>${nifi.version}</version>
                <scope>provided</scope>
            </dependency>

            <!-- Зависимость для использования утилит процессоров Apache NiFi.
                 Эти утилиты могут использоваться для создания пользовательских процессоров в NiFi.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-processor-utils</artifactId>
                <version>1.15.3</version>
            </dependency>

            <!-- Зависимость для использования утилит отчетности Apache NiFi.
                 Эти утилиты предоставляют инструменты для разработки компонентов отчетности в NiFi.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-reporting-utils</artifactId>
                <version>${nifi.version}</version>
            </dependency>

            <!-- Зависимость для использования Apache Commons IO.
                 Commons IO предоставляет утилиты для ввода-вывода (I/O), такие как операции с файлами и потоками.
            -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

Components Apache NiFi should be packed in .nar archives, and for this you need to connect several plugins.

    <!-- Раздел сборки, где определяются плагины, используемые для процесса сборки проекта -->
    <build>
        <plugins>
            <!-- Плагин NiFi NAR Maven Plugin используется для создания NAR (NiFi Archive) файлов.
                 NAR файлы — это специальные архивы, используемые в Apache NiFi для упаковки расширений (например, процессоров, контроллеров и других компонентов).
                 Плагин необходим для построения и упаковки пользовательских компонентов NiFi в формате NAR.
            -->
            <plugin>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-nar-maven-plugin</artifactId>
                <version>1.5.0</version>
                <!-- Плагин используется как расширение Maven, добавляя функциональность для сборки NAR файлов. -->
                <extensions>true</extensions>
            </plugin>

            <!-- Плагин Maven Surefire Plugin используется для запуска юнит-тестов в фазе тестирования.
                 Этот плагин выполняет тесты, написанные с использованием фреймворков, таких как JUnit или TestNG, и генерирует отчет о тестах.
            -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.15</version>
            </plugin>

            <!-- Плагин Maven Compiler Plugin используется для компиляции исходного кода проекта.
                 В данном случае он настроен для использования версии Java 17 в качестве исходного и целевого уровня компиляции.
            -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <!-- Версия Java, используемая для компиляции исходного кода -->
                    <source>17</source>
                    <!-- Версия Java, на которую нацелен скомпилированный код -->
                    <target>17</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

Now you can assemble .nar archives. Go to the module pico-reporting-task-atlas-nar and create a file there pom.xml with the following contents:


<!-- Определение родительского POM для данного модуля.
     Родительский POM содержит общие настройки и зависимости, которые наследуются дочерними модулями.
     В данном случае родительский артефакт 'PicoReportLineageToAtlas' с группой 'pico.habr' и версией '1.20.0'.
-->
<parent>
    <groupId>pico.habr</groupId>
    <artifactId>PicoReportLineageToAtlas</artifactId>
    <version>1.20.0</version>
</parent>

<!-- Уникальный идентификатор артефакта для данного модуля в Maven.
     'artifactId' должен быть уникальным в рамках группы ('groupId') и представляет конкретный проект или модуль.
-->
<artifactId>pico-reporting-task-atlas-nar</artifactId>

<!-- Тип упаковки артефакта, который указывает на формат выходного файла.
     В данном случае используется 'nar' (NiFi Archive), специальный формат для расширений Apache NiFi.
-->
<packaging>nar</packaging>

<!-- Человекочитаемое имя модуля, используемое для идентификации проекта -->
<name>reporting-task-atlas-nar</name>

<!-- Определение свойств для проекта.
     Здесь задается кодировка исходных файлов проекта для обеспечения корректной обработки файлов с различными кодировками.
-->
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<!-- Определение зависимостей для данного модуля -->
<dependencies>
    <!-- Зависимость от другого модуля внутри той же группы 'pico.habr'.
         Этот модуль предоставляет основные компоненты и классы для задачи отчетности в Atlas.
         Данная зависимость используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>pico.habr</groupId>
        <artifactId>reporting-task-atlas</artifactId>
        <version>1.20.0</version>
        <scope>compile</scope>
    </dependency>
    
    <!-- Зависимость от стандартных API сервисов Apache NiFi.
         Эта зависимость предоставляет NAR-файл, который содержит стандартные API сервисов, используемых в NiFi.
         Указан тип зависимости 'nar', поскольку это специфичный для NiFi формат архива.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-standard-services-api-nar</artifactId>
        <version>1.20.0</version>
        <type>nar</type>
    </dependency>
</dependencies>

There is no need to touch anything else in this module, and when you start building the project, a directory will appear .nar archive.

Let's go to the module reporting-task-atlas and create a file there pom.xml with the following contents:

<!-- Определение родительского POM для данного модуля.
     Родительский POM содержит общие настройки и зависимости, которые наследуются дочерними модулями.
     В данном случае родительский артефакт 'PicoReportLineageToAtlas' с группой 'pico.habr' и версией '1.20.0'.
-->
<parent>
    <groupId>pico.habr</groupId>
    <artifactId>PicoReportLineageToAtlas</artifactId>
    <version>1.20.0</version>
</parent>

<!-- Уникальный идентификатор артефакта для данного модуля в Maven.
     'artifactId' должен быть уникальным в рамках группы ('groupId') и представляет конкретный проект или модуль.
-->
<artifactId>reporting-task-atlas</artifactId>

<!-- Указание версии модуля. Обычно совпадает с версией родительского POM. -->
<version>1.20.0</version>

<!-- Тип упаковки артефакта. 'jar' означает, что выходной файл будет в формате JAR (Java ARchive). -->
<packaging>jar</packaging>

<!-- Определение зависимостей, необходимых для компиляции и выполнения проекта. -->
<dependencies>

    <!-- Зависимость от утилит для процессоров Apache NiFi.
         Содержит вспомогательные классы и методы, необходимые для создания пользовательских процессоров в NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-processor-utils</artifactId>
    </dependency>

    <!-- Зависимость от SLF4J API (Simple Logging Facade for Java).
         Предоставляет абстракцию для различных логгеров, таких как Log4j и Logback.
    -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>

    <!-- Зависимость от API Apache NiFi.
         Предоставляет основные интерфейсы и классы для разработки компонентов NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-api</artifactId>
    </dependency>

    <!-- Зависимость от утилит отчетности Apache NiFi.
         Предоставляет классы и методы для разработки компонентов отчетности в NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-reporting-utils</artifactId>
    </dependency>

    <!-- Зависимость от компонента задачи отчетности в Atlas для NiFi.
         Позволяет интегрировать Apache NiFi с Apache Atlas для отправки метаданных о lineage.
         Версия зависит от свойства 'nifi.version'.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-atlas-reporting-task</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от API сервиса SSL контекста NiFi.
         Предоставляет API для работы с SSL контекстами, используемыми в NiFi.
         Версия зависит от свойства 'nifi.version'.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-ssl-context-service-api</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от API сервиса Kerberos для NiFi.
         Предоставляет API для управления учетными данными Kerberos в NiFi.
         Используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-kerberos-credentials-service-api</artifactId>
        <version>${nifi.version}</version>
        <scope>compile</scope>
    </dependency>

    <!-- Зависимость от репозитория постоянного происхождения NiFi.
         Используется для сохранения данных lineage в формате, который может быть восстановлен при перезапуске NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-persistent-provenance-repository</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от библиотеки JetBrains Annotations.
         Предоставляет аннотации, используемые для улучшения статического анализа и других задач компиляции.
         Используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>org.jetbrains</groupId>
        <artifactId>annotations</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>

    <!-- Зависимость от библиотеки Lombok.
         Позволяет упростить написание Java кода, автоматически генерируя геттеры, сеттеры и другие методы во время компиляции.
         Используется только на этапе компиляции и не включается в финальный артефакт ('scope' provided).
    -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.26</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

All pom.xml The files are ready, and all that remains is to write the reporter code.

I am creating a class PicoReportLineageToAtlas and copy the class code into it org.apache.nifi.atlas.reporting.ReportLineageToAtlas.

I change in it:

  • package org.apache.nifi.atlas.reporting on package pico.habr.nifi.atlas.reporting

  • The name of the reporter himself ReportLineageToAtlas on PicoReportLineageToAtlas

  • I'm updating the tags a bit.

// было 
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<namespace>", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {

  //...

}

// стало
@Tags({"pico", "atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<namespace>", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class PicoReportLineageToAtlas extends AbstractReportingTask {

  //...

}

The next task is to find where in the code the filtering by the name of the DAG components is placed.

As it turns out, another class is responsible for this, namely org.apache.nifi.atlas.NiFiFlowAnalyzer. Therefore, we create another class. PicoNiFiFlowAnalyzer and similarly copy the code into it. After that, make a few edits.

//было
package org.apache.nifi.atlas;

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { 
  //...
  }

//меняем на 
package pico.habr.nifi.atlas; //замена групы

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.atlas.NiFiFlow; //добовляю недостающию зависемость
import org.apache.nifi.atlas.NiFiFlowPath; //добовляю недостающию зависемость
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { //меняю название класса
  //...
  }

We find the method in the code analyzeProcessGroup and make edits.

// было
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus().forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus().forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus().forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

// правки
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

There is only one thing left PicoReportLineageToAtlas replace one class with another.

import org.apache.nifi.atlas.NiFiFlowAnalyzer
//меняем на 
import pico.habr.nifi.atlas.PicoNiFiFlowAnalyzer;

//и

final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
//меняем на 
final PicoNiFiFlowAnalyzer flowAnalyzer = new PicoNiFiFlowAnalyzer();

There is only one thing left resources create file org.apache.nifi.reporting.ReportingTask with the following contents:

pico.habr.nifi.atlas.reporting.PicoReportLineageToAtlas

It contains only the path to the reporter class. This file is used to register custom implementations of the interface. ReportingTask V NiFiIt follows the Java Service Provider Interface (SPI) specification, which allows implementations of interfaces or abstract classes to be dynamically discovered and loaded at runtime.

All that remains is to assemble the project as a team mvn clean install.

And then in the module pico-reporting-task-atlas-nar will appear .nar archive pico-reporting-task-atlas-nar-1.20.0.nar.

As a result, we have the following project structure:

ls -R
.:
pom.xml  reporting-task-atlas  reporting-task-atlas-nar

./reporting-task-atlas:
pom.xml  src

./reporting-task-atlas/src:
main

./reporting-task-atlas/src/main:
java  resources

./reporting-task-atlas/src/main/java:
pico

./reporting-task-atlas/src/main/java/pico:
habr

./reporting-task-atlas/src/main/java/pico/habr:
nifi

./reporting-task-atlas/src/main/java/pico/habr/nifi:
atlas

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas:
PicoNiFiFlowAnalyzer.java  reporting

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas/reporting:
PicoReportLineageToAtlas.java

./reporting-task-atlas/src/main/resources:
META-INF

./reporting-task-atlas/src/main/resources/META-INF:
services

./reporting-task-atlas/src/main/resources/META-INF/services:
org.apache.nifi.reporting.ReportingTask

./reporting-task-atlas-nar:
pom.xml

how to penetrate .nar into NIFI

To connect .nar archive in NIFIjust drop it into the folder libwhich should be located in the root folder NIFI. In the case of Docker, you can use the command docker cpAfter that, just reboot. NIFI or the entire container.

Result

go to the control panel NIFI and find a custom reporter there. Set it up according to the requirements. (set up the same way as the original)

As an example, I will create the following dag

And let's see what's displayed in Atlas

And as we see in the Processor Atlas D did not appear because it does not have the required suffix

P.S. You can look at the code here

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *