Property of type Controller Service in a custom NiFi processor

Preamble

In one of the homemade processors Apache NiFi I have a need to work with certificate files. Since the project already had configured certificate services (such as StandardRestrictedSSLContextService), I decided to make the processor a property with the type SSLContextServiceso that you can substitute the controller services already configured in NiFi and take certificate data from there. I studied the materiel, nothing complicated. Added a property to the processor, threw it on flow. But… the processor does not see my StandardRestrictedSSLContextService. I reviewed and re-read many articles, including from the respected Pierre Villard, but no way. Until I came across implementation of a similar case on Github.

Perhaps this is a banal problem, but it caused decent difficulties for me, so I decided to write about it. Below I will give code examples, and also show how to slip your stub into the processor integration test (which also turned out to be not an entirely obvious task).

Adding a property to the processor

We add a property to the processor through which it will be possible to specify standard services that provide access to certificates.

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    .name("SSL Context Service")
    .description("SSL Context Service provides trusted certificates and client certificates for TLS communication.")
    .required(false)
    .identifiesControllerService(SSLContextService.class)
    .build();

In method code @OnTrigger processor access to the value of the property will be obtained like this:

SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

Now, perhaps the most important section of this little article, for which everything was started. Properly adding dependencies!

1) B pom.xml add two dependencies to the main processor module:

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-ssl-context-service-api</artifactId>
    <version>${nifi.version}</version>
</dependency>
 
<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-standard-services-api-nar</artifactId>
    <version>${nifi.version}</version>
    <type>nar</type>
</dependency>

2) B pom.xml nar module, add one dependency:

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-standard-services-api-nar</artifactId>
    <version>${nifi.version}</version>
    <type>nar</type>
</dependency>

nifi-ssl-context-service-api – with the help of this dependency in the processor module, we have the opportunity to use the interface SSLContextService.class in the code of the main processor module

nifi-standard-services-api-nar – this dependency in both modules will allow the processor to use standard NiFi services

Actually, that’s the whole point.

If you need to understand what dependency you will have to use when you need to add a property to the processor that is different in type from SSLContextServiceanalysis can help NiFi sources.

Testing a CPU that uses a service as one of its properties

A small problem is that in order to test a processor whose one of the properties is Controller Service, we need to create some kind of stub for this service. And then specify it as one of the properties.

Let’s prepare auxiliary test data:

DummySSLContextService.class – stub for the service
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import lombok.Builder;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;

@Builder
public class DummySSLContextService extends AbstractControllerService implements SSLContextService {
    private final String keyStoreFile;
    private final String keyStorePassword;
    private final String keyStoreType;
    private final String trustStoreFile;
    private final String trustStorePassword;
    private final String trustStoreType;

    @Override
    public TlsConfiguration createTlsConfiguration() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createContext() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createSSLContext(org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public X509TrustManager createTrustManager() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getTrustStoreFile() {
        return trustStoreFile;
    }

    @Override
    public String getTrustStoreType() {
        return trustStoreType;
    }

    @Override
    public String getTrustStorePassword() {
        return trustStorePassword;
    }

    @Override
    public boolean isTrustStoreConfigured() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getKeyStoreFile() {
        return keyStoreFile;
    }

    @Override
    public String getKeyStoreType() {
        return keyStoreType;
    }

    @Override
    public String getKeyStorePassword() {
        return keyStorePassword;
    }

    @Override
    public String getKeyPassword() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public boolean isKeyStoreConfigured() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public String getSslAlgorithm() {
        throw new RuntimeException("Method not implemented");
    }

    @Override
    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
        // ничего не делаем
    }
}
TestData.class – constants and stub service instance for tests
import java.nio.file.Path;

import org.apache.nifi.ssl.SSLContextService;

public class TestData {
    static final String CLIENT_CERT_FILE_NAME = Path.of("src/test/resources/client-identity.jks").toAbsolutePath().toString();
    static final String CLIENT_TRUST_FILE_NAME = Path.of("src/test/resources/truststore.jks").toAbsolutePath().toString();
    public static final String KEYSTORE_COMMON_PASSWORD = "changeit";
    public static final String KEYSTORE_COMMON_TYPE = "JKS";

    public static final SSLContextService TEST_SSL_CONTEXT_SERVICE = DummySSLContextService.builder()
            .keyStoreFile(CLIENT_CERT_FILE_NAME)
            .keyStorePassword(KEYSTORE_COMMON_PASSWORD)
            .keyStoreType(KEYSTORE_COMMON_TYPE)
            .trustStoreFile(CLIENT_TRUST_FILE_NAME)
            .trustStorePassword(KEYSTORE_COMMON_PASSWORD)
            .trustStoreType(KEYSTORE_COMMON_TYPE)
            .build();
}

And the actual test (here SSL_CONTEXT_SERVICE is a processor property descriptor):

    @Test
    void should_get_credentials() throws InitializationException {
        final TestRunner testRunner = TestRunners.newTestRunner(NsmSecmanProcessor.class);

        // устанавливаем значение для одного из свойств тестового процессора
        testRunner.setProperty(SOME_PROPERTY_NAME, SOME_PROPERTY_VALUE);
      
        // устанавливаем для тестового процессора свойство, в котором указывается Controller Service
        // это делается в три шага
        testRunner.addControllerService("TestSSLContextService", TEST_SSL_CONTEXT_SERVICE);
        testRunner.setProperty(SSL_CONTEXT_SERVICE, "TestSSLContextService");
        testRunner.enableControllerService(TEST_SSL_CONTEXT_SERVICE);

        // создаем набор аттрибутов для тестового flow-файла
        Map<String, String> attributes = new HashMap<>();
        attributes.put(SOME_ATTR_NAME, SOME_ATTR_VALUE);

        // добавляем контент и аттрибуты в тестовый flow-файл
        // вместо строки сюда может быть передан поток
        testRunner.enqueue("Flowfile content", attributes);

        // When
        testRunner.run();

        // Then
        List<MockFlowFile> originalFlowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
        List<MockFlowFile> failureFlowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);

        assertThat(originalFlowFiles.size()).isEqualTo(1);
        assertThat(failureFlowFiles.size()).isZero();

        Map<String, String> actualAttributes = originalFlowFiles.get(0).getAttributes();
        assertThat(actualAttributes)
                .isNotNull()
                .containsEntry(SOME_ATTR_NAME, SOME_ATTR_VALUE);
    }

Conclusion

That’s all. I hope this little article is helpful. Comments and indications of inaccuracies in the comments are very welcome))

Thank you for reading to the end.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *