Microservice architecture example from Saga to MassTransit

1) To ensure the order of execution. For event based systems this is not guaranteed. In order for action B to be executed strictly after action A, you need to send response events to event A and strictly after this response event is received, start B.

2) To compensate for actions. If action B fails for some reason, then the saga can execute the compensating requests for action A.

3) To describe the process. If the process is large and complex, then it is convenient to paint it in the form of a Saga in order to see what is happening in one place. Here the Saga serves as a BPM schema describing what we do Do A then do B if B succeeds then do C otherwise do D etc.

4) For guaranteed execution of some set of actions. If we did A, then the Saga can repeat trying to perform action B until a fixed number of retries.

5) To ensure a consistent system. For example, when you need to change the state of microservice A and microservice B strictly in accordance with each other.

Here is a small digression. Usually, inside the system, one format is used between microservices (for example, AMPQ or gRPC), and outside there can be systems using SOAP, HTTP, and so on. In order to isolate internal services from the features of the external environment, I usually use two types of translator services. Gateway and Proxy. Their main task is to translate from one protocol to another. For example from HTTP to AMPQ. The difference is that the Gateway is used to translate and isolate requests from outside to inside. For example SpaGateway is used to translate requests from the front. Proxy is used for transfers from inside to outside. For example SmtpServerProxy to translate requests from us to the mail server.

If each microservice is considered as some kind of object or library, then for their architecture the principles of SOLID and others of this kind are very applicable. In general, Saga plays the role of the “Brain”, which has “Organs” – these are microservices. Well, or you can consider Saga as a boss who has subordinate microservices. In complex processes, the main saga is distinguished. The main process and child sagas are sub processes. The parent saga issues commands to child sagas (under processes), which in turn issue commands to microservices.

Our saga will be as simple as possible. No compensatory action. The essence of its logic: We have a separate item microservice from which you can take a certain number of items or add them. There is a microservice of money from where you can either take or add a certain amount of money. We will work out a simple scenario where the saga first takes a certain amount from the microservice of money (holds it) and then takes a certain number of items from the microservice of items (reduces the amount in stock) of items.

Placement diagram:

Sequence diagram:

Request Schema:

MoneyMicroservice

To get started, you can install the necessary dependencies with the commands:

Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
Install-Package MassTransit.EntityFramework

It is also necessary to put on RabbitMq Delayed Message Exchange

Creating request and response messages to work with the bus

//Рекомендуется для контрактов использовать интерфейсы чтобы не было возможности 
//в них прикрутить какую-то логику.
//Хотя уже с новой версией C# где можно реализации и интерфейсам прикреплять 
//подход больше не актуален
public interface IGetMoneyRequest
{
    public Guid OrderId { get; }
}

public interface IGetMoneyResponse
{
    public Guid OrderId { get; }
}

Create a handler for this message

public class GetMoneyConsumer : IConsumer<IGetMoneyRequest>
{
    public Task Consume(ConsumeContext<IGetMoneyRequest> context)
    {
        return context.RespondAsync<IGetMoneyResponse>(new { context.Message.OrderId });
    }
}

Next, add our handler to the list of handlers in the startup

builder.Services.AddMassTransit(cfg =>
{
  //Для сообщения AddMoneyRequest будет установлен адрес  add-money-request
    cfg.SetKebabCaseEndpointNameFormatter();
  //Чтобы работала обработка запросов надо поставить расширение на RabbitMq rabbitmq_delayed_message_exchange
    cfg.AddDelayedMessageScheduler();
    //Тут регистрируем наши обработчики сообщений
    cfg.AddConsumer<AddMoneyConsumer>();
    cfg.AddConsumer<GetMoneyConsumer>();
  //Настройка подлючения к RabbitMq
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
      //Использовать паттерн OutBox - либо все сообщений одной пачкой сразу отправляются 
      //либо не будет отправлено ни одно из сообщений. 
      //Это нужно когда вам например нужно послать две команды сразу CreateOrder 
      // и SendEmail только при условии что отправяться оба либо ни одно из них.
        rbfc.UseInMemoryOutbox();
      //Повторные попытки обработать запрос.
        rbfc.UseMessageRetry(r =>
        {
          //Инкрементально повторять 3 раза каждый раз увеличивая между поторами 
          //интервал на 1 секунду. Начать с интервала в 1 секунду.
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
      //Использовать отложенные сообщения в том числе с помошью них можно 
      //делать таймауты
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
      //Записываем какие сообщения мы слушаем. Вызывать этот метод обязательно
      //иначе консумеры не будут реагировать на сообщения.
        rbfc.ConfigureEndpoints(brc);
    });
}) 
    .AddMassTransitHostedService();

ItemsMicroservice

Essentially the same as the previous one.

public interface IGetItemsRequest
{
    public Guid OrderId { get; }
}

public interface IGetItemsResponse
{
    public Guid OrderId { get; }
}

public class GetItemsConsumer : IConsumer<IGetItemsRequest>
{
    public Task Consume(ConsumeContext<IGetItemsRequest> context)
    {
        return context.RespondAsync<IGetItemsResponse>(new { OrderId = context.Message.OrderId });
    }
}
    
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    cfg.AddConsumer<AddItemsConsumer>();
    cfg.AddConsumer<GetItemsConsumer>();
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
})
    .AddMassTransitHostedService();

SagasMicroservice

Here our sagas are located and they orchestrate the process of buying items.

Request and response messages from saga:

public class BuyItemsRequest
{
    public Guid OrderId { get; set; }
}

public class BuyItemsResponse
{
    public Guid OrderId { get; set; }
    public string ErrorMessage { get; set; }
}

Saga status:

public sealed class BuyItemsSagaState : SagaStateMachineInstance
{
		//Идентификатор по которому мы отличаем один процес от другого.
    public Guid CorrelationId { get; set; }
    //Текущее состояние саги ака Failed, GetItemsPending и т.д.
    public string? CurrentState { get; set; }
    //Тут мы сохраняем идентификатор запроса что запустил нашу сагу
    //чтобы ответить на него
    public Guid? RequestId { get; set; }
    //Тут мы сохраняем адресс откуда пришел запрос который запустил нашу сагу
    //чтобы ответить на него
    public Uri? ResponseAddress { get; set; }
}

Saga for the item buying process:

public sealed class BuyItemsSaga : MassTransitStateMachine<BuyItemsSagaState>
{
    private readonly ILogger<BuyItemsSaga> _logger;

    public BuyItemsSaga(ILogger<BuyItemsSaga> logger)
    {
        _logger = logger;
        //Указываем куда будем записывать текущее состояние саги (Pending,Faulted)
        InstanceState(x => x.CurrentState);
        //Указываем что слушаем событие OrderId у которого равен нашему CorrelationId у саги
        //Либо если нет саги с таким CorrelationId то создаем его с ним.
        Event<BuyItemsRequest>(() => BuyItems, x => x.CorrelateById(y => y.Message.OrderId));
       //Узказываем какие запросы будем делать из саги
       Request(
            () => GetMoney
            );
        Request(
         () => GetItems
         );
        //Узказываем как будем реагировать на сообщения в стартовом стостоянии
        Initially(

            When(BuyItems)
            .Then(x =>
            {
            //Сохраняем идентификатор запроса и его адрес при старте саги чтобы потом на него ответить
                if (!x.TryGetPayload(out SagaConsumeContext<BuyItemsSagaState, BuyItemsRequest> payload))
                    throw new Exception("Unable to retrieve required payload for callback data.");
                x.Saga.RequestId = payload.RequestId;
                x.Saga.ResponseAddress = payload.ResponseAddress;
            })
            //Соверщаем запрос к микросевису MoneyMicroservice
            .Request(GetMoney, x => x.Init<IGetMoneyRequest>(new { OrderId = x.Data.OrderId }))
           //Переводим сагу в состояние GetMoney.Pending
           .TransitionTo(GetMoney.Pending)

            );

        //Описываем то как наша сага будет реагировать на сообщения находясь в 
        //состоянии GetMoney.Pending
        During(GetMoney.Pending,
            //Когда приходи сообщение что запрос прошел успешно делаем новый запрос
            //теперь уже в микросервис ItemsMicroservice
            When(GetMoney.Completed)
            .Request(GetItems, x => x.Init<IGetItemsRequest>(new { OrderId = x.Data.OrderId }))
            .TransitionTo(GetItems.Pending),
            //При ошибке отвечем тому кто иницировал запрос сообщением с текстом ошибки
            When(GetMoney.Faulted)
              .ThenAsync(async context =>
              { 
                //Тут можно сделать какие-то компенсирующие действия. 
               //Например вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Money " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),
            //При таймате отвечаем с сообщением что произошел таймаут
            When(GetMoney.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Money");
               })
            .TransitionTo(Failed)

             );

        During(GetItems.Pending,
            //При успешном ответе от микросервиса предметов 
            //отвечаем без ошибки и переводим сагу в финальное состояние.
            When(GetItems.Completed)
              .ThenAsync(async context =>
              {
                  await RespondFromSaga(context, null);
              })
            .Finalize(),

            When(GetItems.Faulted)
              .ThenAsync(async context =>
              {
                   //Тут можно сделать какие-то компенсирующие действия. 
                  //Например вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Items " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),

            When(GetItems.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Items");
               })
            .TransitionTo(Failed)

            );
    }
    //Запрос на получение денег
    public Request<BuyItemsSagaState, IGetMoneyRequest, IGetMoneyResponse> GetMoney { get; set; }
    //Запрос на получение предметов
    public Request<BuyItemsSagaState, IGetItemsRequest, IGetItemsResponse> GetItems { get; set; }
   //Событие стартующее нашу сагу.
   public Event<BuyItemsRequest> BuyItems { get; set; }
   //Одно из наших кастомных состояний в которое может перейти сага
    public State Failed { get; set; }
    //Метод для ответного сообщения
    //Тут нужно явно использовать ResponseAddress и RequestId 
    //сохраненные ранее чтобы ответить ровно тому кто сделал запрос
    private static async Task RespondFromSaga<T>(BehaviorContext<BuyItemsSagaState, T> context, string error) where T : class
    {
        var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
        await endpoint.Send(new BuyItemsResponse
        {
            OrderId = context.Saga.CorrelationId,
            ErrorMessage = error
        }, r => r.RequestId = context.Saga.RequestId);
    }
}

Registering a saga in a startup

builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    //Тут добляем сагу с указанием что будем сохранять ее в БД 
    //с помошью EF и будем использовать пессеместичный режим конкуренции за ресурсы
    cfg.AddSagaStateMachine<BuyItemsSaga, BuyItemsSagaState>()
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
        r.ExistingDbContext<SagasDbContext>();
        r.LockStatementProvider = new PostgresLockStatementProvider();
    });
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
});

APIGateway

Its job is simply to translate the http request into an ampq request on the bus. Wait for a response and return the response to the user (frontend)

public class BuyItemsRequstModel
{
    public Guid OrderId { get; set; }
}

[ApiController]
[Route("api/v1/items")]
public class ItemsController : ControllerBase
{
    //Интерфейс MassTransit через который идет работа с сообщениями
    private readonly IBus _bus;
    private readonly ILogger<ItemsController> logger;

    public ItemsController(IBus bus, ILogger<ItemsController> logger)
    {
        _bus = bus;
        this.logger = logger;
    }

    [HttpPost("buy")]
    public async Task<BuyItemsResponse> BuyAsync(BuyItemsRequstModel model)
    {
       //Делаем запрос в шину и ждем ответа от саги. 
       //Ответ прийдет из RabbitMq или словим ошибку таймаута запроса
        logger.LogInformation("Start!");
        var response = await _bus.Request<BuyItemsRequest, BuyItemsResponse>(model);
        logger.LogInformation("End!");
        //Возвращаем сообщение что было в ответе
        return response.Message;
    }
}

You can also do everything simply on events without Request / Response, only this will require more code and you can drive them to the front through SignalR.

Sources

Microservices With Sagas

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *