Tag Archives: Service Bus

Transmitting an Image via Azure Service Bus

This post in a continuation of this earlier post around serialising an image. Once you’re able to serialise an image, then you can transmit that image. In this post, we’ll see how we could do that using Azure Service Bus.

A Recap on Image Serialisation

In the previous (linked) post, we saw how we can turn an image into a string of text, and back again. Essentially, what we did was use a BinaryWriter to write the file as a binary stream, and a BinaryReader to turn it back to an image.

The plan here is to do exactly the same thing, but to simply write the stream to an Azure Service Bus Message, and read from it at the other side.

Size Matters

One thing that you’re going to need to be aware of here is the size of the image. The free tier of Service Bus limits you to 256KB. Serialising an image to stream can be less than that, but unless you know different, you should assume that it will be bigger. Even after you sign up for the much more expensive premium tier, when you set-up the topic or queue, you’ll need to specify a maximum size. Images can be very big!

Alternatives

To be clear: the purpose of this post is to demonstrate that you can transmit an image via Service Bus – not that you should. There are other ways to do this: for example, you could upload the image to blob storage, or an S3 bucket, and then just point to that in the message. The one advantage transmitting the image with the message does give you, is that the binary data lives and dies with the message itself – so depending on what you do with the message after receipt, that may be an advantage.

Transmitting the Message

The following is a simple helper method that will send the message for us:

async Task SendMessage(string connectionString, string topicName, string messageText)
{    
    int byteCount = System.Text.ASCIIEncoding.ASCII.GetByteCount(messageText);

    await using var serviceBusClient = new ServiceBusClient(connectionString);
    var sender = serviceBusClient.CreateSender(topicName);
    
    var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageText));

    await sender.SendMessageAsync(message);
}

byteCount tells you how big the message is going to be. Useful when you’re writing a blog post about such things, but you may find it useful for debugging.

The rest is pretty basic Azure Service Bus SDK stuff. I’ve written about this previously in more depth.

We can then combine this with the code from the last post:

        var serialisedToSend = SerialiseImageToBinary(
            @"c:\tmp\myimage.png");
        await SendLargeMessage(connectionString, "image-message", serialisedToSend);

The next step is to receive the message at the other side.

Consuming the Message

As with the send, we have a helper method here:

async Task<string> ConsumeNextMessage(string topic)
{
    var serviceBusClient = new ServiceBusClient(connectionString);
    var receiver = serviceBusClient.CreateReceiver(topic, "sub-1");
    var message = await receiver.ReceiveMessageAsync();
    return message.Body.ToString();
}

Again, I’ve written about receiving a message using the SDK before.

Here, we do the reverse of the send:

        var serialisedToReceive = await ConsumeNextMessage("image-message");

        DeserialiseImageFromBinary(
            serialisedToReceive,
            @$"c:\tmp\newimg{DateTime.Now.ToString("yy-MM-dd-HH-mm-ss")}.png");

I’ve used the date so that I could test this multiple times – other than that, we’re receiving the serialised image, and then writing it to disk.

Summary

I’ll re-iterate the sentiment that I’m not advocating this as a good or preferable way to transmit images, simply highlighting that it is possible with relatively little work.

Listing all topics and subscriptions in an Azure Service Bus Namespace

For anyone that follows this blog, you’ll notice that Azure Service Bus is one of my favourite topics. In this post, we’re going to see how you can list all subscriptions to all topics within a namespace.

var serviceBusClient = new ServiceBusClient(connectionString);

var serviceBusAdminClient = new ServiceBusAdministrationClient(connectionString);
var topics = serviceBusAdminClient.GetTopicsAsync();
await foreach (var topic in topics)
{
    var subs = serviceBusAdminClient.GetSubscriptionsAsync(topic.Name);
    await foreach (var sub in subs)
    {
        Console.WriteLine($"{sub.TopicName}: {sub.SubscriptionName}");
    }
}

We’re using the latest service bus library, and the ServiceBusAdministrationClient, which lets us traverse the topics, and the subscriptions within them.

References

https://www.pmichaels.net/2021/06/26/receiving-a-message-using-azure-messaging-servicebus/

Azure Service Bus SDK Libraries

I’ve written pretty extensively on the Microsoft.Azure.ServiceBus SDK. In this post, I’m just covering the fact that this library is on its way to deprecation (don’t panic, its predecessor has been hanging around since 2011!)

Let’s see what these libraries are and some links.

WindowsAzure.ServiceBus

This library does look like it’s on its way to being deprecated. It supports .Net Framework only.

The NuGet package is here, but it’s closed source:

https://www.nuget.org/packages/WindowsAzure.ServiceBus

Microsoft.Azure.ServiceBus

This library was introduced to support .Net Core.

The NuGet package is here:

https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

The code for this is open source:

https://github.com/Azure/azure-service-bus-dotnet

Azure.Messaging.ServiceBus

If you read Sean Feldman’s article here (which this was heavily based on), you’ll see that this seems to be due to some restructuring of teams. The code has changed, and MS say it’s more consistent (although what with, I’m unsure).

The NuGet Package is here:

https://www.nuget.org/packages/Azure.Messaging.ServiceBus

The source code for this is here:

https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus

References

https://markheath.net/post/migrating-to-new-servicebus-sdk

https://weblogs.asp.net/sfeldman/the-future-of-asb-dotnet-sdk

Azure Service Bus – Auto Delete on Idle

In the past few months, I’ve written, and spoken quite a lot about Azure Service Bus – especially the new features that are now available out of the box. here I wrote about Auto-forwarding, and in this post, I’m going to cover auto-delete; I think the two subjects go hand-in-hand.

The idea behind auto-delete is that if a set of messages are time sensitive, then the message queue can be flagged for auto-deletion. This is not the same as “Time To Live” for a message, because the entire queue is deleted after a period of inactivity!

Let’s see how we can set-up a new queue, in code (see the referenced post above for the NuGet packages that you’ll need for this):

        private static async Task CreateAutoDeleteQueue(string connectionString, string queueName)
        {
            // Create authorisation rules
            var authorisationRule = new SharedAccessAuthorizationRule(
                "manage", new[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send });

            var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
          
            var options = new CreateQueueOptions(queueName)
            {
                AutoDeleteOnIdle = TimeSpan.FromMinutes(5)
            };
            options.AuthorizationRules.Add(authorisationRule);

            var queue = await serviceBusAdministrationClient.CreateQueueAsync(options);

        }

We can now post a number of messages to that queue:

Then, 5 minutes after the last message have been posted (or any activity has taken place), the queue is deleted:

References

https://www.serverless360.com/blog/hidden-gems-azure-service-bus

Deferred Messages in Azure Service Bus

In Azure Service Bus, you can schedule a message to deliver at a later time, but you can also defer a message until a later time.

Scheduled Versus Deferred Messages

The difference here is subtle, but important: when you schedule a message, you’re telling the Service Bus to deliver that message at a time of your choosing, when you defer a message, you telling the Service Bus to hang onto a message that has been sent, until such time as you’re ready to receive it.

Why Would you Defer a Message?

The idea here is that you are not ready for the message – but you don’t want to hold up the queue. In this respect, it’s a little like the dead letter concept; that is, there is a message that’s essentially holding up the queue – however, in this case, there’s nothing wrong with the message itself.

Let’s imagine that we receive a message that a sales order has been created – we go to get the customer information for the sales order, and we find that the customer has yet to be created (such things are possible when you start engaging in eventually consistent systems): in this case, you could defer the message, and come back to it when the customer has been created.

Some Code – How to Defer a Message

Deferring a message is actually very simple:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);
var message = await messageReceiver.ReceiveAsync();

var sequenceNumber = message.SystemProperties.SequenceNumber;
await messageReceiver.DeferAsync(message.SystemProperties.LockToken);

There’s three important concepts here:
1. The sequence number is very important: without it, the message is effectively lost; that’s because of (2)
2. You can receive a message after this, and you will never see the deferred message again until you purposely receive it, which brings us to (3)
3. To retrieve this message, you must explicitly ask for it.

To receive the deferred message you simply pass in the sequence number:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME, ReceiveMode.PeekLock);            
var message = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

The deferred message will never time out. Messages have a “Time to Live”, after which they get moved to the Dead Letter Queue; but once a message is deferred, it will live forever, and must be received to remove it.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-deferral

Azure Service Bus – Scheduled Message Delivery

Azure Service Bus sets itself apart from other message brokers by the dizzying array of additional and useful features that it provides out of the box. This particular one is really useful for things like scheduled e-mails. Let’s say, for example, that you’re an event organiser, and you want to notify people a few days before the event. This feature enables you to tell Service Bus to simply send a message at that time (you could have a simple Azure function that then picked up the message and sent an e-mail).

If you’re new to Service Bus, or using it with .Net, then start here.

NuGet

The basic NuGet package you’ll need is here:

Microsoft.Azure.ServiceBus

Reading the Service Bus Message

For the purpose of this post, we’ll just set-up a basic console application that sends and receives the message; let’s start with the read:

private static Task ReadMessageEvent(string connectionString)
{
    var queueClient = new QueueClient(connectionString, QUEUE_NAME);

    var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
    queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

    return Task.CompletedTask;
}

private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
{
    Console.WriteLine("Something bad happened!");
    return Task.CompletedTask;
}

private static Task handleMessage(Message message, CancellationToken cancellation)
{
    string messageBody = Encoding.UTF8.GetString(message.Body);
    Console.WriteLine("Message received: {0}", messageBody);

    return Task.CompletedTask;
}

There’s not much to say here – this event will simply print a message to the console when it’s received.

Schedule the Service Bus Message

Now that we’ve set up a method to receive the messages, let’s send one. You could add this to the same console app (obviously it would have to occur after the Read!)

var queueClient = new QueueClient(connectionString, QUEUE_NAME);

string messageBody = $"{DateTime.Now}: Happy New Year! ({Guid.NewGuid()}) You won't get this until {dateTime}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));

long sequenceNumber = await queueClient.ScheduleMessageAsync(message, dateTime);
//await queueClient.CancelScheduledMessageAsync(sequenceNumber);

await queueClient.CloseAsync();

dateTime is simply the time that you wish to send the message; for example:

var dateTime = DateTime.UtcNow.AddSeconds(10)

Will send the message in 10 seconds.

The commented line above will then cancel the message from being sent – you only need to provide the sequence number (which you get from setting up the schedule in the first place).

References and A GitHub Example

For a working sample of this, please see here.

https://stackoverflow.com/questions/60437666/how-to-defer-a-azure-service-bus-message

Receiving Messages in Azure Service Bus

In this post I covered the basics of setting up a queue and sending a message to it. Here, I’m going to cover the options around receiving that message.

Essentially, there are two possibilities here: you can either set-up an event listener, or you can poll the queue directly, and receive the messages one at a time.

Option 1 – Events

The events option seems to be the one that Microsoft now prefer – essentially, you register a handler and then as the messages come in, you simply handle them inside an event. The code here looks something like this:

            var queueClient = new QueueClient(connectionString, "test-queue");

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionHandler);
            queueClient.RegisterMessageHandler(handleMessage, messageHandlerOptions);

The event handlers:

        private static Task ExceptionHandler(ExceptionReceivedEventArgs arg)
        {
            Console.WriteLine("Something bad happened!");
            return Task.CompletedTask;
        }

        private static Task handleMessage(Message message, CancellationToken cancellation)
        {
            string messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Message received: {0}", messageBody);

            return Task.CompletedTask;
        }

Option 2 – Polling

With this option, you simply ask for a message. You’ll need to use the approach for things like deferred messages (which I hope to cover in a future post):

            var messageReceiver = new MessageReceiver(connectionString, "test-queue", ReceiveMode.ReceiveAndDelete);            
            var message = await messageReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);            
            Console.WriteLine("Message received: {0}", messageBody);

Option 3 – Option 1, but cruelly force it into option 2

I thought I’d include this, although I would strongly advise against using it in most cases. If you wish, you can register an event, but force the event into a procedural call, so that you can await it finishing. You can do this by using the TaskCompletionSource. First, declare a TaskCompletionSource in your code (somewhere accessible):

private static TaskCompletionSource<bool> _taskCompletionSource;

Then, in handleMessage (see above), when you’ve received the message you want, set the result:

            if (message.CorrelationId == correlationId)
            {
                await client.CompleteAsync(message.SystemProperties.LockToken);

                _taskCompletionSource.SetResult(true);
            }

Finally, after you’ve registered the message handler, just await this task:

queueClient.RegisterMessageHandler(
                (message, cancellationToken) => handleMessage(correlationId, queueClient, message, cancellationToken), 
                messageHandlerOptions);

await _taskCompletionSource.Task;

References

Advanced Features with Azure Service Bus

Setting up an e-mail Notification System using Logic Apps

One of the new features of the Microsoft’s Azure offering are Logic Apps: these are basically a workflow system, not totally dis-similar to Windows Workflow (WF so as not to get sued by panda bears). I’ve worked with a number of workflow systems in the past, from standard offerings to completely bespoke versions. The problem always seems to be that, once people start using them, they become the first thing you reach for to solve every problem. Not to say that you can’t solve every problem using a workflow (obviously, it depends which workflow and what you’re doing), but they are not always the best solution. In fact, they tend to be at their best when they are small and simple.

With that in mind, I thought I’d start with a very straightforward e-mail alert system. In fact, all this is going to do is to read an service bus queue and send an e-mail. I discussed a similar idea here, but that was using a custom written function.

Create a Logic App

The first step is to create a new Logic App project:

There are three options here: create a blank logic app, choose from a template (for example, process a service bus message), or define your own with a given trigger. We’ll start from a blank app:

Trigger

Obviously, for a workflow to make sense, it has to start on an event or a schedule. In our case, we are going to run from a service bus entry, so let’s pick that from the menu that appears:

In this case, we’ll choose Peek-Lock, so that we don’t lose our message if something fails. I can now provide the connection details, or simply pick the service bus from a list that it already knows about:

It’s not immediately obvious, but you have to provide a connection name here:

If you choose Peek-Lock, you’ll be presented with an explanation of what that means, and then a screen such as the following:

In addition to picking the queue name, you can also choose the queue type (as well as listening to the queue itself, you can run your workflow from the dead-letter queue – which is very useful in its own right, and may even be a better use case for this type of workflow). Finally, you can choose how frequently to poll the queue.

If you now pick “New step”, you should be faced with an option:

In our case, let’s provide a condition (so that only queue messages with “e-mail” in the message result in an e-mail):

Before progressing to the next stage – let’s have a look at the output of this (you can do this by running the workflow and viewing the “raw output”):

Clearly the content data here is not what was entered. A quick search revealed that the data is Base64 encoded, so we have to make a small tweak in advanced mode:

Okay – finally, we can add the step that actually sends the e-mail. In this instance, I simply picked Outlook.com, and allowed Azure access to my account:

The last step is to complete the message. Because we only took a “peek-lock”, we now need to manually complete the message. In the designer, we just need to add an action:

Then tell it that we want to use the service bus again. As you can see – that’s one of the options in the list:

Finally, it wants the name of the queue, and asks for the lock token – which it helpfully offers from dynamic content:

Testing

To test this, we can add a message to our test queue using the Service Bus Explorer:

I won’t bother with a screenshot of the e-mail, but I will show this:

Which provides a detailed overview of exactly what has happened in the run.

Summary

Having a workflow system built into Azure seems like a two edged sword. On the one hand, you could potentially use it to easily augment functionality and quickly plug holes; however, on the other hand, you might find very complex workflows popping up all over the system, creating an indecipherable architecture.

Working with Multiple Cloud Providers – Part 3 – Linking Azure and GCP

This is the third and final post in a short series on linking up Azure with GCP (for Christmas). In the first post, I set-up a basic Azure function that updated some data in table storage, and then in the second post, I configured the GCP link from PubSub into BigQuery.

In the post, we’ll square this off by adapting the Azure function to post a message directly to PubSub; then, we’ll call the Azure function with Santa’a data, and watch that appear in BigQuery. At least, that was my plan – but Microsoft had other ideas.

It turns out that Azure functions have a dependency on Newtonsoft Json 9.0.1, and the GCP client libraries require 10+. So instead of being a 10 minute job on Boxing day to link the two, it turned into a mammoth task. Obviously, I spent the first few hours searching for a way around this – surely other people have faced this, and there’s a redirect, setting, or way of banging the keyboard that makes it work? Turns out not.

The next idea was to experiment with contacting the Google server directly, as is described here. Unfortunately, you still need the Auth libraries.

Finally, I swapped out the function for a WebJob. WebJobs give you a little move flexibility, and have no hard dependencies. So, on with the show (albeit a little more involved than expected).

WebJob

In this post I described how to create a basic WebJob. Here, we’re going to do something similar. In our case, we’re going to listen for an Azure Service Bus Message, and then update the Azure Storage table (as described in the previous post), and call out to GCP to publish a message to PubSub.

Handling a Service Bus Message

We weren’t originally going to take this approach, but I found that WebJobs play much nicer with a Service Bus message, than with trying to get them to fire on a specific endpoint. In terms of scaleability, adding a queue in the middle can only be a good thing. We’ll square off the contactable endpoint at the end with a function that will simply convert the endpoint to a message on the queue. Here’s what the WebJob Program looks like:

public static void ProcessQueueMessage(
    [ServiceBusTrigger("localsantaqueue")] string message,
    TextWriter log,
    [Table("Delivery")] ICollector<TableItem> outputTable)
{
    Console.WriteLine("test");
 
    log.WriteLine(message);
 
    // parse query parameter
    TableItem item = Newtonsoft.Json.JsonConvert.DeserializeObject<TableItem>(message);
    if (string.IsNullOrWhiteSpace(item.PartitionKey)) item.PartitionKey = item.childName.First().ToString();
    if (string.IsNullOrWhiteSpace(item.RowKey)) item.RowKey = item.childName;
 
    outputTable.Add(item);
 
    GCPHelper.AddMessageToPubSub(item).GetAwaiter().GetResult();
    
    log.WriteLine("DeliveryComplete Finished");
 
}

Effectively, this is the same logic as the function (obviously, we now have the GCPHelper, and we’ll come to that in a minute. First, here’s the code for the TableItem model:

[JsonObject(MemberSerialization.OptIn)]
public class TableItem : TableEntity
{
    [JsonProperty]
    public string childName { get; set; }
 
    [JsonProperty]
    public string present { get; set; }
}

As you can see, we need to decorate the members with specific serialisation instructions. The reason being that this model is being used by both GCP (which only needs what you see on the screen) and Azure (which needs the inherited properties).

GCPHelper

As described here, you’ll need to install the client package for GCP into the Azure Function App that we created in post one of this series (referenced above):

Install-Package Google.Cloud.PubSub.V1 -Pre

Here’s the helper code that I mentioned:

public static class GCPHelper
{
    public static async Task AddMessageToPubSub(TableItem toSend)
    {
        string jsonMsg = Newtonsoft.Json.JsonConvert.SerializeObject(toSend);
        
        Environment.SetEnvironmentVariable(
            "GOOGLE_APPLICATION_CREDENTIALS",
            Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Test-Project-8d8d83hs4hd.json"));
        GrpcEnvironment.SetLogger(new ConsoleLogger());

        PublisherClient publisher = PublisherClient.Create();
        string projectId = "test-project-123456";
        TopicName topicName = new TopicName(projectId, "test");
        SimplePublisher simplePublisher = 
            await SimplePublisher.CreateAsync(topicName);
        string messageId = 
            await simplePublisher.PublishAsync(jsonMsg);
        await simplePublisher.ShutdownAsync(TimeSpan.FromSeconds(15));
    }
 
}

I detailed in this post how to create a credentials file; you’ll need to do that to allow the WebJob to be authorised. The Json file referenced above was created using that process.

Azure Config

You’ll need to create an Azure message queue (I’ve called mine localsantaqueue):

I would also download the Service Bus Explorer (I’ll be using it later for testing).

GCP Config

We already have a DataFlow, a PubSub Topic and a BigQuery Database, so GCP should require no further configuration; except to ensure the permissions are correct.

The Service Account user (which I give more details of here needs to have PubSub permissions. For now, we’ll make them an editor, although in this instance, they probably only need publish:

Test

We can do a quick test using the Service Bus Explorer and publish a message to the queue:

The ultimate test is that we can then see this in the BigQuery Table:

Lastly, the Function

This won’t be a completely function free post. The last step is to create a function that adds a message to the queue:

[FunctionName("Function1")]
public static HttpResponseMessage Run(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,             
    TraceWriter log,
    [ServiceBus("localsantaqueue")] ICollector<string> queue)
{
    log.Info("C# HTTP trigger function processed a request.");
    var parameters = req.GetQueryNameValuePairs();
    string childName = parameters.First(a => a.Key == "childName").Value;
    string present = parameters.First(a => a.Key == "present").Value;
    string json = "{{ 'childName': '{childName}', 'present': '{present}' }} ";            
    queue.Add(json);
    

    return req.CreateResponse(HttpStatusCode.OK);
}

So now we have an endpoint for our imaginary Xamarin app to call into.

Summary

Both GCP and Azure are relatively immature platforms for this kind of interaction. The GCP client libraries seem to be missing functionality (and GCP is still heavily weighted away from .Net). The Azure libraries (especially functions) seem to be in a pickle, too – with strange dependencies that makes it very difficult to communicate outside of Azure. As a result, this task (which should have taken an hour or so) took a great deal of time, and it was completely unnecessary.

Having said that, it is clearly possible to link the two systems, if a little long-winded.

References

https://blog.falafel.com/rest-google-cloud-pubsub-with-oauth/

https://github.com/Azure/azure-functions-vs-build-sdk/issues/107

https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus

https://stackoverflow.com/questions/48092003/adding-to-a-queue-using-an-azure-function-in-c-sharp/48092276#48092276