Testing integration with message queues correctly

Surely your project uses a message queue (it doesn’t matter kafka, pulsar or some kind of bunny). The main problem is detailed testing of the operation of your system. Let's look at the solution options and see what the author has up his sleeve.

Street lectures

Street lectures

How is it being resolved now?

Obviously, the simplest option for testing integration with message queues is the absence of testing, or rather, shifting the responsibility to KVA engineers. Perhaps someone in your project even practices this, but serious developers do not do this.

A serious senior who tests message queues to the fullest is the scoop so beloved by liberals – Baba Zina from the store with her catchphrase:

Justification for why a supercomputer is needed to test integrations

Justification for why a supercomputer is needed to test integrations

The zine base will not say anything bad, so it will raise the cluster, configure it somehow and claim that the integration is working.

However, this approach has many problems, namely:

  1. There is no control over data flows; it is impossible to assert that all messages have been processed, that the messages themselves are correct;

  2. For tests, this approach is not idiomatic; if for one message you can still put up with it, then in the case of a complex tree (due to branches and the creation of side messages) it is impossible to claim that you have tested everything you need, even if for some reason the test coverage is 100% ;

  3. The configuration during testing differs from that in production; it is impossible to say that it will work 100% the same;

  4. You need a supercomputer for testing so that your team can work quietly;

  5. (Just don’t tell Baba Zina) Testing takes a lot more time – the developer wastes more time waiting for tests to complete, since he claims that his time is valuable, then money is spent twice – on the equipment and on the developer. Therefore, either Baba Zina is stealing or she is mistaken.

Based on the above, we can say for sure that another solution to the problem is required. Preferably one that will reduce the load on the hardware and at the same time give full control when performing tests.

The ideal option for testing integrations would be to be able to directly control the flow of messages and transmit them only after you have tested the content yourself. You need to be able to spam your consumer with the same message to be sure that your blocking algorithms are working.

Solution

In the project mireapay a library has been specially developed for working with message queues message-queue . Its peculiarity is the fact that it allows you not only to change the message queues (for example Pulsar to Kafka) of the application without rewriting everything, but also to emulate the test environment as much as possible, providing additional functions that message queues cannot have . Don’t forget that you still won’t get your cluster up and running with the required configuration, partitions, etc. – otherwise your tests will be like a golden toilet in terms of the cost of running them.

This library is based on two interfaces:

  1. EventConsumer – the implementation of this interface allows you to listen to a topic and receive messages one at a time. There is no package consumer; the author did not need one – the reader can implement it independently;

  2. EventProducer – a bean with this interface will be created for each topic, connect it to your component and send messages to the topic. If sending fails, catch the exception.

It is often necessary to divide the same event into different topics; for this purpose, the MessageQueueId annotation was created and assigned to your consumer. The default ID for all consumers is default, but you can set your own. For producers, you need to use the full bean name, for example defaultSimpleEventProducer, richSimpleEventProducer. Registration of topics (for creation by the bin generator) is carried out in the project configuration. Let's look at an example contract service:

event:
  provider: "pulsar"
  consumer:
    - event-class: "com.lastrix.mps.node.model.event.contract.external.ExternalContractEvent"
  producer:
    - event-class: "com.lastrix.mps.node.model.event.contract.external.ExternalContractEvent"
      message-queue-id: "out"
      topic-group: "out"

The service listens to the message queue for the ExternalContractEvent event from the default topic, but writes to the topic with the out group, the group is used when generating the topic name, while message-queue-id is needed to generate beans and separate them. Because events appear in different topics, then our service will not read its own messages and increase the number of processed messages per second. Yes, Baba Zina, you won’t be able to tell everyone that your service processes 300k messages per second, because you push back 299,999 of them as processed.

Now, in order to test such integration, we need test configuration:

event:
  provider: "test"
  consumer:
    - event-class: "com.lastrix.mps.node.model.event.contract.external.ExternalContractEvent"
  producer:
    - event-class: "com.lastrix.mps.node.model.event.contract.external.ExternalContractEvent"
      message-queue-id: "out"

It looks almost identical, the only difference is the provider. The reader may have a desire to express that everything should be stated in the annotation. Don't rush. The same service uses another event that the service itself sends and reads, including its own.

event:
  provider: "test"
  consumer:
    - event-class: "com.lastrix.mps.node.model.event.contract.lifecycle.ContractLifecycleEvent"
  producer:
    - event-class: "com.lastrix.mps.node.model.event.contract.lifecycle.ContractLifecycleEvent"
      # this will allow us to manually control event processing
      message-queue-id: "test"

Due to the fact that we created a separate message queue for the ContractLifecycleEvent event producer, when messages are sent by the service, they will not automatically go to its input. Until we ourselves read and transfer the message, the testing process will stop. This is exactly what was needed within the framework of “data flow control”.

Now we can start writing tests. To simplify the work, it is recommended to separate the code and declarations using OOP:

Abstract class will allow us to declare topics, example:

    // все очереди сообщений одним списком, TestMessageQueue - управляет
    // как продюсером, так и консьюмером
    @Autowired
    List<TestMessageQueue<?>> messageQueues;

    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    TestMessageQueue<ExternalContractEvent> defaultExternalContractEventMessageQueue;
    // обратите внимание на префикс, он определяется идентификатором, а не группой
    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    TestMessageQueue<ExternalContractEvent> outExternalContractEventMessageQueue;

Since the contract service can implement many schemes, for each one a abstract classwhich implements test management tools. Of course, all the code can be written separately in each test, spending a week copy-pasting to implement a large number of combinations during processing.

Let's consider one of the methods:

    protected void assertEventStatus(Contract contract, ContractStatus status, NodeId expectedNodeId) {
        var event = testContractLifecycleEventMessageQueue.poll();
        assertNotNull(event);
        assertTrue(event instanceof FactCreatedContractLifecycleEvent);
        var factEvent = (FactCreatedContractLifecycleEvent) event;
        assertEquals(contract.getId(), factEvent.getContractId());
        var fact = defaultContractFactMapper.toModel(factEvent.getContractFact());
        assertEquals(expectedNodeId, fact.getNodeId());
        assertEquals(StatusContractFactDetails.TYPE, fact.getDetails().getType());
        var details = (StatusContractFactDetails) fact.getDetails();
        assertEquals(status, details.getStatus());

        assertAck(defaultContractLifecycleEventMessageQueue, event);
    }

This method reads from our special topic for testing, where the service will write messages during testing. The data is validated and, if successful, transferred to the main topic so that the service can read and process it. All these methods with minor variations are based on this principle.

Now, in order to test the code, it is enough to implement something that is no longer integration testing in the full sense of the word, but definitely not unit testing. The author calls such tests scenario tests, because they must pass according to a certain scenario, let’s take as example one of the tests:

    @Test
    void test() {
        var nonLocalWalletId = Instancio.create(QWalletId.class);
        var contract = createContract(localWalletId, contractExternalId, generatePaymentTemplates(nonLocalWalletId));


        pushContractCreatedEvent(contract);
        assertSlaveReceiveContractCreated(contract);

        assertCurrentStatus(contract.getId(), ContractStatus.INIT);

        responseSlaveInitialized(contract, nonLocalWalletId.nodeId(), List.of(
                statusFact(contract.getId(), nonLocalWalletId.nodeId(), ContractStatus.INIT),
                statusFact(contract.getId(), nonLocalWalletId.nodeId(), ContractStatus.ACTIVE)
        ));

        assertEventStatus(contract, ContractStatus.ACTIVE);
        assertEventLocalStatus(contract, ContractStatus.ACTIVE);
        assertExternalEventStatusSent(contract, ContractStatus.ACTIVE);

        assertCreatedHistoryEvent(contract);

//        assertApprovalRequested(contract, localWalletId, getPaymentTemplates(contract));

//        approve(contract, localWalletId, ApprovalType.WITHDRAW);
        assertAutoApproved(contract, localWalletId, ApprovalType.WITHDRAW);

        var fromWithdrawTransactionEvent = expectCreateTransactionEvent(contract, localWalletId, ApprovalType.WITHDRAW);

        respondCreateTransactionSuccess(fromWithdrawTransactionEvent);

        assertPaymentHoldEvent(contract);
        var details = assertExternalPaymentHoldEvent(contract);

        respondExternalPaymentHold(contract, nonLocalWalletId, buildDepositDetails(nonLocalWalletId, details));
        assertPaymentHoldEvent(contract);

        respondExternalEvent(contract, statusFact(contract.getId(), nonLocalWalletId.nodeId(), ContractStatus.COMPLETING));
        assertEventStatus(contract, ContractStatus.COMPLETING, nonLocalWalletId.nodeId());
        assertEventStatus(contract, ContractStatus.COMPLETING);
        assertExternalEventStatusSent(contract, ContractStatus.COMPLETING);
        assertEventLocalStatus(contract, ContractStatus.COMPLETING);

        assertTransactionConfirmed(fromWithdrawTransactionEvent.getPaymentInfo());

        respondExternalEvent(contract, statusFact(contract.getId(), nonLocalWalletId.nodeId(), ContractStatus.COMPLETED));
        assertEventStatus(contract, ContractStatus.COMPLETED, nonLocalWalletId.nodeId());

        assertEventStatus(contract, ContractStatus.COMPLETED);
        assertExternalEventStatusSent(contract, ContractStatus.COMPLETED);
        assertEventLocalStatus(contract, ContractStatus.COMPLETED);

        assertContractStatusNotification(contract, localWalletId, ContractStatus.COMPLETED);

        assertCompleteHistoryEvent(contract);

        assertMessageQueuesAreEmpty();
    }

This test verifies a successful wallet-to-wallet transfer when two nodes are involved in the processing. Such tests can be written in batches and require almost no time to implement. If the author wrote this test like Baba Zina, it would take more than 2,000 lines!!!

Conclusion

This paper proposes an option for testing integrations with message queues based on a special library and scenario testing.

The advantage of a scenario test is the fact that even if many scenarios break at once as a result of changes, you can debug the work one by one. If you fix one, you will most likely fix many at once. A scenario test is almost 100% business logic of an application, which an analyst can read if desired and have basic programming skills.

A side benefit of this approach is a huge reduction in time for code refactoring. No matter what changes you make to your code, your script will remain unchanged, because it has almost no overlap with your code. Changes to verification methods are minimal and easily verifiable.

When going through a code review, you can immediately see the logic of the script and the logic of the test methods, it becomes possible to delegate the development of the script and validation methods (for example, a script is written and methods are loaded with a description of what needs to be done, and some junior learns how to develop using these methods). Take them apart and discuss the test at different levels. The methods themselves, developed for one scenario, will be almost guaranteed to be used in others, which will help further reduce the time for developing tests. When there is a lot of variability (a complex graph of system states), this approach is irreplaceable.

For employers

Don't forget to take a Java cat to your team! He doesn’t catch mice, but he writes Java code and does a little design!

https://hh.ru/resume/b33504daff020c31070039ed1f77794a774336

And don't forget

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *