Prostore – simple Event sourcing + CQRS framework

At the moment, there are few Event sourcing + CQRS frameworks on the market. And those that exist are unpopular and have little support, so many create their own in-house solutions. In this article, I’ll talk about the Prostore project done by the Programming Store team, which can serve as an example when creating your own solution.

Event Sourcing

Event Sourcing is an architectural pattern, an approach for storing data as events. In the traditional approach, we store the final state. Event Sourcing is a radically different approach. Instead of storing the final state, we store all intermediate states as events. The final state is obtained by successive application of all intermediate states. The data store is implemented as an append-only immutable log, and is typically referred to separately as the Event Store.

CQRS

CQRS stands for Command and Query Responsibility Segregation. It is a template that separates read (Query) and update (Command) operations for a data store. By separating operations into Command and Query, we can maximize the performance, scalability, and security of our services. It also increases the flexibility of the entire system. For Command and Query services, we can choose different frameworks, programming languages, databases. For example, we could store/update data in a relational database and make queries in a graph database.

Event Sourcing + CQRS

As described above, Event Sourcing uses a log structure as its data store. We can only add events to the end, but we cannot change or delete events. It is important to emphasize here that we can store only events in the Event Store, other structures are not provided by the template. Event Sourcing events are domain events that have already happened at some point in the past. For example:

class OrderItemAddedEvent {
    String orderId;
    String userId;
    String itemId;
    long timestamp;
}

There can be many such events in a complex subject area. This imposes limits on the storage structure and performance. To solve this problem, Event Sourcing is used in conjunction with CQRS. The final solution might look like this:

Here:

  • Client – the client of our services (user interface, external / internal service).

  • Commands – commands in the CQRS definition, operations whose result leads to the creation of events in the Event Store.

  • Command Service A is a service or group of services that serve commands of certain types. For example, order service.

  • Command Service B is another command service.

  • Event Store – event store. For example, MongoDB, PostgreSQL, Apache Cassandra, Redis. The main characteristic when choosing an Event Store is the performance of adding an entry.

  • Events – events.

  • Event Bus – event bus. For example, Apache Kafka, Redis, RabbitMQ.

  • Query – a command – a request for obtaining aggregated data.

  • Query Service A is a service or group of services that serve certain types of queries. For example, order history.

  • Query Service B is another query service.

  • Query Storage is a storage of aggregated data that is optimal for queries. Here, one Query Storage is used for two services, but you can use your own storage for each query service. For example, for one Mongodb base, for another PostgreSQL.

This is only one of the implementation options, and in my opinion, the closest to the “canonical”. For simplicity of the scheme, I did not add Service Discovery here, although it is mandatory in practice (the client must know which command / query services are available to him).

Benefits and Applications

A simple log structure gives us some advantages:

  • Low requirements for transactional logic. Base operations are atomic and immutable (adding to the end).

  • High vertical performance.

  • Simple scaling (sharding).

  • Large selection of databases (key-value, NoSQL, SQL).

  • The ability to restore the state from any point in history.

  • Quick changes to the logic associated with the base (no schema and no need to do migrations).

Since we use events, we also get the benefits of an event-driven architecture:

Possible applications:

  • Auditing, logging.

  • Logistics.

  • Cash and other transactions with a high requirement for data consistency.

  • Storage method for “state machine”.

  • Systems designed with a focus on weak connectivity.

For these cases, the Event Sourcing approach works well in practice (Netflix, Walmart), especially when we are dealing with big data.

Flaws

Since we need to store all events in the domain, Event Sourcing requires more disk space than traditional approaches.

As mentioned above, other disadvantages of Event Sourcing are solved by using CQRS. CQRS architecturally solves the problem of scalability and performance of complex queries. Unfortunately, the requirements for the amount of stored data cannot be solved only by an architectural approach. This narrows the applicability of this pattern. When choosing Event Sourcing for your project, you should study this drawback, make calculations. Also, I would not recommend using this approach as a direct replacement for the traditional approach, since it is not. I saw a project where this approach was applied to the entire project, for CRUD logic, and it was terrible. Since Event Sourcing requires storing all domain scope events, there have been events like UserEmailChanged. And so practically for each field.

The most important disadvantages I would include niche, and as a result, unpopularity. There are not many ready-made solutions on the market. Paid solutions can be expensive, and open source solutions have a small community. And as a result, in-house solutions are popular. You will have to make a choice: pay dearly by choosing a paid solution, or pay time by choosing an open source solution, or still make your own decision. Of course, without knowing your case, it is difficult for me to give any practical advice. Next, I would like to show an example of how you can make your own solution.

Solution

Link to solution code https://github.com/ProgrammingStore/prostore. The project is implemented on Spring Boot 2.7.5. Architecturally, the solution looks like the previously proposed scheme.

Requirements:

  • Java >= 17

  • Apache maven >= 3.8

  • Mongodb >= 6.0.1

  • Kafka >= 2.13-3.3.1

Project composition:

  • prostore-core – core, basic abstractions;

  • prostore-eureka – plugin, CommandBus/QueryBus implementation based on Spring Cloud Eureka;

  • prostore-mongo – plugin, implementation of EventStore on Mongodb;

  • prostore-kafka – plugin, implementation of EventBus based on Apache Kafka;

  • prostore-test-common – common library for test projects;

  • prostore-test-service – test project “service”. This is a combined service for commands and requests;

  • prostore-test-client – test project “client”;

  • spring-boot-prostore-starter – the main starter;

  • spring-boot-prostore-eureka-starter – starter for eureka;

  • spring-boot-prostore-mongo-starter – starter for mongo;

  • spring-boot-prostore-kafka-starter – starter for kafka;

  • prostore-eureka-server – eureka server.

When creating your own event sourcing solution, it is important to pay attention to flexibility. Don’t inject hard dependencies. Perhaps for some cases you will need to replace the database, for example with SQL.

In this project, Mongodb was chosen as the most optimal for storing events. Other possible options:

  • Apache Cassandra;

  • Redis;

  • PostgreSQL/MySQL.

The main characteristic when choosing an Event Store is the performance of adding an entry, as well as the ease of sharding.

Apache Kafka has been selected as the Event Bus. Event Bus requirements depend on the chosen architecture and implementation. Also, in some implementations, the Event Store can serve as an Event Bus. In the current implementation, the task of the Event Bus is the guaranteed delivery of events.

Spring Cloud Eureka is used as a CommandBus/QueryBus implementation.

Definitions of the main abstractions:

  • Aggregate – an aggregate of states. This is the same final state that we obtain by applying successively all the states. Has a unique id. For example:

@Data
class Order implements Aggregate {
    String id;
    String userId;
    List<OrderItem> orderItems;
}
  • AggregateEvent – An event that occurred in the context of an aggregate. For example, the OrderItemAddedEvent event occurs in the context of the Order aggregate.

  • Cache – for caching aggregates. Constantly building aggregates applying all events is expensive, so we use a cache.

  • Command – command, marker interface. Example:

public class CreateOrderCommand implements Command {
    private String userId;
    private String storeId;
}
  • CommandBus is a bus for sending commands.

  • CommandHandler – command handler.

  • Event – event, marker interface.

  • EventBus is a bus for sending events.

  • EventHandler is an event handler.

  • EventStore – event store.

  • Query – query, marker interface. Example:

public class GetOrderByIdQuery implements Query {
    private String aggregateId;
}

It is important to understand the difference between CommandHandler and EventHandler. CommandHandler is a command handler. The CommandBus sends a command to only one instance. If the send attempt fails, the client must resend the command. In the handler, having received the command, we must generate the event or events corresponding to the command, store them in the EventStore and send them to the EventBus. Example:

@Component
class CreateShipmentCommandHandler implements CommandHandler<CreateShipmentCommand> {
    private final Logger logger = LoggerFactory.getLogger(CreateShipmentCommandHandler.class);

    private final EventStore eventStore;
    private final EventBus eventBus;

    private CreateShipmentCommandHandler(EventStore eventStore, EventBus eventBus) {
        this.eventStore = eventStore;
        this.eventBus = eventBus;
    }

    @Override
    public String handle(CreateShipmentCommand command) {
        logger.debug("command = {}", command);
        ShipmentCreatedEvent event = new ShipmentCreatedEvent(
            UUID.randomUUID().toString(), command.getDestination(), command.getLocation()
        );
        eventStore.save(event);
        eventBus.publish(event);
        return String.format("created: %s", event.getAggregateId());
    }
}

EventHandler is an event handler. EventBus sends events to all instances. Resending events after an unsuccessful attempt should be handled by the EventBus. In the EventHandler handler, we can update the state of the aggregate (if it is an AggregateEvent), send a new command. Example:

@Component
class ShipmentMovedEventHandler implements EventHandler<ShipmentMovedEvent> {
    private final Logger logger = LoggerFactory.getLogger(ShipmentMovedEventHandler.class);

    private final EventStore eventStore;

    private ShipmentMovedEventHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(ShipmentMovedEvent event) {
        logger.debug("Got event: {}", event);
        Shipment shipment = eventStore.get(event.getAggregateId());
        shipment.setLocation(event.getLocation());
    }
}

Running a test project:

  • Run mongodb locally;

  • Run kafka locally;

  • mvn clean install;

  • mvn spring-boot:run -f prostore-eureka-server;

  • PORT=9000 mvn spring-boot:run -f prostore-test-client;

  • PORT=9001 mvn spring-boot:run -f prostore-test-service;

  • PORT=9002 mvn spring-boot:run -f prostore-test-service.

Here we launched the Eureka server, a test client on port 9000, two instances of test services on ports 9001 and 9002.

Now we can start testing. We test by sending requests to the test service:

curl -v -H "Content-Type: application/json" \
     -d '{"destination":"Moscow", "location": "Almaty"}' \
     http://localhost:9000/shipment
curl -v -X GET -H "Content-Type: application/json" \
     -d '{"aggregateId": "AGGREGATE_ID"}' \
     http://localhost:9000/shipment

Change shipment (instead of AGGREGATE_ID, insert the aggregateId received by the create command):

curl -v -H "Content-Type: application/json" \
     -d '{"aggregateId":"AGGREGATE_ID", "location": "Sydney"}' \
     http://localhost:9000/shipment/move
  • If multiple `prostore-test-service` instances are running, requests will be round-robin balanced.

  • Try restarting services. When the service starts, replay events will be launched to restore the aggregates.

All events will be added to the MongoEvent collection (test base by default):

{
    "_id" : ObjectId("63a3f2d70ac51a617297cda5"),
    "aggregateId" : "7a97ca5a-3375-4be1-bbe2-83ae7b40614d",
    "eventType" : "ru.programstore.prostore.test.common.event.ShipmentCreatedEvent",
    "eventJson" : "{\"aggregateId\":\"7a97ca5a-3375-4be1-bbe2-83ae7b40614d\",\"destination\":\"Moscow\",\"location\":\"Almaty\"}",
    "timestamp" : NumberLong(2075100266365871),
    "_class" : "ru.programstore.prostore.mongo.MongoEvent"
}

MongoEvent is provided by the mongodb module and does not require any action from the developer.

Here, when compared with the traditional approach, there are no separate collections / tables for domain area entities. Structurally, this can be represented as a stream of data that is divided by aggregateId. In the traditional approach, we would store the Shipment entity in a table/collection and then change the state of the Shipment by id. In Event Sourcing, we save the ShipmentCreatedEvent event with some initial state, set / get aggregateId. And then we can generate any events for this aggregateId. For example, ShipmentMovedEvent. When adding/changing fields, we don’t need to change the schema. We change event fields or add new types of events.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *