Monthly Archives: May 2018

Playing with Azure Event Hub

I’ve recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.

As a quick disclaimer, it’s worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post – you won’t find anything original here – just a record of my findings. You may find more (and more accurate) information in those.

Event Hub Namespace

The first step, as with many Azure services, is to create a namespace:

For a healthy amount of data transference, you’ll pay around £10 per month.

Finally, we’ll create event hub within the namespace:

When you create the event hub, it asks how many partitions you need. This basically splits the message delivery; and it’s clever enough to work out, if you have 3 partitions and two listeners that one should have two slots, and one, one slot:

We’ll need an access policy so that we have permission to listen:

New Console Apps

We’ll need to create two applications: a producer and a consumer.

Let’s start with a producer. Create a new console app and add this NuGet library.

Here’s the code:

class Program
{
    private static EventHubClient eventHubClient;
    private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1";
    private const string EhEntityPath = "pcm-eventhub1";
 
    public static async Task Main(string[] args)
    {
        EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
        {
            EntityPath = EhEntityPath
        };
 
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
 
        while (true)
        {
            Console.Write("Please enter message to send: ");
            string message = Console.ReadLine();
            if (string.IsNullOrWhiteSpace(message)) break;
 
            await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
        }
 
        await eventHubClient.CloseAsync();
 
        Console.WriteLine("Press ENTER to exit.");
        Console.ReadLine();
    }
}

Consumer

Next we’ll create a consumer; so the first thing we’ll need is to grant permissions for listening:

We’ll create a second new console application with this same library and the processor library, too.

class Program
{
    private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1";
    private const string EhEntityPath = "pcm-eventhub1";
    private const string StorageContainerName = "eventhub";
    private const string StorageAccountName = "pcmeventhubstorage";
    private const string StorageAccountKey = "key";
 
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
 
    static async Task Main(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
 
        var eventProcessorHost = new EventProcessorHost(
            EhEntityPath,
            PartitionReceiver.DefaultConsumerGroupName,
            EhConnectionString,
            StorageConnectionString,
            StorageContainerName);
 
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<EventsProcessor>();
 
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
 
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
}

class EventsProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
        return Task.CompletedTask;
    }
 
    public Task OpenAsync(PartitionContext context)
    {
        Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
        return Task.CompletedTask;
    }
 
    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
        return Task.CompletedTask;
    }
 
    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
        }
 
        return context.CheckpointAsync();
    }
}

As you can see, we can now transmit data through the Event Hub into client applications:

Footnotes

*Large, in terms of frequency, rather than volume – for example, transmitting a small message twice a second, rather than uploading a petabyte of data

References

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-receive-eph

What can you do with a logic app? Part three – Creating a Logic App Client

One of the things that are missing from Azure Logic apps is the ability to integrate human interaction. Microsoft do have their own version of an interactive workflow (PowerApps), which is (obviously) far better than what you can produce by following this post.

In this post, we’ll create a very basic client for a logic app. Obviously, with some thought, this could easily be extended to allow a fully functional, interactive, workflow system.

Basic Logic App

Let’s start by designing our logic app. The app in question is going to be a very simple one. It’s format is going to be that it will add a message to a logging queue (just so it has something to do), then we’ll ask the user a question; and we’ll do this by putting a message onto a topic: left or right. Based on the user’s response, we’ll either write a message to the queue saying left, or right. Let’s have a look at our Logic App design:

It’s worth pointing out a few things about this design:
1. The condition uses the expression base64ToString() to convert the encoded message into plain text.
2. Where the workflow picks up, it uses a peek-lock, and then completes the message at the end. It looks like it’s a ‘feature’ of logic apps that an automatic complete on this trigger will not actually complete the message (plus, this is actually a better design).

Queues and Topics

The “Log to message queue” action above is putting an entry into a queue; so a quick note about why we’re using a queue for logging, and a topic for the interaction with the user. In a real life version of this system, we might have many users, but they might all want to perform the same action. Let’s say that they all are part of a sales process, and the actions are actually actions along that process; adding these to a queue maintains their sequence. Here’s the queue and topic layout that I’m using for this post:

Multiple Triggers

As you can see, we actually have two triggers in this workflow. The first starts the workflow (so we’ll drop a message into the topic to start it), and the second waits for a second message to go into the topic.

To add a trigger part way through the workflow, simply add an action, search and select “Triggers”:

Because we have a trigger part way through the workflow, what we have effectively issued here is an await statement. Once a message appears in the subscription, the workflow will continue where it left off:

As soon as a message is posted, the workflow carries on:

Client Application

For the client application, we could simply use the Service Bus Explorer (in fact, the screenshots above were taken from using this to simulate messages in the topic). However, the point of this post is to create a client, and so we will… although we’ll just create a basic console app for now.

We need the client to do two things: read from a topic subscription, and write to a topic. I haven’t exactly been here before, but I will be heavily plagiarising from here, here, and here.

Let’s create a console application:

Once that’s done, we’ll need the service bus client library: Install it from here.

The code is generally quite straight-forward, and looks a lot like the code to read and write to queues. The big difference is that you don’t read from a topic, but from a subscription to a topic (a topic can have many subscriptions):

class Program
{
    
    static async Task Main(string[] args)
    {
        MessageHandler messageHandler = new MessageHandler();
        messageHandler.RegisterToRead("secondstage", "sub1");
 
        await WaitForever();
    }
 
    private static async Task WaitForever()
    {
        while (true) await Task.Delay(5000);
    }
}
public class MessageHandler
{
    private string _connectionString = "service bus connection string details";
    private ISubscriptionClient _subscriptionClient;
    public void RegisterToRead(string topicName, string subscriptionName)
    {            
        _subscriptionClient = new SubscriptionClient(_connectionString, topicName, subscriptionName);
 
        MessageHandlerOptions messageHandlerOptions = new MessageHandlerOptions(ExceptionReceived)
        {
            AutoComplete = false,
            MaxAutoRenewDuration = new TimeSpan(1, 0, 0)
        };
 
        _subscriptionClient.RegisterMessageHandler(ProcessMessage, messageHandlerOptions);
 
    }
 
    private async Task ProcessMessage(Message message, CancellationToken cancellationToken)
    {
        string messageText = Encoding.UTF8.GetString(message.Body);
 
        Console.WriteLine(messageText);
        string leftOrRight = Console.ReadLine();
 
        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
 
        await SendResponse(leftOrRight, "userinput");
    }
 
    private async Task SendResponse(string leftOrRight, string topicName)
    {
        TopicClient topicClient = new TopicClient(_connectionString, topicName);
        Message message = new Message(Encoding.UTF8.GetBytes(leftOrRight));
        await topicClient.SendAsync(message);
    }
 
    private Task ExceptionReceived(ExceptionReceivedEventArgs arg)
    {
        Console.WriteLine(arg.Exception.ToString());
        return Task.CompletedTask;
    }
}

If we run it, then when the logic app reaches the second trigger, we’ll get a message from the subscription and ask directions:

Based on the response, the logic app will execute either the right or left branch of code.

Summary

Having worked with workflow systems in the past, one recurring feature of them is that they start to get used for things that don’t fit into a workflow, resulting in a needlessly over-complex system. I imagine that Logic Apps are no exception to this rule, and in 10 years time, people will roll their eyes at how Logic Apps have been used where a simple web service would have done the whole job.

The saving grace here is source control. The workflow inside a Logic App is simply a JSON file, and so it can be source controlled, added to a CI pipeline, and all the good things that you might expect. Whether or not a more refined version of what I have described here makes any sense is another question.

There are many downsides to this approach: firstly, you are fighting against the Service Bus by asking it to wait for input (that part is a very fixable problem with a bit of an adjustment to the messages); secondly, you would presumably need some form of timeout (again, a fixable problem that will probably feature in a future post). The biggest issue here is that you are likely introducing complex conditional logic with no way to unit test; this isn’t, per se, fixable; however, you can introduce some canary logic (again, this will probably be the feature of a future post).

References

https://docs.microsoft.com/en-us/azure/logic-apps/logic-apps-limits-and-config

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

https://stackoverflow.com/questions/28127001/the-lock-supplied-is-invalid-either-the-lock-expired-or-the-message-has-alread