Unit and E2E operator testing with timers in Apache Flink

All source code to be parsed is in the repository AlexanderBobryakov/flink-spring. The master branch presents the final project for the entire series. This article corresponds to the release thread with the title release/8_Test_for_Trigger_Flink_Job.

This is my ninth article about Apache Flink. As new ones are released, links to them will appear below.

List of my articles about Flink:

Table of contents:

Operator with timers

In the previous part, we created a TriggerAlertProcessor operator that is built into the Kafka-to-Kafka event process. It saved some information in the current state so that after a timer of 10 minutes, it would send events further based on the saved state. This code looks like this:

public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> {
   private static final long serialVersionUID = 1L;

   private final Duration stateWaiting;
   private ValueState<AlertState> alertState;
   private ValueState<Long> timestampState;
   private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer;
   private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer;

   public TriggerAlertProcessor(@NotNull Duration stateWaiting) {
       this.stateWaiting = stateWaiting;
   }

   @Override
   public void open(Configuration parameters) {
       messageToStateTransformer = new TriggerMessageToAlertStateTransformer();
       stateToMessageTransformer = new AlertStateToAlertMessageTransformer();
       final var defaultTtlConfig = StateTtlConfig
               .newBuilder(Time.minutes(stateWaiting.toMillis() + 1000))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(NeverReturnExpired)
               .cleanupFullSnapshot()
               .build();

       // alert state
       final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);
       alertDescriptor.enableTimeToLive(defaultTtlConfig);
       alertState = getRuntimeContext().getState(alertDescriptor);

       // timestamp state
       final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG);
       timestampDescriptor.enableTimeToLive(defaultTtlConfig);
       timestampState = getRuntimeContext().getState(timestampDescriptor);
   }

   @Override
   public void processElement(TriggerMessage message,
                              KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,
                              Collector<AlertMessage> out) throws Exception {
       final var status = message.getStatus();
       if (START.equals(status)) {
           // create timer
           final var state = messageToStateTransformer.transform(message);
           alertState.update(state);
           final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
           final var previousTimestamp = timestampState.value();
           if (previousTimestamp != null) {
               ctx.timerService().deleteProcessingTimeTimer(previousTimestamp);
           }
           ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis);
           timestampState.update(invokeTimerMillis);
       } else if (STOP.equals(status)) {
           // remove timer
           final var invokeTimerMillis = timestampState.value();
           if (invokeTimerMillis != null) {
               ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis);
               timestampState.clear();
           }
           alertState.clear();
       } else {
           log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey());
       }
   }

   @Override
   public void onTimer(long timestamp,
                       KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,
                       Collector<AlertMessage> out) throws Exception {
       final var alertStateValue = alertState.value();
       if (alertStateValue != null) {
           final var alertMessage = stateToMessageTransformer.transform(alertStateValue);
           out.collect(alertMessage);
       }
       timestampState.clear();
       alertState.clear();
   }

   public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {
       private static final long serialVersionUID = 1L;

       @Override
       public String getKey(TriggerMessage message) {
           return message.getUserId() + message.getTriggerName();
       }
   }
}

I explained it in detail in last article. Let me remind you of the algorithm itself that we will test:

  1. We receive the TriggerMessage input event.

  2. In terms of the key (trigger_name + user_id) we perform the following actions:

    a) if the TriggerMessage status is START, then we create a timer for 10 minutes, upon its completion we generate an AlertMessage based on the past START event;

    b) if the status of TriggerMessage is equal to STOP, then we delete the timer created in the paragraph above. If such a message has not yet appeared (in the context of the key), then it’s okay.

You can test this operator either using TestHarness from the Flink developers, or by installing Flink MiniCluster. Let's try to write identical tests for both options.

Testing an operator with timers using TestHarness classes

With classes Test Harness we have already met in the article How to unit test Flink operators: TestHarness. They allow you to test statements without any additional Flink dependencies or environment. You just need to configure the appropriate TestHarness class, and also emulate the event processing process.

Let's start by initializing the TestHarness class. We select KeyedOneInputStreamOperatorTestHarness as it, since our operator is the KeyedStream stream handler. To do this, we previously used the .keyBy method in the stream, and our operator has an internal KeyedState state. Initialization looks like this:

class TriggerAlertProcessorUnitTest_byTestHarness {
   private final Duration stateWaiting = Duration.ofMillis(10);
   private KeyedOneInputStreamOperatorTestHarness<String, TriggerMessage, AlertMessage> testHarness;

   @BeforeEach
   @SneakyThrows
   void init() {
       final var processor = new TriggerAlertProcessor(stateWaiting);
       testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
               new KeyedProcessOperator<>(processor),
               new TriggerAlertProcessor.TriggerAlertProcessorKeySelector(),
               Types.STRING
       );
       testHarness.open();
   }

   @AfterEach
   @SneakyThrows
   void clear() {
       testHarness.close();
   }
// ...
}

At the very beginning, we set stateWaiting – the time to wait for the timer before firing. It can be anything. In the BeforeEach block, we create an object of the tested operator TriggerAlertProcessor with initialization KeyedOneInputStreamOperatorTestHarness. We pass to the constructor:

  • our operator, wrapped in a special Flink class KeyedProcessOperator;

  • an object on the basis of which the event key is selected in our KeyedStream stream;

  • the type of this key.

Additionally, you can transfer other settings, such as parallelism. Be sure to call the open method to initialize internal states and prepare for use in tests. Inside AfterEach we close all resources.

Now we move on to the tests themselves. First, you should check that the timers are triggered at all and the onTimer method generates an event in the output stream. To do this, we’ll send three START events with different userIds and a repeating triggerName to the input of our operator:

private static Stream<Arguments> provideStartEvents() {
   return Stream.of(
           arguments(List.of(
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_1").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_1").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(UUID.randomUUID()).withStatus(START).withTriggerName("trigger_2").build())
           ))
   );
}

This example uses the test static method aTriggerMessage(), which allows you to make a valid TriggerMessage with the fields filled in. We expect that for each of them the timer will be created and run successfully, generating an event in the output stream. Let's check this with a test:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);
   testHarness.setProcessingTime(stateWaiting.toMillis() + 1);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(events.size(), outputEvents.size(), "Unexpected size of output result list");
}

The first step is to send all three events to testHarness for processing. The internal time of the processTime machine does not change, because for testHarness we must call special time management methods. In our case, through setProcessTime we specify an interval greater than the duration of the specified timer. The operator should then automatically react internally to the timers and call onTimer methods on each one. To do this, in the test we check that as a result of testHarness.extractOutputStreamRecord() we generated the same number of events as were received as input.

By this logic, it turns out that there will be no events as a result. If we do not tell the test to pass time using setProcessTime, then the timeout will not pass and the operator's onTimer method will not be called. Let's check it out:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldNotProcessStartEventsWhenTimerNotInvoked(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(0, outputEvents.size(), "Unexpected size of output result list");
}

Indeed, this test runs successfully.

At the end, we will reproduce the situation when a START event comes first, and then a STOP event with the same key (trigger_name + user_id). In this case, we generate input events as follows:

private static Stream<Arguments> provideStartStopEvents() {
   final var userId = UUID.randomUUID();
   return Stream.of(
           arguments(List.of(
                   new StreamRecord<>(aTriggerMessage().withUserId(userId).withStatus(START).withTriggerName("trigger_3").build()),
                   new StreamRecord<>(aTriggerMessage().withUserId(userId).withStatus(STOP).withTriggerName("trigger_3").build())
           ))
   );
}

Let's leave the test exactly the same as the first one. Let's only add the expectation of no output events during the timer:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartStopEvents")
void shouldNotProcessEventsWhenStartWithStop(List<StreamRecord<TriggerMessage>> events) {
   testHarness.processElements(events);
   testHarness.setProcessingTime(stateWaiting.toMillis() + 1);

   final var outputEvents = testHarness.extractOutputStreamRecords();
   assertEquals(0, outputEvents.size(), "Unexpected size of output result list");
}

In general, there are many more different tests that can be written, but we will stop for now. This is already enough to demonstrate the possibility of testing timers using TestHarness. Now let's see what exactly the same tests look like using the Flink MiniCluster.

Testing an operator with timers using Flink MiniCluster

In the article Unit testing of Flink operators, Job: Flink MiniCluster, I told you how to make a convenient annotation above the test so that Flink MiniCluster will automatically run:

@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}

This annotation uses the Junit Extension mechanism and is fully suitable for our tests. Their structure should look like this:

  1. We create Source from input elements.

  2. We create test Sink, in which we will check for the presence of output events.

  3. We create a minimal pipeline with our testable operator.

  4. We launch the job and wait for the time set by the timer.

  5. Checking Sink for output events.

At first glance, the test for triggering timers will look like this:

@WithFlinkCluster
class TriggerAlertProcessorUnitTest_byFlinkCluster {
   @SneakyThrows
   @ParameterizedTest
   @MethodSource("provideStartEvents")
   void shouldProcessStartEventsWhenTimerInvoked_v1(List<TriggerMessage> events) {
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var sink = new TestListSink<AlertMessage>();

       env.fromCollection(events)
           .keyBy(new TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
       env.execute();
       await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
   }

   // ...
}

At the beginning of the test, we get env, and then we create a test TestListSink. I talked about it in detail in a previous article about Unit testing of Flink operators. Its structure will be very important to us. The main purpose of this Sink is to catch all output events, given any given degree of parallelism of the Sink operator. Next, we create a pipeline by specifying input events through .fromCollection, then we use our operator and a 15 ms timer. At the end, we check that all events have been received at the output in the await block.

There are many errors in this test. Let's find them. By analogy with using TestHarness, we launch a job and expect three events at the output for each input message. In fact, on my machine the test fails:

Assert does not execute in two seconds: we do not receive as many events as output as there were in input. Something's wrong. Let's try to debug inside our operator in the places where the timer is created and the onTimer() method is called. First, we increase the waiting time for assert execution in Awaitility from 2 s to 60 s:

The debug stops three times in the onTimer method and writes events to the output stream without errors. And the test is surprisingly successful:

Let's look at the problems in more detail.

Sources BOUNDED and CONTINUOUS_UNBOUNDED

First problem is that during a normal launch, after processing all input events by each operator, the job is completed immediately, without taking into account the presence of internal timers.

This is due to the presence of two types of sources: BOUNDED – a source with a finite number of events and CONTINUOUS_UNBOUNDED – a source with an infinite number of records. Creating a thread through the fromCollection method just creates a BOUNDED thread, and the timers simply do not have time to work. So we need to somehow wait before the thread terminates. There are several ways to do this, but I suggest creating your own test universal UNBOUNDED source:

@SuppressWarnings("PMD.TestClassWithoutTestCases")
public final class TestUnboundedSource<T> implements SourceFunction<T> {
   private static final long serialVersionUID = 1L;
   private final int elementsListId;

   private volatile boolean running = true;

   @SuppressWarnings("unchecked")
   public static <E> TestUnboundedSource<E> fromElements(List<E> elements) {
       final var instanceListId = TestListWrapper.getInstance().createList();
       final var list = (List<E>) TestListWrapper.getInstance().getList(instanceListId);
       list.addAll(elements);
       return new TestUnboundedSource<>(instanceListId);
   }

   private TestUnboundedSource(int elementsListId) {
       this.elementsListId = elementsListId;
   }

   @Override
   @SuppressWarnings("unchecked")
   public void run(SourceContext<T> ctx) throws Exception {
       final var elements = (List<T>) TestListWrapper.getInstance().getList(elementsListId);
       for (T element : elements) {
           ctx.collect(element);
       }
       while (running) {
           // CONTINUOUS_UNBOUNDED
           Thread.sleep(500L);
       }
   }

   @Override
   public void cancel() {
       running = false;
   }
}

Such a source can be created using the static TestUnboundedSource.fromElements method, passing a collection of any elements. Inside it, thanks to the internal Collections.synchronizedList, a thread-safe event store is created via TestListWrapper. I have already talked about the advantages of this class in this article. It is needed to protect against problems when increasing source parallelism. This is very important, since we could use default parallelism greater than 1 in tests. Then, unlike our TestUnboundedSource, the source would duplicate messages. It is possible to use static collections as suggested in documentationbut this is not very convenient.

TestUnboundedSource must necessarily implement the run method of the SourceFunction interface, which in our case iterates through all the elements of the input collection and sends them to the data stream. The very essence of an “infinite” thread is achieved through a loop inside the run method. Implementing the close method actually stops the source because we are required to follow the convention in the javadoc. So for the cancel method it is mentioned:

Thus, in the test you can replace env.fromCollection(events) with creating a source TestUnboundedSource.fromElements(events):

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked_v2(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   env.execute();

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
}

We run the corrected test and see that it is frozen:

This was expected. We have created an infinite source, and the env.execute() method is synchronous, which is why we cannot call the close method from the current thread.

Running a Flink job asynchronously

To solve the problem, we will use the env.executeAsync() method, which launches Flink Job asynchronously. Don’t forget to add the closing of resources as the last line: for synchronous execution, the execute method returned JobExecutionResult, but the executeAsync method returns a JobClient object:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked_v3(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   final var jobClient = env.executeAsync();

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
   jobClient.cancel().get(2, TimeUnit.SECONDS);
}

We run the test, it passes:

We are in no hurry to rejoice. There is another hidden problem here. Let's imagine that the assert failed. In this case, the resources will not be closed and no one will call the jobClient.cancel() method. Therefore, when running subsequent tests when using a once raised Flink MiniCluster, the current job will still be executed, and there simply will not be enough resources of the raised cluster for a new test.

Safe completion of an asynchronous Flink task

I already wrote about this problem in the article E2E testing of Flink Job with Kafka. This can be solved with a custom AutoCloseableJobClient decorator over JobClient, which can be wrapped in try/finally. True, with the help of Lombok the test looks more elegant:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldProcessStartEventsWhenTimerInvoked(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofMillis(15)))
           .sinkTo(sink);
   @Cleanup final var jobClient = new AutoCloseableJobClient(env.executeAsync());

   await().atMost(ofSeconds(2))
           .until(() -> sink.getHandledEvents().size() == events.size());
}

In this case, Lombok itself will wrap the jobClient object in a try/finally block and always call the close method after the test, regardless of whether the assert succeeds or fails.

Using this technique, we will add a new test for the timer not firing, simply specifying the timeout duration, for example, 1 day:

@SneakyThrows
@ParameterizedTest
@MethodSource("provideStartEvents")
void shouldNotProcessStartEventsWhenTimerNotInvoked(List<TriggerMessage> events) {
   final var env = StreamExecutionEnvironment.getExecutionEnvironment();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(events);

   env.addSource(source, TypeInformation.of(TriggerMessage.class))
           .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
           .process(new TriggerAlertProcessor(ofDays(1)))
           .sinkTo(sink);
   @Cleanup final var jobClient = new AutoCloseableJobClient(env.executeAsync());

   await().during(ofSeconds(2))
           .until(() -> sink.getHandledEvents().isEmpty());
}

In a similar way, a test is written to determine if the timer does not fire when the START and STOP events occur within the same key.

Testing a Flink Job containing a statement with timers

The next stage of testing is to add full-fledged tests to the entire Flink job, which was implemented in the previous article according to the technical specifications. Let me remind you what it looks like:

@Component
@AllArgsConstructor
@ConditionalOnProperty("jobs.trigger-to-alert-job.enabled")
public class TriggerToAlertJob extends FlinkJob {
   private final TriggerToAlertJobProperties properties;
   private final SourceBinder<TriggerMessage> sourceBinder;
   private final SinkProvider<AlertMessage> sinkProvider;

   @Override
   public void registerJob(StreamExecutionEnvironment env) {
       sourceBinder.bindSource(env)
               .filter(new TriggerMessageByStatusAndUserFilter())
               .uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status")
               .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())
               .process(new TriggerAlertProcessor(properties.getStateWaiting()))
               .uid("trigger_alert_processor_id").name("trigger_alert_processor")
               .sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message");
   }
}

For the test, we will use the @FlinkJobTest annotation, which we created for testing Flink jobs in previous articles. It allows you to create or use an existing Flink MiniCluster, and also loads the necessary Spring context:

@Retention(RUNTIME)
@SpringBootTest(
   webEnvironment = NONE,
   classes = {
       PropertiesConfig.class,
       FlinkConfig.class,
   })
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}

In the new TriggerToAlertJobUnitTest we need to test two main cases:

  1. Generation of an output event by a timer when a START event is received.

  2. Absence of an output event if the START and STOP events are received at the input.

Similar to the tests above, we need to operate by changing the duration of our timer. In the first case, you can set it to 1 ms so as not to increase the waiting time for the output event in the test. In the second – 2 s to ensure that the operator successfully processes both messages.

Since the duration of the timer is set through the configuration in application.yml, it is convenient to separate these two tests within one test class using the @Nested Junit annotation, additionally passing the corresponding timer values ​​to the @TestPropertySource Spring annotation. As a result, the tests will look like this:

@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class TriggerToAlertJobUnitTest {
   @Autowired
   private StreamExecutionEnvironment environment;

   @Nested
   @TestPropertySource(properties = "jobs.trigger-to-alert-job.state-waiting=1ms")
   public class TriggerToAlertJobUnitTest_minWaitingTime {
       @Autowired
       private TriggerToAlertJobProperties properties;

       @Test
       @SneakyThrows
       void shouldCreateAlertMessageByStartTriggerMessage() {
           // ...
       }
   }

   @Nested
   @TestPropertySource(properties = "jobs.trigger-to-alert-job.state-waiting=1s")
   public class TriggerToAlertJobUnitTest_longWaitingTime {
       @Autowired
       private TriggerToAlertJobProperties properties;

       @Test
       @SneakyThrows
       void shouldNotCreateAlertMessageByStartWithStopTriggerMessage() {
           // ...
       }
   }
}

The first test for generating an output event by the timer when a START event is received looks like this:

@Test
@SneakyThrows
void shouldCreateAlertMessageByStartTriggerMessage() {
   final var triggerMessage = aTriggerMessage().withStatus(START).build();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(List.of(triggerMessage));
   final var job = new TriggerToAlertJob(
           properties,
           env -> env.addSource(source, TypeInformation.of(TriggerMessage.class)).uid("source"),
           () -> sink
   );

   job.registerJob(environment);
   @Cleanup final var jobClient = new AutoCloseableJobClient(environment.executeAsync());

   await().atMost(ofSeconds(2))
           .until(() -> !sink.getHandledEvents().isEmpty());
   final var out = sink.getHandledEvents();
   assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
   final var alertMessage = out.get(0);
   assertEquals(triggerMessage.getTriggerName(), alertMessage.getTriggerName(), "Unexpected trigger name");
   assertEquals(triggerMessage.getUserId(), alertMessage.getUserId(), "Unexpected user id");
   assertEquals(triggerMessage.getTimestamp(), alertMessage.getTimestamp(), "Unexpected timestamp");
}

The content is quite simple: we create test Source and Sink similarly to the examples above, but we already use a real TriggerToAlertJob with passing it to the constructor in the Spring job configuration. After the asynchronous launch, we check that we received an AlertMessage with the expected field values ​​at the output.

The second test for the absence of an output event when two events START and STOP are received at the input looks like this:

@Test
@SneakyThrows
void shouldNotCreateAlertMessageByStartWithStopTriggerMessage() {
   final var userId = UUID.randomUUID();
   final var startTriggerMessage = aTriggerMessage().withStatus(START).withUserId(userId).build();
   final var stopTriggerMessage = aTriggerMessage().withStatus(STOP).withUserId(userId).build();
   final var sink = new TestListSink<AlertMessage>();
   final var source = TestUnboundedSource.fromElements(List.of(startTriggerMessage, stopTriggerMessage));
   final var job = new TriggerToAlertJob(
           properties,
           env -> {
               env.setParallelism(1);
               return env.addSource(source, TypeInformation.of(TriggerMessage.class)).uid("source");
           },
           () -> sink
   );

   job.registerJob(environment);
   @Cleanup final var jobClient = new AutoCloseableJobClient(environment.executeAsync());

   await().during(ofSeconds(2))
           .until(() -> {
               final var userId1 = userId;
               return sink.getHandledEvents().isEmpty();
           });
}

The test is similar to the previous ones, but it has a very important difference in the line with env.setParallelism(1). This is necessary so that the START and STOP messages are processed in the correct order. Otherwise, STOP may be processed earlier. You can even catch such a case when you restart this test several times with parallelism set to >1. Here you can argue that we have a KeySelector, which in any case will send both events using the same key to the same operator. But don’t forget that in the job, the first place is the filter operator, whose parallelism is >1 by default. Therefore, two events can be divided into different parallel tasks (Task) of the filter and change the order already before the operator with the state. This scenario is also quite possible in production – and this should be kept in mind.

To make sure the entire job is working, we need to add a final E2E test.

E2E testing of a Flink Job containing an operator with timers

The E2E test should be similar to the previous Unit test of the entire job, but real components—Kafka topics—should be used as a source and destination. In the article E2E testing of Flink Job with Kafka was an example of E2E test for another Flink job:

@E2ETest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class JobE2ETest {
   @Autowired
   private JobStarter jobStarter;
   @Autowired
   private TestKafkaFacade kafka;
   @Autowired
   private KafkaProperties kafkaProperties;

   @Test
   @SneakyThrows
   void shouldProcessClickMessageSourceToProductSink() {
       // ...
   }
}

Here the @E2ETest annotation caused the entire Spring context to be raised, as well as Kafka and Flink MiniCluster. Also, a Flink configuration was used, emulating a prod environment (Embedded RockDB was used). Topics and interaction with Kafka were implemented using our auxiliary abstraction TestKafkaFacade, which was discussed in detail earlier.

For the current task, you can add the following test:

@Test
@SneakyThrows
void shouldProcessTriggerMessageSourceToAlertSink() {
   final var triggerMessage = aTriggerMessage().withStatus(START).build();
   kafka.sendMessage(kafkaProperties.getTopics().getTriggerTopic(), triggerMessage);

   @Cleanup final var jobClient = jobStarter.startJobs();

   final var alertTopic = kafkaProperties.getTopics().getAlertTopic();
   @Cleanup final var kafkaConsumer =
           kafka.createKafkaConsumer(Set.of(alertTopic));
   await().atMost(ofSeconds(5))
           .until(() -> kafkaConsumer.receiveAndGetAll(alertTopic, AlertMessage.class),
                   alertMessages -> alertMessages.size() == 1
                           && alertMessages.get(0).getUserId().equals(triggerMessage.getUserId())
           );
}

In it, the START message is sent to the trigger topic, and at the output it is checked that they received the AlertMessage message in the expected alert-topic specified in the application-test.yml configuration. This happens because the entire context is raised, along with Source and Sink, which are connected to Kafka. Also in the configuration it is necessary to set the timer duration setting, for example, 10 ms as opposed to the standard 10 minutes.

Conclusion

In this article, I showed how you can test operators with timers using TestHarness classes and raising the basic Flink MiniCluster. We looked at several problems and unobvious pitfalls during testing. In conclusion, we wrote a full-fledged Unit test for the entire Flink job, as well as a final E2E test that tested integration with Kafka in an environment close to the product.

In the next article, we will look at the problem when the schema of our KeyedState state may change, but we cannot lose data when updating the application and restoring the state when starting a new version. I’ll take up the topic of data serialization and write my own state serializer that supports schema evolution. And of course, I will test cases when in a once launched distributed Flink cluster there are several different versions of the same class.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *