Tag Archives: Batch

Service Bus Batching and Pre-Fetch

Azure Service Bus is often used as a way to link services (as the name suggests). This typically means that the throughput is large, but not so large that anyone worries about the speed. However, in this post, I’m going to cover some techniques to speed up service bus messaging.

A quick disclaimer here, though: it might be that if you’re reading this then you’re not using the best tool for the job, and perhaps something more lightweight might be a better fit.

Essentially, there are two inbuilt methods in Service Bus to do this: Batching, and Prefetch; let’s look at Batching first.

Batching

The principle behind batching is very simple: instead of making multiple calls to Azure Service Bus with multiple messages, you make one call with all the messages that you wish to send.

Let’s see some code that sends 1000 messages:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

    await queueClient.SendAsync(message);
}
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

The timing for these posts is not a benchmark test, but whatever my test recorded at the time – your mileage may vary a lot, although I would say they are a fair rough estimation. As you’ll see later on, the differences are very stark.

In my tests, sending 1000 messages took around 30 seconds (which is good enough for most scenarios, as they are unlikely to be sent in a batch such as this). Let’s see how we can batch the messages to speed things up:

var stopwatch = new Stopwatch();
stopwatch.Start();

var queueClient = new QueueClient(connectionString, QUEUE_NAME);
var messages = new List<Message>();

for (int i = 0; i < 1000; i++)
{
    string messageBody = $"{DateTime.Now}: {messageText} ({Guid.NewGuid()})";
    var message = new Message(Encoding.UTF8.GetBytes(messageBody));                

    messages.Add(message);
}
await queueClient.SendAsync(messages);
await queueClient.CloseAsync();

stopwatch.Stop();
Console.WriteLine($"Send messages took {stopwatch.ElapsedMilliseconds}");

As you can see, there’s not a huge amount of change to the code. It’s worth noting that in the first example, if the code crashes half way through the send, half the messages would be sent – whereas with the batch, it’s all or nothing. However, this came in at under a second:

On the receive side, we have a similar thing.

Batch Receive

Here, we can receive messages in chunks, instead of one at a time. Again, let’s see how we might receive 1000 messages (there are multiple ways to do this):

var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
for (int i = 0; i < 1000; i++)
{
    var message = await messageReceiver.ReceiveAsync();
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

The execution here takes around 127 seconds (over 2 minutes) in my tests:

The same is true for batch receipt as for batch send; with a slight caveat:

var stopwatch = new Stopwatch();
stopwatch.Start();
Int count = 1000;
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    var messages = await messageReceiver.ReceiveAsync(remainingCount);

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

Note that the CompleteAsync can also be called in batch.

You may be wondering here what the while loop is all about. In fact, it’s because batch receive isn’t guaranteed to return the exact number of messages that you request. However, we still brought the receive time down to around 10 seconds:

A Note on Batching and Timeouts

It’s worth bearing in mind that when you retrieve a batch of messages, you’re doing just that – retrieving them. In a PeekLock scenario, they are now locked; and, if you don’t complete or abandon them, they will time out like any other message. If you have a large number of messages, you may need to extend the timeout; for example:

var messages = await messageReceiver.ReceiveAsync(remainingCount, TimeSpan.FromSeconds(20));

In the next section, we’ll discuss the second technique, of allowing the service bus to “run ahead” and get messages before you actually request them.

Prefetch

Prefetch speeds up the retrieval of messages by getting Azure Service Bus to return messages ahead of them being needed. This presents a problem (similar to receiving in batch), which is that the system is actually retrieving messages on your behalf before you ask for them. In this example, we’ve been using PeekLock – that is, the message is left on the queue until we explicitly complete it. However, once you Peek the message, it’s locked. That means that with the code above, we can easily trip ourselves up.

int count = 1000;
var stopwatch = new Stopwatch();
stopwatch.Start();

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
messageReceiver.PrefetchCount = prefetchCount;
for (int i = 0; i < count; i++)
{
    var message = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(60));
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine($"Message received: {messageBody}");

    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");

Note the extended timeout on the Receive allows for the prefetched messages to complete.

Here’s the timing for Prefetch:

This is slightly quicker than processing the messages one at a time, but much slower than a batch. The main reason being that the complete takes the bulk of the time.

Remember that with Prefetch, if you’re using PeekLock, once you’ve pre-fetched a message, the timeout on the lock starts – this means that if you’re lock is for 5 seconds, and you’ve prefetched 500 records – you need to be sure that you’ll get around to them in time.

ReceiveAndDelete

Whilst the Prefetch messages timing out may be bad, with ReceiveAndDelete, they are taken off the queue, this means that you can consume the messages without ever actually seeing them!

Prefetch with Batch

Here, we can try to use the prefetch and batch combined:

int count = 1000
var stopwatch = new Stopwatch();
stopwatch.Start();
int remainingCount = count;

while (remainingCount > 0)
{
    var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
    messageReceiver.PrefetchCount = prefetchCount;
    var messages = await messageReceiver.ReceiveAsync(remainingCount);
    if (messages == null) break;

    foreach (var message in messages)
    {
        string messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Message received: {messageBody}");
        remainingCount--;
    }

    await messageReceiver.CompleteAsync(messages.Select(a => a.SystemProperties.LockToken));
}

stopwatch.Stop();
Console.WriteLine($"Receive messages took {stopwatch.ElapsedMilliseconds}");
Console.WriteLine($"Remaining count: {remainingCount}");

In fact, in my tests, the timing for this was around the same as a batch receipt:

There may be some advantages with much higher numbers, but generally, combining the two in this manner doesn’t seem to provide much benefit.

References

https://www.planetgeek.ch/2020/04/27/azure-service-bus-net-sdk-deep-dive-sender-side-batching/

https://markheath.net/post/speed-up-azure-service-bus-with-batching

https://github.com/Azure/azure-service-bus-dotnet/issues/441

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/understanding-Azure-service-bus-prefetch

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements?tabs=net-standard-sdk-2

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch