Category Archives: Azure Service Bus

Service Bus Batching and Pre-Fetch

Azure Service Bus is often used as a way to link services (as the name suggests). This typically means that the throughput is large, but not so large that anyone worries about the speed. However, in this post, I’m going to cover some techniques to speed up service bus messaging.

A quick disclaimer here, though: it might be that if you’re reading this then you’re not using the best tool for the job, and perhaps something more lightweight might be a better fit.

Essentially, there are two inbuilt methods in Service Bus to do this: Batching, and Prefetch; let’s look at Batching first.

Batching

The principle behind batching is very simple: instead of making multiple calls to Azure Service Bus with multiple messages, you make one call with all the messages that you wish to send.

Let’s see some code that sends 1000 messages:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

    await queueClient.SendAsync(message);
}
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

The timing for these posts is not a benchmark test, but whatever my test recorded at the time – your mileage may vary a lot, although I would say they are a fair rough estimation. As you’ll see later on, the differences are very stark.

In my tests, sending 1000 messages took around 30 seconds (which is good enough for most scenarios, as they are unlikely to be sent in a batch such as this). Let’s see how we can batch the messages to speed things up:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);
var messages = new List<Message>();

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));                

    messages.Add(message);
}
await queueClient.SendAsync(messages);
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

As you can see, there’s not a huge amount of change to the code. It’s worth noting that in the first example, if the code crashes half way through the send, half the messages would be sent – whereas with the batch, it’s all or nothing. However, this came in at under a second:

On the receive side, we have a similar thing.

Batch Receive

Here, we can receive messages in chunks, instead of one at a time. Again, let’s see how we might receive 1000 messages (there are multiple ways to do this):

var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
for (int i = 0; i < 1000; i++)
{
    var message = await messageReceiver.ReceiveAsync();
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

The execution here takes around 127 seconds (over 2 minutes) in my tests:

The same is true for batch receipt as for batch send; with a slight caveat:

var stopwatch = new Stopwatch();
stopwatch.Start();
Int count = 1000;
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    var messages = await messageReceiver.ReceiveAsync(remainingCount);

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

Note that the CompleteAsync can also be called in batch.

You may be wondering here what the while loop is all about. In fact, it’s because batch receive isn’t guaranteed to return the exact number of messages that you request. However, we still brought the receive time down to around 10 seconds:

A Note on Batching and Timeouts

It’s worth bearing in mind that when you retrieve a batch of messages, you’re doing just that – retrieving them. In a PeekLock scenario, they are now locked; and, if you don’t complete or abandon them, they will time out like any other message. If you have a large number of messages, you may need to extend the timeout; for example:

var messages = await messageReceiver.ReceiveAsync(remainingCount, TimeSpan.FromSeconds(20));

In the next section, we’ll discuss the second technique, of allowing the service bus to “run ahead” and get messages before you actually request them.

Prefetch

Prefetch speeds up the retrieval of messages by getting Azure Service Bus to return messages ahead of them being needed. This presents a problem (similar to receiving in batch), which is that the system is actually retrieving messages on your behalf before you ask for them. In this example, we’ve been using PeekLock – that is, the message is left on the queue until we explicitly complete it. However, once you Peek the message, it’s locked. That means that with the code above, we can easily trip ourselves up.

int count = 1000;
var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
messageReceiver.PrefetchCount = prefetchCount;
for (int i = 0; i < count; i++)
{
    var message = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(60));
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

Note the extended timeout on the Receive allows for the prefetched messages to complete.

Here’s the timing for Prefetch:

This is slightly quicker than processing the messages one at a time, but much slower than a batch. The main reason being that the complete takes the bulk of the time.

Remember that with Prefetch, if you’re using PeekLock, once you’ve pre-fetched a message, the timeout on the lock starts – this means that if you’re lock is for 5 seconds, and you’ve prefetched 500 records – you need to be sure that you’ll get around to them in time.

ReceiveAndDelete

Whilst the Prefetch messages timing out may be bad, with ReceiveAndDelete, they are taken off the queue, this means that you can consume the messages without ever actually seeing them!

Prefetch with Batch

Here, we can try to use the prefetch and batch combined:

int count = 1000
var stopwatch = new Stopwatch();
stopwatch.Start();
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    messageReceiver.PrefetchCount = prefetchCount;
    var messages = await messageReceiver.ReceiveAsync(remainingCount);
    if (messages == null) break;

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

In fact, in my tests, the timing for this was around the same as a batch receipt:

There may be some advantages with much higher numbers, but generally, combining the two in this manner doesn’t seem to provide much benefit.

References

https://www.planetgeek.ch/2020/04/27/azure-service-bus-net-sdk-deep-dive-sender-side-batching/

https://markheath.net/post/speed-up-azure-service-bus-with-batching

https://github.com/Azure/azure-service-bus-dotnet/issues/441

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/understanding-Azure-service-bus-prefetch

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch

Azure Service Bus – Auto Delete on Idle

In the past few months, I’ve written, and spoken quite a lot about Azure Service Bus – especially the new features that are now available out of the box. here I wrote about Auto-forwarding, and in this post, I’m going to cover auto-delete; I think the two subjects go hand-in-hand.

The idea behind auto-delete is that if a set of messages are time sensitive, then the message queue can be flagged for auto-deletion. This is not the same as “Time To Live” for a message, because the entire queue is deleted after a period of inactivity!

Let’s see how we can set-up a new queue, in code (see the referenced post above for the NuGet packages that you’ll need for this):

        private static async Task CreateAutoDeleteQueue(string connectionString, string queueName)
        {
            // Create authorisation rules
            var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

            var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
          
            var options = new CreateQueueOptions(queueName)
            {
                AutoDeleteOnIdle = TimeSpan.FromMinutes(5)
            };
            options.AuthorizationRules.Add(authorisationRule);

            var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

        }

We can now post a number of messages to that queue:

Then, 5 minutes after the last message have been posted (or any activity has taken place), the queue is deleted:

References

https://www.serverless360.com/blog/hidden-gems-azure-service-bus

Service Bus Management And Auto Forwarding

In preparation for a talk I’m giving, I started looking into ways that you can create Azure Service Bus queues programatically (If you want to see some of the other posts that came out of the research for this talk, they’re here and here).

In fact, there are a few different ways, and it’s very difficult to cut through the confusion to work out which method is the new way. In this article, Microsoft recommend the method that I cover in this post.

In this post, I’ll cover creating a new message queue in code, and configuring it to auto-forward to a second queue.

The Old Way

If you go back to some of my earlier posts on Azure Service Bus, you’ll see that an object called the QueueDescription was used excessively (here and here). In the newer version of the SDK that’s no longer used, and management has been separated from the usage of the service bus. This seems to make sense, when you consider that most people using the Service Bus will not need to change the structure of it; however, there were times when having the ability to create a queue that didn’t exist would make for more resilient software.

The New Way(s)

Microsoft have replaced this with, at the time of writing, three separate alternatives. This has made the whole process very confusing – especially since some of the Microsoft documentation still talks about using QueueDescription! They, however, recommend using the Azure.Messaging.ServiceBus package.

At the time of writing, that was 7.0.1:

<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.0.1" />

Creating a Queue

One of the advantages here is that you get very granular control over exactly what you’re creating; but that also means that you’re responsible for creating things such as access policies. Let’s try the following code:

var authorisationRule = new SharedAccessAuthorizationRule(
    "manage", new[] { AccessRights.Manage });

var options = new CreateQueueOptions(source);
options.AuthorizationRules.Add(authorisationRule);
            
var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

In fact, this causes the following error on run:

System.ArgumentException: ‘Manage permission should also include Send and Listen’

We’re now in a world of open source, which means we can see why this doesn’t work by looking at the code. In fact, if you specify Manage privilege, it expects three permissions (that is, you must also allow Listen and Send). The working code looks like this:

var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

var options = new CreateQueueOptions(source);
options.AuthorizationRules.Add(authorisationRule);
            
var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

In fact, the reason that I started to look into queue management, was that I wanted to see if I could configure a queue with auto forwarding, without using the Azure Portal or the Service Bus Explorer.

Auto Forwarding

In fact, it’s actually quite easy. Once you start playing with the SDK, you’ll see that you can use CreateQueueOptions to specify most queue features:

var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);

var optionsDest = new CreateQueueOptions(destination);
optionsDest.AuthorizationRules.Add(authorisationRule);
var queueDest = await serviceBusAdministrationClient.CreateQueueAsync(optionsDest);

var options = new CreateQueueOptions(source)
{
    ForwardTo = destination                
};
options.AuthorizationRules.Add(authorisationRule);
            
var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

Here, we’re setting up two queues (you can only forward to a queue that exists), referred to in the string variables source and destination, then we simply set the ForwardTo property of the second queue.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-management-libraries

https://www.serverless360.com/blog/auto-forwarding-a-hidden-gem-of-service-bus

Deferred Messages in Azure Service Bus

In Azure Service Bus, you can schedule a message to deliver at a later time, but you can also defer a message until a later time.

Scheduled Versus Deferred Messages

The difference here is subtle, but important: when you schedule a message, you’re telling the Service Bus to deliver that message at a time of your choosing, when you defer a message, you telling the Service Bus to hang onto a message that has been sent, until such time as you’re ready to receive it.

Why Would you Defer a Message?

The idea here is that you are not ready for the message – but you don’t want to hold up the queue. In this respect, it’s a little like the dead letter concept; that is, there is a message that’s essentially holding up the queue – however, in this case, there’s nothing wrong with the message itself.

Let’s imagine that we receive a message that a sales order has been created – we go to get the customer information for the sales order, and we find that the customer has yet to be created (such things are possible when you start engaging in eventually consistent systems): in this case, you could defer the message, and come back to it when the customer has been created.

Some Code – How to Defer a Message

Deferring a message is actually very simple:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);
var message = await messageReceiver.ReceiveAsync();

var sequenceNumber = message.SystemProperties.SequenceNumber;
await messageReceiver.DeferAsync(message.SystemProperties.LockToken);

There’s three important concepts here:
1. The sequence number is very important: without it, the message is effectively lost; that’s because of (2)
2. You can receive a message after this, and you will never see the deferred message again until you purposely receive it, which brings us to (3)
3. To retrieve this message, you must explicitly ask for it.

To receive the deferred message you simply pass in the sequence number:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);            
var message = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

The deferred message will never time out. Messages have a “Time to Live”, after which they get moved to the Dead Letter Queue; but once a message is deferred, it will live forever, and must be received to remove it.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-deferral

Read the Dead Letter Queue

I’ve been writing about and speaking about Azure Service Bus a lot recently.

In this post, I’m going to focus on the Dead Letter Queue in more detail.

What is the Dead Letter Queue, and what has it ever done for me?

To describe what the dead letter queue does, I invite you to think about an assembly line for a car. The car in question has just come through to have a bonnet fitted (hood for any American readers). However, the guy that’s fitting the bonnet can’t get it to sit right in the hinges; he tries and tries, but it won’t fit. After a while, he goes to get his superviser, and they both try. They draft the workers in from all over the plant, but can’t get the bonnet fitted.

Meanwhile, the entire assembly line has stopped. The person that fits the steering wheel is behind the bonnet fitter, and there’s no space for him to move the car that he’s just fitted the wheel to; the dashboard fitter can’t pass onto the steering wheel, and so on.

(I have no knowledge of what a car assembly line looks like, outside of the film Christine, so apologies if this is incorrect).

A message that can’t be processed is often called a poison message, and it causes exactly this problem. The Service Bus can’t deliver any messages until this message has gone, and this message can’t go, because there’s something wrong with it. The solution is to have a dedicated queue that holds these messages: it’s called a Dead Letter Queue – it’s kind of like a holding bay for the car.

Why would a message be “poison”

There are a few reasons that a message can be considered “poison” and dead lettered; some of the most common are:

– Each queue has a maximum delivery count, if it’s exceeded – that is, we’ve tried too many times to process it
– The message can be explicitly marked as bad by the client
– The size of the message is bigger than the allocated maximum size
– The message has been “auto-forwarded” too many times

Essentially, the system tries to work out whether this message is staying around too long and causing issues with the system. It’s important to know, though, that the dead letter queue is just another queue. The message isn’t lost – just side-lined.

Dead Lettering

Let’s see how we can force a message into a dead letter queue. The easiest way to do this is to explicitly just Dead Letter the message; for example:

            var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
            var message = await messageReceiver.ReceiveAsync();

            await messageReceiver.DeadLetterAsync(message.SystemProperties.LockToken, "Really bad message");

Here, we’ve read the message, and then told Service Bus to just Dead Letter it. In real life, you may choose to do this on rare occasions, but I imagine its main use is for testing.

Abandon the Message

Another way to cause a message to be dead lettered is to exceed the Max Delivery Count. You can do this by “abandoning” the message multiple times; for example:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
var message = await messageReceiver.ReceiveAsync();

string messageBody = Encoding.UTF8.GetString(message.Body);

Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.SystemProperties.DeliveryCount}");
await messageReceiver.AbandonAsync(message.SystemProperties.LockToken);

Here, we’re reading the message, and rather than completing it, we’re abandoning it. It’s worth bearing in mind that this is what happens when you abandon a message. It’s also what happens when you read a message and just implicitly abandon it (i.e., you read it on a PeekLock and then do nothing): the AbandonAsync method doesn’t actually change the functionality of the code above – it does change the speed, though.

Reading The Dead Letter Queue

Now that we’ve dead-lettered a message, we can read the Dead Letter Queue.

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
            
            var message = await deadLetterReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);
            if (message.UserProperties.ContainsKey("DeadLetterReason"))
            {
                Console.WriteLine("Reason: {0} ", message.UserProperties["DeadLetterReason"]);
            }
            if (message.UserProperties.ContainsKey("DeadLetterErrorDescription"))
            {
                Console.WriteLine("Description: {0} ", message.UserProperties["DeadLetterErrorDescription"]);
            }

The code above sets up a MessageReceiver for the dead letter queue. The delivery count inside the dead letter queue does not increase, but it does retain the number that it had from the original queue. Effectively, all you can do with a Dead Letter message is to complete it.

DeadLetterReason

When a message is dead lettered, the properties DeadLetterReason and DeadLetterErrorDescription may get added to the message. If you forcibly dead letter the message then you have the option to add this: if you choose not to then it will not be present (hence the checks around the properties), but mostly, these will be available.

Re-submitting a Message and Transactions

We’ve now seen how to cause a message to Dead Letter, and read the Dead Letter queue; next we’re going to investigate re-submitting the message.

As a quick side not – you can’t really re-submit a message – as you’ll see, what we actually do is to complete the dead letter message, and send a copy back to the queue.

            var serviceBusConnection = new ServiceBusConnection(connectionString);

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(serviceBusConnection, deadletterPath, ReceiveMode.PeekLock);
            
            var queueClient = new QueueClient(serviceBusConnection, QUEUE_NAME, ReceiveMode.PeekLock, RetryPolicy.Default);

            var deadLetterMessage = await deadLetterReceiver.ReceiveAsync();

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = deadLetterMessage.Clone();

            resubmitMessage.UserProperties.Remove("DeadLetterReason");
            resubmitMessage.UserProperties.Remove("DeadLetterErrorDescription");
            
            await queueClient.SendAsync(resubmitMessage);
            await deadLetterReceiver.CompleteAsync(deadLetterMessage.SystemProperties.LockToken);            

            scope.Complete();            

There’s a few points to note in the above code:

FormatDeadLetterPath

FormatDeadLetterPath gives you the entity path for the dead letter queue, based on an entity.

Transaction Scope

The scope ensures that everything between its creation and completion happens as a single transaction. That is, if part of that fails, the whole thing fails. For example, you could add a throw new exception between the send and the complete, and the new message will not send.

We’re using the new C# 8 using statement – that is, it will apply to everything between it, and the end of the method.

ServiceBusConnection

There are several overloads for most of these methods, and typically, you can pass a connection string into the constructor – for example, MessageReceiver could be called like this:

new MessageReceiver(connectionString, QUEUE_NAME);

Typically, you can use this and it works exactly the same as if you established your own connection and passed that through; however, with a transaction, everything needs to share a connection. If they do not, then you may see an error such as this:

Transaction hasn’t been declared yet, or has already been discharged

Hence we’re creating the connection upfront.

References

https://blogs.infosupport.com/implementing-a-retry-pattern-for-azure-service-bus-with-topic-filters/

https://stackoverflow.com/questions/38784331/how-to-peek-the-deadletter-messages

https://github.com/Azure/azure-service-bus/pull/91

Azure Service Bus – Scheduled Message Delivery

Azure Service Bus sets itself apart from other message brokers by the dizzying array of additional and useful features that it provides out of the box. This particular one is really useful for things like scheduled e-mails. Let’s say, for example, that you’re an event organiser, and you want to notify people a few days before the event. This feature enables you to tell Service Bus to simply send a message at that time (you could have a simple Azure function that then picked up the message and sent an e-mail).

If you’re new to Service Bus, or using it with .Net, then start here.

NuGet

The basic NuGet package you’ll need is here:

Microsoft.Azure.ServiceBus

Reading the Service Bus Message

For the purpose of this post, we’ll just set-up a basic console application that sends and receives the message; let’s start with the read:

private static Task ReadMessageEvent(string connectionString)
{
    var queueClient = new QueueClient(connectionString, QUEUE_NAME);

    var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
    queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

    return Task.CompletedTask;
}

private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
{
    Console.WriteLine("Something bad happened!");
    return Task.CompletedTask;
}

private static Task handleMessage(Message message, CancellationToken cancellation)
{
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine("Message received: {0}", messageBody);

    return Task.CompletedTask;
}

There’s not much to say here – this event will simply print a message to the console when it’s received.

Schedule the Service Bus Message

Now that we’ve set up a method to receive the messages, let’s send one. You could add this to the same console app (obviously it would have to occur after the Read!)

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

string messageBody = $"{DateTime.Now}: Happy New Year! ({Guid.NewGuid()}) You won't get this until {dateTime}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));

long sequenceNumber = await queueClient.ScheduleMessageAsync(message, dateTime);
//await queueClient.CancelScheduledMessageAsync(sequenceNumber);

await queueClient.CloseAsync();

dateTime is simply the time that you wish to send the message; for example:

var dateTime = DateTime.UtcNow.AddSeconds(10)

Will send the message in 10 seconds.

The commented line above will then cancel the message from being sent – you only need to provide the sequence number (which you get from setting up the schedule in the first place).

References and A GitHub Example

For a working sample of this, please see here.

https://stackoverflow.com/questions/60437666/how-to-defer-a-azure-service-bus-message

Receiving Messages in Azure Service Bus

In this post I covered the basics of setting up a queue and sending a message to it. Here, I’m going to cover the options around receiving that message.

Essentially, there are two possibilities here: you can either set-up an event listener, or you can poll the queue directly, and receive the messages one at a time.

Option 1 – Events

The events option seems to be the one that Microsoft now prefer – essentially, you register a handler and then as the messages come in, you simply handle them inside an event. The code here looks something like this:

            var queueClient = new QueueClient(connectionString, "test-queue");

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
            queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

The event handlers:

        private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
        {
            Console.WriteLine("Something bad happened!");
            return Task.CompletedTask;
        }

        private static Task handleMessage(Message message, CancellationToken cancellation)
        {
            string messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Message received: {0}", messageBody);

            return Task.CompletedTask;
        }

Option 2 – Polling

With this option, you simply ask for a message. You’ll need to use the approach for things like deferred messages (which I hope to cover in a future post):

            var messageReceiver = new MessageReceiver(connectionString, "test-queue", ReceiveMode.ReceiveAndDelete);            
            var message = await messageReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);            
            Console.WriteLine("Message received: {0}", messageBody);

Option 3 – Option 1, but cruelly force it into option 2

I thought I’d include this, although I would strongly advise against using it in most cases. If you wish, you can register an event, but force the event into a procedural call, so that you can await it finishing. You can do this by using the TaskCompletionSource. First, declare a TaskCompletionSource in your code (somewhere accessible):

private static TaskCompletionSource<bool> _taskCompletionSource;

Then, in handleMessage (see above), when you’ve received the message you want, set the result:

            if (message.CorrelationId == correlationId)
            {
                await client.CompleteAsync(message.SystemProperties.LockToken);

                _taskCompletionSource.SetResult(true);
            }

Finally, after you’ve registered the message handler, just await this task:

queueClient.RegisterMessageHandler(
                (message, cancellationToken) => handleMessage(correlationId, queueClient, message, cancellationToken), 
                messageHandlerOptions);

await _taskCompletionSource.Task;

References

Advanced Features with Azure Service Bus

Sending a Service Bus Message Failed

While playing around with Azure Service Bus in .Net Core, I came across this error, and it had me stumped for a while:

Microsoft.Azure.ServiceBus.UnauthorizedException: ‘Put token failed. status-code: 401, status-description: InvalidSignature: The token has an invalid signature..’

Having had a look on the internet, everyone seemed to be of the opinion that such errors were caused by an invalid connection string. I checked my connection string a number of times and it seemed correct.

However, the sentiment is, in fact, the correct one. The actual cause of this was that I was using the connection string from a specific queue, while trying to connect to a topic. I’d copied the queue connection code, and for some reason, the connection string being different didn’t twig with me. Connection strings work in a hierarchical fashion. For example:

You can use the connection string from a namespace for all the queues and topics within that namespace; but you can only use the connection string for a queue only for that queue.

What can you do with a logic app? Part three – Creating a Logic App Client

One of the things that are missing from Azure Logic apps is the ability to integrate human interaction. Microsoft do have their own version of an interactive workflow (PowerApps), which is (obviously) far better than what you can produce by following this post.

In this post, we’ll create a very basic client for a logic app. Obviously, with some thought, this could easily be extended to allow a fully functional, interactive, workflow system.

Basic Logic App

Let’s start by designing our logic app. The app in question is going to be a very simple one. It’s format is going to be that it will add a message to a logging queue (just so it has something to do), then we’ll ask the user a question; and we’ll do this by putting a message onto a topic: left or right. Based on the user’s response, we’ll either write a message to the queue saying left, or right. Let’s have a look at our Logic App design:

It’s worth pointing out a few things about this design:
1. The condition uses the expression base64ToString() to convert the encoded message into plain text.
2. Where the workflow picks up, it uses a peek-lock, and then completes the message at the end. It looks like it’s a ‘feature’ of logic apps that an automatic complete on this trigger will not actually complete the message (plus, this is actually a better design).

Queues and Topics

The “Log to message queue” action above is putting an entry into a queue; so a quick note about why we’re using a queue for logging, and a topic for the interaction with the user. In a real life version of this system, we might have many users, but they might all want to perform the same action. Let’s say that they all are part of a sales process, and the actions are actually actions along that process; adding these to a queue maintains their sequence. Here’s the queue and topic layout that I’m using for this post:

Multiple Triggers

As you can see, we actually have two triggers in this workflow. The first starts the workflow (so we’ll drop a message into the topic to start it), and the second waits for a second message to go into the topic.

To add a trigger part way through the workflow, simply add an action, search and select “Triggers”:

Because we have a trigger part way through the workflow, what we have effectively issued here is an await statement. Once a message appears in the subscription, the workflow will continue where it left off:

As soon as a message is posted, the workflow carries on:

Client Application

For the client application, we could simply use the Service Bus Explorer (in fact, the screenshots above were taken from using this to simulate messages in the topic). However, the point of this post is to create a client, and so we will… although we’ll just create a basic console app for now.

We need the client to do two things: read from a topic subscription, and write to a topic. I haven’t exactly been here before, but I will be heavily plagiarising from here, here, and here.

Let’s create a console application:

Once that’s done, we’ll need the service bus client library: Install it from here.

The code is generally quite straight-forward, and looks a lot like the code to read and write to queues. The big difference is that you don’t read from a topic, but from a subscription to a topic (a topic can have many subscriptions):

class Program
{
    
    static async Task Main(string[] args)
    {
        MessageHandler messageHandler = new MessageHandler();
        messageHandler.RegisterToRead("secondstage", "sub1");
 
        await WaitForever();
    }
 
    private static async Task WaitForever()
    {
        while (true) await Task.Delay(5000);
    }
}
public class MessageHandler
{
    private string _connectionString = "service bus connection string details";
    private ISubscriptionClient _subscriptionClient;
    public void RegisterToRead(string topicName, string subscriptionName)
    {            
        _subscriptionClient = new SubscriptionClient(_connectionString, topicName, subscriptionName);
 
        MessageHandlerOptions messageHandlerOptions = new MessageHandlerOptions(ExceptionReceived)
        {
            AutoComplete = false,
            MaxAutoRenewDuration = new TimeSpan(1, 0, 0)
        };
 
        _subscriptionClient.RegisterMessageHandler(ProcessMessage, messageHandlerOptions);
 
    }
 
    private async Task ProcessMessage(Message message, CancellationToken cancellationToken)
    {
        string messageText = Encoding.UTF8.GetString(message.Body);
 
        Console.WriteLine(messageText);
        string leftOrRight = Console.ReadLine();
 
        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
 
        await SendResponse(leftOrRight, "userinput");
    }
 
    private async Task SendResponse(string leftOrRight, string topicName)
    {
        TopicClient topicClient = new TopicClient(_connectionString, topicName);
        Message message = new Message(Encoding.UTF8.GetBytes(leftOrRight));
        await topicClient.SendAsync(message);
    }
 
    private Task ExceptionReceived(ExceptionReceivedEventArgs arg)
    {
        Console.WriteLine(arg.Exception.ToString());
        return Task.CompletedTask;
    }
}

If we run it, then when the logic app reaches the second trigger, we’ll get a message from the subscription and ask directions:

Based on the response, the logic app will execute either the right or left branch of code.

Summary

Having worked with workflow systems in the past, one recurring feature of them is that they start to get used for things that don’t fit into a workflow, resulting in a needlessly over-complex system. I imagine that Logic Apps are no exception to this rule, and in 10 years time, people will roll their eyes at how Logic Apps have been used where a simple web service would have done the whole job.

The saving grace here is source control. The workflow inside a Logic App is simply a JSON file, and so it can be source controlled, added to a CI pipeline, and all the good things that you might expect. Whether or not a more refined version of what I have described here makes any sense is another question.

There are many downsides to this approach: firstly, you are fighting against the Service Bus by asking it to wait for input (that part is a very fixable problem with a bit of an adjustment to the messages); secondly, you would presumably need some form of timeout (again, a fixable problem that will probably feature in a future post). The biggest issue here is that you are likely introducing complex conditional logic with no way to unit test; this isn’t, per se, fixable; however, you can introduce some canary logic (again, this will probably be the feature of a future post).

References

https://docs.microsoft.com/en-us/azure/logic-apps/logic-apps-limits-and-config

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

https://stackoverflow.com/questions/28127001/the-lock-supplied-is-invalid-either-the-lock-expired-or-the-message-has-alread