Tag Archives: QueueBind

Using Pub/Sub (or the Fanout Pattern) in Rabbit MQ in .Net 6

I’ve previously spoken and written quite extensively about the pub/sub pattern using message brokers such as GCP and Azure. I’ve also posted a few articles about Rabbit MQ.

In this post, I’d like to cover the Rabbit MQ concept of pub/sub.

The Concept

Most message brokers broadly support two types of message exchange. The first type is a queue: that is, a single, persistent list of messages that can be read by one, or multiple consumers. The use case I usually use for this is sending e-mails: imagine you have a massive amount of e-mails to send: write them all to a queue, and then set 3 or 4 consumers reading the queue and sending the mails.

The second type is publish / subscribe, or pub/sub. This is, essentially, the concept that each consumer has its own private queue. Imagine that you want to notify all the applications in your system that a sales order has been raised: each interested party would register itself as a consumer and, when a message is sent, they would all receive that message. This pattern works well for distributed systems.

As I said, most message brokers broadly support these two concepts, although annoyingly, in different ways and with different labels. Here, we’ll show how RabbitMQ deals with this.

Setting up RabbitMQ

Technology has moved on since the last time I wrote about installing and running it. The following docker command should have you set-up in a couple of seconds:

docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Once it’s running, you can view the dashboard here. If you haven’t changed anything, the default username / password is guest / guest.

Receiver

Before we get into any actual code, you’ll need to install the Rabbiq MQ Client NuGet Package.

For pub/sub, the first task is to set-up a receiver. The following code should do that for you:

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

var result = channel.QueueDeclare("OrderRaised", false, false, false, null);
string queueName = result.QueueName;
channel.QueueBind(queueName, "SalesOrder", "");

Console.WriteLine(result);
  
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
  
channel.BasicConsume(queueName, true, consumer);


Console.WriteLine("Receiving...");
Console.ReadLine();

static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(message);
}

In the code above, you’ll see that we first set-up an exchange called SalesOrder, and we tell that exchange that it’s a Fanout exchange.

We then declare a queue, and bind it to the exchange – that is, it will receive messages sent to that exchange. Notice that we receive from the queue.

Finally, we set-up the consumer, and tell it what to do when a message is received (in this case, just output to the console window).

Sender

For the sender, the code is much simpler:

static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

    channel.BasicPublish("SalesOrder", "", false, null, Encoding.UTF8.GetBytes(message));
}

Notice that we don’t have any concept of the queue here, we simply publish to the exchange – what happens after that is no concern of the publisher.

Summary

I keep coming back to Rabbit – especially for demos and concepts, as it runs locally easily, and has many more options than the main cloud providers – at least in terms of actual messaging capability. If you’re just learning about message brokers, Rabbit is definitely a good place to start.

Asynchronous Behaviour in Applications using Message Queues

This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.

The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.

To this end, I’ve extended my helper class to include a TopicHelper:

public class TopicHelper : IDisposable
{
    IReceiveTopicBehaviour _receiveBehaviour = null;
    ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection _connection = null;
    IModel _channel = null;
    EventingBasicConsumer _consumer = null;
 
    private void SetupChannel()
    {
        if (_connection == null)
        {
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
        }
    }
 
    private void SetupConsumer()
    {
        if (_channel == null) throw new Exception("Channel is not set-up");
        if (_connection == null) throw new Exception("Connection is not set-up");
 
        if (_consumer == null)
            _consumer = new EventingBasicConsumer(_channel);
    }
 
    public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
    {
        _receiveBehaviour = behaviour;
 
        SetupChannel();
        SetupConsumer();
 
        _channel.ExchangeDeclare(topicName, "topic");
        var queue = _channel.QueueDeclare();
        var queueName = queue.QueueName;
 
        _channel.QueueBind(queue: queueName,
        exchange: topicName,
        routingKey: key);
 
        _consumer.Received += Consumer_Received;
 
        _channel.BasicConsume(queueName, true, _consumer);
 
    }
 
    private void Consumer_Received(object sender, BasicDeliverEventArgs e)
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(message);
 
        _receiveBehaviour.OnReceive(message);
    }
 
    public void SendTopic(string topicName, string key, string data)
    {
        SetupChannel();
 
        _channel.ExchangeDeclare(topicName, "topic");
        _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));            
 
    }
 
    public void Dispose()
    {            
        _connection.Dispose();
        _channel.Dispose();            
    }
}

There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:

rabbitalert1

The call to set-up the alert is this:

public partial class MainWindow : Window
{
    private TopicHelper _topicHelper = new TopicHelper();
    public MainWindow()
    {
        InitializeComponent();
 
        Task.Run(() =>
        {
            ReceiveTopic recTopic = new ReceiveTopic();
            _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);
        });
    }
}

I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:

public class ReceiveTopic : IReceiveTopicBehaviour
{
    public void OnReceive(string message)
    {
        System.Windows.MessageBox.Show(message);
    }
}

This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.

The final piece of the puzzle is the send alert; in my case this is a console app:

rabbitalert2

Here’s the code for the main function:

static void Main(string[] args)
{
    Console.WriteLine("Alert: ");
    string alertText = Console.ReadLine();
 
    using (TopicHelper helper = new TopicHelper())
    {
        helper.SendTopic("Alerts", "thisApp", alertText);
    }
 
    Console.WriteLine("Finished");
    Console.ReadLine();
}

Does it work?

Yes, of course.

rabbitalert3