Tag Archives: BasicPublish

Using Pub/Sub (or the Fanout Pattern) in Rabbit MQ in .Net 6

I’ve previously spoken and written quite extensively about the pub/sub pattern using message brokers such as GCP and Azure. I’ve also posted a few articles about Rabbit MQ.

In this post, I’d like to cover the Rabbit MQ concept of pub/sub.

The Concept

Most message brokers broadly support two types of message exchange. The first type is a queue: that is, a single, persistent list of messages that can be read by one, or multiple consumers. The use case I usually use for this is sending e-mails: imagine you have a massive amount of e-mails to send: write them all to a queue, and then set 3 or 4 consumers reading the queue and sending the mails.

The second type is publish / subscribe, or pub/sub. This is, essentially, the concept that each consumer has its own private queue. Imagine that you want to notify all the applications in your system that a sales order has been raised: each interested party would register itself as a consumer and, when a message is sent, they would all receive that message. This pattern works well for distributed systems.

As I said, most message brokers broadly support these two concepts, although annoyingly, in different ways and with different labels. Here, we’ll show how RabbitMQ deals with this.

Setting up RabbitMQ

Technology has moved on since the last time I wrote about installing and running it. The following docker command should have you set-up in a couple of seconds:

docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Once it’s running, you can view the dashboard here. If you haven’t changed anything, the default username / password is guest / guest.

Receiver

Before we get into any actual code, you’ll need to install the Rabbiq MQ Client NuGet Package.

For pub/sub, the first task is to set-up a receiver. The following code should do that for you:

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

var result = channel.QueueDeclare("OrderRaised", false, false, false, null);
string queueName = result.QueueName;
channel.QueueBind(queueName, "SalesOrder", "");

Console.WriteLine(result);
  
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
  
channel.BasicConsume(queueName, true, consumer);


Console.WriteLine("Receiving...");
Console.ReadLine();

static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(message);
}

In the code above, you’ll see that we first set-up an exchange called SalesOrder, and we tell that exchange that it’s a Fanout exchange.

We then declare a queue, and bind it to the exchange – that is, it will receive messages sent to that exchange. Notice that we receive from the queue.

Finally, we set-up the consumer, and tell it what to do when a message is received (in this case, just output to the console window).

Sender

For the sender, the code is much simpler:

static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.ExchangeDeclare("SalesOrder", ExchangeType.Fanout);

    channel.BasicPublish("SalesOrder", "", false, null, Encoding.UTF8.GetBytes(message));
}

Notice that we don’t have any concept of the queue here, we simply publish to the exchange – what happens after that is no concern of the publisher.

Summary

I keep coming back to Rabbit – especially for demos and concepts, as it runs locally easily, and has many more options than the main cloud providers – at least in terms of actual messaging capability. If you’re just learning about message brokers, Rabbit is definitely a good place to start.

Message Persistence in RabbitMQ and BenchMarkDotNet

(Note: if you want to follow the code on this, you might find it easier if you start from the project that I create here.)

A queue in a message broker can be persistent, which means that, should you have a power failure (or just shut down the server), when it comes back, the queue is still there.

So, we can create a durable (persistent) queue, like this:

var result = channel.QueueDeclare("NewQueue", true, false, false, args);

The second parameter indicates that the queue is durable. Let’s send it some messages:

static void Main(string[] args)
{            
    for (int i = 1; i <= 100; i++)
    {
        string msg = $"test{i}";
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Now we have 100 messages:

persist1

Let’s simulate a server reboot:

parsist2

Following the reboot, it’s gone:

persist3

Admittedly, that doesn’t sound very durable!

Why?

The reason for this, is that the durability of the queue doesn’t affect the durability of the message. At least, if the queue is durable, it doesn’t make the message so.

How can it be made persistent?

Let’s change our send code a little:

private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        IBasicProperties prop = channel.CreateBasicProperties();
        prop.Persistent = true;
 
        channel.BasicPublish("", "NewQueue", prop, Encoding.UTF8.GetBytes(message));                
 
    }
}

The only difference is the addition of the IBasicProperties parameter. The Persistent flag is set. Now we’ll re run the same test; here’s the messages:

persist4

And after restarting the service:

persist5

As you can see, the messages are still there, and you can see the time difference where they’re been restored to the queue after a failure.

Speed – Introducing BenchmarkDotNet

I suppose the main question here is what price do you pay for durability. This gives me a chance to play with a new tool that I heard about a little while ago: BenchmarkDotNet.

It’s quite straightforward to use, just add the NuGet package:

Install-Package BenchmarkDotNet

There’s a bit of refactoring; I effectively ripped out the send and called it from a separate class:

class Program
{        
    static void Main(string[] args)
    {
        BenchmarkRunner.Run<SpeedTest>();
    }
}
 
public class SpeedTest
{
    [Benchmark]
    public void SendNewMessagePersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", true);
    }
 
    [Benchmark]
    public void SendNewMessageNonPersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", false);
    }
 
 
}

I then ran this:

persist6

And it produced this:

persist7

So, it is a bit slower to persist the message. I’m not sure how helpful this information is: I probably could have guessed that persisting the message would have been slower beforehand. Having said that, I am quite impressed with BenchMarkDotNet!

A C# Programmer’s Guide to Installing, Running and Messaging with RabbitMQ

Following this year’s trip to DDD North I got speaking to someone about Rabbit MQ. I’d previously done some research on Active MQ, however, given that there are, realistically, only two options in this market, I thought I should have a look at the competition.

Install from here.

You also need to install Erlang.

After the install… nothing happens. No matter how long you wait, assuming it’s just your slow laptop (don’t want to talk about that anymore). Navigate to:

rabbitmq1

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0\sbin

In a DOS prompt, type:

rabbitmq-plugins enable rabbitmq_management

rabbitmq2

Rabbit will now run as a service, which also feels much cleaner that ActiveMQs console app style interface.

Now, log-in to http://localhost:15672/, and log-in as guest/guest:

rabbitmq3

Code

Just like with ActiveMQ, there is a NuGet package that you can use for interacting with the exchange:

https://www.nuget.org/packages/RabbitMQ.Client. Your project needs .Net 4.5.1 or higher; otherwise, you’ll see this:

rabbitmq4

If it’s not, upgrade your project to 4.5.1:

rabbitmq5

Okay, so now we’re up and running, here’s the code for the send:

static void Main(string[] args)
{            
    while (true)
    {
        string msg = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(msg))
        {
            break;
        }
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Just like with ActiveMQ, you can now just look at the queue:

rabbitmq6

A quick caveat here:

var result = channel.QueueDeclare("NewQueue");

Doesn’t work because exclusive defaults to ‘true’, meaning that only your session can see it.

Receiving a Message

static void Main(string[] args)
{
    while (true)
    {
        ReceiveNextMessage();
 
        string exit = Console.ReadLine();
        if (!string.IsNullOrWhiteSpace(exit)) break;
    }
 
}
 
private static void ReceiveNextMessage()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume("NewQueue", true, consumer);
 
        System.Threading.Thread.Sleep(1000);
 
    }
}
 
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

rabbitmq7

Why the Thread.Sleep? The reason is that, because the message consumer is event based, you need to stop tidying up the objects until you’re done. Obviously, sleeping for 1 second has its issues. I suspect this could be better achieved using a TaskCompletionSource, of even just a Console.ReadLine(), but for the purposes of this post, it suffices.

Topics

The concept of topics seems to be the same as ActiveMQ; however, the implementation is different, and there were a few dead-ends to go down first.

The following code, for example:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "testTopic");
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

Gives the error: COMMAND_INVALID – unknown exchange type ‘testTopic’

rabbitmq8

RabbitMQ seems to have some magic strings that determine the message type. This works:

 
private static void SendNewTopic(string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "topic"); // topic is a magic string
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

The “test” is a routing code. These seem to be quite an involved system; however, it basically tells the exchange where the message is going.

rabbitmq9

Receive

private static void ReceiveNextTopic()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("NewTopic", "topic");
        var queueName = channel.QueueDeclare().QueueName;
 
        channel.QueueBind(queue: queueName,
                        exchange: "NewTopic",
                        routingKey: "test");
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume(queueName, true, consumer);
 
        Console.ReadLine();
    }
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

Notice the routing key; this (broadly speaking) needs to match the sender’s routing key. The purpose seems to be to allow a complex mapping between publisher and subscriber.

Conclusion

In comparison to ActiveMQ, RabbitMQ seems to be a more actively maintained code base; a quick comparison of the GitHub repo of each reveals more activity in Rabbit. Rabbit also seems a little cleaner in the way it’s run and installed; however, it also seems to have more moving parts.

There are a number of companies that offer support for ActiveMQ commercially; however, the company that wrote RabbitMQ offer commercial support, should you need it.

Can you run Active and RabbitMQ together?

No. No, you can’t. They both seem to want to use the same resources, and so if you want to use them both on the same machine, you can, but only one at a time.

Acknowledgements

I used the following websites extensively in this article:

http://arcware.net/installing-rabbitmq-on-windows/

https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html