Category Archives: Azure Service Bus

Service Bus Batching

One annoying little issue that you come across when dealing with batch sending for Azure Service Bus if that the total size of the batched message is restricted to the same size as a single message. At the time of writing, this was 256KB for standard tier. What this means is that, if you have a message that’s, say, 500 bytes – you’re restricted to a batch of around 5 – 600 before you exceed the send limit.

You can batch these; and, in fact, Azure Service Bus has a little feature that allows you to do this: ServiceBusMessageBatch.

Let’s take the following code:

await using var serviceBusClient = new ServiceBusClient(connectionString);
var sender = serviceBusClient.CreateSender(topicname);
List<ServiceBusMessage> serviceBusMessages = new();    

string messageText = new string('a', 500);

for (int i = 1; i <= 1000; i++)
{
    serviceBusMessages.Add(
        new ServiceBusMessage(messageText);        
}
await sender.SendMessagesAsync(serviceBusMessages);

This code would crash, as it would exceed the message limit. However, the following code has the same effect, but would simply batch the messages into messages no larger than 256KB and send each as it was ready:

List<ServiceBusMessage> serviceBusMessages = new();
var serviceBusMessageBatch = await sender.CreateMessageBatchAsync();

for (int i = 1; i <= count; i++)
{
    if (!serviceBusMessageBatch.TryAddMessage(new ServiceBusMessage(messageText)))
    {
        await sender.SendMessagesAsync(serviceBusMessageBatch);
        serviceBusMessageBatch.Dispose();
        serviceBusMessageBatch = await sender.CreateMessageBatchAsync();
    }
}

await sender.SendMessagesAsync(serviceBusMessageBatch);

References

https://weblogs.asp.net/sfeldman/sb-safebatching

Sending Messages at Scale – Cannot allocate more handles

When sending many messages (thousands or tens of thousands) you may find that you run into errors that you may not see while testing with lower numbers of messages. One such error that I’ve encountered recently is this:

Azure.Messaging.ServiceBus.ServiceBusException: ‘Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)’

Some code to replicate the issue:

for (int i = 1; i <= 10000; i++)
{
    var sender = serviceBusClient.CreateSender("topic-name");

    string messageText = $"Test{i}--{DateTime.Now.ToString("yy-MM-dd")}";
    var msg = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));
    await sender.SendMessageAsync(msg);
}    

There is a limit on concurrent connections to the service bus, set at 4,999 at the time of writing; that is, you cannot have 5,000 connections at the same time. Every time a call to CreateSender is called, a new connection is made to the service bus. Because this is a tight loop, the connections cannot be cleaned up in time, and so it overwhelms the service.

There are three mechanisms for avoiding this. By far the best and easiest is to send the messages as a batch.

Sending Messages as a Batch

List<ServiceBusMessage> serviceBusMessages = new();    
for (int i = 1; i <= 10000; i++)
{
    string messageText = $"Test{i}--{DateTime.Now.ToString("yy-MM-dd")}";
    serviceBusMessages.Add(
       new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText)));        
}
await sender.SendMessagesAsync(serviceBusMessages);

This is far faster, and avoids 10,000 calls to the service. The caveat here is the size of the message – if the 10,000 messages exceed a single message size, then it may be necessary to batch the messages into groups of 1000, or even 100 – depending on the size of each message.

Cleaning up the Client

This mechanism is to forcibly dispose of the message factory each loop:

for (int i = 1; i <= count; i++)
{
    var sender = serviceBusClient.CreateSender("topic-name");

    string messageText = $"Test{i}--{DateTime.Now.ToString("yy-MM-dd")}";
    var msg = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));
    await sender.SendMessageAsync(msg);

    await sender.CloseAsync();
}    

Change the Scope of the Message Factory

The final method is to simply move the creation of the message factory outside the loop:

    var sender = serviceBusClient.CreateSender("topic-name");
    for (int i = 1; i <= count; i++)
    {
        string messageText = $"Test{i}--{DateTime.Now.ToString("yy-MM-dd")}";
        var msg = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));
        await sender.SendMessageAsync(msg);

    }    

Summary

If you’re dealing with large quantities of messages, then the batch send is by far the best option, as it not only avoids the issue with too many handles, but speeds up the whole process.

ServiceBusAdministrationClient Update Lock Duration Not Working

When you start to experiment with very large messages, or very large quantities of messages, there are times when the default lock duration of 30 seconds for Azure Service Bus can cause issues. In this post, I’ll show how this can be changed at a subscription or queue level using the Azure.Messaging.ServiceBus client.

Caveat

As with previous articles that I’ve written on Service Bus, I’m not advocating this as a desirable way to deal with such cases; however, times do arise when it makes sense to adjust this value.

The Code

The key here is the ServiceBusAdministrationClient. It allows you to get a reference to the subscription or queue:

    var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
    var sub = await serviceBusAdministrationClient.GetSubscriptionAsync("topic", "sub1");

You can then change something on that object; for example:

sub.Value.LockDuration = TimeSpan.FromSeconds(newDuration);

But it’s not updating

Somewhat counter intuitively, the final step is to update with the new object:

await serviceBusAdministrationClient.UpdateSubscriptionAsync(sub);

The subscription / queue should then update fine.

Summary

In this post, we’ve discussed how you can change the lock duration using the Azure Service Bus SDK. We’ve also shown how easy it can be to not realise that you need to explicitly update (ask me how I know!)

Compressing a Service Bus Message Using Gzip

I’ve recently been playing around at the edges of Azure Service Bus. One of the things that occurred to me was that, for a large message, you could improve speed and save money by simply compressing the message before sending.

The obvious downside to this approach is that your consumer needs to be aware that the message is compressed. The other issue is that there is an overhead to compression and decompression, both in size and in speed, so compressing a small message is pointless (or worse – it may actually increase the size and time to transmit).

That’s the why and why not, let’s move onto the how.

How to Send a Compressed Message

Let’s start with a re-cap on how to send a message without it being compressed:

async Task SendMessage(string connectionString, string topicName, string messageText)
{
    await using var serviceBusClient = new ServiceBusClient(connectionString);
    var sender = serviceBusClient.CreateSender(topicName);
    
    var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));

    await sender.SendMessageAsync(message);
}

Since this sends a message, which is just a stream of text, all we need to do now is compress that message. An easy way is to use GZipStream:

byte[] CompressBytes(byte[] bytes)
{    
    using var memoryStream = new MemoryStream();
    using var gzipStream = new GZipStream(memoryStream, CompressionLevel.SmallestSize);
    gzipStream.Write(bytes, 0, bytes.Length);
    gzipStream.Close();

    return memoryStream.ToArray();
}

This method takes an array of bytes, and runs them through the GZipStream to apply the Gzip compression algorithm on it. All we now need to do is to convert our string to a byte array – we can use something like the following, which also converts the resulting string to Base64 – meaning we can transport it easier:

string CompressMessage(string toCompress)
{
    var bytes = Encoding.UTF8.GetBytes(toCompress);

    var returnBytes = CompressBytes(bytes);

    string returnValue = Convert.ToBase64String(returnBytes);
    return returnValue;
}

We now have all the building blocks, we can simply assemble them in the correct order:

var compressed = CompressMessage(largeMessage);
await SendMessage(connectionString, "compressed-message", compressed);

All that’s left is to receive the message.

How to Receive a Compressed Message

Again, let’s start with a basic method to receive the next message on the service bus:

async Task<string> ConsumeNextMessage(string topic)
{
    await using var serviceBusClient = new ServiceBusClient(connectionString);
    var receiver = serviceBusClient.CreateReceiver(topic, "sub-1");
    var message = await receiver.ReceiveMessageAsync();
    return message.Body.ToString();
}

This will simply read the next message for the given subscription and return its contents. As with the send, we simply need the other building blocks; we’ll start with decompressing the bytes:

byte[] DecompressBytes(byte[] bytes)
{    
    using var memoryStream = new MemoryStream(bytes);
    using var outputStream = new MemoryStream();
    using var decompressStream = new GZipStream(memoryStream, CompressionMode.Decompress);
    decompressStream.CopyTo(outputStream);
    var returnBytes = outputStream.ToArray();
    return returnBytes;
}

Again, we’re using GZipStream to return a decompressed version of the data. We’ll call that in the same manner as above:

string DecompressMessage(string compressed)
{
    var bytes = Convert.FromBase64String(compressed);
    
    var returnBytes = DecompressBytes(bytes);
    return Encoding.UTF8.GetString(returnBytes);
}

It’s worth remembering that we’re transporting the compressed data using Base64, so we need to convert it from that first. Finally, we can assemble the methods:

var message = await ConsumeNextMessage("compressed-message");
var decompressed = DecompressMessage(message);

What difference does it make?

Is it really worth all this message around? The answer is probably an emphatic no. However, in a very small subset of cases, this might mean that you can continue on the standard tier, rather than upgrade to premium.

This also helps with speed – sending a 70MB message takes around 50 – 55 seconds in my tests. The same message compressed was around 5MB and took only 5 seconds to send.

References

https://docs.microsoft.com/en-us/dotnet/api/system.io.compression.gzipstream?view=net-6.0

https://www.infoworld.com/article/3660629/how-to-compress-and-decompress-strings-in-c-sharp.html

Transmitting an Image via Azure Service Bus

This post in a continuation of this earlier post around serialising an image. Once you’re able to serialise an image, then you can transmit that image. In this post, we’ll see how we could do that using Azure Service Bus.

A Recap on Image Serialisation

In the previous (linked) post, we saw how we can turn an image into a string of text, and back again. Essentially, what we did was use a BinaryWriter to write the file as a binary stream, and a BinaryReader to turn it back to an image.

The plan here is to do exactly the same thing, but to simply write the stream to an Azure Service Bus Message, and read from it at the other side.

Size Matters

One thing that you’re going to need to be aware of here is the size of the image. The free tier of Service Bus limits you to 256KB. Serialising an image to stream can be less than that, but unless you know different, you should assume that it will be bigger. Even after you sign up for the much more expensive premium tier, when you set-up the topic or queue, you’ll need to specify a maximum size. Images can be very big!

Alternatives

To be clear: the purpose of this post is to demonstrate that you can transmit an image via Service Bus – not that you should. There are other ways to do this: for example, you could upload the image to blob storage, or an S3 bucket, and then just point to that in the message. The one advantage transmitting the image with the message does give you, is that the binary data lives and dies with the message itself – so depending on what you do with the message after receipt, that may be an advantage.

Transmitting the Message

The following is a simple helper method that will send the message for us:

async Task SendMessage(string connectionString, string topicName, string messageText)
{    
    int byteCount = System.Text.ASCIIEncoding.ASCII.GetByteCount(messageText);

    await using var serviceBusClient = new ServiceBusClient(connectionString);
    var sender = serviceBusClient.CreateSender(topicName);
    
    var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));

    await sender.SendMessageAsync(message);
}

byteCount tells you how big the message is going to be. Useful when you’re writing a blog post about such things, but you may find it useful for debugging.

The rest is pretty basic Azure Service Bus SDK stuff. I’ve written about this previously in more depth.

We can then combine this with the code from the last post:

        var serialisedToSend = SerialiseImageToBinary(
            @"c:\tmp\myimage.png");
        await SendLargeMessage(connectionString, "image-message", serialisedToSend);

The next step is to receive the message at the other side.

Consuming the Message

As with the send, we have a helper method here:

async Task<string> ConsumeNextMessage(string topic)
{
    var serviceBusClient = new ServiceBusClient(connectionString);
    var receiver = serviceBusClient.CreateReceiver(topic, "sub-1");
    var message = await receiver.ReceiveMessageAsync();
    return message.Body.ToString();
}

Again, I’ve written about receiving a message using the SDK before.

Here, we do the reverse of the send:

        var serialisedToReceive = await ConsumeNextMessage("image-message");

        DeserialiseImageFromBinary(
            serialisedToReceive,
            @$"c:\tmp\newimg{DateTime.Now.ToString("yy-MM-dd-HH-mm-ss")}.png");

I’ve used the date so that I could test this multiple times – other than that, we’re receiving the serialised image, and then writing it to disk.

Summary

I’ll re-iterate the sentiment that I’m not advocating this as a good or preferable way to transmit images, simply highlighting that it is possible with relatively little work.

Listing all topics and subscriptions in an Azure Service Bus Namespace

For anyone that follows this blog, you’ll notice that Azure Service Bus is one of my favourite topics. In this post, we’re going to see how you can list all subscriptions to all topics within a namespace.

var serviceBusClient = new ServiceBusClient(connectionString);

var serviceBusAdminClient = new ServiceBusAdministrationClient(connectionString);
var topics = serviceBusAdminClient.GetTopicsAsync();
await foreach (var topic in topics)
{
    var subs = serviceBusAdminClient.GetSubscriptionsAsync(topic.Name);
    await foreach (var sub in subs)
    {
        Console.WriteLine($"{sub.TopicName}: {sub.SubscriptionName}");
    }
}

We’re using the latest service bus library, and the ServiceBusAdministrationClient, which lets us traverse the topics, and the subscriptions within them.

References

https://www.pmichaels.net/2021/06/26/receiving-a-message-using-azure-messaging-servicebus/

Reading an Azure Dead Letter Queue with Azure.Messaging.ServiceBus

Some time ago, I wrote about how you can read the dead letter queue. I also wrote this post on the evolution of Azure Service Bus libraries.

By way of a recap, a Dead Letter Queue is a sub queue that allows for a message that would otherwise clog up a queue (for example, it can’t be processed for some reason) to go to the dead letter queue and be removed from processing.

In this post, I’ll cover how you can read a dead letter queue using the new Azure.Messaging.ServiceBus library.

Force a Dead Letter Message

There are basically two ways that a message can end up in a dead letter queue: either it breaks the rules (too many retries, too many redirects, etc.), or it is explicitly placed in a dead letter queue. To do the latter, the process is as follows:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var messageReceiver = serviceBusClient.CreateReceiver(QUEUE_NAME);
            var message = await messageReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            await messageReceiver.DeadLetterMessageAsync(message, "Really bad message");

The main difference here (other than that the previous method was DeadLetterAsync) is that you pass the entire message, rather than just the lock token.

Reading a Dead Letter Message

There’s a few quirks here – firstly, the dead letter reason, delivery count, etc. were part of an array known as SystemProperties, whereas they are now just properties – which does make them far more accessible and discoverable. Here’s the code to read the dead letter queue:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            
            var message = await deadLetterReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);

            // Previous versions had these as properties
            // https://www.pmichaels.net/2021/01/23/read-the-dead-letter-queue/
            if (!string.IsNullOrWhiteSpace(message.DeadLetterReason))
            {
                Console.WriteLine("Reason: {0} ", message.DeadLetterReason);
            }
            if (!string.IsNullOrWhiteSpace(message.DeadLetterErrorDescription))
            {
                Console.WriteLine("Description: {0} ", message.DeadLetterErrorDescription);
            }

            Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.DeliveryCount}");

Again, most of the changes are simply naming. It’s worth mentioning the FormatDeadLetterPath() function. This was previously part of a static helper class EntityNameHelper; here, I’ve tried to replicate that behaviour locally (as it seems to have been removed):

        private static string QUEUE_NAME = "dead-letter-demo";
        private static string DEAD_LETTER_PATH = "$deadletterqueue";
        
        static string FormatDeadLetterPath() =>
            $"{QUEUE_NAME}/{DEAD_LETTER_PATH}";

Resubmitting a Dead Letter Message

This is something that I covered in my original post on this. It’s not inbuilt behaviour – but you basically copy the message and re-submit. In fact, this is much, much easier now:

            var serviceBusClient = new ServiceBusClient(connectionString);

            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            var sender = serviceBusClient.CreateSender(QUEUE_NAME);

            var deadLetterMessage = await deadLetterReceiver.ReceiveMessageAsync();            

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = new ServiceBusMessage(deadLetterMessage);
            await sender.SendMessageAsync(resubmitMessage);
            //throw new Exception("aa"); // - to prove the transaction
            await deadLetterReceiver.CompleteMessageAsync(deadLetterMessage);

            scope.Complete();            

Most of what’s here has previously been covered; the old Message.Clone is now much neater (but slightly less obvious) in that you simply pass the old method in as a constructor parameter. Because the Dead Letter Reason, et al, are now properties, there’s no longer a need to manually deal with them not getting copied across.

The transaction simply checks that either the dead letter message is correctly re-submitted, or it remains a dead letter.

Summary

The new library makes the code much more concise and discoverable. We’ve seen how to force a dead letter; how to receive and view the contents of the Dead Letter Queue, and finally, how to resubmit a dead lettered message.

Receiving a Message Using Azure.Messaging.ServiceBus

Azure.Messaging.ServiceBus is the latest SDK library that allows you to interface with Azure Service Bus.

In this post I wrote about receiving a message in Azure Service Bus using the Microsoft.Azure.ServiceBus library. Here, I’ll cover the method of receiving a message using Azure.Messaging.ServiceBus.

The first step is to create a ServiceBusClient instance:

_serviceBusClient = new ServiceBusClient(connectionString);

Once you’ve created this, the subsequent classes are created from there. This library draws a distinction between a message receiver and a message processor – the latter being event driven.

Receiving a Message

To receive a message:

            var messageReceiver = _serviceBusClient.CreateReceiver(QUEUE_NAME);            
            var message = await messageReceiver.ReceiveMessageAsync();

            //string messageBody = Encoding.UTF8.GetString(message.Body);
            string messageBody = message.Body.ToString();

It’s worth noting here that it is no longer necessary to decode the message body explicitly.

Processing a Message

This is the new version of registering a handler for the event, and it has a few additional features. Let’s see the code:

            var processor = _serviceBusClient.CreateProcessor(QUEUE_NAME);
            processor.ProcessMessageAsync += handleMessage;
            processor.ProcessErrorAsync += ExceptionHandler;

            await processor.StartProcessingAsync();                        

            await Task.Delay(2000);
            await processor.StopProcessingAsync();

We won’t worry too much about the events themselves for now, but the important events are StartProcessingAsync and StopProcessingAsync. Note that here we have a 2 second delay – this means that we will receive messages for two seconds, and then stop; obviously the start and stop don’t need to be in the same method.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

Azure Service Bus SDK Libraries

I’ve written pretty extensively on the Microsoft.Azure.ServiceBus SDK. In this post, I’m just covering the fact that this library is on its way to deprecation (don’t panic, its predecessor has been hanging around since 2011!)

Let’s see what these libraries are and some links.

WindowsAzure.ServiceBus

This library does look like it’s on its way to being deprecated. It supports .Net Framework only.

The NuGet package is here, but it’s closed source:

https://www.nuget.org/packages/WindowsAzure.ServiceBus

Microsoft.Azure.ServiceBus

This library was introduced to support .Net Core.

The NuGet package is here:

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

The code for this is open source:

https://github.com/Azure/azure-service-bus-dotnet

Azure.Messaging.ServiceBus

If you read Sean Feldman’s article here (which this was heavily based on), you’ll see that this seems to be due to some restructuring of teams. The code has changed, and MS say it’s more consistent (although what with, I’m unsure).

The NuGet Package is here:

https://www.nuget.org/packages/Azure.Messaging.ServiceBus

The source code for this is here:

https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus

References

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/the-future-of-asb-dotnet-sdk