How to write your own processor, or expand functionality in NiFi
Apache Nifi has over 300 processors in its base package.
Nifi processor It is the main building block for creating dataflow in the NiFi ecosystem. Processors provide an interface through which NiFi provides access to the flowfile, its attributes, and content. Own custom processor will save power, time and user attention, because instead of many simple processor elements, only one will be displayed in the interface and only one will be executed (well, or how much you write). Like standard processors, a custom processor allows you to perform various operations and process the contents of a flowfile. Today we’ll talk about standard tools for expanding functionality.
ExecuteScript
ExecuteScript is a universal processor that is designed to implement business logic in a programming language (Groovy, Jython, Javascript, JRuby). This approach allows you to quickly get the desired functionality. To provide access to the NiFi components in the script, it is possible to use the following variables:
Session: variable of type org.apache.nifi.processor.ProcessSession. The variable allows you to perform operations with flowfile, such as create (), putAttribute () and Transfer (), as well as read () and write ().
Context: org.apache.nifi.processor.ProcessContext. It can be used to get processor properties, relationships, controller services, and StateManager.
REL_SUCCESS: Attitude of success.
REL_FAILURE: Relationship Failure
Dynamic properties: Dynamic properties defined in ExecuteScript are passed to the script engine as variables, set as a PropertyValue. This allows you to get the value of the property, cast the value to the corresponding data type, for example, logical, etc.
To use, just select Script Engine
and indicate the location of the file Script File
with our script or the script itself Script Body
.
Consider a couple of examples:
get one stream file from the queue
flowFile = session.get()
if(!flowFile) return
Generate a new FlowFile
flowFile = session.create()
// Additional processing here
Add Attribute to FlowFile
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
Extract and process all attributes.
flowFile = session.get() if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}
Logger
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
You can use advanced features in ExecuteScript, more about this can be found in the article ExecuteScript Cookbook.
ExecuteGroovyScript
ExecuteGroovyScript has the same functionality as ExecuteScript, but instead of a zoo of valid languages, you can use only one – groovy. The main advantage of this processor is its more convenient use of service services. In addition to the standard set of variables Session, Context, etc. You can define dynamic properties with the CTL and SQL prefix. Starting with version 1.11, support for RecordReader and Record Writer appeared. All properties are HashMap, which uses the “Service Name” as the key, and the value is a specific object depending on the property:
RecordWriter HashMap
RecordReader HashMap
SQL HashMap
CTL HashMap
This information already makes life easier. we can look into the sources or find documentation for a particular class.
Work with the database
If we define the SQL.DB property and bind DBCPService, then we will get access to the property from the code SQL.DB.rows('select * from table')
The processor automatically accepts a connection from the dbcp service before execution and processes the transaction. Database transactions are automatically rolled back when an error occurs and are committed if successful. In ExecuteGroovyScript, you can intercept start and stop events by implementing the appropriate static methods.
import org.apache.nifi.processor.ProcessContext
...
static onStart(ProcessContext context){
// your code
}
static onStop(ProcessContext context){
// your code
}
REL_SUCCESS << flowFile
InvokeScriptedProcessor
Another interesting processor. To use it, you need to declare a class that implements the implements interface and define a processor variable. You can define any PropertyDescriptor or Relationship, also access the parent ComponentLog and define the methods void onScheduled (ProcessContext context) and void onStopped (ProcessContext context). These methods will be called when a scheduled start event occurs in NiFi (onScheduled) and when it stops (onScheduled).
class GroovyProcessor implements Processor {
@Override
void initialize(ProcessorInitializationContext context) { log = context.getLogger()
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr
@Override
Collection validate(ValidationContext context) { return null
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n
@Override
List getPropertyDescriptors() { return null }
@Override
String getIdentifier() { return null }
}
Logic must be implemented in the method void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throw s ProcessException {
final ProcessSession session = sessionFactory.createSession(); def
flowFile = session.create()
if (!flowFile) return
// your code
try
{ session.commit();
} catch (final Throwable t) {
session.rollback(true);
throw t;
}
}
It is unnecessary to describe all the methods declared in the interface, so let's get around one abstract class in which we declare the following method:
abstract void executeScript(ProcessContext context, ProcessSession session)
The method we will call in
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)
import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.components.ValidationContext import org.apache.nifi.components.ValidationResult import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.ProcessSessionFactory import org.apache.nifi.processor.Processor
import org.apache.nifi.processor.ProcessorInitializationContext import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
abstract class BaseGroovyProcessor implements Processor {
public ComponentLog log
public Set relationships;
@Override
void initialize(ProcessorInitializationContext context) { log = context.getLogger()
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr final ProcessSession session = sessionFactory.createSession();
try {
executeScript(context, session);
session.commit();
} catch (final Throwable t) {
session.rollback(true);
throw t;
}
}
abstract void executeScript(ProcessContext context, ProcessSession session) thro
@Override
Collection validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n
@Override
List getPropertyDescriptors() { return null }
@Override
String getIdentifier() { return null }
}
Now declare the inheritor class BaseGroovyProcessor
and describe our executeScript, also add Relationship RELSUCCESS and RELFAILURE.
import org.apache.commons.lang3.tuple.Pair
import org.apache.nifi.expression.ExpressionLanguageScope import org.apache.nifi.processor.util.StandardValidators import ru.rt.nifi.common.BaseGroovyProcessor
import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException import org.quartz.CronExpression
import java.sql.Connection
import java.sql.PreparedStatement import java.sql.ResultSet
import java.sql.SQLException import java.sql.Statement
class InvokeScripted extends BaseGroovyProcessor {
public static final REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If the cache was successfully communicated with it will be rou
.build()
public static final REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If unable to communicate with the cache or if the cache entry
.build()
@Override
void executeScript(ProcessContext context, ProcessSession session) throws Proces def flowFile = session.create()
if (!flowFile) return
try {
// your code
session.transfer(flowFile, REL_SUCCESS)
} catch(ProcessException | SQLException e) {
session.transfer(flowFile, REL_FAILURE)
log.error("Unable to execute SQL select query {} due to {}. No FlowFile
}
}
}
Add to the end of the code processor = new InvokeScripted()
This approach is similar to creating a custom processor.
Conclusion
Creating a custom processor is not the easiest thing - for the first time you will have to work hard to figure it out, but the benefits of this action are undeniable.
Post prepared by Rostelecom Data Management Team