Tag Archives: TaskCompletionSource

Receiving Messages in Azure Service Bus

In this post I covered the basics of setting up a queue and sending a message to it. Here, I’m going to cover the options around receiving that message.

Essentially, there are two possibilities here: you can either set-up an event listener, or you can poll the queue directly, and receive the messages one at a time.

Option 1 – Events

The events option seems to be the one that Microsoft now prefer – essentially, you register a handler and then as the messages come in, you simply handle them inside an event. The code here looks something like this:

            var queueClient = new QueueClient(connectionString, "test-queue");

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
            queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

The event handlers:

        private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
        {
            Console.WriteLine("Something bad happened!");
            return Task.CompletedTask;
        }

        private static Task handleMessage(Message message, CancellationToken cancellation)
        {
            string messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Message received: {0}", messageBody);

            return Task.CompletedTask;
        }

Option 2 – Polling

With this option, you simply ask for a message. You’ll need to use the approach for things like deferred messages (which I hope to cover in a future post):

            var messageReceiver = new MessageReceiver(connectionString, "test-queue", ReceiveMode.ReceiveAndDelete);            
            var message = await messageReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);            
            Console.WriteLine("Message received: {0}", messageBody);

Option 3 – Option 1, but cruelly force it into option 2

I thought I’d include this, although I would strongly advise against using it in most cases. If you wish, you can register an event, but force the event into a procedural call, so that you can await it finishing. You can do this by using the TaskCompletionSource. First, declare a TaskCompletionSource in your code (somewhere accessible):

private static TaskCompletionSource<bool> _taskCompletionSource;

Then, in handleMessage (see above), when you’ve received the message you want, set the result:

            if (message.CorrelationId == correlationId)
            {
                await client.CompleteAsync(message.SystemProperties.LockToken);

                _taskCompletionSource.SetResult(true);
            }

Finally, after you’ve registered the message handler, just await this task:

queueClient.RegisterMessageHandler(
                (message, cancellationToken) => handleMessage(correlationId, queueClient, message, cancellationToken), 
                messageHandlerOptions);

await _taskCompletionSource.Task;

References

Advanced Features with Azure Service Bus

Handling Events Inside an Azure Function

While writing an Azure function, I established a simple repository pattern using the standard IoC for Functions.

However, when I ran the project up, I started getting the following error:

System.ObjectDisposedException: ‘Cannot access a disposed object. A common cause of this error is disposing a context that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling Dispose() on the context, or wrapping the context in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances.

Odd!

This was very unexpected. The path of code that led to this was a very basic Add, so I was confident that the object had not been intentionally disposed.

If a write caused this, then what about a read?

I tried reading from the context at the same point, and saw the same effect. Finally, I injected the DB Context into the function itself and called it from the first line of the function call, and it worked fine.

In fact, the issue was that I was trying to read from an Azure Service Bus here:

var client = new QueueClient(connectionStringBuilder, ReceiveMode.PeekLock); 
client.RegisterMessageHandler(handler, new MessageHandlerOptions(onException));

Before we continue, I realise that someone reading this will probably announce loudly: “You’re doing it wrong! You should use the Function Binding to read from a Service Bus Queue!” To that person, I reply loudly, and proudly: “Yes, you’re right – I’m doing it wrong!”

Back to the story. So, after I left, I saw … oops – wrong story.

The issue here is that the event is losing the context of the function, because the function can complete before the event fires.

However, it seems that I have solved this problem before. To fix this, you can simply use a TaskCompletionSource to set something up similar to the following:

private static TaskCompletionSource<bool> _tcs;

[FunctionName("MyFunction")]
public async Task Run([TimerTrigger("0 */1 * * * *")]TimerInfo myTimer)
{
    _tcs = new TaskCompletionSource<bool>();

    var client = new QueueClient(connectionStringBuilder, ReceiveMode.PeekLock); 

    client.RegisterMessageHandler(handler, new MessageHandlerOptions(onException));

    await _tcs.Task;
}

private Task handler(Message message, CancellationToken cancellationToken)
{            
    _myService.AddData(someData);

    _tcs.SetResult(true);

    . . . 
}

Summary

As stated, if your problem is that you’re trying to use the service bus client, then you are probably doing it wrong. It’s something of a shame that the new Service Bus SDK doesn’t support a GetNextMessage() style syntax, but it doesn’t, and so you get forced into the Function Bindings (not that I have any specific issue with them).

However, there are many reasons why you may need or choose to use an event inside a function, and if you do, this is one possible way to do so.

It’s worth pointing out that the above solution will only work for the first call of the event. If you have multiple calls then you’ll need to somehow determine when the last one has occurred.

References

https://www.pmichaels.net/2020/09/19/creating-and-using-an-azure-service-bus-in-net-core/

TaskCompletionSource

I’ve had a couple of problems recently, where I’ve had tasks or asynchronous methods and they don’t quote fit into the architecture that I find myself in. I’d come across the TaskCompletionSource before, but hadn’t realised how useful it was. Basically, a TaskCompletionSource allows you to control when a task finishes; and allows you to do so in a synchronous, or asynchronous fashion. What this gives you is precise control over when an awaited task finishes.

UWP

Consider the following code in UWP. Basically, what this does is execute an anonymous function on the UI thread:

await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.High, async () => 
{
    await MyAyncFunc();
}
System.Diagnostics.Debug.WriteLine("After MyAsyncFunc");

The problem here is that executing an anonymous async function in the above scenario doesn’t work. However, using the TaskCompletionSource, we can bypass that whole conversation:

TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.High, async () => 
{
    await MyAyncFunc();
    System.Diagnostics.Debug.WriteLine("After MyAsyncFunc");

    tcs.SetResult(true);
});
await tcs.Task;

Now the function will return when the the TaskCompletionSource.SetResult has been called.

Event based

The second scenario where this is useful is where you are trying to use an event based architecture within an async / await scenario. The following example is a little contrived, but it does illustrate the point:

    class Program
    {
        private static Timer _tmr = new Timer();
        private static TaskCompletionSource<bool> _tcs;

        static void Main(string[] args)
        {
            var tmr = StartTimer();

            Console.WriteLine("Before wait...");
            tmr.Wait();

            Console.WriteLine("After wait...");
        }        

        private static async Task StartTimer()
        {            

            _tmr.Interval = 3000;
            _tmr.Elapsed += _tmr_Elapsed;
            _tmr.Start();

            _tcs = new TaskCompletionSource<bool>();
            await _tcs.Task;
        }

        private static void _tmr_Elapsed(object sender, ElapsedEventArgs e)
        {
            _tcs.SetResult(true);
        }
    }

Potentially, a more real world example of this is when you might want to wrap an API in an async/await.

Control over exactly when a task finishes, and the ability to await async void methods

The final scenario where this can be useful is where you either want to await an `async void` method, or where you have a specific part of a method or process that you want to await.

The following code illustrates how to effectively await an async void method:

    class Program
    {        
        private static TaskCompletionSource<bool> _tcs;

        static void Main(string[] args)
        {
            _tcs = new TaskCompletionSource<bool>();
            BackgroundFunction();

            _tcs.Task.Wait();

            Console.WriteLine("Done");
        }        

        private static async void BackgroundFunction()
        {
            for (int i = 1; i <= 10; i++)
            {
                Console.WriteLine($"Processing: {i}");
                await DoStuff();
            }
            _tcs.SetResult(true);
        }

        private static async Task DoStuff()
        {
            await Task.Delay(500);
            
        }

    }

Finally, here is a parallel for loop:

        static void Main(string[] args)
        {
            Parallel.For(1, 3, (i) =>
            {
                BackgroundFunction();
            });

            Console.WriteLine("Done");
        }        

Imagine that BackgroundFunction is performing a long running task where a specific condition needs to return control. There are obviously combinations of functions in the TPL (WaitAll, WhenAll, WhenAny and WhenAll), however, these rely on the whole task, or a set of tasks, completing. Again, the below example is contrived, but it illustrates the granular control over the task that you have.

        static void Main(string[] args)
        {
            _tcs = new TaskCompletionSource<bool>();

            for (int i = 1; i <= 2; i++)
            {
                BackgroundFunction();
            }

            _tcs.Task.Wait();            

            Console.WriteLine("Done");
        }        

        private static async void BackgroundFunction()
        {
            for (int i = 1; i <= 10; i++)
            {
                Console.WriteLine($"Processing: {i}");
                await DoStuff();

                if (i == 7)
                {
                    _tcs.TrySetResult(true);
                    return;                    
                }
            }            
        }

I will re-iterate again, I realise that in the above example, there are better ways to achieve this, and the example is purely for illustration.

Conclusion

Generally speaking, the simplest and most robust code comes from using the task architecture in the way it was designed: that is, use async / await inside a method that returns a Task. I’m not suggesting in this post that the methods I’ve described should replace that; but there are situations where that might not fit.

Aknowledgements

I used the following posts heavily while writing this:

Awaiting the CoreDispatcher
The Nature of TaskCompletionSource
Real life scenarios for using TaskCompletionSource?
Task Parallelism