Home Optimized outbox pattern implementation with EF Core and Channel class
Post
Cancel

Optimized outbox pattern implementation with EF Core and Channel class

The outbox pattern is very common in microservices communication, it’s based on making the API request not depend on the broker availability directly. The message to be published is saved on a database table along with the business code transaction, the message is only published after the transaction succeeds.

It gives us a guarantee that if the business code transaction fails, the message will not be saved and not be published. Also, the possibility of business code succeeding and the message publishing fails is removed due to the possility of retry the failed publishings on background.

After being saved, the message is normally fetched from the database by a background process/thread. This background worker is responsible for publishing and saving its status back on the database.

Here’s a simple diagram representing the outbox pattern. RabbitMQ Official Client Async Event Handler Outbox pattern diagram

The problem

The problem with the outbox pattern is the need for a process/thread that actively query the database for pending/faulted messages. Querying the database every 30s or every minute can harm its performance.

Therefore, in this post I’ll propose a simple way of getting the message (events) that are just saved, without querying the database. Eventually, we’ll need to query the database to recover messages (events) that failed on publishing, but this can be done with a higher interval without harming database’s performance.

The proposed solution

After making a state change by calling DbContext.SaveChanges, if it was successful (a.k.a. didn’t throw any exception), the messages saved in the outbox will be sent to an in-memory queue, the Outbox Event Queue without query the database.

This queue is made on top of dotnet channels making it async. One class will be listening to this in-memory queue to publish the events on the RabbitMq broker, this class will be the Outbox Event Publisher.

After publishing or receiving an error, the event will be sent to another in-memory queue, the Outbox Event Save Queue, the Outbox Event Saver Service will be listening to this queue and is going to persist events status on the database. The event’s status will be persisted in batches, or after one minute if the minimum batch size was not reached.

To retry the error messages (events), we’ll have the Outbox Event Reviver that will query the database every five minutes. It’ll look for messages (events) that are unpublished or not retried in the last five minutes.

RabbitMQ Official Client Async Event Handler Proposed solution diagram

The code of this post can be seen here xilapa/OutboxPattern at simple_version. If you want to see an more complete implementation with: a separated retry service, more caution to avoid duplicates, better error handling, and more tolerant to faults, see the main branch: xilapa/OutboxPattern.

Implementing the solution

Creating the Outbox table

Considering that you already have a working application with Entity Framework Core which generates events that need to be published, let’s create the OutboxEvent class to handle these events that needs publishing. This class uses the event type as the routing key and will have some properties to keep track of the message content and its status.

The OutboxEvent class receives an object at the constructor because it doesn’t need to know anything about our domain besides the event type that’ll be the routing key.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public sealed class OutboxEvent
{
    // Ctor for EF Core
    private OutboxEvent()
    { }

    public OutboxEvent(object @event)
    {
        Id = Guid.NewGuid();
        EventKey = @event.GetType().Name;
        EventData = JsonSerializer.Serialize(@event);
        EventDate = DateTime.UtcNow;
        PublishingDate = null;
        LastRetryDate = null;
        Status = PublishingStatus.NotPublished;
        Retries = 0;
        ExpirationDate = null;
    }

    public Guid Id { get; private set; }
    public string EventKey { get; private set; }
    public string EventData { get; private set; }
    public DateTime EventDate { get; private set; }
    public DateTime? PublishingDate { get; private set; }
    public DateTime? LastRetryDate { get; private set; }
    public PublishingStatus Status { get; private set; }
    public int Retries { get; private set; }
    public DateTime? ExpirationDate { get; set; }

    public void SetPublishedStatus()
    {
        Status = PublishingStatus.Published;
    }

    public void SetErrOnPublishStatus()
    {
        Status = PublishingStatus.ErrorOnPublish;
    }
}

public enum PublishingStatus
{
    NotPublished = 1,
    Published,
    ErrorOnPublish
}

In our DbContext class, we need to add the OutboxEvents property to interact with the database table.

1
2
3
4
5
public sealed class Context : DbContext
{
    // Code ommited for brevity
    public DbSet<OutboxEvent> OutboxEvents { get; set; }
}

Creating the in memory queues

We need two in-memory queues, Outbox Event Queue and Outbox Event Save Queue, as they have identical behavior with only different names. The BaseQueue will centralize this behavior.

The BaseQueue uses the Channel class to have the queue behavior. The channel class lets us choose the default behavior for a writer when the channel is full, it could wait for a room to write or continue without writing anything to the queue. For the BaseQueue class, this default behavior will be to wait for a room to write.

This class also has a method to enqueue items, that DbContext and Outbox Event Reviver will use, and a method to read the queue as an IAsyncEnumerable that will be used by Outbox Event Publisher and Outbox Event Saver Service.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class BaseQueue : IBaseQueue
{
    private readonly Channel<OutboxEvent> _queue;

    protected BaseQueue(
        int capacity, 
        BoundedChannelFullMode channelMode = BoundedChannelFullMode.Wait
    )
    {
        var boundedChannelOptions = new BoundedChannelOptions(capacity)
        {
            SingleReader = true,
            FullMode = channelMode
        };
        _queue = Channel.CreateBounded<OutboxEvent>(boundedChannelOptions);
    }

    public async ValueTask Enqueue(OutboxEvent @event, CancellationToken cancellationToken) =>
        await _queue.Writer.WriteAsync(@event, cancellationToken);

    public IAsyncEnumerable<OutboxEvent> GetAllAsync(CancellationToken cancellationToken) =>
        _queue.Reader.ReadAllAsync(cancellationToken);
}

public interface IBaseQueue
{
    ValueTask Enqueue(OutboxEvent @event, CancellationToken cancellationToken);
    IAsyncEnumerable<OutboxEvent> GetAllAsync(CancellationToken cancellationToken);
}

Using SingleReader equals true gives the channel some internal performance optimizations

The Outbox Event Queue will inherit the BaseQueue and use the channel mode as DropWrite when the channel is full, which is the behavior of just ignoring writing to the channel. This is not a problem because the message will be revived and written to the queue sometime later. As this queue will be the entry point for events that came from DbContext.SaveChanges, this will give us the advantage of not blocking our event generator (in an ASP .NET project it could be a request) waiting for room on the internal queue.

1
2
3
4
5
6
7
8
public sealed class OutboxEventQueue : BaseQueue, IOutboxEventQueue
{
    public OutboxEventQueue() : base(10_000, BoundedChannelFullMode.DropWrite)
    { }
}

public interface IOutboxEventQueue : IBaseQueue
{ }

The Outbox Event Save Queue will maintain the default channel behavior to avoid losing any status that needs to be saved.

1
2
3
4
5
6
7
8
public sealed class OutboxEventSaveQueue : BaseQueue, IOutboxEventSaveQueue
{
    public OutboxEventSaveQueue() : base(20_000)
    { }
}

public interface IOutboxEventSaveQueue : IBaseQueue
{ }

As you have noticed, both queues have their interfaces. This is needed to have different base queue implementations added to the dependency injection container.

Sending the events from DbContext to Outbox Event Queue

On the DbContext class the SaveChanges method will be overridden to create the OutboxEvents before saving the changes and then sending these events to the OutboxEventQueue after it.

In this example all domain classes derive from a base class called BaseEntity, this class has a property called DomainEvents that holds the events that need publishing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new ())
{
    // Create the outbox events
    var outboxEvents = ChangeTracker
        .Entries<BaseEntity>()
        .Where(e => e.Entity.DomainEvents.Count != 0)
        .SelectMany(e => e.Entity.DomainEvents)
        .Select(domainEvent => new OutboxEvent(domainEvent))
        .ToArray();

    // Add them to the current transaction
    foreach (var outboxEvent in outboxEvents)
        Add(outboxEvent);

    // Call the base Save changes
    var result = await base.SaveChangesAsync(cancellationToken);

    // If the SaveChanges doesn't throw an exception this line will be called
    // And will send the outbox events to be published without hitting the database again
    foreach (var outboxEvent in outboxEvents)
        await _outboxQueue.Enqueue(outboxEvent, cancellationToken);

    return result;
}

Creating the publisher

The OutboxEventPublisher will derive from Microsoft’s BackgroundService to be used as an HostedService on the application.

This class will need to receive an OutboxEventQueue instance to listen to the events that will arrive and an OutboxEventSaveQueue instance to send the events to have their status persisted on the database.

The OutboxEventPublisher class will look like below, code of the constructor and some properties are omitted for brevity. The full code can be seen here OutboxEventPublisher.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public sealed class OutboxEventPublisher : BackgroundService
{
    // The RabbitMQ channel
    private readonly IModel _channel;

    // The structure to keep track of events pending confirmations from the broker
    private record PublishingKey(int ChannelId, ulong MessageId);
    private ConcurrentDictionary<PublishingKey, OutboxEvent> _eventsPendingConfirmation;

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return PublishEventsFromQueue(stoppingToken);
    }

    private async Task PublishEventsFromQueue(CancellationToken cancellationToken)
    {
        await foreach(var @event in _eventsQueue.GetAllAsync(cancellationToken))
        {
            // If the channel is closed save the status on the database
            // The client is auto-recovering
            if (_channel.IsClosed)
            {
                await _saveQueue.Enqueue(@event, cancellationToken);
                // Give some time to the auto-recovery
                await Task.Delay(300, cancellationToken);
            }

            try
            {
                var publishingKey = new PublishingKey(_channel.ChannelNumber,
                     _channel.NextPublishSeqNo);
                _eventsPendingConfirmation.TryAdd(publishingKey, @event);

                var body = JsonSerializer.SerializeToUtf8Bytes(@event.EventData);
                _channel!.BasicPublish("", @event.EventKey, _defaultMessageProperties, body);
            }
            catch (Exception e)
            {
                _logger.LogError(e);
                await _saveQueue.Enqueue(@event, cancellationToken);
            }
        }
    }



    private void HandleAcks(object? channel, BasicAckEventArgs eventArgs)
    {
        if (channel is not IModel channelCasted) throw new Exception("It should be a channel");
        var key = new PublishingKey(channelCasted.ChannelNumber, eventArgs.DeliveryTag);
        var eventFound = _eventsPendingConfirmation.TryRemove(key, out var @event);
        if (!eventFound) return;
        @event!.SetPublishedStatus();
        _saveQueue.Enqueue(@event, CancellationToken.None).AsTask().Wait();
    }

    private void HandleNacks(object? channel, BasicNackEventArgs eventArgs)
    {
        if (channel is not IModel channelCasted) throw new Exception("It should be a channel");
        var key = new PublishingKey(channelCasted.ChannelNumber, eventArgs.DeliveryTag);
        var eventFound = _eventsPendingConfirmation.TryRemove(key, out var @event);
        if (!eventFound || @event is null) return;
        _saveQueue.Enqueue(@event, CancellationToken.None).AsTask().Wait();
    }
}

Before trying to publish, it’s necessary to check if the channel is closed, if it is, then the message is sent to have the status saved on the database. A closed channel means that the connection with the broker is lost. By default, any connection created using the RabbitMQ official client has the property AutomaticRecoveryEnabled set to true, which means it’ll recover from failures automatically.

A ConcurrentDictionary is used to keep track of events pending confirmation from the broker. The key of this dictionary is a record, called PublishingKey, having the Channel Id and the Message Id. If some exception occurs, the message will have its status saved on the database.

The HandleAcks and HandleNacks methods will handle the acknowledgments. They remove the event from the _eventsPendingConfirmation dictionary and send it to have the status saved on the database. The only difference between these two methods is that the positive acknowledgment set the message status to published.

Creating the event saver

The OutboxEventSaverService which is also a BackgroundService, listens to the OutboxEventSaveQueue and saves the event’s status on the database. If an event doesn’t have the published status, it’ll be saved as a failure.

The event’s status is saved on the database in batches of 10.000 events, or if events are pending to be saved at the last minute. To have this control, the RecurringSaver method checks every minute if there are events to be saved. Because two asynchronous methods are interacting with the same lists, a lock object is used to synchronize the access.

The OutboxEventSaverService class will look like the below. the constructor code, some properties and logs were omitted for brevity. The full code can be seen here OutboxEventSaverService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public sealed class OutboxEventSaverService : BackgroundService
{
    private readonly List<OutboxEvent> _publishedEvents;
    private readonly List<OutboxEvent> _errorOnPublishingEvents;
    private readonly object _outboxEventsLock;

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.WhenAny(StartListeningAsync(stoppingToken), RecurringSaver(stoppingToken));
    }

    private async Task StartListeningAsync(CancellationToken cancellationToken)
    {
        await foreach (var @event in _saveQueue.GetAllAsync(cancellationToken))
        {
            lock (_outboxEventsLock)
            {
                if (PublishingStatus.Published.Equals(@event.Status))
                {
                    _publishedEvents.Add(@event);
                }
                else
                {
                    @event.SetErrOnPublishStatus();
                    _errorOnPublishingEvents.Add(@event);
                }
            }

            if (_publishedEvents.Count + _errorOnPublishingEvents.Count >= 10_000)
                await SaveToDatabase();
        }
    }

    private async Task RecurringSaver(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(60), cancellationToken);
            await SaveToDatabase();
        }
    }

    private async Task SaveToDatabase()
    {
        OutboxEvent[] publishedEventsToSave;
        OutboxEvent[] errorOnPublishingEventsToSave;

        // Get the events from the synchronized list
        lock (_outboxEventsLock)
        {
            if (_publishedEvents.Count == 0 && _errorOnPublishingEvents.Count == 0) return;
            publishedEventsToSave = _publishedEvents.ToArray();
            errorOnPublishingEventsToSave = _errorOnPublishingEvents.ToArray();
            _publishedEvents.Clear();
            _errorOnPublishingEvents.Clear();
        }

        var commandParams = new
        {
            Date = DateTime.UtcNow,
            ExpireDateSuccess = DateTime.UtcNow.AddHours(3),
            ExpireDateError = DateTime.UtcNow.AddDays(2)
        };

        var connectionString = _senderSettings.DatabaseConnectionString;
        await using var connection = new NpgsqlConnection(connectionString);

        if (publishedEventsToSave.Length > 0)
        {
            await connection.ExecuteAsync($@"UPDATE ""OutboxEvents"" 
            SET 
            ""Status"" = 2,
            ""Retries"" = ""Retries"" + 1,
            ""LastRetryDate"" = @Date,
            ""PublishingDate"" = @Date,
            ""ExpirationDate"" = @ExpireDateSuccess
            WHERE ""Id"" IN (
            {Utils.ConcatGuidsToQueryString(publishedEventsToSave.Select(_ => _.Id))});",
                commandParams);
        }

        if (errorOnPublishingEventsToSave.Length > 0)
        {
            await connection.ExecuteAsync($@"
            UPDATE ""OutboxEvents"" 
            SET 
                ""Status"" = 3,
                ""Retries"" = ""Retries"" + 1,
                ""LastRetryDate"" = @Date,
                ""ExpirationDate"" = @ExpireDateError
            WHERE ""Id"" IN (
            {Utils.ConcatGuidsToQueryString(publishedEventsToSave.Select(_ => _.Id))});",
                commandParams);
        }
    }
}

Dapper is used to save the status on the database to avoid DbContext overhead just for an update clause.

Creating the event reviver

To recover the failed events, we need the OutboxEventReviver, another BackgroundService that will query the database for unpublished events or publish failures. Event reviver only looks for messages that are unpublished or failed to publish in the last five minutes, to avoid re-publishing messages that are in memory. It gets the events from the database on batches of 10.000 events and keeps getting until there are no more events left to retry.

The OutboxEventReviver class will look like below, code of constructor, some properties and logs were omitted for brevity. The full code can be seen here OutboxEventReviver.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public sealed class OutboxEventReviver : BackgroundService
{
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return CheckTheDatabaseForEventsToPublish(stoppingToken);
    }

    private async Task CheckTheDatabaseForEventsToPublish(CancellationToken ctx)
    {
        while (!ctx.IsCancellationRequested)
        {
            await _timerToCheckDatabase.WaitForNextTickAsync(ctx);
            await ReEnqueueEvents(ctx);
        }
    }

    private async Task ReEnqueueEvents(CancellationToken ctx)
    {
        var totalEventsRevived = 0;
        var getMore = false;
        var offset = 0;
        do
        {
            var parameters = new
            {
                MaxEventsFetched = 10_000,
                OffSet = offset,
                MinRecoverDate = DateTime.UtcNow.Add(-1 * TimeSpan.FromMinutes(5))
            };

            var query = @" 
            SELECT 
                ""Id"", ""EventKey"", ""EventData""
            FROM 
                ""OutboxEvents""
            WHERE 
            (
              (""Status"" = 1 AND ""EventDate"" < @MinRecoverDate)
            OR
              (""Status"" = 3 AND ""Retries"" < 15 AND ""LastRetryDate"" < @MinRecoverDate) 
            )
            ORDER BY ""EventDate"" ASC
                LIMIT @MaxEventsFetched
            OFFSET @OffSet";
            var command = new CommandDefinition(query, parameters, cancellationToken: ctx);
            var connectionString = _senderSettings.DatabaseConnectionString;
            await using var connection = new NpgsqlConnection(connectionString);
            var currentEventsRevived = await connection.QueryAsync<OutboxEvent>(command);

            foreach (var @event in currentEventsRevived)
                await _eventQueue.Enqueue(@event, ctx);

            totalEventsRevived += currentEventsRevived.Count;
            offset += 10_000;

            if (currentEventsRevived.Count < 10_000)
                getMore = false;
        } while (getMore);
    }
}

Adding the Outbox structure to the DI Container

Finally, these BackgroundServices, and in-memory queues need to be added to the dependency injection container.

On the ConfigureServices method, the queues are added as singleton and the BackgroundServices, are added as HostedServices.

1
2
3
4
5
6
7
8
    ConfigureServices((hostContext, serviceCollection) =>
        serviceCollection
            .AddSingleton<IOutboxEventQueue, OutboxEventQueue>()
            .AddSingleton<IOutboxEventSaveQueue, OutboxEventSaveQueue>()
            .AddHostedService<OutboxEventPublisher>()
            .AddHostedService<OutboxEventSaverService>()
            .AddHostedService<OutboxEventReviver>()
    )

To see this working, fork xilapa/OutboxPattern at simple_version that I made as an example. Then go to src\Sender folder and edit the appsettings.json values for the database connection string, and RabbitMq connection properties. To run, open the terminal on the main folder and run the following command:

1
dotnet run --project "src\Sender"

Try to restart the RabbitMQ instance while the application is running to see the failures handling.

Conclusion

To optimize the Outbox pattern is needed to avoid unnecessary database calls. In this post, the just saved events by DbContext are sent to in-memory queues backed with Channel class from Dotnet Core. The Channel class gives in-memory support for the pub-sub pattern without blocking any thread. As stated at the beginning of this post, to see the full code go to xilapa/OutboxPattern at simple_version. If you want to see an more complete implementation with: a separated retry service, more caution to avoid duplicates, better error handling, and more tolerant to faults, see the main branch: xilapa/OutboxPattern.

This post is licensed under CC BY 4.0 by the author.

Consuming multiples messages from one channel with the official RabbitMQ dotnet client

-

Trending Tags