In this post, you’ll learn how to consume multiple messages on a single channel using the official RabbitMQ.Client for dotnet with the proper caution to avoid concurrency problems or freezing the consumer.
We’ll create a consumer that will receive strings and print them to the console n times every second. The received messages will be like message 5
, the word message
will be printed out five times with a one-second interval.
I’ll assume that you have a RabbitMQ instance running locally and some basic knowledge of it.
Creating the producer
First of all, we need to create a console app and set up a producer to send those messages.
Our producer will send the messages to the queue multiple_consumers_single_channel
using the default exchange.
Here’s the producer code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using System.Text.Json;
using RabbitMQ.Client;
// creating the connection and getting a channel
var connectionFactory = new ConnectionFactory {HostName = "localhost"};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
// reusing the message properties
var defaultProps = channel.CreateBasicProperties();
while (true)
{
var message = Console.ReadLine();
if ("exit".Equals(message))
break;
var body = JsonSerializer.SerializeToUtf8Bytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "multiple_consumers_single_channel",
defaultProps, body
);
}
First, we establish a connection with the local instance of RabbitMQ. If the Username
and Password
properties of the connection factory aren’t set, then the default values: guest
and guest
will be used.
As you have noted, the queue is not declared, because it’ll be declared at the consumer.
Then we save the message properties outside the loop to avoid unnecessary allocations every time we type a new message to send.
Finally, we have an always-running loop that only exits if you type exit
. Inside this loop, the message is serialized to a UTF8 byte array and sent using the BasicPublish
method from the channel.
Creating the consumer
With the producer code done, let’s start the consumer.
Establishing the connection
After creating a new console app, we need to do the same as we have done at the producer. Get a connection to the RabbitMQ and a channel to listen to the incoming messages.
Let’s start with the following code:
1
2
3
4
var connectionFactory = new ConnectionFactory
{HostName = "localhost", DispatchConsumersAsync = true};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
The main difference in the above code from the Producer is the DispatchConsumersAsync = true
, this is needed because every new message will start a new Task
to run on the ThreadPool
(we’ll code this in the next steps). The default synchronous consumer is based on void events (events without return), the asynchronous consumer is based on Task
s events (events that returns a Task), like seen on the image bellow taken from the official client. Using it’ll prevent us from the bad usage of async void
stated on the Async Guidance written by David Fowler.
RabbitMQ Official Client Async Event Handler
Setting up the queue
With the channel, we can then declare our queue and use the BasicQos
method from the channel to set the prefetchCount
to the number of messages that we want to process in parallel at the consumer.
Let’s do it by adding the following code:
1
2
channel.QueueDeclare("multiple_consumers_single_channel");
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
The prefetchCount
is basically how many messages a consumer can hold without acknowledging. In our case, we have defined the value 2, meaning we’ll get at most two messages at a time to process.
The value of
prefetchSize
should be0
because it’s not implemented on the dotnet client.
Listening to the messages
Now with the following code, the async consumer is created and subscribed to the queue.
1
2
3
4
5
6
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (_, ea) => HandleMessage(ea);
channel.BasicConsume("multiple_consumers_single_channel", autoAck: false, consumer);
Console.WriteLine("Listening for new messages");
Console.ReadKey();
The method HandleMessage
to handle the incoming messages will be created in the next step. So let’s talk about the BasicConsume
, the key point of this method is to pass the autoAck: false
so we can apply backpressure and process only two messages at a time, as we have defined previously with BasicQos
.
After starting to consume the messages, the application should be kept running, for this reason, the Console.ReadKey()
is placed at the end of the code.
Processing messages in parallel
Finally, we have arrived to parallel message processing.
Let’s first take a look at the HandleMessage
and then I’ll explain it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Task HandleMessage(BasicDeliverEventArgs ea)
{
// copying the message body before pass it to the task
var msgBody = ea.Body.Span.ToArray();
// starting the task
Task.Run(async () =>
{
var body = JsonSerializer.Deserialize<string>(msgBody);
var splittedMsg = body.Split(" ");
var msg = splittedMsg[0];
var timesToRepeat = int.Parse(splittedMsg[1]);
for (int i = 0; i < timesToRepeat; i++)
{
Console.WriteLine($"{i} - {DateTime.Now} - {msg}");
await Task.Delay(1000);
}
channel.BasicAck(ea.DeliveryTag, false);
})
return Task.CompletedTask;
}
The above code isn’t Thread-Safe keep reading to know how to fix this.
First, we copy the message body from the event args before the handler method returns with the .ToArray
method. As stated in the official dotnet API guide this has to be done because the message body can be deallocated at any moment after the handler returns.
A new background Task
is started without waiting for its completion and then we return from the HandleMessage
.
Inside the Task
the message is acknowledged with the BasicAck
method. The problem with this is that the channel can be accessed simultaneously by different threads, leading to errors in processing messages.
1
channel.BasicAck(ea.DeliveryTag, false);
Each channel must be only accessed at one thread at a time, to guarantee this we’ll use a lock around the BasicAck
method, as the channel is public used on the library, we need to create an object just to control this lock. This is a best practice when using locks.
Now the code, with the lock around the channel access, looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Task HandleMessage(BasicDeliverEventArgs ea, object channelLockObject)
{
var msgBody = ea.Body.Span.ToArray();
Task.Run(async () =>
{
var body = JsonSerializer.Deserialize<string>(msgBody);
var splittedMsg = body.Split(" ");
var msg = splittedMsg[0];
var timesToRepeat = int.Parse(splittedMsg[1]);
for (int i = 0; i < timesToRepeat; i++)
{
Console.WriteLine($"{i} - {DateTime.Now} - {msg}");
await Task.Delay(1000);
}
lock (channelLockObject)
{
channel.BasicAck(ea.DeliveryTag, false);
}
});
return Task.CompletedTask;
}
Avoiding consumer freezing
The code looks almost done, but we have one last problem. Let’s run our publisher and consumer and send some messages. I started sending two valid messages, and everything worked fine but when I sent a message with an invalid value, nothing happen, then I sent another invalid message and nothing happen. Then when I tried to send valid messages again, nothing still happens. Freezed consumer
At the RabbitMQ management interface, there were two messages unacked and two messages ready to be consumed. The two unacked messages are the invalid messages, when the consumer tries to parse invalidValue
to an integer an exception is thrown, and the Task
finishes without giving a response to the queue. This causes the consumer to be frozen with unacked messages. Unacked messages at RabbitMQ management interface
To fix this we could wrap the Task
code with a try-catch
block and inside the catch, we could: nack the message, log the error, write it to a database, send it to another queue, or something else. In this example, we’ll only log the error and nack the message. We’ll go with an unusual way, a Task
continuation that’ll run only if the Task
finishes with a faulted state. I think this is more elegant than a try-catch
. At the end of the Task.Run
method, append the following code:
1
2
3
4
5
6
7
8
9
10
.ContinueWith(faultedTask =>
{
var msgs = faultedTask.Exception!
.InnerExceptions.Select(e => e.Message);
Console.WriteLine(string.Join(", ", msgs));
lock (channelLock)
{
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
}
}, TaskContinuationOptions.OnlyOnFaulted);
In this continuation the inner exceptions messages are aggregated in one string and logged to the console. Then the message is nacked, inside a lock to avoid concurrent access to the channel. Also, notice that the requeue
parameter was set to false, to avoid trying to reprocess the invalid message infinetly.
Summing up
The final consumer code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
using System.Text.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var connectionFactory = new ConnectionFactory
{HostName = "localhost", DispatchConsumersAsync = true};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("multiple_consumers_single_channel");
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
var channelLock = new object();
consumer.Received += (_, ea) => HandleMessage(ea, channelLock);
channel.BasicConsume("multiple_consumers_single_channel", autoAck: false, consumer);
Console.WriteLine("Listening for new messages");
Console.ReadKey();
Task HandleMessage(BasicDeliverEventArgs ea, object channelLockObject)
{
var msgBody = ea.Body.Span.ToArray();
Task.Run(async () =>
{
var body = JsonSerializer.Deserialize<string>(msgBody);
var splittedMsg = body.Split(" ");
var msg = splittedMsg[0];
var timesToRepeat = int.Parse(splittedMsg[1]);
for (int i = 0; i < timesToRepeat; i++)
{
Console.WriteLine($"{i} - {DateTime.Now} - {msg}");
await Task.Delay(1000);
}
lock (channelLockObject)
{
channel.BasicAck(ea.DeliveryTag, false);
}
}).ContinueWith(faultedTask =>
{
var msgs = faultedTask.Exception!
.InnerExceptions.Select(e => e.Message);
Console.WriteLine(string.Join(", ", msgs));
lock (channelLock)
{
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
}
}, TaskContinuationOptions.OnlyOnFaulted);
return Task.CompletedTask;
}
Conclusion
The key points of processing multiples messages in parallel using one channel are:
- Handle the concurrent access to the channel;
- Handle what happens when the consumer tries to process an invalid message.
In this post background Tasks
was spawned directly from the Received
event. Another approach would be to use the Received
event only to send the message body to a dotnet Channel, which works like an in-memory queue, and then with consumers listening to this in-memory channel, process the message in parallel.