Tag Archives: topic

Using Pub/Sub (or the Fanout Pattern) in Rabbit MQ in .Net 6

I’ve previously spoken and written quite extensively about the pub/sub pattern using message brokers such as GCP and Azure. I’ve also posted a few articles about Rabbit MQ.

In this post, I’d like to cover the Rabbit MQ concept of pub/sub.

The Concept

Most message brokers broadly support two types of message exchange. The first type is a queue: that is, a single, persistent list of messages that can be read by one, or multiple consumers. The use case I usually use for this is sending e-mails: imagine you have a massive amount of e-mails to send: write them all to a queue, and then set 3 or 4 consumers reading the queue and sending the mails.

The second type is publish / subscribe, or pub/sub. This is, essentially, the concept that each consumer has its own private queue. Imagine that you want to notify all the applications in your system that a sales order has been raised: each interested party would register itself as a consumer and, when a message is sent, they would all receive that message. This pattern works well for distributed systems.

As I said, most message brokers broadly support these two concepts, although annoyingly, in different ways and with different labels. Here, we’ll show how RabbitMQ deals with this.

Setting up RabbitMQ

Technology has moved on since the last time I wrote about installing and running it. The following docker command should have you set-up in a couple of seconds:

docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Once it’s running, you can view the dashboard here. If you haven’t changed anything, the default username / password is guest / guest.


Before we get into any actual code, you’ll need to install the Rabbiq MQ Client NuGet Package.

For pub/sub, the first task is to set-up a receiver. The following code should do that for you:

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

var result = channel.QueueDeclare("OrderRaised", false, false, false, null);
string queueName = result.QueueName;
channel.QueueBind(queueName, "SalesOrder", "");

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queueName, true, consumer);


static void Consumer_Received(object sender, BasicDeliverEventArgs e)
    var body = e.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);


In the code above, you’ll see that we first set-up an exchange called SalesOrder, and we tell that exchange that it’s a Fanout exchange.

We then declare a queue, and bind it to the exchange – that is, it will receive messages sent to that exchange. Notice that we receive from the queue.

Finally, we set-up the consumer, and tell it what to do when a message is received (in this case, just output to the console window).


For the sender, the code is much simpler:

static void SendNewMessage(string message)
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

    channel.BasicPublish("SalesOrder", "", false, null, Encoding.UTF8.GetBytes(message));

Notice that we don’t have any concept of the queue here, we simply publish to the exchange – what happens after that is no concern of the publisher.


I keep coming back to Rabbit – especially for demos and concepts, as it runs locally easily, and has many more options than the main cloud providers – at least in terms of actual messaging capability. If you’re just learning about message brokers, Rabbit is definitely a good place to start.

Reading an Azure Dead Letter Queue with Azure.Messaging.ServiceBus

Some time ago, I wrote about how you can read the dead letter queue. I also wrote this post on the evolution of Azure Service Bus libraries.

By way of a recap, a Dead Letter Queue is a sub queue that allows for a message that would otherwise clog up a queue (for example, it can’t be processed for some reason) to go to the dead letter queue and be removed from processing.

In this post, I’ll cover how you can read a dead letter queue using the new Azure.Messaging.ServiceBus library.

Force a Dead Letter Message

There are basically two ways that a message can end up in a dead letter queue: either it breaks the rules (too many retries, too many redirects, etc.), or it is explicitly placed in a dead letter queue. To do the latter, the process is as follows:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var messageReceiver = serviceBusClient.CreateReceiver(QUEUE_NAME);
            var message = await messageReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            await messageReceiver.DeadLetterMessageAsync(message, "Really bad message");

The main difference here (other than that the previous method was DeadLetterAsync) is that you pass the entire message, rather than just the lock token.

Reading a Dead Letter Message

There’s a few quirks here – firstly, the dead letter reason, delivery count, etc. were part of an array known as SystemProperties, whereas they are now just properties – which does make them far more accessible and discoverable. Here’s the code to read the dead letter queue:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            var message = await deadLetterReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);

            // Previous versions had these as properties
            // https://www.pmichaels.net/2021/01/23/read-the-dead-letter-queue/
            if (!string.IsNullOrWhiteSpace(message.DeadLetterReason))
                Console.WriteLine("Reason: {0} ", message.DeadLetterReason);
            if (!string.IsNullOrWhiteSpace(message.DeadLetterErrorDescription))
                Console.WriteLine("Description: {0} ", message.DeadLetterErrorDescription);

            Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.DeliveryCount}");

Again, most of the changes are simply naming. It’s worth mentioning the FormatDeadLetterPath() function. This was previously part of a static helper class EntityNameHelper; here, I’ve tried to replicate that behaviour locally (as it seems to have been removed):

        private static string QUEUE_NAME = "dead-letter-demo";
        private static string DEAD_LETTER_PATH = "$deadletterqueue";
        static string FormatDeadLetterPath() =>

Resubmitting a Dead Letter Message

This is something that I covered in my original post on this. It’s not inbuilt behaviour – but you basically copy the message and re-submit. In fact, this is much, much easier now:

            var serviceBusClient = new ServiceBusClient(connectionString);

            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            var sender = serviceBusClient.CreateSender(QUEUE_NAME);

            var deadLetterMessage = await deadLetterReceiver.ReceiveMessageAsync();            

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = new ServiceBusMessage(deadLetterMessage);
            await sender.SendMessageAsync(resubmitMessage);
            //throw new Exception("aa"); // - to prove the transaction
            await deadLetterReceiver.CompleteMessageAsync(deadLetterMessage);


Most of what’s here has previously been covered; the old Message.Clone is now much neater (but slightly less obvious) in that you simply pass the old method in as a constructor parameter. Because the Dead Letter Reason, et al, are now properties, there’s no longer a need to manually deal with them not getting copied across.

The transaction simply checks that either the dead letter message is correctly re-submitted, or it remains a dead letter.


The new library makes the code much more concise and discoverable. We’ve seen how to force a dead letter; how to receive and view the contents of the Dead Letter Queue, and finally, how to resubmit a dead lettered message.

Receiving Messages in Azure Service Bus

In this post I covered the basics of setting up a queue and sending a message to it. Here, I’m going to cover the options around receiving that message.

Essentially, there are two possibilities here: you can either set-up an event listener, or you can poll the queue directly, and receive the messages one at a time.

Option 1 – Events

The events option seems to be the one that Microsoft now prefer – essentially, you register a handler and then as the messages come in, you simply handle them inside an event. The code here looks something like this:

            var queueClient = new QueueClient(connectionString, "test-queue");

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
            queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

The event handlers:

        private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
            Console.WriteLine("Something bad happened!");
            return Task.CompletedTask;

        private static Task handleMessage(Message message, CancellationToken cancellation)
            string messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Message received: {0}", messageBody);

            return Task.CompletedTask;

Option 2 – Polling

With this option, you simply ask for a message. You’ll need to use the approach for things like deferred messages (which I hope to cover in a future post):

            var messageReceiver = new MessageReceiver(connectionString, "test-queue", ReceiveMode.ReceiveAndDelete);            
            var message = await messageReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);            
            Console.WriteLine("Message received: {0}", messageBody);

Option 3 – Option 1, but cruelly force it into option 2

I thought I’d include this, although I would strongly advise against using it in most cases. If you wish, you can register an event, but force the event into a procedural call, so that you can await it finishing. You can do this by using the TaskCompletionSource. First, declare a TaskCompletionSource in your code (somewhere accessible):

private static TaskCompletionSource<bool> _taskCompletionSource;

Then, in handleMessage (see above), when you’ve received the message you want, set the result:

            if (message.CorrelationId == correlationId)
                await client.CompleteAsync(message.SystemProperties.LockToken);


Finally, after you’ve registered the message handler, just await this task:

                (message, cancellationToken) => handleMessage(correlationId, queueClient, message, cancellationToken), 

await _taskCompletionSource.Task;


Advanced Features with Azure Service Bus

What can you do with a logic app? Part three – Creating a Logic App Client

One of the things that are missing from Azure Logic apps is the ability to integrate human interaction. Microsoft do have their own version of an interactive workflow (PowerApps), which is (obviously) far better than what you can produce by following this post.

In this post, we’ll create a very basic client for a logic app. Obviously, with some thought, this could easily be extended to allow a fully functional, interactive, workflow system.

Basic Logic App

Let’s start by designing our logic app. The app in question is going to be a very simple one. It’s format is going to be that it will add a message to a logging queue (just so it has something to do), then we’ll ask the user a question; and we’ll do this by putting a message onto a topic: left or right. Based on the user’s response, we’ll either write a message to the queue saying left, or right. Let’s have a look at our Logic App design:

It’s worth pointing out a few things about this design:
1. The condition uses the expression base64ToString() to convert the encoded message into plain text.
2. Where the workflow picks up, it uses a peek-lock, and then completes the message at the end. It looks like it’s a ‘feature’ of logic apps that an automatic complete on this trigger will not actually complete the message (plus, this is actually a better design).

Queues and Topics

The “Log to message queue” action above is putting an entry into a queue; so a quick note about why we’re using a queue for logging, and a topic for the interaction with the user. In a real life version of this system, we might have many users, but they might all want to perform the same action. Let’s say that they all are part of a sales process, and the actions are actually actions along that process; adding these to a queue maintains their sequence. Here’s the queue and topic layout that I’m using for this post:

Multiple Triggers

As you can see, we actually have two triggers in this workflow. The first starts the workflow (so we’ll drop a message into the topic to start it), and the second waits for a second message to go into the topic.

To add a trigger part way through the workflow, simply add an action, search and select “Triggers”:

Because we have a trigger part way through the workflow, what we have effectively issued here is an await statement. Once a message appears in the subscription, the workflow will continue where it left off:

As soon as a message is posted, the workflow carries on:

Client Application

For the client application, we could simply use the Service Bus Explorer (in fact, the screenshots above were taken from using this to simulate messages in the topic). However, the point of this post is to create a client, and so we will… although we’ll just create a basic console app for now.

We need the client to do two things: read from a topic subscription, and write to a topic. I haven’t exactly been here before, but I will be heavily plagiarising from here, here, and here.

Let’s create a console application:

Once that’s done, we’ll need the service bus client library: Install it from here.

The code is generally quite straight-forward, and looks a lot like the code to read and write to queues. The big difference is that you don’t read from a topic, but from a subscription to a topic (a topic can have many subscriptions):

class Program
    static async Task Main(string[] args)
        MessageHandler messageHandler = new MessageHandler();
        messageHandler.RegisterToRead("secondstage", "sub1");
        await WaitForever();
    private static async Task WaitForever()
        while (true) await Task.Delay(5000);
public class MessageHandler
    private string _connectionString = "service bus connection string details";
    private ISubscriptionClient _subscriptionClient;
    public void RegisterToRead(string topicName, string subscriptionName)
        _subscriptionClient = new SubscriptionClient(_connectionString, topicName, subscriptionName);
        MessageHandlerOptions messageHandlerOptions = new MessageHandlerOptions(ExceptionReceived)
            AutoComplete = false,
            MaxAutoRenewDuration = new TimeSpan(1, 0, 0)
        _subscriptionClient.RegisterMessageHandler(ProcessMessage, messageHandlerOptions);
    private async Task ProcessMessage(Message message, CancellationToken cancellationToken)
        string messageText = Encoding.UTF8.GetString(message.Body);
        string leftOrRight = Console.ReadLine();
        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        await SendResponse(leftOrRight, "userinput");
    private async Task SendResponse(string leftOrRight, string topicName)
        TopicClient topicClient = new TopicClient(_connectionString, topicName);
        Message message = new Message(Encoding.UTF8.GetBytes(leftOrRight));
        await topicClient.SendAsync(message);
    private Task ExceptionReceived(ExceptionReceivedEventArgs arg)
        return Task.CompletedTask;

If we run it, then when the logic app reaches the second trigger, we’ll get a message from the subscription and ask directions:

Based on the response, the logic app will execute either the right or left branch of code.


Having worked with workflow systems in the past, one recurring feature of them is that they start to get used for things that don’t fit into a workflow, resulting in a needlessly over-complex system. I imagine that Logic Apps are no exception to this rule, and in 10 years time, people will roll their eyes at how Logic Apps have been used where a simple web service would have done the whole job.

The saving grace here is source control. The workflow inside a Logic App is simply a JSON file, and so it can be source controlled, added to a CI pipeline, and all the good things that you might expect. Whether or not a more refined version of what I have described here makes any sense is another question.

There are many downsides to this approach: firstly, you are fighting against the Service Bus by asking it to wait for input (that part is a very fixable problem with a bit of an adjustment to the messages); secondly, you would presumably need some form of timeout (again, a fixable problem that will probably feature in a future post). The biggest issue here is that you are likely introducing complex conditional logic with no way to unit test; this isn’t, per se, fixable; however, you can introduce some canary logic (again, this will probably be the feature of a future post).





Asynchronous Behaviour in Applications using Message Queues

This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.

The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.

To this end, I’ve extended my helper class to include a TopicHelper:

public class TopicHelper : IDisposable
    IReceiveTopicBehaviour _receiveBehaviour = null;
    ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection _connection = null;
    IModel _channel = null;
    EventingBasicConsumer _consumer = null;
    private void SetupChannel()
        if (_connection == null)
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
    private void SetupConsumer()
        if (_channel == null) throw new Exception("Channel is not set-up");
        if (_connection == null) throw new Exception("Connection is not set-up");
        if (_consumer == null)
            _consumer = new EventingBasicConsumer(_channel);
    public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
        _receiveBehaviour = behaviour;
        _channel.ExchangeDeclare(topicName, "topic");
        var queue = _channel.QueueDeclare();
        var queueName = queue.QueueName;
        _channel.QueueBind(queue: queueName,
        exchange: topicName,
        routingKey: key);
        _consumer.Received += Consumer_Received;
        _channel.BasicConsume(queueName, true, _consumer);
    private void Consumer_Received(object sender, BasicDeliverEventArgs e)
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
    public void SendTopic(string topicName, string key, string data)
        _channel.ExchangeDeclare(topicName, "topic");
        _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));            
    public void Dispose()

There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:


The call to set-up the alert is this:

public partial class MainWindow : Window
    private TopicHelper _topicHelper = new TopicHelper();
    public MainWindow()
        Task.Run(() =>
            ReceiveTopic recTopic = new ReceiveTopic();
            _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);

I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:

public class ReceiveTopic : IReceiveTopicBehaviour
    public void OnReceive(string message)

This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.

The final piece of the puzzle is the send alert; in my case this is a console app:


Here’s the code for the main function:

static void Main(string[] args)
    Console.WriteLine("Alert: ");
    string alertText = Console.ReadLine();
    using (TopicHelper helper = new TopicHelper())
        helper.SendTopic("Alerts", "thisApp", alertText);

Does it work?

Yes, of course.