I’ve recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.
As a quick disclaimer, it’s worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post – you won’t find anything original here – just a record of my findings. You may find more (and more accurate) information in those.
Event Hub Namespace
The first step, as with many Azure services, is to create a namespace:
For a healthy amount of data transference, you’ll pay around £10 per month.
Finally, we’ll create event hub within the namespace:
When you create the event hub, it asks how many partitions you need. This basically splits the message delivery; and it’s clever enough to work out, if you have 3 partitions and two listeners that one should have two slots, and one, one slot:
We’ll need an access policy so that we have permission to listen:
New Console Apps
We’ll need to create two applications: a producer and a consumer.
Let’s start with a producer. Create a new console app and add this NuGet library.
Here’s the code:
class Program { private static EventHubClient eventHubClient; private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1"; private const string EhEntityPath = "pcm-eventhub1"; public static async Task Main(string[] args) { EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString) { EntityPath = EhEntityPath }; eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); while (true) { Console.Write("Please enter message to send: "); string message = Console.ReadLine(); if (string.IsNullOrWhiteSpace(message)) break; await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message))); } await eventHubClient.CloseAsync(); Console.WriteLine("Press ENTER to exit."); Console.ReadLine(); } }
Consumer
Next we’ll create a consumer; so the first thing we’ll need is to grant permissions for listening:
We’ll create a second new console application with this same library and the processor library, too.
class Program { private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1"; private const string EhEntityPath = "pcm-eventhub1"; private const string StorageContainerName = "eventhub"; private const string StorageAccountName = "pcmeventhubstorage"; private const string StorageAccountKey = "key"; private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey); static async Task Main(string[] args) { Console.WriteLine("Registering EventProcessor..."); var eventProcessorHost = new EventProcessorHost( EhEntityPath, PartitionReceiver.DefaultConsumerGroupName, EhConnectionString, StorageConnectionString, StorageContainerName); // Registers the Event Processor Host and starts receiving messages await eventProcessorHost.RegisterEventProcessorAsync<EventsProcessor>(); Console.WriteLine("Receiving. Press ENTER to stop worker."); Console.ReadLine(); // Disposes of the Event Processor Host await eventProcessorHost.UnregisterEventProcessorAsync(); } } class EventsProcessor : IEventProcessor { public Task CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'."); return Task.CompletedTask; } public Task OpenAsync(PartitionContext context) { Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'"); return Task.CompletedTask; } public Task ProcessErrorAsync(PartitionContext context, Exception error) { Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}"); return Task.CompletedTask; } public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count); Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'"); } return context.CheckpointAsync(); } }
As you can see, we can now transmit data through the Event Hub into client applications:
Footnotes
*Large, in terms of frequency, rather than volume – for example, transmitting a small message twice a second, rather than uploading a petabyte of data
References
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-receive-eph