Event Sourcing and Saga with Marten and Wolverine in C# and a bit of a modular monolith

In this article:

We will abandon controllers, MediatR and MassTransit, all of the above will be replaced by Wolverine. We will cast a modular monolith in granite, implement event registration using Marten. An example of all this disgrace is located here.

Let's start with Wolverine

According to its creators, Wolwerine is a new generation of implementation of the Mediator and Message Bus patterns, a library “with batteries included”.

Messages: they can be processed, they can be sent, they can be scheduled, they can be expected.

Messages: they can be processed, they can be sent, they can be scheduled, they can be expected.

Wolverine allows:

  • Handle HTTP requests using Wolverine handlers, including those found in libraries.

  • Send messages (commands and requests) to handlers and receive the execution result.

  • Send messages (commands and requests) to handlers located in other applications, in this case we will need transport. Out of the box, Wolverine works with the following types of transport: RabbitMQ, Azure Service Bus, Amazon SQS, TCP, Sql Server, PostgreSQL, MQTT, Kafka. In the example, I use Kafka as a transport for some messages, Wolverine allows you to choose which message will be sent by which transport.

  • Implement the Saga pattern. Wolverine's Saga supports a type of message called “timeout messages”, which allows “forgotten” sagas to be completed “automatically”.

  • Send messages in “Ping – Pong” mode, in the documentation it is called “cascading messages”.

Wolverine has out-of-the-box integration with Marten, which makes it possible, for example, to send all commands and requests directly to Kafka.

Martin

Marten is a .Net library that aims to eliminate the “boilerplate” and allow you to focus on delivering business value, according to its creators. Marten is an add-on to PostgreSQL that allows you to use this database as an analogue of MongoDB or EventStoreDB. The manufacturer also promises Strong Consistency (but there are nuances), flexible indexing strategies, improved linq queries, built-in implementation of the Outbox/Inbox pattern “out of the box”, Multi-tenancy support, integration with Asp.Net Core and other words in English that are not clear to everyone:

https://martendb.io/introduction.html.

The example implements event registration on Marten, and if you compare the EventStoreDB plus MongoDB bundle (a good example is here), then yes, in comparison with them, Marten will require a minimum of code, since the read part is provided “out of the box”.

Modular monolith

A modular monolith is a type of architecture that involves dividing an application into modules, each of which can be converted into a separate micro-service as the application scales, and, as you might guess, a module is an implementation of a bounded context in code. Modules “know” only each other’s contracts and interact in the same way as micro-services would, plus interaction using one or another implementation of the Mediator pattern.

Pros: We save on writing infrastructure code required to create a separate application for each module, which we would have to do in the case of a microservice architecture, at the same time, there is a chance to avoid spaghetti code. We save on writing integration code, since the modules are in the same process. Saving data from different modules within a single transaction is also not a problem, since all modules work with the same connection to the database. It is much easier to create a new module than a new microservice.

Cons: It is a monolith, the module is not a deployment unit.

A racially accurate podcast about Modular Monolith for .Net can be found here and there too example.

Well, or within the framework of import substitution, conversation And example from DevBrothers, also quite a bit.

Subject area

It is simple and brief, like Buldakov's toast. There is a “Person”, the person has “Accounts”, “Payments” come to the accounts. Each action: creating a person, adding an account, adding a payment must be confirmed or rejected. At the output, we must receive a list of persons and the balance for each of them, with the history of how the current state was achieved.

Adding new data to the database – only at the end of the saga:

In Swagger it looks like this:

In point 1 you need to take the saga ID and within 3 minutes send a request with it in point 2.

In point 1 you need to take the saga ID and within 3 minutes send a request with it in point 2.

The solution has an API project from which the Controllers folder is separated, it is not needed, since all HTTP calls should be handled by modules.

We have three modules: Persona module (PersonModule), Saga Approvement Module and Saga Rejection Module.

Wolverine HTTP EndPoints

Let's start with SagaApprovementModule: it contains the SagaApprovement.Contracts library with the module's contracts and the SagaApprovement library, which contains Wolverine's endpoints:

// <summary>
/// EndPoint завершения саги добавления счёта, добавления платежа, создания персоны.
/// Во всех трёх обработчиках один из параметров - впрыск ссылки на объект шины сообщений.
/// </summary>
public static class ApproveEndPoints
{
    /// <summary>
    /// Отправляем в сагу добавления счёта сообщение разешающее добавление счёта.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-add-account-saga")]
    public static ValueTask Handle(ApproveAccountCommand command, IMessageBus bus) => bus.PublishAsync(new AccountApproved(command.SagaId));

    /// <summary>
    /// Отправляем в сагу добавления платежа сообщение разешающее добавление платежа.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-add-payment-saga")]
    public static ValueTask Handle(ApprovePaymentCommand command, IMessageBus bus) => bus.PublishAsync(new PaymentApproved(command.SagaId));
    

    /// <summary>
    /// Отправляем в сагу создания персоны сообщение разешающее создание персоны.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-person-creation-saga")]
    public static ValueTask Handle(ApprovePersonCreationCommand command, IMessageBus bus) => bus.PublishAsync(new PersonApproved(command.SagaId));
}

The class contains three endpoints with addresses: “approve-add-account-saga”, “approve-add-payment-saga”, “approve-person-creation-saga”. For all this to work, you need a class that has “EndPoint” or “EndPoints” in its name and contains methods with attributes WolverinePost, WolverineGet, etc. Nothing else is needed.

The Wolverine manual states that method injection is preferred. In the line above, we inject the Wolverine message bus interface. We can then choose one of three options:

  • bus.PublishAsync — publishes a message to the bus. Even if there are no handlers, the method will complete successfully; if there are handlers, the method will not wait for their execution results.

  • bus.SendAsync – publishes a message to the bus, but waits for there to be handlers for this message, if there are any, it does not wait for them to complete their execution.

  • bus.InvokeAsync — publishes a message, waits for a handler to exist, returns the result of the handler execution to the sender.

In the example above, I publish the AccountApproved message to the bus. The handler for this message is in the AddAccountSaga saga. In order for the saga to accept the message, you need to define a Handle method in which one of the parameters is the message class. The handler looks like this:

/// <summary>
/// Успешное завершение саги, добавляем аккаунт.
/// </summary>
/// <param name="_"></param>
/// <param name="addAccountService">сервис добавления счёта.</param>
public async void Handle(AccountApproved _, IAddAccountService addAccountService)
{
    // Обращаемся к сервису добавления аккаунта,
    // отправляя туда данные из состояния саги.
    await addAccountService.CreateAccount(PersonId, AccountName);

    // Завершаем сагу.
    MarkCompleted();
}

Saga on Wolverine

/// <summary>
/// Сага добавления аккаунта. 
/// </summary>
public class AddAccountSaga : Saga
{
    /// <summary>
    /// Идентификатор саги.
    /// </summary>
    public string? Id { get; set; }

    /// <summary>
    /// Идентификатор персоны.
    /// </summary>
    public string PersonId { get; set; }

    /// <summary>
    /// Наименование аккаунта.
    /// </summary>
    public string AccountName { get; set; }

    /// <summary>
    /// Обработчик старта саги. 
    /// Название Start зарезервировано Wolverine. 
    /// Сообщение принимаемое в этом методе в качетсве первого параметра - будет считаться стартовым
    /// сообщением саги.
    /// Стартовый обработчик должен вернуть сагу. 
    /// В нашем случае сагу и сообщение завершающее сагу по таймауту.
    /// </summary>
    /// <param name="addAccountSagaStarted">Стартовое сообщение саги</param>
    /// <returns></returns>
    public static (AddAccountSaga, AddAccountTimeoutExpired) Start(AddAccountSagaStarted addAccountSagaStarted) => (new AddAccountSaga
    {
        //заполняем состояние саги данными.
        Id = addAccountSagaStarted.AddAccountSagaId,
        PersonId = addAccountSagaStarted.PersonId,
        AccountName = addAccountSagaStarted.AccountName
    },
    new AddAccountTimeoutExpired(addAccountSagaStarted.AddAccountSagaId));

    /// <summary>
    /// Успешное завершение саги, добавляем аккаунт.
    /// </summary>
    /// <param name="_"></param>
    /// <param name="addAccountService">сервис добавления счёта.</param>
    public async void Handle(AccountApproved _, IAddAccountService addAccountService)
    {
        // Обращаемся к сервису добавления аккаунта,
        // отправляя туда данные из состояния саги.
        await addAccountService.CreateAccount(PersonId, AccountName);

        // Завершаем сагу.
        MarkCompleted();
    }

    /// <summary>
    /// Хэндлер отрицательного завершения саги.
    /// </summary>
    /// <param name="_"></param>
    public void Handle(AccountRejected _) => MarkCompleted();

    /// <summary>
    /// Хэндлер завершения саги по таймауту.
    /// MarkCompleted - закрывает сагу.
    /// </summary>
    /// <param name="_"></param>
    public void Handle(AddAccountTimeoutExpired _) => MarkCompleted();
}

Creating sagas with Wolverine is easy:

  • We inherit the class from the Saga class.

  • Define the fields that are the state of the saga. The state will be available in all message handlers of the saga. (p. 6–20).

  • We define the handler of the saga start message. It should be named Start, and the first parameter will be considered the message with which the saga begins. (p.31).

  • We define the remaining handlers of the saga. The first parameters in the handlers are the messages that the saga should process.

The Start method must return a saga instance. In our case, it returns a tuple of the saga instance and a message that will close the saga on timeout if it is “forgotten”. The saga state will be saved in the database, and then retrieved from it during the next triggering described in the handler saga.

We inject all the necessary services with the second and following parameters in the handlers. As done in line #45.

In the settings, when adding Wolverine, you can specify which transport to send messages:

//Будем публиковать в кафке ниже приведённые события.
opts.PublishMessage<PersonApproved>().ToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.PublishMessage<PersonRejected>().ToKafkaTopic("CreatePersonUseCase.PersonRejected");

And specify where the leading integration messages can come from:

//Будем получать из топиков кафки следующие события.
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonRejected");

Actually, that's all that's needed to create a saga.

Marten and Event Sourcing

What Event Sourcing is can be read here, a good example is attached to the article, looking at which you can appreciate how much Marten simplifies the implementation of event registration.

Marten has a daemon and a projection mechanism implemented out of the box. Projections are updated when new events are saved. Projections can be accessed using Linq queries in much the same way as if we were working with relational databases. Yes, we don’t need Mongo or another database, we don’t need to implement a daemon and subscriptions to get snapshots. Marten has already done all this for us. You just need to create a projection and connect it to Marten, and it’s not difficult.

Let's move on to the hardware. In the example, we will find the Repository.cs file – this is the implementation of the repository that saves events in aggregate streams in the DB and restores the states of the aggregates based on events from the DB.

public sealed class Repository(IDocumentStore store) : IRepository
{
    //Marten document store
    private readonly IDocumentStore store = store;

    /// Получаем несохранённые события из агрегата и сохраняем их.
    public async Task StoreAsync(Aggregate aggregate, CancellationToken ct = default)
    {
        // получаем сессию для работы с событиями.
        await using var session = await store.LightweightSerializableSessionAsync(token: ct);
        // получаем список несохранённых событий из агрегата
        var events = aggregate.GetUncommittedEvents().ToArray();
        // добавляем события в стрим с идентификатором aggregate.Id
        session.Events.Append(aggregate.Id, aggregate.Version, events);

        // сохраняем изменения.
        await session.SaveChangesAsync(ct);
        // очищаем список несохранённых событий.
        aggregate.ClearUncommittedEvents();
    }

    /// Восстанавливаем состояние агрегата по событиям.
    public async Task<T> LoadAsync<T>(
        string id,
        int? version = null,
        CancellationToken ct = default
    ) where T : Aggregate
    {
        // получаем сессию для работы с событиями.
        await using var session = await store.LightweightSerializableSessionAsync(token: ct);
        // восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
        // при этом Marten вызовет методы Apply для каждого из сохранённых событий.

        var stream = await session.Events.FetchForWriting<T>(id, ct);

        return stream.Aggregate;        
    }
}

This is all the code that is needed for the repository of the write part of the module. In the read part, you can do without a repository at all.

/// <summary>
/// Endpoint получения данных о персонах.
/// </summary>
public static class GetPersonWithSumEndPoint
{
    /// <summary>
    /// Получаем персону по её идентификатору.
    /// В метод впрыскиваем сессию для получения read-модели.
    /// </summary>
    /// <param name="getPersonsWithSumCommand"></param>
    /// <param name="session"></param>
    /// <returns></returns>
    /// <exception cref="Exception"></exception>
    [WolverineGet("person/person")]
    public static async Task<string> Handle(GetPersonWithSumQuery getPersonsWithSumCommand, IQuerySession session)
    {
        var person = await session
            .Query<PersonWithSum>()
            .FirstOrDefaultAsync(c => c.Id == getPersonsWithSumCommand.PersonId) ?? throw new Exception($"Person not found.");

        return JsonConvert.SerializeObject(person, Formatting.Indented);
    }

    /// <summary>
    /// Получаем список всех персон (IRL это плохо, но для примера можно кмк.)
    /// Впрыскиваем в метод сессию для получения списка read-моделей.
    /// </summary>
    /// <param name="getPersonsWithSumCommand"></param>
    /// <param name="session"></param>
    /// <returns>Список персон с сальдо.</returns>
    /// <exception cref="Exception"></exception>
    [WolverineGet("person/persons")]
    public static async Task<string> Handle(GetPersonsWithSumQuery getPersonsWithSumCommand, IQuerySession session)
    {
        var persons = await session
            .Query<PersonWithSum>().ToListAsync() ?? throw new Exception($"Persons not found.");

        return JsonConvert.SerializeObject(persons, Formatting.Indented);
    }
}

The GetPersonWithSumEndPoint class, as you might guess, is an HTTP EndPoint. To get data from projections, we just need to execute a LINQ query.

await session.Query<PersonWithSum>().ToListAsync()

We get all the data from the projection, but you can add Where, Take, Skip, etc. to limit the selection. No difference with the selection from relational DBs.

Projections

Projections in Marten can be of the following types:

  • Aggregate Projection: Live — projection on the fly, only a model is needed, no need to create a projection class; Multi-Stream — projections with the ability to group events, slice events, split by tenantId, etc.; Custom — even more extensive capabilities than in the previous Multi-Stream.

  • Event Projections: Allow you to explicitly define document creation operations from individual events.

  • Custom projections: projections that inherit from IProjection, you do everything yourself from scratch. The other types of projections listed in this list are inherited from certain classes and already have some functionality.

  • Inline projections: events are projected into a single transaction with event persistence.

  • Flat Table Projection: Allows you to create an ADO.NET table, add columns to it using AddColumn, and project data directly into it.

In the example you can find a projection of aggregates based on SingleStreamProjection.

The projection model looks like this:

/// <summary>
/// Модель персоны, используется в проекции PersonWithSumProjection. В модель добавлено поле Saldo.
/// </summary>
public class PersonWithSum
{
    /// <summary>
    /// Идентификатор персоны.
    /// </summary>
    public string Id { get; set; }

    /// <summary>
    /// ФИО
    /// </summary>
    public string Name { get; set; }

    /// <summary>
    /// ИНН
    /// </summary>
    public string Inn { get; set; }

    /// <summary>
    /// Сальдо.
    /// </summary>
    public decimal Saldo { get; set; }

    public long Version { get; private set; }

    /// <summary>
    /// Счета.
    /// </summary>
    public List<Account> Accounts = new List<Account>();

    /// <summary>
    /// Методы Apply будут вызваны Marten при построении проекции.
    /// </summary>
    /// <param name="event"></param>
    public void Apply(PersonCreated @event)
    {
        Id = @event.Id;
        Name = @event.Name;
        Inn = @event.Inn;
        Version++;
    }

    public void Apply(PersonNameChanged @event)
    {
        Name = @event.NewName;
        Version++;
    }

    public void Apply(PersonInnChanged @event)
    {
        Inn = @event.NewInn;
        Version++;
    }

    public void Apply(AccountCreated @event)
    {
        var account = new Account(@event.AccountId, @event.Name, new List<Payment>());
        Accounts.Add(account);
        Version++;
    }

    public void Apply(PaymentCreated @event)
    {
        var payment = new Payment(@event.Id, @event.Sum, @event.PaymentType);
        var account = Accounts.FirstOrDefault(x => x.Id == @event.AccountId) ?? throw new ArgumentNullException($"Счёт не найден с ид {@event.AccountId}");
        account.Payments.Add(payment);

        Saldo = @event.PaymentType == (int)PaymentTypeEnum.Credit ? Saldo + @event.Sum : Saldo - @event.Sum;

        Version++;
    }
}

Apply methods will be called in the projection.

The projection class looks like this:

/// <summary>
/// Проекция событий агрегата PersonAggreate. Проекция вычисляет сальдо по каждому из агрегатов.
/// </summary>
public class PersonWithSumProjection : SingleStreamProjection<PersonWithSum>
{
    public PersonWithSumProjection()
    {
        // Вызываются методы Apply модели PersonWithSum
        ProjectEvent<PersonCreated>((item, @event) => item.Apply(@event));
        ProjectEvent<PersonInnChanged>((item, @event) => item.Apply(@event));
        ProjectEvent<PersonNameChanged>((item, @event) => item.Apply(@event));
        ProjectEvent<AccountCreated>((item, @event) => item.Apply(@event));
        // В этом Apply вычисляется сальдо.
        ProjectEvent<PaymentCreated>((item, @event) => item.Apply(@event));
    }
}

In the projection we indicate which events it will process and by which model methods.

Then the projection needs to be added to Marten.

options.Projections.Add<PersonWithSumProjection>(ProjectionLifecycle.Async);

LifeTime is specified as asynchronous, i.e. the projection is built asynchronously, after saving events. You can specify Inline, then the projection will be built within one transaction together with saving events.

Units

Marten does not provide base classes for writing aggregates. Therefore, we write them ourselves: we expose properties for reading. The state is changed only through the aggregate methods. If an error occurs, we write the corresponding event to the list of events to be saved, instead of generating an exception.

Marten expects your aggregate to implement Apply methods for each of the domain events, if so, Marten will be able to restore the aggregate state even if the Apply methods are private. For example, the aggregate defines the following Apply methods:

protected void Apply(PersonNameChanged @event)
{
    Name = @event.NewName;
    Version++;
}

protected void Apply(PersonInnChanged @event)
{
    Inn = @event.NewInn;
    Version++;
}

protected void Apply(AccountCreated @event)
{
    var account = Account.Create(@event.AccountId, @event.Name);
    _accounts.Add(account);
    Version++;
}

When we read an aggregate from the database using the repository's LoadAsync method,

/// Восстанавливаем состояние агрегата по событиям.
public async Task<T> LoadAsync<T>(
    string id,
    int? version = null,
    CancellationToken ct = default
) where T : Aggregate
{
    // получаем сессию для работы с событиями.
    await using var session = await store.LightweightSerializableSessionAsync(token: ct);
    // восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
    // при этом Marten вызовет методы Apply для каждого из сохранённых событий.

    var stream = await session.Events.FetchForWriting<T>(id, ct);

    return stream.Aggregate;        
}

in line 15 we access the stream.Aggregate property, when accessed Marten will call the above mentioned Apply methods and apply the events.

An example of an aggregate is located in the file: PersonAggregate.cs.

Subscriptions

Subscriptions are also available in Marten.

/// <summary>
/// Подписка на события типа PersonApproved.
/// </summary>
public class PersonApprovedToKafkaSubscription : SubscriptionBase
{
    private readonly IServiceProvider _serviceProvider;

    public PersonApprovedToKafkaSubscription(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;

        SubscriptionName = nameof(PersonApprovedToKafkaSubscription);

        // Подписываемся только на события типа PersonApproved
        IncludeType<PersonApproved>();

        // настраиваем сколько событий демон будет извлекать за раз
        // и сколько будет держать в памяти.
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Позиция с которой читаем события (с текущего события)
        Options.SubscribeFromPresent();
    }

    /// <summary>
    /// Обрабатываем события.
    /// </summary>
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        // с помощью Woverine будем отправлять интеграционные события в кафку.
        var messageBus = _serviceProvider.GetService<IMessageBus>() ?? throw new ArgumentNullException("Шина событий не зарегистрирована в IoC");

        foreach (var @event in page.Events)
        {
            await messageBus.PublishAsync(
                new PersonApprovedIntegrationEvent(@event.Data.GetType().Name, JsonConvert.SerializeObject(@event.Data)));
        }

        return NullChangeListener.Instance;
    }
}

This is a subscription to one type of events. After receiving the domain event PersonApproved, an integration event PersonApprovedIntegrationEvent is generated based on it, which is sent to Kafka, as an example.

Replay of events

Sometimes you need to rebuild the projection, you can do this in Marten too.

[HttpGet]
[Route("replay")]
public async Task Replay(
    [FromServices] IDocumentStore store, CancellationToken cancellation)
{
    using var daemon = await store.BuildProjectionDaemonAsync();

    // Fire up everything!
    await daemon.StartAllAsync();

    // or instead, rebuild a single projection
    //await daemon.RebuildProjectionAsync("a projection name", 5.Minutes(), cancellation);

    // or a single projection by its type
    await daemon.RebuildProjectionAsync<PersonWithSumProjection>(cancellation);

    // Be careful with this. Wait until the async daemon has completely
    // caught up with the currently known high water mark
    await daemon.WaitForNonStaleData(1.Minutes());

    // Start a single projection shard
    //await daemon.StartAgentAsync("shard name", cancellation);

    // Or change your mind and stop the shard you just started
    //await daemon.StopAgentAsync("shard name");

    // No, shut them all down!
    await daemon.StopAllAsync();
}

Total

Overall, Wolverine+Marten can significantly reduce the amount of boilerplate code due to a wide range of out-of-the-box solutions. However, there is a caveat – if you are aiming for Strong Consistency – Marten-based solutions may have performance issues, as indicated by warnings on the official website.

Good luck!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *