This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.
The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.
To this end, I’ve extended my helper class to include a TopicHelper:
public class TopicHelper : IDisposable { IReceiveTopicBehaviour _receiveBehaviour = null; ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" }; IConnection _connection = null; IModel _channel = null; EventingBasicConsumer _consumer = null; private void SetupChannel() { if (_connection == null) { _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); } } private void SetupConsumer() { if (_channel == null) throw new Exception("Channel is not set-up"); if (_connection == null) throw new Exception("Connection is not set-up"); if (_consumer == null) _consumer = new EventingBasicConsumer(_channel); } public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour) { _receiveBehaviour = behaviour; SetupChannel(); SetupConsumer(); _channel.ExchangeDeclare(topicName, "topic"); var queue = _channel.QueueDeclare(); var queueName = queue.QueueName; _channel.QueueBind(queue: queueName, exchange: topicName, routingKey: key); _consumer.Received += Consumer_Received; _channel.BasicConsume(queueName, true, _consumer); } private void Consumer_Received(object sender, BasicDeliverEventArgs e) { var body = e.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(message); _receiveBehaviour.OnReceive(message); } public void SendTopic(string topicName, string key, string data) { SetupChannel(); _channel.ExchangeDeclare(topicName, "topic"); _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data)); } public void Dispose() { _connection.Dispose(); _channel.Dispose(); } }
There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:
The call to set-up the alert is this:
public partial class MainWindow : Window { private TopicHelper _topicHelper = new TopicHelper(); public MainWindow() { InitializeComponent(); Task.Run(() => { ReceiveTopic recTopic = new ReceiveTopic(); _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic); }); } }
I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:
public class ReceiveTopic : IReceiveTopicBehaviour { public void OnReceive(string message) { System.Windows.MessageBox.Show(message); } }
This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.
The final piece of the puzzle is the send alert; in my case this is a console app:
Here’s the code for the main function:
static void Main(string[] args) { Console.WriteLine("Alert: "); string alertText = Console.ReadLine(); using (TopicHelper helper = new TopicHelper()) { helper.SendTopic("Alerts", "thisApp", alertText); } Console.WriteLine("Finished"); Console.ReadLine(); }
Does it work?
Yes, of course.