Category Archives: Azure Service Bus

Azure Service Bus SDK Libraries

I’ve written pretty extensively on the Microsoft.Azure.ServiceBus SDK. In this post, I’m just covering the fact that this library is on its way to deprecation (don’t panic, its predecessor has been hanging around since 2011!)

Let’s see what these libraries are and some links.

WindowsAzure.ServiceBus

This library does look like it’s on its way to being deprecated. It supports .Net Framework only.

The NuGet package is here, but it’s closed source:

https://www.nuget.org/packages/WindowsAzure.ServiceBus

Microsoft.Azure.ServiceBus

This library was introduced to support .Net Core.

The NuGet package is here:

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

The code for this is open source:

https://github.com/Azure/azure-service-bus-dotnet

Azure.Messaging.ServiceBus

If you read Sean Feldman’s article here (which this was heavily based on), you’ll see that this seems to be due to some restructuring of teams. The code has changed, and MS say it’s more consistent (although what with, I’m unsure).

The NuGet Package is here:

https://www.nuget.org/packages/Azure.Messaging.ServiceBus

The source code for this is here:

https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus

References

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/the-future-of-asb-dotnet-sdk

Service Bus Batching and Pre-Fetch

Azure Service Bus is often used as a way to link services (as the name suggests). This typically means that the throughput is large, but not so large that anyone worries about the speed. However, in this post, I’m going to cover some techniques to speed up service bus messaging.

A quick disclaimer here, though: it might be that if you’re reading this then you’re not using the best tool for the job, and perhaps something more lightweight might be a better fit.

Essentially, there are two inbuilt methods in Service Bus to do this: Batching, and Prefetch; let’s look at Batching first.

Batching

The principle behind batching is very simple: instead of making multiple calls to Azure Service Bus with multiple messages, you make one call with all the messages that you wish to send.

Let’s see some code that sends 1000 messages:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

    await queueClient.SendAsync(message);
}
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

The timing for these posts is not a benchmark test, but whatever my test recorded at the time – your mileage may vary a lot, although I would say they are a fair rough estimation. As you’ll see later on, the differences are very stark.

In my tests, sending 1000 messages took around 30 seconds (which is good enough for most scenarios, as they are unlikely to be sent in a batch such as this). Let’s see how we can batch the messages to speed things up:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);
var messages = new List<Message>();

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));                

    messages.Add(message);
}
await queueClient.SendAsync(messages);
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

As you can see, there’s not a huge amount of change to the code. It’s worth noting that in the first example, if the code crashes half way through the send, half the messages would be sent – whereas with the batch, it’s all or nothing. However, this came in at under a second:

On the receive side, we have a similar thing.

Batch Receive

Here, we can receive messages in chunks, instead of one at a time. Again, let’s see how we might receive 1000 messages (there are multiple ways to do this):

var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
for (int i = 0; i < 1000; i++)
{
    var message = await messageReceiver.ReceiveAsync();
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

The execution here takes around 127 seconds (over 2 minutes) in my tests:

The same is true for batch receipt as for batch send; with a slight caveat:

var stopwatch = new Stopwatch();
stopwatch.Start();
Int count = 1000;
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    var messages = await messageReceiver.ReceiveAsync(remainingCount);

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

Note that the CompleteAsync can also be called in batch.

You may be wondering here what the while loop is all about. In fact, it’s because batch receive isn’t guaranteed to return the exact number of messages that you request. However, we still brought the receive time down to around 10 seconds:

A Note on Batching and Timeouts

It’s worth bearing in mind that when you retrieve a batch of messages, you’re doing just that – retrieving them. In a PeekLock scenario, they are now locked; and, if you don’t complete or abandon them, they will time out like any other message. If you have a large number of messages, you may need to extend the timeout; for example:

var messages = await messageReceiver.ReceiveAsync(remainingCount, TimeSpan.FromSeconds(20));

In the next section, we’ll discuss the second technique, of allowing the service bus to “run ahead” and get messages before you actually request them.

Prefetch

Prefetch speeds up the retrieval of messages by getting Azure Service Bus to return messages ahead of them being needed. This presents a problem (similar to receiving in batch), which is that the system is actually retrieving messages on your behalf before you ask for them. In this example, we’ve been using PeekLock – that is, the message is left on the queue until we explicitly complete it. However, once you Peek the message, it’s locked. That means that with the code above, we can easily trip ourselves up.

int count = 1000;
var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
messageReceiver.PrefetchCount = prefetchCount;
for (int i = 0; i < count; i++)
{
    var message = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(60));
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

Note the extended timeout on the Receive allows for the prefetched messages to complete.

Here’s the timing for Prefetch:

This is slightly quicker than processing the messages one at a time, but much slower than a batch. The main reason being that the complete takes the bulk of the time.

Remember that with Prefetch, if you’re using PeekLock, once you’ve pre-fetched a message, the timeout on the lock starts – this means that if you’re lock is for 5 seconds, and you’ve prefetched 500 records – you need to be sure that you’ll get around to them in time.

ReceiveAndDelete

Whilst the Prefetch messages timing out may be bad, with ReceiveAndDelete, they are taken off the queue, this means that you can consume the messages without ever actually seeing them!

Prefetch with Batch

Here, we can try to use the prefetch and batch combined:

int count = 1000
var stopwatch = new Stopwatch();
stopwatch.Start();
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    messageReceiver.PrefetchCount = prefetchCount;
    var messages = await messageReceiver.ReceiveAsync(remainingCount);
    if (messages == null) break;

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

In fact, in my tests, the timing for this was around the same as a batch receipt:

There may be some advantages with much higher numbers, but generally, combining the two in this manner doesn’t seem to provide much benefit.

References

https://www.planetgeek.ch/2020/04/27/azure-service-bus-net-sdk-deep-dive-sender-side-batching/

https://markheath.net/post/speed-up-azure-service-bus-with-batching

https://github.com/Azure/azure-service-bus-dotnet/issues/441

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/understanding-Azure-service-bus-prefetch

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch

Azure Service Bus – Auto Delete on Idle

In the past few months, I’ve written, and spoken quite a lot about Azure Service Bus – especially the new features that are now available out of the box. here I wrote about Auto-forwarding, and in this post, I’m going to cover auto-delete; I think the two subjects go hand-in-hand.

The idea behind auto-delete is that if a set of messages are time sensitive, then the message queue can be flagged for auto-deletion. This is not the same as “Time To Live” for a message, because the entire queue is deleted after a period of inactivity!

Let’s see how we can set-up a new queue, in code (see the referenced post above for the NuGet packages that you’ll need for this):

        private static async Task CreateAutoDeleteQueue(string connectionString, string queueName)
        {
            // Create authorisation rules
            var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

            var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
          
            var options = new CreateQueueOptions(queueName)
            {
                AutoDeleteOnIdle = TimeSpan.FromMinutes(5)
            };
            options.AuthorizationRules.Add(authorisationRule);

            var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

        }

We can now post a number of messages to that queue:

Then, 5 minutes after the last message have been posted (or any activity has taken place), the queue is deleted:

References

https://www.serverless360.com/blog/hidden-gems-azure-service-bus

Service Bus Management And Auto Forwarding

In preparation for a talk I’m giving, I started looking into ways that you can create Azure Service Bus queues programatically (If you want to see some of the other posts that came out of the research for this talk, they’re here and here).

In fact, there are a few different ways, and it’s very difficult to cut through the confusion to work out which method is the new way. In this article, Microsoft recommend the method that I cover in this post.

In this post, I’ll cover creating a new message queue in code, and configuring it to auto-forward to a second queue.

The Old Way

If you go back to some of my earlier posts on Azure Service Bus, you’ll see that an object called the QueueDescription was used excessively (here and here). In the newer version of the SDK that’s no longer used, and management has been separated from the usage of the service bus. This seems to make sense, when you consider that most people using the Service Bus will not need to change the structure of it; however, there were times when having the ability to create a queue that didn’t exist would make for more resilient software.

The New Way(s)

Microsoft have replaced this with, at the time of writing, three separate alternatives. This has made the whole process very confusing – especially since some of the Microsoft documentation still talks about using QueueDescription! They, however, recommend using the Azure.Messaging.ServiceBus package.

At the time of writing, that was 7.0.1:

<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.0.1" />

Creating a Queue

One of the advantages here is that you get very granular control over exactly what you’re creating; but that also means that you’re responsible for creating things such as access policies. Let’s try the following code:

var authorisationRule = new SharedAccessAuthorizationRule(
    "manage", new[] { AccessRights.Manage });

var options = new CreateQueueOptions(source);
options.AuthorizationRules.Add(authorisationRule);
            
var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

In fact, this causes the following error on run:

System.ArgumentException: ‘Manage permission should also include Send and Listen’

We’re now in a world of open source, which means we can see why this doesn’t work by looking at the code. In fact, if you specify Manage privilege, it expects three permissions (that is, you must also allow Listen and Send). The working code looks like this:

var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

var options = new CreateQueueOptions(source);
options.AuthorizationRules.Add(authorisationRule);
            
var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

In fact, the reason that I started to look into queue management, was that I wanted to see if I could configure a queue with auto forwarding, without using the Azure Portal or the Service Bus Explorer.

Auto Forwarding

In fact, it’s actually quite easy. Once you start playing with the SDK, you’ll see that you can use CreateQueueOptions to specify most queue features:

var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);

var optionsDest = new CreateQueueOptions(destination);
optionsDest.AuthorizationRules.Add(authorisationRule);
var queueDest = await serviceBusAdministrationClient.CreateQueueAsync(optionsDest);

var options = new CreateQueueOptions(source)
{
    ForwardTo = destination                
};
options.AuthorizationRules.Add(authorisationRule);
            
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

Here, we’re setting up two queues (you can only forward to a queue that exists), referred to in the string variables source and destination, then we simply set the ForwardTo property of the second queue.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-management-libraries

https://www.serverless360.com/blog/auto-forwarding-a-hidden-gem-of-service-bus

Deferred Messages in Azure Service Bus

In Azure Service Bus, you can schedule a message to deliver at a later time, but you can also defer a message until a later time.

Scheduled Versus Deferred Messages

The difference here is subtle, but important: when you schedule a message, you’re telling the Service Bus to deliver that message at a time of your choosing, when you defer a message, you telling the Service Bus to hang onto a message that has been sent, until such time as you’re ready to receive it.

Why Would you Defer a Message?

The idea here is that you are not ready for the message – but you don’t want to hold up the queue. In this respect, it’s a little like the dead letter concept; that is, there is a message that’s essentially holding up the queue – however, in this case, there’s nothing wrong with the message itself.

Let’s imagine that we receive a message that a sales order has been created – we go to get the customer information for the sales order, and we find that the customer has yet to be created (such things are possible when you start engaging in eventually consistent systems): in this case, you could defer the message, and come back to it when the customer has been created.

Some Code – How to Defer a Message

Deferring a message is actually very simple:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);
var message = await messageReceiver.ReceiveAsync();

var sequenceNumber = message.SystemProperties.SequenceNumber;
await messageReceiver.DeferAsync(message.SystemProperties.LockToken);

There’s three important concepts here:
1. The sequence number is very important: without it, the message is effectively lost; that’s because of (2)
2. You can receive a message after this, and you will never see the deferred message again until you purposely receive it, which brings us to (3)
3. To retrieve this message, you must explicitly ask for it.

To receive the deferred message you simply pass in the sequence number:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);            
var message = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

The deferred message will never time out. Messages have a “Time to Live”, after which they get moved to the Dead Letter Queue; but once a message is deferred, it will live forever, and must be received to remove it.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-deferral

Read the Dead Letter Queue

I’ve been writing about and speaking about Azure Service Bus a lot recently.

In this post, I’m going to focus on the Dead Letter Queue in more detail.

What is the Dead Letter Queue, and what has it ever done for me?

To describe what the dead letter queue does, I invite you to think about an assembly line for a car. The car in question has just come through to have a bonnet fitted (hood for any American readers). However, the guy that’s fitting the bonnet can’t get it to sit right in the hinges; he tries and tries, but it won’t fit. After a while, he goes to get his superviser, and they both try. They draft the workers in from all over the plant, but can’t get the bonnet fitted.

Meanwhile, the entire assembly line has stopped. The person that fits the steering wheel is behind the bonnet fitter, and there’s no space for him to move the car that he’s just fitted the wheel to; the dashboard fitter can’t pass onto the steering wheel, and so on.

(I have no knowledge of what a car assembly line looks like, outside of the film Christine, so apologies if this is incorrect).

A message that can’t be processed is often called a poison message, and it causes exactly this problem. The Service Bus can’t deliver any messages until this message has gone, and this message can’t go, because there’s something wrong with it. The solution is to have a dedicated queue that holds these messages: it’s called a Dead Letter Queue – it’s kind of like a holding bay for the car.

Why would a message be “poison”

There are a few reasons that a message can be considered “poison” and dead lettered; some of the most common are:

– Each queue has a maximum delivery count, if it’s exceeded – that is, we’ve tried too many times to process it
– The message can be explicitly marked as bad by the client
– The size of the message is bigger than the allocated maximum size
– The message has been “auto-forwarded” too many times

Essentially, the system tries to work out whether this message is staying around too long and causing issues with the system. It’s important to know, though, that the dead letter queue is just another queue. The message isn’t lost – just side-lined.

Dead Lettering

Let’s see how we can force a message into a dead letter queue. The easiest way to do this is to explicitly just Dead Letter the message; for example:

            var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
            var message = await messageReceiver.ReceiveAsync();

            await messageReceiver.DeadLetterAsync(message.SystemProperties.LockToken, "Really bad message");

Here, we’ve read the message, and then told Service Bus to just Dead Letter it. In real life, you may choose to do this on rare occasions, but I imagine its main use is for testing.

Abandon the Message

Another way to cause a message to be dead lettered is to exceed the Max Delivery Count. You can do this by “abandoning” the message multiple times; for example:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
var message = await messageReceiver.ReceiveAsync();

string messageBody = Encoding.UTF8.GetString(message.Body);

Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.SystemProperties.DeliveryCount}");
await messageReceiver.AbandonAsync(message.SystemProperties.LockToken);

Here, we’re reading the message, and rather than completing it, we’re abandoning it. It’s worth bearing in mind that this is what happens when you abandon a message. It’s also what happens when you read a message and just implicitly abandon it (i.e., you read it on a PeekLock and then do nothing): the AbandonAsync method doesn’t actually change the functionality of the code above – it does change the speed, though.

Reading The Dead Letter Queue

Now that we’ve dead-lettered a message, we can read the Dead Letter Queue.

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
            
            var message = await deadLetterReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);
            if (message.UserProperties.ContainsKey("DeadLetterReason"))
            {
                Console.WriteLine("Reason: {0} ", message.UserProperties["DeadLetterReason"]);
            }
            if (message.UserProperties.ContainsKey("DeadLetterErrorDescription"))
            {
                Console.WriteLine("Description: {0} ", message.UserProperties["DeadLetterErrorDescription"]);
            }

The code above sets up a MessageReceiver for the dead letter queue. The delivery count inside the dead letter queue does not increase, but it does retain the number that it had from the original queue. Effectively, all you can do with a Dead Letter message is to complete it.

DeadLetterReason

When a message is dead lettered, the properties DeadLetterReason and DeadLetterErrorDescription may get added to the message. If you forcibly dead letter the message then you have the option to add this: if you choose not to then it will not be present (hence the checks around the properties), but mostly, these will be available.

Re-submitting a Message and Transactions

We’ve now seen how to cause a message to Dead Letter, and read the Dead Letter queue; next we’re going to investigate re-submitting the message.

As a quick side not – you can’t really re-submit a message – as you’ll see, what we actually do is to complete the dead letter message, and send a copy back to the queue.

            var serviceBusConnection = new ServiceBusConnection(connectionString);

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(serviceBusConnection, deadletterPath, ReceiveMode.PeekLock);
            
            var queueClient = new QueueClient(serviceBusConnection, QUEUE_NAME, ReceiveMode.PeekLock, RetryPolicy.Default);

            var deadLetterMessage = await deadLetterReceiver.ReceiveAsync();

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = deadLetterMessage.Clone();

            resubmitMessage.UserProperties.Remove("DeadLetterReason");
            resubmitMessage.UserProperties.Remove("DeadLetterErrorDescription");
            
            await queueClient.SendAsync(resubmitMessage);
            await deadLetterReceiver.CompleteAsync(deadLetterMessage.SystemProperties.LockToken);            

            scope.Complete();            

There’s a few points to note in the above code:

FormatDeadLetterPath

FormatDeadLetterPath gives you the entity path for the dead letter queue, based on an entity.

Transaction Scope

The scope ensures that everything between its creation and completion happens as a single transaction. That is, if part of that fails, the whole thing fails. For example, you could add a throw new exception between the send and the complete, and the new message will not send.

We’re using the new C# 8 using statement – that is, it will apply to everything between it, and the end of the method.

ServiceBusConnection

There are several overloads for most of these methods, and typically, you can pass a connection string into the constructor – for example, MessageReceiver could be called like this:

new MessageReceiver(connectionString, QUEUE_NAME);

Typically, you can use this and it works exactly the same as if you established your own connection and passed that through; however, with a transaction, everything needs to share a connection. If they do not, then you may see an error such as this:

Transaction hasn’t been declared yet, or has already been discharged

Hence we’re creating the connection upfront.

References

https://blogs.infosupport.com/implementing-a-retry-pattern-for-azure-service-bus-with-topic-filters/

https://stackoverflow.com/questions/38784331/how-to-peek-the-deadletter-messages

https://github.com/Azure/azure-service-bus/pull/91

Azure Service Bus – Scheduled Message Delivery

Azure Service Bus sets itself apart from other message brokers by the dizzying array of additional and useful features that it provides out of the box. This particular one is really useful for things like scheduled e-mails. Let’s say, for example, that you’re an event organiser, and you want to notify people a few days before the event. This feature enables you to tell Service Bus to simply send a message at that time (you could have a simple Azure function that then picked up the message and sent an e-mail).

If you’re new to Service Bus, or using it with .Net, then start here.

NuGet

The basic NuGet package you’ll need is here:

Microsoft.Azure.ServiceBus

Reading the Service Bus Message

For the purpose of this post, we’ll just set-up a basic console application that sends and receives the message; let’s start with the read:

private static Task ReadMessageEvent(string connectionString)
{
    var queueClient = new QueueClient(connectionString, QUEUE_NAME);

    var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
    queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

    return Task.CompletedTask;
}

private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
{
    Console.WriteLine("Something bad happened!");
    return Task.CompletedTask;
}

private static Task handleMessage(Message message, CancellationToken cancellation)
{
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine("Message received: {0}", messageBody);

    return Task.CompletedTask;
}

There’s not much to say here – this event will simply print a message to the console when it’s received.

Schedule the Service Bus Message

Now that we’ve set up a method to receive the messages, let’s send one. You could add this to the same console app (obviously it would have to occur after the Read!)

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

string messageBody = $"{DateTime.Now}: Happy New Year! ({Guid.NewGuid()}) You won't get this until {dateTime}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));

long sequenceNumber = await queueClient.ScheduleMessageAsync(message, dateTime);
//await queueClient.CancelScheduledMessageAsync(sequenceNumber);

await queueClient.CloseAsync();

dateTime is simply the time that you wish to send the message; for example:

var dateTime = DateTime.UtcNow.AddSeconds(10)

Will send the message in 10 seconds.

The commented line above will then cancel the message from being sent – you only need to provide the sequence number (which you get from setting up the schedule in the first place).

References and A GitHub Example

For a working sample of this, please see here.

https://stackoverflow.com/questions/60437666/how-to-defer-a-azure-service-bus-message

Receiving Messages in Azure Service Bus

In this post I covered the basics of setting up a queue and sending a message to it. Here, I’m going to cover the options around receiving that message.

Essentially, there are two possibilities here: you can either set-up an event listener, or you can poll the queue directly, and receive the messages one at a time.

Option 1 – Events

The events option seems to be the one that Microsoft now prefer – essentially, you register a handler and then as the messages come in, you simply handle them inside an event. The code here looks something like this:

            var queueClient = new QueueClient(connectionString, "test-queue");

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
            queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

The event handlers:

        private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
        {
            Console.WriteLine("Something bad happened!");
            return Task.CompletedTask;
        }

        private static Task handleMessage(Message message, CancellationToken cancellation)
        {
            string messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Message received: {0}", messageBody);

            return Task.CompletedTask;
        }

Option 2 – Polling

With this option, you simply ask for a message. You’ll need to use the approach for things like deferred messages (which I hope to cover in a future post):

            var messageReceiver = new MessageReceiver(connectionString, "test-queue", ReceiveMode.ReceiveAndDelete);            
            var message = await messageReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);            
            Console.WriteLine("Message received: {0}", messageBody);

Option 3 – Option 1, but cruelly force it into option 2

I thought I’d include this, although I would strongly advise against using it in most cases. If you wish, you can register an event, but force the event into a procedural call, so that you can await it finishing. You can do this by using the TaskCompletionSource. First, declare a TaskCompletionSource in your code (somewhere accessible):

private static TaskCompletionSource<bool> _taskCompletionSource;

Then, in handleMessage (see above), when you’ve received the message you want, set the result:

            if (message.CorrelationId == correlationId)
            {
                await client.CompleteAsync(message.SystemProperties.LockToken);

                _taskCompletionSource.SetResult(true);
            }

Finally, after you’ve registered the message handler, just await this task:

queueClient.RegisterMessageHandler(
                (message, cancellationToken) => handleMessage(correlationId, queueClient, message, cancellationToken), 
                messageHandlerOptions);

await _taskCompletionSource.Task;

References

Advanced Features with Azure Service Bus

Sending a Service Bus Message Failed

While playing around with Azure Service Bus in .Net Core, I came across this error, and it had me stumped for a while:

Microsoft.Azure.ServiceBus.UnauthorizedException: ‘Put token failed. status-code: 401, status-description: InvalidSignature: The token has an invalid signature..’

Having had a look on the internet, everyone seemed to be of the opinion that such errors were caused by an invalid connection string. I checked my connection string a number of times and it seemed correct.

However, the sentiment is, in fact, the correct one. The actual cause of this was that I was using the connection string from a specific queue, while trying to connect to a topic. I’d copied the queue connection code, and for some reason, the connection string being different didn’t twig with me. Connection strings work in a hierarchical fashion. For example:

You can use the connection string from a namespace for all the queues and topics within that namespace; but you can only use the connection string for a queue only for that queue.