Tag Archives: RabbitMQ

A C# Programmer’s Guide to Queues and Sending a Message with Azure Service Bus

I have previously written about message queue systems. The big two, as far as I can see, are Active MQ and RabbitMQ.

Microsoft have always had MSMQ*, but it’s not really a message broker as such (I believe that you can get similar behaviour using NServiceBus, but have never tried that myself). However, with Azure comes the Azure Service Bus.

The first thing that you need to do is set-up an Azure account. Note that Microsoft offer Azure as a paid service, and so this is not free. However, they also offer free trials and free Azure credit if you have an MSDN.

Log on to:

https://portal.azure.com

Namespace

Namespaces are an important concept in Azure. They basically allow you to split a single Azure account across many functions, but what that means is that everything you do relates to a specific namespace.

To add one, first, pick a pricing tier:

Make sure that your Namepsace isn’t taken:

You’ll then get an alert to say it worked:

If you refresh, you should now see your namespace:

Create Test Project

I always try to start with a console app when trying new stuf. Add NuGet reference:

It is my understanding that, as with ActiveMQ and RabbitMQ, these client libraries are an abstraction over a set of HTTP Post calls. In the case of Azure, I believe that, behind the scenes, it uses WCF to handle all this.

Using the Namespace

Using a message queue system such as RabbitMQ or ActiveMQ, you need a message queue server, and a URL that relates to it. However, one of the things Azure allows you to do is to abstract that; for example:

        static void Main(string[] args)
        {
            Console.WriteLine($"Getting service bus URI...");
            Uri uri = ServiceBusEnvironment.CreateAccessControlUri("pcm-servicebustest");
            Console.WriteLine($"Service Bus URI: {uri.ToString()}");
            Console.ReadLine();
        }

Tells me what the URI of the message queue broker is:

Adding a message to a queue

In order to do anything with a message queue in Azure, you need a token; effectively, this provides a level of security

Tokens

Get the key:

You can store these details in the app/web.config, or you can use them programmatically:

        private static TokenProvider GetTokenProvider(Uri uri)
        {
            Console.WriteLine($"Getting token...");
            TokenProvider tp = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "JWh82nkstIAi4w5tW6MEj7GKQfoiZlwBYjHx9wfDqdA=");                                                

            Console.WriteLine($"Token {tp.ToString()}");
            return tp;
        }

Queues

Putting the above calls together, we can now create a queue in Azure:

        private static void CreateNewQueue(Uri uri, TokenProvider tokenProvider)
        {
            Console.WriteLine($"Creating new queue...");
            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);

            Console.WriteLine($"Created namespace manager for {nm.Address}");
            if (nm.QueueExists("TestQueue"))
            {
                Console.WriteLine("Queue already exists");
            }
            else
            {
                Console.WriteLine("Creating new queue");
                QueueDescription qd = nm.CreateQueue("TestQueue");
            }
        }

Incidentally, the act of creating a queue appears to have cost £0.24 GBP. If you have MSDN, you should get £40 GBP credit each month (at the time of writing).

Now we have a queue, let’s put some messages on it.

Adding a message

        private static void AddNewMessage(string id, string messageBody, string queueName)
        {
            BrokeredMessage message = new BrokeredMessage(messageBody)
            {
                MessageId = id
            };

            string connectionString = GetConnectionString();
            
            QueueClient queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
            queueClient.Send(message);
        }

The Connection String can be found here:

We can now see that a message has, indeed, been added to the queue:

At this time, this is about as much as you can see from this portal.

Errors

These are some errors that I encountered during the creation of this post, and their solutions.

System.UnauthorizedAccessException

System.UnauthorizedAccessException: ‘The token provider was unable to provide a security token while accessing ‘https://pcm-servicebustest-sb.accesscontrol.windows.net/WRAPv0.9/’. Token provider returned message: ‘The remote name could not be resolved: ‘pcm-servicebustest-sb.accesscontrol.windows.net”.’

The cause is not an invalid secret

That’s because this line:

TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider("RootManageSharedAccessKey", "jjdsjdsjk");

Gives the error:

System.ArgumentException: ‘The ‘issuerSecret’ is invalid.’

The fix…

This code is littered throughout the web:

TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider("RootManageSharedAccessKey", "jjdsjdsjk");

But the correct code was:

TokenProvider tp = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "JWh82nkstIAi4w5tW6MEj7GKQfoiZlwBYjHx9wfDqdA=");                                                

System.ArgumentNullException: ‘Queue name should be specified as EntityPath in connectionString.’

Or: 40400: Endpoint not found.

Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException: ‘40400: Endpoint not found., Resource:sb://pcm-servicebustest.servicebus.windows.net/atestqueue. TrackingId:48de75d7-fb01-4fa9-b72e-20a5dc090a8d_G11, SystemTracker:pcm-servicebustest.servicebus.windows.net:aTestQueue, Timestamp:5/25/2017 5:23:27 PM

Means (obviously) that the following code:

QueueClient.CreateFromConnectionString(connectionString, queueName);

Either doesn’t have the queue name, or it is wrong.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-exceptions

https://blogs.msdn.microsoft.com/brunoterkaly/2014/08/07/learn-how-to-create-a-queue-place-and-read-a-message-using-azure-service-bus-queues-in-5-minutes/

https://stackoverflow.com/questions/18558299/servicebus-throws-401-unauthorized-error

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

https://msdn.microsoft.com/en-us/library/jj542433.aspx?f=255&MSPPError=-2147217396

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-multi-tier-app-using-service-bus-queues

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

* Microsoft probably haven’t ALWAYS had MSMQ. There was probably a time in the early 90’s where they didn’t have a message queue system at all.

A C# Programmer’s Guide to Installing, Running and Messaging with RabbitMQ

Following this year’s trip to DDD North I got speaking to someone about Rabbit MQ. I’d previously done some research on Active MQ, however, given that there are, realistically, only two options in this market, I thought I should have a look at the competition.

Install from here.

You also need to install Erlang.

After the install… nothing happens. No matter how long you wait, assuming it’s just your slow laptop (don’t want to talk about that anymore). Navigate to:

rabbitmq1

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0\sbin

In a DOS prompt, type:

rabbitmq-plugins enable rabbitmq_management

rabbitmq2

Rabbit will now run as a service, which also feels much cleaner that ActiveMQs console app style interface.

Now, log-in to http://localhost:15672/, and log-in as guest/guest:

rabbitmq3

Code

Just like with ActiveMQ, there is a NuGet package that you can use for interacting with the exchange:

https://www.nuget.org/packages/RabbitMQ.Client. Your project needs .Net 4.5.1 or higher; otherwise, you’ll see this:

rabbitmq4

If it’s not, upgrade your project to 4.5.1:

rabbitmq5

Okay, so now we’re up and running, here’s the code for the send:

static void Main(string[] args)
{            
    while (true)
    {
        string msg = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(msg))
        {
            break;
        }
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Just like with ActiveMQ, you can now just look at the queue:

rabbitmq6

A quick caveat here:

var result = channel.QueueDeclare("NewQueue");

Doesn’t work because exclusive defaults to ‘true’, meaning that only your session can see it.

Receiving a Message

static void Main(string[] args)
{
    while (true)
    {
        ReceiveNextMessage();
 
        string exit = Console.ReadLine();
        if (!string.IsNullOrWhiteSpace(exit)) break;
    }
 
}
 
private static void ReceiveNextMessage()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume("NewQueue", true, consumer);
 
        System.Threading.Thread.Sleep(1000);
 
    }
}
 
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

rabbitmq7

Why the Thread.Sleep? The reason is that, because the message consumer is event based, you need to stop tidying up the objects until you’re done. Obviously, sleeping for 1 second has its issues. I suspect this could be better achieved using a TaskCompletionSource, of even just a Console.ReadLine(), but for the purposes of this post, it suffices.

Topics

The concept of topics seems to be the same as ActiveMQ; however, the implementation is different, and there were a few dead-ends to go down first.

The following code, for example:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "testTopic");
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

Gives the error: COMMAND_INVALID – unknown exchange type ‘testTopic’

rabbitmq8

RabbitMQ seems to have some magic strings that determine the message type. This works:

 
private static void SendNewTopic(string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "topic"); // topic is a magic string
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

The “test” is a routing code. These seem to be quite an involved system; however, it basically tells the exchange where the message is going.

rabbitmq9

Receive

private static void ReceiveNextTopic()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("NewTopic", "topic");
        var queueName = channel.QueueDeclare().QueueName;
 
        channel.QueueBind(queue: queueName,
                        exchange: "NewTopic",
                        routingKey: "test");
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume(queueName, true, consumer);
 
        Console.ReadLine();
    }
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

Notice the routing key; this (broadly speaking) needs to match the sender’s routing key. The purpose seems to be to allow a complex mapping between publisher and subscriber.

Conclusion

In comparison to ActiveMQ, RabbitMQ seems to be a more actively maintained code base; a quick comparison of the GitHub repo of each reveals more activity in Rabbit. Rabbit also seems a little cleaner in the way it’s run and installed; however, it also seems to have more moving parts.

There are a number of companies that offer support for ActiveMQ commercially; however, the company that wrote RabbitMQ offer commercial support, should you need it.

Can you run Active and RabbitMQ together?

No. No, you can’t. They both seem to want to use the same resources, and so if you want to use them both on the same machine, you can, but only one at a time.

Acknowledgements

I used the following websites extensively in this article:

http://arcware.net/installing-rabbitmq-on-windows/

https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

Asynchronous Behaviour in Applications using Message Queues

This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.

The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.

To this end, I’ve extended my helper class to include a TopicHelper:

public class TopicHelper : IDisposable
{
    IReceiveTopicBehaviour _receiveBehaviour = null;
    ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection _connection = null;
    IModel _channel = null;
    EventingBasicConsumer _consumer = null;
 
    private void SetupChannel()
    {
        if (_connection == null)
        {
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
        }
    }
 
    private void SetupConsumer()
    {
        if (_channel == null) throw new Exception("Channel is not set-up");
        if (_connection == null) throw new Exception("Connection is not set-up");
 
        if (_consumer == null)
            _consumer = new EventingBasicConsumer(_channel);
    }
 
    public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
    {
        _receiveBehaviour = behaviour;
 
        SetupChannel();
        SetupConsumer();
 
        _channel.ExchangeDeclare(topicName, "topic");
        var queue = _channel.QueueDeclare();
        var queueName = queue.QueueName;
 
        _channel.QueueBind(queue: queueName,
        exchange: topicName,
        routingKey: key);
 
        _consumer.Received += Consumer_Received;
 
        _channel.BasicConsume(queueName, true, _consumer);
 
    }
 
    private void Consumer_Received(object sender, BasicDeliverEventArgs e)
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(message);
 
        _receiveBehaviour.OnReceive(message);
    }
 
    public void SendTopic(string topicName, string key, string data)
    {
        SetupChannel();
 
        _channel.ExchangeDeclare(topicName, "topic");
        _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));            
 
    }
 
    public void Dispose()
    {            
        _connection.Dispose();
        _channel.Dispose();            
    }
}

There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:

rabbitalert1

The call to set-up the alert is this:

public partial class MainWindow : Window
{
    private TopicHelper _topicHelper = new TopicHelper();
    public MainWindow()
    {
        InitializeComponent();
 
        Task.Run(() =>
        {
            ReceiveTopic recTopic = new ReceiveTopic();
            _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);
        });
    }
}

I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:

public class ReceiveTopic : IReceiveTopicBehaviour
{
    public void OnReceive(string message)
    {
        System.Windows.MessageBox.Show(message);
    }
}

This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.

The final piece of the puzzle is the send alert; in my case this is a console app:

rabbitalert2

Here’s the code for the main function:

static void Main(string[] args)
{
    Console.WriteLine("Alert: ");
    string alertText = Console.ReadLine();
 
    using (TopicHelper helper = new TopicHelper())
    {
        helper.SendTopic("Alerts", "thisApp", alertText);
    }
 
    Console.WriteLine("Finished");
    Console.ReadLine();
}

Does it work?

Yes, of course.

rabbitalert3