Unit testing of Flink operators, Job: Flink MiniCluster

In this article we will write tests for the entire job using a Flink mini-cluster and using JUnit Extension. We'll also start highlighting convenient auxiliary abstractions for tests that will be needed later.

List of my posts about Flink

All source code to be parsed can be found in the repository AlexanderBobryakov/flink-spring. The master branch presents the final project for the entire series of articles. This article corresponds to the release thread with the title release/5_flinkcluster_job_deduplicator_test.

Contents of the article

Flink MiniCluster

IN documentation for writing tests, it is proposed to use the Flink abstraction of the MiniClusterWithClientResource mini-cluster for local testing of full-fledged tasks. This is due to the fact that we will not be able to fully reproduce the work of Flink on regular Unit tests (even using TestHarness), given concurrency and other internal processes. But a mini-cluster provides such an opportunity.

After the mini-cluster starts, the universal method for determining the environment StreamExecutionEnvironment.getExecutionEnvironment() will automatically connect to the mini-cluster, and all our tasks will be executed on it. The documentation suggests this usage scenario:

public class ExampleIntegrationTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(2)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    public void someТest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

But in this case, you need to create a @ClassRule for each test class or use inheritance in tests. I suggest a slightly different, more convenient way.

So, what are our problem conditions?

Firstly, I would like not to drag unnecessary dependencies into tests with Flink.

Secondly, we should be able to enable a mini-cluster in any test as simply as possible and, most importantly, once, so that the cluster is raised in front of all test classes and killed after all tests.

What solution? What comes to mind is using its @WithFlinkCluster annotation, which will provide Flink with a mini-cluster for the test class it hangs over. Let's look at the implementation itself:

@Retention(RUNTIME)
@Inherited
@ExtendWith({FlinkClusterExtension.class})
public @interface WithFlinkCluster {
}

There's nothing special about it. The main feature is inside FlinkClusterExtension. This is a JUnit Extension. In short, they are needed to change the behavior of tests using test lifecycle events. The implementation of my FlinkClusterExtension looks like this:

@Slf4j
@SuppressWarnings({"PMD.AvoidUsingVolatile"})
public class FlinkClusterExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource {
   private static final MiniClusterWithClientResource FLINK_CLUSTER;
   private static final Lock LOCK = new ReentrantLock();
   private static volatile boolean started;

   static {
       final var configuration = new Configuration();
       configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
       FLINK_CLUSTER = new MiniClusterWithClientResource(
           new MiniClusterResourceConfiguration.Builder()
               .setConfiguration(configuration)
               .setNumberSlotsPerTaskManager(2)
               .setNumberTaskManagers(1)
               .build());
   }

   @Override
   public void beforeAll(ExtensionContext context) throws Exception {
       LOCK.lock();
       try {
           if (!started) {
               log.info("Start Flink MiniCluster");
               started = true;
               FLINK_CLUSTER.before();
               context.getRoot().getStore(GLOBAL).put("Flink Cluster", this);
           }
       } finally {
           LOCK.unlock();
       }
   }

   @Override
   public void close() {
       log.info("Close Flink MiniCluster");
       FLINK_CLUSTER.after();
       started = false;
   }
}

Pay attention to the implementation of two interfaces: BeforeAllCallback, ExtensionContext.Store.CloseableResource. The first provides the beforeAll method before starting all tests inside each test class that have the @ExtendWith({FlinkClusterExtension.class}) annotation. And the second is a callback to close resources after all test classes have been processed. In the static block, we initialize the Flink mini-cluster, passing it various settings:

  • number of slots

  • TaskManager

  • standard parallelism – it’s better to set it to >1 by default to catch unobvious bugs in your scripts

The beforeAll method directly starts the cluster through its before() method. The FlinkClusterExtension implementation includes synchronizations in the form of a lock through the ReentrantLock object to avoid re-running in the case of tests running in parallel. The close method terminates the mini-cluster once after all tests that use the current JUnit Extension have run.

As a result, each test where a Flink cluster is needed can be written like this:

@WithFlinkCluster
class DeduplicatorUnitTest_byFlinkCluster {
   @Test
    void test()  {
        final var env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ...
    }
}

Abstraction for Sink testing via Flink MiniCluster

Before proceeding directly to writing the test, you need to think: how will we check our assignments? The expected result of executing a task or operator is the presence of events at its output.

The problem is that each operator has its own parallelism. Thanks to it, operators are serialized to each TaskManager in the amount of their parallelism (if there is a single slot in each TM). Each of these parallel operators can write to its own parallel instance of the Sink operator. And it would be convenient for us to put them together.

This is also mentioned at the end of the documentation in block of comments, where the authors propose to create your own CollectSink. They use a static collection, this option is not very suitable for us – tests can be executed in parallel and independently, but creating a separate class with a static collection is inconvenient, because access to a single static collection will be from all classes at the same time. This can cause tests to influence each other.

As a solution, in the Flink source code you can find the singleton class org.apache.flink.test.streaming.runtime.util.TestListWrapper, which offers a more suitable option:

private List<List<? extends Comparable>> lists;

Next, in each individual test, when initializing the TestListWrapper, an internal List is created in the lists object above. The ID of this sheet is returned to the user, and then you can write your own Writer, which will write specifically to this List, requesting it from the TestListWrapper itself based on the received id. It sounds confusing, so I’ll provide the code for the idea:

@SuppressWarnings("PMD.TestClassWithoutTestCases")
public class TestListSink<T> implements Sink<T> {
   private static final long serialVersionUID = 1L;

   private final ListWriter writer = new ListWriter();
   private final int resultListId;

   public TestListSink() {
       this.resultListId = TestListWrapper.getInstance().createList();
   }

   @Override
   public SinkWriter<T> createWriter(InitContext context) {
       return writer;
   }

   public List<T> getHandledEvents() {
       return new ArrayList<>(resultList());
   }

   @SuppressWarnings("unchecked")
   private List<T> resultList() {
       synchronized (TestListWrapper.getInstance()) {
           return (List<T>) TestListWrapper.getInstance().getList(resultListId);
       }
   }

   private class ListWriter implements SinkWriter<T>, Serializable {
       private static final long serialVersionUID = 1L;

       @Override
       public void write(T element, Context context) {
           resultList().add(element);
       }

       @Override
       public void flush(boolean endOfInput) {
           // no op
       }

       @Override
       public void close() {
           // no op
       }
   }
}

When I create my TestListSink (in each individual test), a new List is initialized and its id is remembered: TestListWrapper.getInstance().createList(). We also have our own implementation of SinkWriter, which, when receiving an event in the write method, writes it to the same List by sheet id. So, in the case of large parallelism of output operators, we will get a single List into which each parallel instance of the output operator will write. It is also useful for us to define a helper method getHandledEvents, which will return all recorded events of all concurrent instances of the sink operator after the test is executed.

Testing a deduplicator using Flink MiniCluster

In the last article we wrote a test for a deduplicator using TestHarness abstractions. As an additional example, you can rewrite the same test using a mini-cluster. For this we:

  1. We will attach our new annotation to the new test class.

  2. Let's rewrite the test itself a little, directly defining the minimum processing pipeline using a deduplicator.

Let's do this in a new test class:

@WithFlinkCluster
public class DeduplicatorUnitTest_byFlinkCluster {
   private final Time TTL = Time.milliseconds(100);

   @SneakyThrows
   @ParameterizedTest
   @MethodSource("provideUniqueEvents")
   void shouldDeduplicateMessagesByTtl(List<String> events) {
       final var sourceEvents = new ArrayList<String>();
       sourceEvents.addAll(events);
       sourceEvents.addAll(events);
       final var env = StreamExecutionEnvironment.getExecutionEnvironment();
       final var sink = new TestListSink<String>();
       env.fromCollection(sourceEvents)
           .keyBy(value -> value)
           .flatMap(new Deduplicator<>(TTL))
           .sinkTo(sink);

       env.execute();

       final var outputEvents = sink.getHandledEvents();
       assertEquals(events.size(), outputEvents.size(),
           format("Unexpected number of events after deduplication. Output events: %s", outputEvents));
       assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication");
   }

   private static Stream<Arguments> provideUniqueEvents() {
       return Stream.of(arguments(List.of("key_1", "key_2")));
   }
}

At its core, the test is very similar to the previous ones: we receive several String events as input, define the data source fromCollection(), passing the input data to it twice. Then we define the deduplicator itself and the output Sink using our universal TestListSink. After launching the pipeline, we check that the resulting Sink contains the same amount of data as there were unique messages.

The important point is to use MiniCluster. This happens under the hood during the call to the StreamExecutionEnvironment.getExecutionEnvironment() method. Since our FlinkClusterExtension runs before the test is launched, at the time the test is executed, a mini-cluster will already be created on the local machine, and the getExecutionEnvironment() method will see it and pull it up.

Testing the entire Job using Flink MiniCluster

Now you can move on to the first test of the entire Flink task. Let me remind you that our job filters the ClickMessage input stream, passing events only with the platform type WEB and APP. Next, APP events undergo deduplication, and then APP and WEB events are written to the Kafka output topic, which is determined dynamically from the ClickMessage.productTopic field.

The pipeline looks like this:

The test will involve many Spring components, so let's highlight a new annotation for testing Flink Job:

@Retention(RUNTIME)
@SpringBootTest(
   webEnvironment = NONE,
   classes = {
       PropertiesConfig.class,
       FlinkConfig.class,
   })
@ActiveProfiles({"test"})
@WithFlinkCluster
public @interface FlinkJobTest {
}

Since we don’t want to raise absolutely the entire context by default, we specify only two configurations in the @SpringBootTest annotation:

  1. PropertyConfig, which adds all our property classes that are bound to application.yml to the contextual one.

  2. FlinkConfig, in which the StreamExecutionEnvironment bean is registered and configured.

Additionally, we will use the created @FlinkJobTest annotation to raise a mini-cluster.

We are testing the job logic itself, and it doesn’t matter what exactly the source and sink will be. We will substitute real implementations (Kafka) in E2E tests.

The first test will verify that APP events are deduplicated:

@FlinkJobTest
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
class ClickToProductJobUnitTest {
   @Autowired
   private StreamExecutionEnvironment environment;
   @Autowired
   private ClickToProductJobProperties properties;

   @ParameterizedTest
   @EnumSource(value = Platform.Enum.class, names = {"APP"})
   @SneakyThrows
   void shouldDeduplicateClickMessages(Platform platform) {
       final var message = aClickMessage().withPlatform(platform).build();
       final var sink = new TestListSink<WrappedSinkMessage<ProductMessage>>();
       final var job = new ClickToProductJob(
           properties,
           env -> env.fromElements(message, message, message).uid("test_source"),
           () -> sink
       );

       job.registerJob(environment);
       environment.execute();

       final var out = sink.getHandledEvents();
       assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out));
   }

   // ...
}

At the beginning of the code we use our @FlinkJobTest annotation. In the test itself we can inject the main beans:

  1. StreamExecutionEnvironment, aimed at a mini-cluster raised in FlinkClusterExtension.

  2. ClickToProductJobProperties, in which we have all the settings configured in application-test.yml.

In the test itself, we create a ClickMessage based on the APP platform passed into the parameters of the parameterized test. Next, we define the Sink TestListSink described in previous chapters and manually create the ClickToProductJob under test. We pass properties, data source and destination as job arguments.

! The data source contains three identical instances of the input message that the job must deduplicate.

After this, the job is registered in the environment of the mini-cluster and launched synchronously. Since our fromCollection data source has three events, the job will complete automatically as they are processed. This is quite an important point, because in the case of infinite data sources (for example, Kafka), the job will run forever until we complete it. For eternal execution, you will need asynchronous launch, which we will consider in the following articles. At the output, we check that Sink contains only one event, and the rest have been deduplicated.

So you can write a test that checks that WEB events are not deduplicated, but are processed, and events with an arbitrary platform type are not processed at all. An example test can be found in the project repository indicated at the beginning of the article.

As a result, the test structure looks like this:

Conclusion

We looked at creating a Unit test for a full-fledged Flink job and individual stateful operators using a mini-cluster. We also learned how to run a mini-cluster once in front of all test classes that need it. In addition, we created auxiliary abstractions and annotations, significantly separating responsibilities in tests and simplifying the logic of writing new tests.

Kafka remains behind the scenes, because our job reads and writes data to its topics. I will tell you how to write an E2E test for full integration of Kafka + Flink Job in the next part.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *