Creating background tasks in .NET with a request for the status of a running task

Let’s say you need to run a lot of background tasks in .Net with the ability to later access their state. Accessing the members of the running task class can be useful if you need to unambiguously determine the current state of the object at the time of execution.

One of the particular solutions is the actor model

The actor model allows you to run background tasks and access their current state.

Interfaces

Define an interface that will describe the work to be done − IJobwith obligatory passing to the method DoAsyncCancellationToken token.

public interface IJob<in TIn, out TOut>
    where TIn : IJobInput
    where TOut : IJobResult  
{
    Task<bool> DoAsync(TIn input, CancellationToken token);

    /// <summary>
    /// Describes the state of the fields of the IJob class that are changed by the DoAsync method.
    /// </summary>
    TOut GetCurrentState(Guid jobId);
}

Job interface – IJob is defined, but calling its implementation directly will not give the desired results, so it is necessary to determine the context for executing jobs IJobContext, which will unite a group of performed tasks of the same type. Here it is necessary to provide the opportunity not only to run background tasks CreateJobAsyncbut also expect the complete completion of the task DoJobAsync.

public interface IJobContext<in TIn, TOut>
    where TIn : IJobInput
    where TOut : IJobResult
{

    /// <summary>
    /// Create a background job
    /// </summary>
    /// <returns>Job Id</returns>
    Task<JobCreatedCommandResult> CreateJobAsync(TIn input,
        int? maxNrOfRetries = null,
        TimeSpan? minBackoff = null,
        TimeSpan? maxBackoff = null,  
        Guid? jobId = null,
        TimeSpan? timeout = null);

    /// <summary>
    /// Waiting for a response about the completion of the job
    /// </summary>
    Task<JobDoneCommandResult> DoJobAsync(TIn input,
        int? maxNrOfRetries = null, TimeSpan? minBackoff = null, TimeSpan? maxBackoff = null,  Guid? jobId = null, TimeSpan? timeout = null);

    Task<StopJobCommandResult> StopJobAsync(Guid jobId, TimeSpan? timeout = null);

    Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId, TimeSpan? timeout = null);
}

Implementation

The main idea of ​​the actor model is to isolate the state of each model object from external interference, which should guarantee a predictable change in its state. Communication between actors occurs through message queues, in which for each actor all incoming requests are ordered, which gives the sequence of command execution and also avoids side effects of affecting members of the actor class.

The general query scheme would look like MasterActorGroupActorManagerActorWorkerActor

MasterActor

Let’s define the entry point to the system of actors – it will be MasterActor.

Its tasks are to deliver messages to its child actors and determine the strategy for restarting subordinate elements in case of an error.

To send messages from external sources that are not in the ActorSystem (here it is JobContext) the command for waiting for an answer – Ask is used.

await _masterActor.Ask<JobCreatedCommandResult>(command, currentTimeout);

Ask is waiting for a response from the actor model JobCreatedCommandResult, for the given time span currentTimeout. Where command is of type DoJobCommand and accepted MasterActor method Receiveset in the constructor.

  • DoJobCommandHandler – a method that will pass a message to an existing group actor or create a new instance of it if it does not exist, while maintaining an IActorRef reference to it.

  • StopJobCommandHandler – the method called at the time of the request to stop the worker.

  • RequestAllWorkersInfoQueryHandler – handler for requesting the state of a group of actors of a certain type IJob

internal class MasterActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private readonly Dictionary<string, IActorRef> _groupIdToActor = new();
    private readonly Dictionary<IActorRef, string> _actorToGroupId = new();

    public MasterActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

    }


    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {

        if (_groupIdToActor.TryGetValue(doJobCommand.GroupName, out var actorRef))
        {
            actorRef.Forward(doJobCommand);
            return;
        }

        var groupActorProps = DependencyResolver
            .For(Context.System)
            .Props<GroupActor<TIn,TOut>>();

        var groupActor = Context
            .ActorOf(groupActorProps, $"group-{doJobCommand.GroupName}");

        Context.Watch(groupActor);

        groupActor.Forward(doJobCommand);

        _groupIdToActor.Add(doJobCommand.GroupName, groupActor);
        _actorToGroupId.Add(groupActor, doJobCommand.GroupName);

    }

    private void StopJobCommandHandler(StopJobCommand command)
    {
        if (!_groupIdToActor.ContainsKey(command.GroupName))
        {
            Sender.Tell(new StopJobCommandResult(false, $"Group Actor list does not contain {command.GroupName}"));
            return;
        }

        _groupIdToActor[command.GroupName].Forward(command);
    }
...

GroupActor

DoJobCommandHandler must create a child actor GroupActorA that describes the group of tasks to run. Its function is simple, it should relay messages to workers’ managers and store references to child actors to provide a way to refer to their state later.

  • TrySaveWorkerActorRefCommand – the message that is sent from the workerActor to store the IActorRef in the group actor.

  • ManagerActorTerminatedHandler – called after a stop ManagerActor and remove references to objects IActorRef For ManagerActor And WorkerActor.

internal class GroupActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{

    private string? _groupId;

    private readonly Dictionary<Guid, IActorRef> _idToManagerActor = new();
    private readonly Dictionary<IActorRef, Guid> _managerActorToId = new();

    private readonly Dictionary<IActorRef, Guid> _workerActorToId = new();
    private readonly Dictionary<Guid, IActorRef> _idToWorkerActor = new();

 
    public GroupActor()
    {
        //Commands
        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries
        Receive<RequestAllWorkersInfo>(RequestAllWorkersInfoQueryHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<Terminated>(ManagerActorTerminatedHandler);

    }

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_groupId != null && doJobCommand.GroupName != _groupId)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                    ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                    : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        if (_idToManagerActor.ContainsKey(doJobCommand.JobId))
        {
            var message = $"{doJobCommand.JobId} Actor Exists.";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }

        _groupId ??= doJobCommand.GroupName;

        var managerActorProps = DependencyResolver
            .For(Context.System)
            .Props<ManagerActor<TIn,TOut>>();
        
        var managerActor = Context.ActorOf(managerActorProps,
            $"manager-{doJobCommand.JobId}");

        Context.Watch(managerActor);

        _idToManagerActor.Add(doJobCommand.JobId, managerActor);
        _managerActorToId.Add(managerActor, doJobCommand.JobId);
        managerActor.Forward(doJobCommand);
    }

    private void ManagerActorTerminatedHandler(Terminated t)
    {
        var workerId = _managerActorToId[t.ActorRef];
        _managerActorToId.Remove(t.ActorRef);
        _idToManagerActor.Remove(workerId);

        if (!_idToWorkerActor.TryGetValue(workerId, out var workerActorRef))
            return;
        _workerActorToId.Remove(workerActorRef);
        _idToWorkerActor.Remove(workerId);
    }
...

ManagerActor

With behavior ManagerActor things are a little more complicated, he needs to determine a strategy for behavior when errors occur in the worker WorkerActorand keep the initial query “WorkerDoJobCommand‘ to resend this request at restart times.

internal class ManagerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult

{
    //Параметры стратегии перезапуска
    private int _currentNrOfRetries;
    private int _maxNrOfRetries;
    private TimeSpan _minBackoff;
    private TimeSpan _maxBackoff;

    //Ссылка на отправитель команды на выполнение работы
    private IActorRef? _doJobCommandSender;
    //BackoffSupervisor
    private IActorRef? _workerSupervisorActor;
    //Команда для дочернего актора для повторной отправки в момент рестартов
    private WorkerDoJobCommand<TIn>? _doJobCommand;

    private Guid _jobId;
    private bool _startedFlag;

    //StopJobCommandHandler переданный от JobContext вызовет отмену задачи
    private readonly CancellationTokenSource _cancellationTokenSource = new ();

    private readonly ILogger<ManagerActor<TIn, TOut>> _logger;

    public ManagerActor(ILogger<ManagerActor<TIn, TOut>> logger)
    {
        _logger = logger;

        //Commands

        Receive<DoJobCommand<TIn>>(DoJobCommandHandler);
        Receive<StopJobCommand>(StopJobCommandHandler);

        //Queries

        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<TrySaveWorkerActorRefCommand>(TrySaveWorkerActorRefCommandHandler);
        Receive<GiveMeWorkerDoJobCommand>(GiveMeWorkerDoJobCommandHandler);
        Receive<Terminated>(WorkerActorTerminatedHandler);

    }

 ...
  • DoJobCommandHandler – only creates a worker without sending a message to execute the command, and sets it a restart strategy, which is defined by “BackoffSupervisor» at the time of creation WorkerActor. The worker will be restarted if an error occurs in the worker itself and the number equal to maxNrOfRetries times in the interval from minBackoff before maxBackoff. The interval is necessary so that repeated attempts to perform work in the case of connection to external resources evenly distribute requests in a given period of time.

  • GiveMeWorkerDoJobCommandHandler – message handler GiveMeWorkerDoJobCommandwhich is sent by the child worker actor at the time of initialization of its state – this is necessary for restarts, since if you send a message in the DoJobCommandHandlerwhich is executed once, then at the time of the fall of the worker actor, its work will not be started again.

    private void DoJobCommandHandler(DoJobCommand<TIn> doJobCommand)
    {
        if (_workerSupervisorActor != null)
        {
            var message = "Ignoring Create Worker Actor";
            Sender.Tell(doJobCommand.IsCreateCommand
                ? new JobCreatedCommandResult(false, message, doJobCommand.JobId)
                : new JobDoneCommandResult(false, message, doJobCommand.JobId));
            return;
        }


        _jobId = doJobCommand.JobId;
        _maxNrOfRetries = doJobCommand.MaxNrOfRetries;
        _minBackoff = doJobCommand.MinBackoff;
        _maxBackoff = doJobCommand.MaxBackoff;
        _doJobCommandSender = Sender;

        var workerActorProps = DependencyResolver
            .For(Context.System)
            .Props<WorkerActor<TIn,TOut>>();

        var supervisorOfWorkerActorProps = BackoffSupervisor.Props(
            Backoff.OnFailure(
                    workerActorProps,
                    childName: $"worker-{doJobCommand.JobId}",
                    minBackoff: _minBackoff,
                    maxBackoff: _maxBackoff,
                    randomFactor: 0.2,
                    maxNrOfRetries: _maxNrOfRetries)
                .WithSupervisorStrategy(new OneForOneStrategy(exception =>
                {
                    if (exception is TaskCanceledException  
                        || exception.InnerException is TaskCanceledException
                        || _currentNrOfRetries >= _maxNrOfRetries)
                    {
                        var text = $"BackoffSupervisor: jobId: {_jobId}" +
                                   $" {exception?.Message}" +
                                   $" InnerException: {exception?.InnerException?.Message}";
                        _logger.LogError(text);
                        _doJobCommandSender.Tell(new JobDoneCommandResult(false, text, _jobId));
                        return Directive.Stop;
                    }
                  
                    _currentNrOfRetries += 1;
                    return Directive.Restart;
                })));

        _workerSupervisorActor = Context
            .ActorOf(supervisorOfWorkerActorProps, $"supervisor-of-worker-{doJobCommand.JobId}");

        Context.Watch(_workerSupervisorActor);
      
        _doJobCommand = new WorkerDoJobCommand<TIn>(
            doJobCommand.JobInput,
            _doJobCommandSender,  
            doJobCommand.JobId,
            _cancellationTokenSource,
            doJobCommand.IsCreateCommand);
    }
...

WorkerActor

Next, the execution command gets to the worker WorkerActorwhich will send job creation messages JobCreatedCommandResult or about its completion JobDoneCommandResult depending on the initially chosen method in IJobContext CreateJobAsync or DoJobAsync respectively.

Upon completion of work WorkerActor put a death pill on his cheek _self.Tell(PoisonPill.Instance)which will cause it to stop and clean up resources.

internal class WorkerActor<TIn, TOut> : ReceiveActor
    where TIn : IJobInput
    where TOut : IJobResult
{
    private Guid _jobId;

    private readonly IServiceScope _scope;
    private readonly IActorRef _self;
    private IJob<TIn, TOut> _job;

 
    public WorkerActor(IServiceProvider serviceProvider)
    {
        _self = Self;
        _scope = serviceProvider.CreateScope();

        //Commands
        Receive<WorkerDoJobCommand<TIn>>((msg) =>
        {
            WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
        });

        //Queries
        Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);

        //Internal
        Receive<Status.Failure>(Failed);
        Context.Parent.Tell(new GiveMeWorkerDoJobCommand());

    }

    private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
    {
        _jobId = command.JobId;

        Context.Parent.Tell(new TrySaveWorkerActorRefCommand(_self, _jobId, command.DoJobCommandSender));

        if(command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));

        var token = command.CancellationTokenSource.Token;
        var jobResult = await _job.DoAsync(command.JobInput, token);

        if(token.IsCancellationRequested)
        {
            if(!command.IsCreateCommand)
                command.DoJobCommandSender.Tell(new JobDoneCommandResult(false,  
                    "Job was cancelled.",  
                    command.JobId));
            return;
        }

        if(!command.IsCreateCommand)
            command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));

        _self.Tell(PoisonPill.Instance);
    }

Ability to cancel tasks

It is possible to cancel a task. ManagerActor at the time of receiving the command StopJobCommand to stop, causes the cancellation of its _cancellationTokenSource.Cancel(), whose cancellation token was thrown into the child WorkerActorand sending him a death pill _workerSupervisorActor.Tell(PoisonPill.Instance).

   private void StopJobCommandHandler(StopJobCommand _)
    {
        if (!_cancellationTokenSource.IsCancellationRequested)
        {
            _cancellationTokenSource.Cancel();
            _workerSupervisorActor.Tell(PoisonPill.Instance);
            Sender.Tell(new StopJobCommandResult(true, "Ok"));
            return;
        }
        Sender.Tell(new StopJobCommandResult(false, "Cancellation Requested Already."));
    }

DI – Microsoft.Extensions.DependencyInjection

IN WorkerActor interface injected IServiceProvider serviceProviderwhich allows you to control the lifetime of the scope _scope from the moment the worker is initialized to the moment the actor stops and resources are cleared. For a user’s job, the scope is everything inside the method IJob.DoAsync. Call _scope.Dispose() will also call methods Disposeif in the implementation IJob will also be inherited IDispoosable interface.

    protected override void PreStart()
    {
        _job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
    }
    protected override void PostStop()
    {
        _scope.Dispose();
    }

Requests

JobContext allows you to query the state of actors that were created for a particular type IJob.


    public async Task<IDictionary<Guid, ReplyWorkerInfo<TOut>>> GetAllJobsCurrentStatesAsync(long requestId,
        TimeSpan? timeout = null)
    {
        var currentTimeout = timeout ?? _defaultTimeout;
        var query = new RequestAllWorkersInfo(requestId, GetGroupName(), currentTimeout);
        RespondAllWorkersInfo<TOut> info = await _masterActor
            .Ask<RespondAllWorkersInfo<TOut>>(query, currentTimeout);
        return info.WorkersData;
    }

Usage

To register, you need to call methods at the start of the project

//Job library registration
builder.Services.AddScoped<IJob<ForEachJobInput, ForEachJobResult>, ForEachJob>();
builder.Services.AddJobContext();

Where ForEachJob is a simple test problem that iterates over values ​​from 0 to Count c wait seconds between steps. In this test job ForEachJob separately inherited interface IDisposablewhich allows you to additionally determine the mechanism for cleaning up resources at the moment the task is stopped or after its regular completion.

public class ForEachJob : IJob<ForEachJobInput, ForEachJobResult>, IDisposable
{
    private int _currentState;

    private readonly ILogger<ForEachJob> _logger;
    //Здесь возможна инъекция любого зарегестрированного в приложении интерфейса 
    public ForEachJob(ILogger<ForEachJob> logger)
    {
        _logger = logger;
    }

    public async Task<bool> DoAsync(ForEachJobInput input, CancellationToken token)
    {
        foreach (var item in Enumerable.Range(0, input.Count))
        {
            if (token.IsCancellationRequested)
                return false;

            _currentState = item;
            _logger.LogInformation(item.ToString());
            await Task.Delay(1000, token);
        }

        return true;
    }

    public ForEachJobResult GetCurrentState(Guid jobId)
    {
        return new ForEachJobResult
        {
            Id = jobId,
            Data = _currentState
        };
    }

    public void Dispose()
    {
        _logger.LogInformation("Dispose.");
    }

}

A call, for example, from a controller would look like this:

public class ForEachJobController : ControllerBase
{
    private readonly IJobContext<ForEachJobInput, ForEachJobResult> _jobContext;
    
    public ForEachJobController(
      IJobContext<ForEachJobInput, ForEachJobResult> jobContext)
    {
        _jobContext = jobContext;
    }
    
    [HttpPost]
    [Route(nameof(CreateJob))]
    public async Task<JobCreatedCommandResult> CreateJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.CreateJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(DoJob))]
    public async Task<JobDoneCommandResult> DoJob([FromBody] ForEachJobInput input)
    {
        return await _jobContext.DoJobAsync(input);
    }
    
    [HttpPost]
    [Route(nameof(StopJob))]
    public async Task<StopJobCommandResult> StopJob([FromBody] Guid jobId)
    {
        return await _jobContext.StopJobAsync(jobId);
    }
    
    [HttpGet]
    [Route(nameof(GetAllJobs))]
    public async Task<ICollection<ForEachJobResult?>> GetAllJobs(
      [FromQuery] int requestId)
    {
        var result = await _jobContext
            .GetAllJobsCurrentStatesAsync(requestId);
        return result.Values.Select(x => x.Result).ToList();
    }
}

Outcome

Akka one of the possible ways to run deferred tasks with the potential for clustering the solution.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *