Tag Archives: ConnectionFactory

Message Persistence in RabbitMQ and BenchMarkDotNet

(Note: if you want to follow the code on this, you might find it easier if you start from the project that I create here.)

A queue in a message broker can be persistent, which means that, should you have a power failure (or just shut down the server), when it comes back, the queue is still there.

So, we can create a durable (persistent) queue, like this:

var result = channel.QueueDeclare("NewQueue", true, false, false, args);

The second parameter indicates that the queue is durable. Let’s send it some messages:

static void Main(string[] args)
{            
    for (int i = 1; i <= 100; i++)
    {
        string msg = $"test{i}";
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Now we have 100 messages:

persist1

Let’s simulate a server reboot:

parsist2

Following the reboot, it’s gone:

persist3

Admittedly, that doesn’t sound very durable!

Why?

The reason for this, is that the durability of the queue doesn’t affect the durability of the message. At least, if the queue is durable, it doesn’t make the message so.

How can it be made persistent?

Let’s change our send code a little:

private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        IBasicProperties prop = channel.CreateBasicProperties();
        prop.Persistent = true;
 
        channel.BasicPublish("", "NewQueue", prop, Encoding.UTF8.GetBytes(message));                
 
    }
}

The only difference is the addition of the IBasicProperties parameter. The Persistent flag is set. Now we’ll re run the same test; here’s the messages:

persist4

And after restarting the service:

persist5

As you can see, the messages are still there, and you can see the time difference where they’re been restored to the queue after a failure.

Speed – Introducing BenchmarkDotNet

I suppose the main question here is what price do you pay for durability. This gives me a chance to play with a new tool that I heard about a little while ago: BenchmarkDotNet.

It’s quite straightforward to use, just add the NuGet package:

Install-Package BenchmarkDotNet

There’s a bit of refactoring; I effectively ripped out the send and called it from a separate class:

class Program
{        
    static void Main(string[] args)
    {
        BenchmarkRunner.Run<SpeedTest>();
    }
}
 
public class SpeedTest
{
    [Benchmark]
    public void SendNewMessagePersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", true);
    }
 
    [Benchmark]
    public void SendNewMessageNonPersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", false);
    }
 
 
}

I then ran this:

persist6

And it produced this:

persist7

So, it is a bit slower to persist the message. I’m not sure how helpful this information is: I probably could have guessed that persisting the message would have been slower beforehand. Having said that, I am quite impressed with BenchMarkDotNet!

Asynchronous Behaviour in Applications using Message Queues

This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.

The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.

To this end, I’ve extended my helper class to include a TopicHelper:

public class TopicHelper : IDisposable
{
    IReceiveTopicBehaviour _receiveBehaviour = null;
    ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection _connection = null;
    IModel _channel = null;
    EventingBasicConsumer _consumer = null;
 
    private void SetupChannel()
    {
        if (_connection == null)
        {
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
        }
    }
 
    private void SetupConsumer()
    {
        if (_channel == null) throw new Exception("Channel is not set-up");
        if (_connection == null) throw new Exception("Connection is not set-up");
 
        if (_consumer == null)
            _consumer = new EventingBasicConsumer(_channel);
    }
 
    public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
    {
        _receiveBehaviour = behaviour;
 
        SetupChannel();
        SetupConsumer();
 
        _channel.ExchangeDeclare(topicName, "topic");
        var queue = _channel.QueueDeclare();
        var queueName = queue.QueueName;
 
        _channel.QueueBind(queue: queueName,
        exchange: topicName,
        routingKey: key);
 
        _consumer.Received += Consumer_Received;
 
        _channel.BasicConsume(queueName, true, _consumer);
 
    }
 
    private void Consumer_Received(object sender, BasicDeliverEventArgs e)
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(message);
 
        _receiveBehaviour.OnReceive(message);
    }
 
    public void SendTopic(string topicName, string key, string data)
    {
        SetupChannel();
 
        _channel.ExchangeDeclare(topicName, "topic");
        _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));            
 
    }
 
    public void Dispose()
    {            
        _connection.Dispose();
        _channel.Dispose();            
    }
}

There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:

rabbitalert1

The call to set-up the alert is this:

public partial class MainWindow : Window
{
    private TopicHelper _topicHelper = new TopicHelper();
    public MainWindow()
    {
        InitializeComponent();
 
        Task.Run(() =>
        {
            ReceiveTopic recTopic = new ReceiveTopic();
            _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);
        });
    }
}

I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:

public class ReceiveTopic : IReceiveTopicBehaviour
{
    public void OnReceive(string message)
    {
        System.Windows.MessageBox.Show(message);
    }
}

This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.

The final piece of the puzzle is the send alert; in my case this is a console app:

rabbitalert2

Here’s the code for the main function:

static void Main(string[] args)
{
    Console.WriteLine("Alert: ");
    string alertText = Console.ReadLine();
 
    using (TopicHelper helper = new TopicHelper())
    {
        helper.SendTopic("Alerts", "thisApp", alertText);
    }
 
    Console.WriteLine("Finished");
    Console.ReadLine();
}

Does it work?

Yes, of course.

rabbitalert3