Tag Archives: DLQ

Azure Service Bus – Send an e-mail on Message Timeout

A message queue has, in its architecture, two main points of failure; the first is the situation where a message is added to a queue, but never read (or at least not read within a specified period of time); this is called a Dead Letter, and it is the subject of this post. The second is where the message is corrupt, or it breaks the reading logic in some way; that is known as a Poison Message.

There are a number of reasons that a message might not get read in the specified time: the service reading and processing the messages might not be keeping up with the supply, it might have crashed, the network connection might have failed.

One possible thing to do at this stage, is to have a process that automatically notifies someone that a message has ended up in the dead letter queue.

Step One – specify a timeout

Here’s how you would specify a timeout on the message specifically:

           BrokeredMessage message = new BrokeredMessage(messageBody)
            {
                MessageId = id,
                TimeToLive = new TimeSpan(0, 5, 0)
            };

Or, you can create a default on the queue from the QueueDescription (typically this would be done when you initially create the queue:

                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0)
                };
                nm.CreateQueue(qd);

Should these values differ, the shortest time will be taken.

What happens to the message by default?

I’ve added a message to the queue using the default timeout of 5 minutes; here it is happily sitting in the queue:

Looking at the properties of the queue, we can determine that the “TimeToLive” is, indeed, 5 minutes:

In addition, you can see that, by default, the flag telling Service Bus to move the message to a dead letter queue is not checked. This means that the message will not be moved to the dead letter queue.

5 Minutes later:

Nothing has happened to this queue, except time passing. The message has now been discarded. It seems an odd behaviour; however, as with ReadAndDelete Locks there may be reasons that this behaviour is required.

Step Two – Dead Letters

If you want to actually do something with the expired message, the key is a concept called “Dead Lettering”. The following code will direct the Service Bus to put the offending message into the “Dead Letter Queue”:

                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0),
                    EnableDeadLetteringOnMessageExpiration = true
                };
                nm.CreateQueue(qd);

Here’s the result for the same test:

Step Three – Doing something with this…

Okay – so the message hasn’t been processed, and it’s now sat in a queue specially designed for that kind of thing, so what can we do with it? One possible thing is to create a piece of software that monitors this queue. This is an adaptation of the code that I originally created here:

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static bool InitialiseClient()
        {
            Uri uri = ServiceManagementHelper.GetServiceUri();
            TokenProvider tokenProvider = ServiceManagementHelper.GetTokenProvider(uri);

            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);
            return nm.QueueExists("TestQueue");
        }

        private static string ReadMessage(string queueName)
        {
            QueueClient client = QueueManagementHelper.GetQueueClient(queueName, true);

            BrokeredMessage message = client.Receive();
            if (message == null) return string.Empty;
            string messageBody = message.GetBody<string>();

            //message.Complete();

            return messageBody;
        }

If this was all that we had to monitor the queue, then somebody’s job would need to be to watch this application. That may make sense, depending on the nature of the business; however, we could simply notify the person in question that there’s a problem. Now, if only the internet had a concept of an offline messaging facility that works something akin to the postal service, only faster…

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");

                    Console.WriteLine($"{DateTime.Now}: Send e-mail");
                    SendEmail(message);
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static void SendEmail(string messageText)
        {
            System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
            message.To.Add("[email protected]");
            message.Subject = "Message in queue has expired";
            message.From = new System.Net.Mail.MailAddress("[email protected]");
            message.Body = messageText;
            System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("smtp.live.com");
            smtp.Port = 587;
            smtp.UseDefaultCredentials = false;
            smtp.Credentials = new System.Net.NetworkCredential("[email protected]", "passw0rd");
            smtp.EnableSsl = true;
            smtp.Send(message);
        }

In order to prevent a torrent of mails, you might want to put a delay in this code, or even maintain some kind of list so that you only send one mail per day.

References

https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.queuedescription.enabledeadletteringonmessageexpiration?view=azureservicebus-4.0.0#Microsoft_ServiceBus_Messaging_QueueDescription_EnableDeadLetteringOnMessageExpiration

https://www.codit.eu/blog/2015/01/automatically-expire-messages-in-azure-service-bus-how-it-works/

https://stackoverflow.com/questions/9851319/how-to-add-smtp-hotmail-account-to-send-mail

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:

private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9