Tag Archives: Queue

What can you do with a logic app? Part three – Creating a Logic App Client

One of the things that are missing from Azure Logic apps is the ability to integrate human interaction. Microsoft do have their own version of an interactive workflow (PowerApps), which is (obviously) far better than what you can produce by following this post.

In this post, we’ll create a very basic client for a logic app. Obviously, with some thought, this could easily be extended to allow a fully functional, interactive, workflow system.

Basic Logic App

Let’s start by designing our logic app. The app in question is going to be a very simple one. It’s format is going to be that it will add a message to a logging queue (just so it has something to do), then we’ll ask the user a question; and we’ll do this by putting a message onto a topic: left or right. Based on the user’s response, we’ll either write a message to the queue saying left, or right. Let’s have a look at our Logic App design:

It’s worth pointing out a few things about this design:
1. The condition uses the expression base64ToString() to convert the encoded message into plain text.
2. Where the workflow picks up, it uses a peek-lock, and then completes the message at the end. It looks like it’s a ‘feature’ of logic apps that an automatic complete on this trigger will not actually complete the message (plus, this is actually a better design).

Queues and Topics

The “Log to message queue” action above is putting an entry into a queue; so a quick note about why we’re using a queue for logging, and a topic for the interaction with the user. In a real life version of this system, we might have many users, but they might all want to perform the same action. Let’s say that they all are part of a sales process, and the actions are actually actions along that process; adding these to a queue maintains their sequence. Here’s the queue and topic layout that I’m using for this post:

Multiple Triggers

As you can see, we actually have two triggers in this workflow. The first starts the workflow (so we’ll drop a message into the topic to start it), and the second waits for a second message to go into the topic.

To add a trigger part way through the workflow, simply add an action, search and select “Triggers”:

Because we have a trigger part way through the workflow, what we have effectively issued here is an await statement. Once a message appears in the subscription, the workflow will continue where it left off:

As soon as a message is posted, the workflow carries on:

Client Application

For the client application, we could simply use the Service Bus Explorer (in fact, the screenshots above were taken from using this to simulate messages in the topic). However, the point of this post is to create a client, and so we will… although we’ll just create a basic console app for now.

We need the client to do two things: read from a topic subscription, and write to a topic. I haven’t exactly been here before, but I will be heavily plagiarising from here, here, and here.

Let’s create a console application:

Once that’s done, we’ll need the service bus client library: Install it from here.

The code is generally quite straight-forward, and looks a lot like the code to read and write to queues. The big difference is that you don’t read from a topic, but from a subscription to a topic (a topic can have many subscriptions):

class Program
{
    
    static async Task Main(string[] args)
    {
        MessageHandler messageHandler = new MessageHandler();
        messageHandler.RegisterToRead("secondstage", "sub1");
 
        await WaitForever();
    }
 
    private static async Task WaitForever()
    {
        while (true) await Task.Delay(5000);
    }
}
public class MessageHandler
{
    private string _connectionString = "service bus connection string details";
    private ISubscriptionClient _subscriptionClient;
    public void RegisterToRead(string topicName, string subscriptionName)
    {            
        _subscriptionClient = new SubscriptionClient(_connectionString, topicName, subscriptionName);
 
        MessageHandlerOptions messageHandlerOptions = new MessageHandlerOptions(ExceptionReceived)
        {
            AutoComplete = false,
            MaxAutoRenewDuration = new TimeSpan(1, 0, 0)
        };
 
        _subscriptionClient.RegisterMessageHandler(ProcessMessage, messageHandlerOptions);
 
    }
 
    private async Task ProcessMessage(Message message, CancellationToken cancellationToken)
    {
        string messageText = Encoding.UTF8.GetString(message.Body);
 
        Console.WriteLine(messageText);
        string leftOrRight = Console.ReadLine();
 
        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
 
        await SendResponse(leftOrRight, "userinput");
    }
 
    private async Task SendResponse(string leftOrRight, string topicName)
    {
        TopicClient topicClient = new TopicClient(_connectionString, topicName);
        Message message = new Message(Encoding.UTF8.GetBytes(leftOrRight));
        await topicClient.SendAsync(message);
    }
 
    private Task ExceptionReceived(ExceptionReceivedEventArgs arg)
    {
        Console.WriteLine(arg.Exception.ToString());
        return Task.CompletedTask;
    }
}

If we run it, then when the logic app reaches the second trigger, we’ll get a message from the subscription and ask directions:

Based on the response, the logic app will execute either the right or left branch of code.

Summary

Having worked with workflow systems in the past, one recurring feature of them is that they start to get used for things that don’t fit into a workflow, resulting in a needlessly over-complex system. I imagine that Logic Apps are no exception to this rule, and in 10 years time, people will roll their eyes at how Logic Apps have been used where a simple web service would have done the whole job.

The saving grace here is source control. The workflow inside a Logic App is simply a JSON file, and so it can be source controlled, added to a CI pipeline, and all the good things that you might expect. Whether or not a more refined version of what I have described here makes any sense is another question.

There are many downsides to this approach: firstly, you are fighting against the Service Bus by asking it to wait for input (that part is a very fixable problem with a bit of an adjustment to the messages); secondly, you would presumably need some form of timeout (again, a fixable problem that will probably feature in a future post). The biggest issue here is that you are likely introducing complex conditional logic with no way to unit test; this isn’t, per se, fixable; however, you can introduce some canary logic (again, this will probably be the feature of a future post).

References

https://docs.microsoft.com/en-us/azure/logic-apps/logic-apps-limits-and-config

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

https://stackoverflow.com/questions/28127001/the-lock-supplied-is-invalid-either-the-lock-expired-or-the-message-has-alread

Working with Multiple Cloud Providers – Part 3 – Linking Azure and GCP

This is the third and final post in a short series on linking up Azure with GCP (for Christmas). In the first post, I set-up a basic Azure function that updated some data in table storage, and then in the second post, I configured the GCP link from PubSub into BigQuery.

In the post, we’ll square this off by adapting the Azure function to post a message directly to PubSub; then, we’ll call the Azure function with Santa’a data, and watch that appear in BigQuery. At least, that was my plan – but Microsoft had other ideas.

It turns out that Azure functions have a dependency on Newtonsoft Json 9.0.1, and the GCP client libraries require 10+. So instead of being a 10 minute job on Boxing day to link the two, it turned into a mammoth task. Obviously, I spent the first few hours searching for a way around this – surely other people have faced this, and there’s a redirect, setting, or way of banging the keyboard that makes it work? Turns out not.

The next idea was to experiment with contacting the Google server directly, as is described here. Unfortunately, you still need the Auth libraries.

Finally, I swapped out the function for a WebJob. WebJobs give you a little move flexibility, and have no hard dependencies. So, on with the show (albeit a little more involved than expected).

WebJob

In this post I described how to create a basic WebJob. Here, we’re going to do something similar. In our case, we’re going to listen for an Azure Service Bus Message, and then update the Azure Storage table (as described in the previous post), and call out to GCP to publish a message to PubSub.

Handling a Service Bus Message

We weren’t originally going to take this approach, but I found that WebJobs play much nicer with a Service Bus message, than with trying to get them to fire on a specific endpoint. In terms of scaleability, adding a queue in the middle can only be a good thing. We’ll square off the contactable endpoint at the end with a function that will simply convert the endpoint to a message on the queue. Here’s what the WebJob Program looks like:

public static void ProcessQueueMessage(
    [ServiceBusTrigger("localsantaqueue")] string message,
    TextWriter log,
    [Table("Delivery")] ICollector<TableItem> outputTable)
{
    Console.WriteLine("test");
 
    log.WriteLine(message);
 
    // parse query parameter
    TableItem item = Newtonsoft.Json.JsonConvert.DeserializeObject<TableItem>(message);
    if (string.IsNullOrWhiteSpace(item.PartitionKey)) item.PartitionKey = item.childName.First().ToString();
    if (string.IsNullOrWhiteSpace(item.RowKey)) item.RowKey = item.childName;
 
    outputTable.Add(item);
 
    GCPHelper.AddMessageToPubSub(item).GetAwaiter().GetResult();
    
    log.WriteLine("DeliveryComplete Finished");
 
}

Effectively, this is the same logic as the function (obviously, we now have the GCPHelper, and we’ll come to that in a minute. First, here’s the code for the TableItem model:

[JsonObject(MemberSerialization.OptIn)]
public class TableItem : TableEntity
{
    [JsonProperty]
    public string childName { get; set; }
 
    [JsonProperty]
    public string present { get; set; }
}

As you can see, we need to decorate the members with specific serialisation instructions. The reason being that this model is being used by both GCP (which only needs what you see on the screen) and Azure (which needs the inherited properties).

GCPHelper

As described here, you’ll need to install the client package for GCP into the Azure Function App that we created in post one of this series (referenced above):

Install-Package Google.Cloud.PubSub.V1 -Pre

Here’s the helper code that I mentioned:

public static class GCPHelper
{
    public static async Task AddMessageToPubSub(TableItem toSend)
    {
        string jsonMsg = Newtonsoft.Json.JsonConvert.SerializeObject(toSend);
        
        Environment.SetEnvironmentVariable(
            "GOOGLE_APPLICATION_CREDENTIALS",
            Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Test-Project-8d8d83hs4hd.json"));
        GrpcEnvironment.SetLogger(new ConsoleLogger());

        PublisherClient publisher = PublisherClient.Create();
        string projectId = "test-project-123456";
        TopicName topicName = new TopicName(projectId, "test");
        SimplePublisher simplePublisher = 
            await SimplePublisher.CreateAsync(topicName);
        string messageId = 
            await simplePublisher.PublishAsync(jsonMsg);
        await simplePublisher.ShutdownAsync(TimeSpan.FromSeconds(15));
    }
 
}

I detailed in this post how to create a credentials file; you’ll need to do that to allow the WebJob to be authorised. The Json file referenced above was created using that process.

Azure Config

You’ll need to create an Azure message queue (I’ve called mine localsantaqueue):

I would also download the Service Bus Explorer (I’ll be using it later for testing).

GCP Config

We already have a DataFlow, a PubSub Topic and a BigQuery Database, so GCP should require no further configuration; except to ensure the permissions are correct.

The Service Account user (which I give more details of here needs to have PubSub permissions. For now, we’ll make them an editor, although in this instance, they probably only need publish:

Test

We can do a quick test using the Service Bus Explorer and publish a message to the queue:

The ultimate test is that we can then see this in the BigQuery Table:

Lastly, the Function

This won’t be a completely function free post. The last step is to create a function that adds a message to the queue:

[FunctionName("Function1")]
public static HttpResponseMessage Run(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,             
    TraceWriter log,
    [ServiceBus("localsantaqueue")] ICollector<string> queue)
{
    log.Info("C# HTTP trigger function processed a request.");
    var parameters = req.GetQueryNameValuePairs();
    string childName = parameters.First(a => a.Key == "childName").Value;
    string present = parameters.First(a => a.Key == "present").Value;
    string json = "{{ 'childName': '{childName}', 'present': '{present}' }} ";            
    queue.Add(json);
    

    return req.CreateResponse(HttpStatusCode.OK);
}

So now we have an endpoint for our imaginary Xamarin app to call into.

Summary

Both GCP and Azure are relatively immature platforms for this kind of interaction. The GCP client libraries seem to be missing functionality (and GCP is still heavily weighted away from .Net). The Azure libraries (especially functions) seem to be in a pickle, too – with strange dependencies that makes it very difficult to communicate outside of Azure. As a result, this task (which should have taken an hour or so) took a great deal of time, and it was completely unnecessary.

Having said that, it is clearly possible to link the two systems, if a little long-winded.

References

https://blog.falafel.com/rest-google-cloud-pubsub-with-oauth/

https://github.com/Azure/azure-functions-vs-build-sdk/issues/107

https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus

https://stackoverflow.com/questions/48092003/adding-to-a-queue-using-an-azure-function-in-c-sharp/48092276#48092276

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:

private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9