Category Archives: Azure Event Hub

Using Event Hubs to Create a Multi-player Game

In this previous post, I introduced Azure Event Hubs (to myself mostly). In this post, I’m going to attempt to create a very basic multi-player game, using Event Hubs to manage the multi-player part.

I’ve previously written extensively on MonoGame, although I haven’t done anything with it for a while.

Getting started with MonoGame

The first step is to install the MonoGame framework from here. At the time of writing, 3.6 was the latest version. This was released over a year ago, so maybe that says something about the direction this game platform is going. I’ve heard a lot of rumours about Microsoft buying Unity (which I believe is ultimately based on MonoGame) so it’s odd that they wouldn’t want to take this over (especially given that it’s a fork of XNA).

Once you’ve installed MonoGame, in VS create a new project; you’ll be beset by MonoGame project types:

For now, we’ll create a Windows MonoGame Project. For the purpose of this example, you’ll need to install the MonoGame.Extended NuGet package.

Let’s start off with a really simple game – this basically gives you a red dot on the screen, and you can move it around:

public class Game1 : Game
{
    GraphicsDeviceManager graphics;
    SpriteBatch spriteBatch;
    Vector2 _playerPosition;
    Random _random = new Random();
    private float _momentumUp = 0;
    private float _momentumRight = 0;
 
    public Game1()
    {
        graphics = new GraphicsDeviceManager(this);
        Content.RootDirectory = "Content";
    }
 
    protected override void Initialize()
    {
        _playerPosition = GetRandomVector();
        base.Initialize();
    }
 
    private Vector2 GetRandomVector()
    {
        Vector2 vector2 = new Vector2(_random.Next(GraphicsDevice.Viewport.Width), _random.Next(GraphicsDevice.Viewport.Height));
        return vector2;
    }
 
    protected override void LoadContent()
    {
        spriteBatch = new SpriteBatch(GraphicsDevice);
    }
 
    protected override void Update(GameTime gameTime)
    {
        if (GamePad.GetState(PlayerIndex.One).Buttons.Back == ButtonState.Pressed || Keyboard.GetState().IsKeyDown(Keys.Escape))
            Exit();
 
        if (Keyboard.GetState().IsKeyDown(Keys.A) || Keyboard.GetState().IsKeyDown(Keys.Left))
            _momentumRight -= 0.1f;
        else if (Keyboard.GetState().IsKeyDown(Keys.D) || Keyboard.GetState().IsKeyDown(Keys.Right))
            _momentumRight += 0.1f;
        else if (Keyboard.GetState().IsKeyDown(Keys.W) || Keyboard.GetState().IsKeyDown(Keys.Up))
            _momentumUp -= 0.1f;
        else if (Keyboard.GetState().IsKeyDown(Keys.X) || Keyboard.GetState().IsKeyDown(Keys.Down))
            _momentumUp += 0.1f;
 
        _playerPosition.X += (float)_momentumRight;
        _playerPosition.Y += (float)_momentumUp;
 
        base.Update(gameTime);
    }
 
    protected override void Draw(GameTime gameTime)
    {
        GraphicsDevice.Clear(Color.CornflowerBlue);
 
        spriteBatch.Begin();            
        spriteBatch.DrawCircle(_playerPosition, 10f, 30, new Color(255, 0, 0), 10);
        spriteBatch.End();
 
        base.Draw(gameTime);
    }
 
}

As you can see from the above code, we’re just moving a ball around the screen. If you installed this on a second machine, they would also be able to move that ball as well; at this stage, neither screen would be aware in any sense of the other screen. The next step is that we want to see the ball from the first screen on the second. We won’t mess about with collision, as this isn’t about the game, it’s about the Event Hub traffic.

The EventHub client library in the post mentioned above is for .Net Standard only, so we’ll need to use the Service Bus Client Library.

The basis of what we’ll do next is to simply take the current player position and send it to Event Hub; however, to cater for the fact that more than one player will have a position, we’ll need to identify it. Let’s start by creating a class to encompass this information:

public class PlayerData
{
    public float PlayerPositionX { get; set; }
    public float PlayerPositionY { get; set; }
 
    public Vector2 GetPlayerPosition()
    {
        return new Vector2(PlayerPositionX, PlayerPositionY);
    }
    public Guid PlayerId { get; set; } = Guid.NewGuid();
}

I’ve taken this opportunity to replace the Vector2, which is a struct, with class members. Changing the data in structs can end up with strange behaviour… and in this particular case, it point blank refuses to let you, but gives a message that says it can’t recognise the variable.

Now we’ll need to re-jig the calling code a little to reference this new class instead of the player position:

protected override void Initialize()
{
    _playerData = new PlayerData()
    {
        PlayerPositionX = _random.Next(GraphicsDevice.Viewport.Width),
        PlayerPositionY = _random.Next(GraphicsDevice.Viewport.Height)
    };
    base.Initialize();

EventHub

Let’s add the event hub code here. In this article, I discussed how you might deal with a situation where you’re trying to send non-critical data in rapid succession, using a locking mechanism. Let’s start by writing the code to send the message:

private static object _lock = new object();
private async Task SendPlayerInformation()
{
    if (Monitor.TryEnter(_lock))
    {
        string data = JsonConvert.SerializeObject(_playerData);
        await _eventHubSender.SendData(data);
    }
}

This will need to be called inside the Update method:

protected override void Update(GameTime gameTime)
{
    if (GamePad.GetState(PlayerIndex.One).Buttons.Back == ButtonState.Pressed || Keyboard.GetState().IsKeyDown(Keys.Escape))
        Exit();
 
    if (Keyboard.GetState().IsKeyDown(Keys.A) || Keyboard.GetState().IsKeyDown(Keys.Left))
        _momentumRight -= 0.1f;
    else if (Keyboard.GetState().IsKeyDown(Keys.D) || Keyboard.GetState().IsKeyDown(Keys.Right))
        _momentumRight += 0.1f;
    else if (Keyboard.GetState().IsKeyDown(Keys.W) || Keyboard.GetState().IsKeyDown(Keys.Up))
        _momentumUp -= 0.1f;
    else if (Keyboard.GetState().IsKeyDown(Keys.X) || Keyboard.GetState().IsKeyDown(Keys.Down))
        _momentumUp += 0.1f;
 
    if (_momentumRight != 0 || _momentumUp != 0)
    {
        _playerData.PlayerPositionX += (float)_momentumRight;
        _playerData.PlayerPositionY += (float)_momentumUp;
        SendPlayerInformation(); // Do not wait for this to finish
    }
 
    base.Update(gameTime);
}

SendPlayerInformation is a fire and forget method. Generally, using await statements inside MonoGame can cause some very strange, and difficult to diagnose issues. The reason being that the premise of a game loop is that it executes repeatedly – that’s how the game keeps updating: the Update method allows you to rapidly change the state of the game, and the Draw method allows that state to be rendered to the screen. However, if you stick an await in there, you’ve returned control to the caller, and because none of the MonoGame stack works with the async / await paradigm, the result is that the entire loop waits for your task. In the Draw method, this will result in an obvious blip in the rendering; but in the Update method, the effect will be more subtle.

Other Players

The next step is to implement the listener. Again, this is slightly different when using the service bus libraries:

public class EventHubListener : IEventProcessor
{
 
    public async Task Start(Guid hostId)
    {
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
 
        foreach (string partitionId in eventHub.PartitionIds)
        {
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            {
                PartitionId = partitionId
            }, new EventProcessorCheckpointManager());
 
            Console.WriteLine("Processing : " + partitionId);
        }
        
    }
 
    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }
 
    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        if (_startDate == null)
            _startDate = DateTime.UtcNow;
 
        System.Diagnostics.Debug.WriteLine("ProcessEventsAsync Start");
        System.Diagnostics.Debug.WriteLine(context.Lease.PartitionId);
 
        var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);
        foreach (EventData eventData in filteredMessages)
        {
            
            System.Diagnostics.Debug.WriteLine(context.ConsumerGroupName);
            System.Diagnostics.Debug.WriteLine(eventData.PartitionKey);                                
 
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            PlayerData data = JsonConvert.DeserializeObject<PlayerData>(bytes);

            // If we're processing this player then stop
            if (Game1.ThisPlayerGuid == data.PlayerId)
            {
                _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
                EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
 
                defaultConsumerGroup.UnregisterProcessor(context.Lease, CloseReason.Shutdown);
                System.Diagnostics.Debug.WriteLine("Unregistering listener...");
                return;
            }
 
            if (Game1.AllPlayerData.ContainsKey(data.PlayerId))
            {
                System.Diagnostics.Debug.WriteLine(data.PlayerId);
 
                var playerData = Game1.AllPlayerData[data.PlayerId];
                playerData.PlayerPositionX = data.PlayerPositionX;
                playerData.PlayerPositionY = data.PlayerPositionY;
            }
            else
            {
                Game1.AllPlayerData.Add(data.PlayerId, data);
            } 
        }
 
        await context.CheckpointAsync();
        System.Diagnostics.Debug.WriteLine("ProcessEventsAsync End");
    }

As you can see, there’s a bit of faffing to get this going. The code does seem a little convoluted, but essentially all we’re doing is finding any messages that were queued after we started processing (this is a very crude checkpoint), and then look for that Player Id; if we find it then we move it, and if we don’t then we create it. There’s also a little check in there to make sure we’re not processing echos!

As it stands, this only registers the clients on start-up, meaning that, whilst client 2 can see client 1, client 1 cannot see client 2.

As you can see, the player on instance one is replicated on instance two. Remember that it only starts broadcasting after movement, so the player will need to move to appear in the second instance.

In the next post, I’ll investigate the possibility of using a game controller to allow both clients to interact simultaneously (or at least be simultaneously visible).

References

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-framework-getstarted-send

http://www.bloggedbychris.com/2014/08/16/exploring-azure-event-hubs-code-implementation/

https://alexandrebrisebois.wordpress.com/2014/07/18/getting-acquainted-with-azure-service-bus-event-hubs/

https://mikhail.io/2017/05/reliable-consumer-of-azure-event-hubs/

Short Walks – Object Locking in C#

While playing with Azure Event Hubs, I decided that I wanted to implement a thread locking mechanism that didn’t queue. That is, I want to try and get a lock on the resource, and if it’s currently in use, just forget it and move on. The default behaviour in C# is to wait for the resource. For example, consider my method:

static async Task MyProcedure()
{
    Console.WriteLine($"Test1 {DateTime.Now}");
    await Task.Delay(5000);
    Console.WriteLine($"Test2 {DateTime.Now}");
}

I could execute this 5 times like so:

static async Task Main(string[] args)
{
    Parallel.For(1, 5, (a) =>
    {
        MyProcedure();
    });
 
    Console.ReadLine();
}

If I wanted to lock this (just bear with me and assume that makes sense for a minute), I might do this:

private static object _lock = new object();        
 
static async Task Main(string[] args)
{
    Parallel.For(1, 5, (a) =>
    {
        //MyProcedure();
        Lock();
    });
 
    Console.ReadLine();
}
 
static void Lock()
{
    Task.Run(() =>
    {
        lock (_lock)
        {
            MyProcedure().GetAwaiter().GetResult();
        }
    });
}

I re-jigged the code a bit, because you can’t await inside a lock statement, and obviously, just making the method call synchronous would not be locking the asynchronous call.

So now, I’ve successfully made my asynchronous method synchronous. Each execution of `MyProcedure` will happen sequentially, and that’s because `lock` queues the locking calls behind one another.

However, imagine the Event Hub scenario that’s referenced in the post above. I have, for example, a game, and it’s sending a large volume of telemetry up to the cloud. In my particular case, I’m sending a player’s current position. If I have a locking mechanism whereby the locks are queued then I could potentially get behind; and if that happens then, at best, the data sent to the cloud will be outdated and, at worse, it will use up game resources, potentially causing a lag.

After a bit of research, I found an alterntive:

private static object _lock = new object();        
 
static async Task Main(string[] args)
{
    Parallel.For(1, 5, (a) =>
    {
        //MyProcedure();
        //Lock();
        TestTryEnter();
    });
 
    Console.ReadLine();
}

static async Task TestTryEnter()
{
    bool lockTaken = false;
 
    try
    {
        Monitor.TryEnter(_lock, 0, ref lockTaken);
 
        if (lockTaken)
        {
            await MyProcedure();                                        
        }
        else
        {
            Console.WriteLine("Could not get lock");
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_lock);
        }
    }
}

So here, I try to get the lock, and if the resource is already locked, I simply give up and go home. There are obviously a very limited number of uses for this; however, my Event Hub scenario, described above, is one of them. Depending on the type of data that you’re transmitting, it may make much more sense to have a go, and if you’re in the middle of another call, simply abandon the current one.

Playing with Azure Event Hub

I’ve recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.

As a quick disclaimer, it’s worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post – you won’t find anything original here – just a record of my findings. You may find more (and more accurate) information in those.

Event Hub Namespace

The first step, as with many Azure services, is to create a namespace:

For a healthy amount of data transference, you’ll pay around £10 per month.

Finally, we’ll create event hub within the namespace:

When you create the event hub, it asks how many partitions you need. This basically splits the message delivery; and it’s clever enough to work out, if you have 3 partitions and two listeners that one should have two slots, and one, one slot:

We’ll need an access policy so that we have permission to listen:

New Console Apps

We’ll need to create two applications: a producer and a consumer.

Let’s start with a producer. Create a new console app and add this NuGet library.

Here’s the code:

class Program
{
    private static EventHubClient eventHubClient;
    private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1";
    private const string EhEntityPath = "pcm-eventhub1";
 
    public static async Task Main(string[] args)
    {
        EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
        {
            EntityPath = EhEntityPath
        };
 
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
 
        while (true)
        {
            Console.Write("Please enter message to send: ");
            string message = Console.ReadLine();
            if (string.IsNullOrWhiteSpace(message)) break;
 
            await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
        }
 
        await eventHubClient.CloseAsync();
 
        Console.WriteLine("Press ENTER to exit.");
        Console.ReadLine();
    }
}

Consumer

Next we’ll create a consumer; so the first thing we’ll need is to grant permissions for listening:

We’ll create a second new console application with this same library and the processor library, too.

class Program
{
    private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1";
    private const string EhEntityPath = "pcm-eventhub1";
    private const string StorageContainerName = "eventhub";
    private const string StorageAccountName = "pcmeventhubstorage";
    private const string StorageAccountKey = "key";
 
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
 
    static async Task Main(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
 
        var eventProcessorHost = new EventProcessorHost(
            EhEntityPath,
            PartitionReceiver.DefaultConsumerGroupName,
            EhConnectionString,
            StorageConnectionString,
            StorageContainerName);
 
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<EventsProcessor>();
 
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
 
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
}

class EventsProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
        return Task.CompletedTask;
    }
 
    public Task OpenAsync(PartitionContext context)
    {
        Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
        return Task.CompletedTask;
    }
 
    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
        return Task.CompletedTask;
    }
 
    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
        }
 
        return context.CheckpointAsync();
    }
}

As you can see, we can now transmit data through the Event Hub into client applications:

Footnotes

*Large, in terms of frequency, rather than volume – for example, transmitting a small message twice a second, rather than uploading a petabyte of data

References

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-receive-eph