Property of type Controller Service in a custom NiFi processor
Preamble
In one of the homemade processors Apache NiFi I have a need to work with certificate files. Since the project already had configured certificate services (such as StandardRestrictedSSLContextService
), I decided to make the processor a property with the type SSLContextService
so that you can substitute the controller services already configured in NiFi and take certificate data from there. I studied the materiel, nothing complicated. Added a property to the processor, threw it on flow. But… the processor does not see my StandardRestrictedSSLContextService
. I reviewed and re-read many articles, including from the respected Pierre Villard, but no way. Until I came across implementation of a similar case on Github.
Perhaps this is a banal problem, but it caused decent difficulties for me, so I decided to write about it. Below I will give code examples, and also show how to slip your stub into the processor integration test (which also turned out to be not an entirely obvious task).
Adding a property to the processor
We add a property to the processor through which it will be possible to specify standard services that provide access to certificates.
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("SSL Context Service provides trusted certificates and client certificates for TLS communication.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
In method code @OnTrigger
processor access to the value of the property will be obtained like this:
SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
Now, perhaps the most important section of this little article, for which everything was started. Properly adding dependencies!
1) B pom.xml
add two dependencies to the main processor module:
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi.version}</version>
<type>nar</type>
</dependency>
2) B pom.xml
nar module, add one dependency:
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi.version}</version>
<type>nar</type>
</dependency>
nifi-ssl-context-service-api
– with the help of this dependency in the processor module, we have the opportunity to use the interface SSLContextService.class
in the code of the main processor module
nifi-standard-services-api-nar
– this dependency in both modules will allow the processor to use standard NiFi services
Actually, that’s the whole point.
If you need to understand what dependency you will have to use when you need to add a property to the processor that is different in type from SSLContextService
analysis can help NiFi sources.
Testing a CPU that uses a service as one of its properties
A small problem is that in order to test a processor whose one of the properties is Controller Service, we need to create some kind of stub for this service. And then specify it as one of the properties.
Let’s prepare auxiliary test data:
DummySSLContextService.class – stub for the service
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import lombok.Builder;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
@Builder
public class DummySSLContextService extends AbstractControllerService implements SSLContextService {
private final String keyStoreFile;
private final String keyStorePassword;
private final String keyStoreType;
private final String trustStoreFile;
private final String trustStorePassword;
private final String trustStoreType;
@Override
public TlsConfiguration createTlsConfiguration() {
throw new RuntimeException("Method not implemented");
}
@Override
public SSLContext createContext() {
throw new RuntimeException("Method not implemented");
}
@Override
public SSLContext createSSLContext(org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {
throw new RuntimeException("Method not implemented");
}
@Override
public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {
throw new RuntimeException("Method not implemented");
}
@Override
public X509TrustManager createTrustManager() {
throw new RuntimeException("Method not implemented");
}
@Override
public String getTrustStoreFile() {
return trustStoreFile;
}
@Override
public String getTrustStoreType() {
return trustStoreType;
}
@Override
public String getTrustStorePassword() {
return trustStorePassword;
}
@Override
public boolean isTrustStoreConfigured() {
throw new RuntimeException("Method not implemented");
}
@Override
public String getKeyStoreFile() {
return keyStoreFile;
}
@Override
public String getKeyStoreType() {
return keyStoreType;
}
@Override
public String getKeyStorePassword() {
return keyStorePassword;
}
@Override
public String getKeyPassword() {
throw new RuntimeException("Method not implemented");
}
@Override
public boolean isKeyStoreConfigured() {
throw new RuntimeException("Method not implemented");
}
@Override
public String getSslAlgorithm() {
throw new RuntimeException("Method not implemented");
}
@Override
public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
// ничего не делаем
}
}
TestData.class – constants and stub service instance for tests
import java.nio.file.Path;
import org.apache.nifi.ssl.SSLContextService;
public class TestData {
static final String CLIENT_CERT_FILE_NAME = Path.of("src/test/resources/client-identity.jks").toAbsolutePath().toString();
static final String CLIENT_TRUST_FILE_NAME = Path.of("src/test/resources/truststore.jks").toAbsolutePath().toString();
public static final String KEYSTORE_COMMON_PASSWORD = "changeit";
public static final String KEYSTORE_COMMON_TYPE = "JKS";
public static final SSLContextService TEST_SSL_CONTEXT_SERVICE = DummySSLContextService.builder()
.keyStoreFile(CLIENT_CERT_FILE_NAME)
.keyStorePassword(KEYSTORE_COMMON_PASSWORD)
.keyStoreType(KEYSTORE_COMMON_TYPE)
.trustStoreFile(CLIENT_TRUST_FILE_NAME)
.trustStorePassword(KEYSTORE_COMMON_PASSWORD)
.trustStoreType(KEYSTORE_COMMON_TYPE)
.build();
}
And the actual test (here SSL_CONTEXT_SERVICE
is a processor property descriptor):
@Test
void should_get_credentials() throws InitializationException {
final TestRunner testRunner = TestRunners.newTestRunner(NsmSecmanProcessor.class);
// устанавливаем значение для одного из свойств тестового процессора
testRunner.setProperty(SOME_PROPERTY_NAME, SOME_PROPERTY_VALUE);
// устанавливаем для тестового процессора свойство, в котором указывается Controller Service
// это делается в три шага
testRunner.addControllerService("TestSSLContextService", TEST_SSL_CONTEXT_SERVICE);
testRunner.setProperty(SSL_CONTEXT_SERVICE, "TestSSLContextService");
testRunner.enableControllerService(TEST_SSL_CONTEXT_SERVICE);
// создаем набор аттрибутов для тестового flow-файла
Map<String, String> attributes = new HashMap<>();
attributes.put(SOME_ATTR_NAME, SOME_ATTR_VALUE);
// добавляем контент и аттрибуты в тестовый flow-файл
// вместо строки сюда может быть передан поток
testRunner.enqueue("Flowfile content", attributes);
// When
testRunner.run();
// Then
List<MockFlowFile> originalFlowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
List<MockFlowFile> failureFlowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertThat(originalFlowFiles.size()).isEqualTo(1);
assertThat(failureFlowFiles.size()).isZero();
Map<String, String> actualAttributes = originalFlowFiles.get(0).getAttributes();
assertThat(actualAttributes)
.isNotNull()
.containsEntry(SOME_ATTR_NAME, SOME_ATTR_VALUE);
}
Conclusion
That’s all. I hope this little article is helpful. Comments and indications of inaccuracies in the comments are very welcome))
Thank you for reading to the end.