Tag Archives: DLQ

Reading an Azure Dead Letter Queue with Azure.Messaging.ServiceBus

Some time ago, I wrote about how you can read the dead letter queue. I also wrote this post on the evolution of Azure Service Bus libraries.

By way of a recap, a Dead Letter Queue is a sub queue that allows for a message that would otherwise clog up a queue (for example, it can’t be processed for some reason) to go to the dead letter queue and be removed from processing.

In this post, I’ll cover how you can read a dead letter queue using the new Azure.Messaging.ServiceBus library.

Force a Dead Letter Message

There are basically two ways that a message can end up in a dead letter queue: either it breaks the rules (too many retries, too many redirects, etc.), or it is explicitly placed in a dead letter queue. To do the latter, the process is as follows:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var messageReceiver = serviceBusClient.CreateReceiver(QUEUE_NAME);
            var message = await messageReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            await messageReceiver.DeadLetterMessageAsync(message, "Really bad message");

The main difference here (other than that the previous method was DeadLetterAsync) is that you pass the entire message, rather than just the lock token.

Reading a Dead Letter Message

There’s a few quirks here – firstly, the dead letter reason, delivery count, etc. were part of an array known as SystemProperties, whereas they are now just properties – which does make them far more accessible and discoverable. Here’s the code to read the dead letter queue:

            var serviceBusClient = new ServiceBusClient(connectionString);
            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            
            var message = await deadLetterReceiver.ReceiveMessageAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);

            // Previous versions had these as properties
            // https://www.pmichaels.net/2021/01/23/read-the-dead-letter-queue/
            if (!string.IsNullOrWhiteSpace(message.DeadLetterReason))
            {
                Console.WriteLine("Reason: {0} ", message.DeadLetterReason);
            }
            if (!string.IsNullOrWhiteSpace(message.DeadLetterErrorDescription))
            {
                Console.WriteLine("Description: {0} ", message.DeadLetterErrorDescription);
            }

            Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.DeliveryCount}");

Again, most of the changes are simply naming. It’s worth mentioning the FormatDeadLetterPath() function. This was previously part of a static helper class EntityNameHelper; here, I’ve tried to replicate that behaviour locally (as it seems to have been removed):

        private static string QUEUE_NAME = "dead-letter-demo";
        private static string DEAD_LETTER_PATH = "$deadletterqueue";
        
        static string FormatDeadLetterPath() =>
            $"{QUEUE_NAME}/{DEAD_LETTER_PATH}";

Resubmitting a Dead Letter Message

This is something that I covered in my original post on this. It’s not inbuilt behaviour – but you basically copy the message and re-submit. In fact, this is much, much easier now:

            var serviceBusClient = new ServiceBusClient(connectionString);

            var deadLetterReceiver = serviceBusClient.CreateReceiver(FormatDeadLetterPath());
            var sender = serviceBusClient.CreateSender(QUEUE_NAME);

            var deadLetterMessage = await deadLetterReceiver.ReceiveMessageAsync();            

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = new ServiceBusMessage(deadLetterMessage);
            await sender.SendMessageAsync(resubmitMessage);
            //throw new Exception("aa"); // - to prove the transaction
            await deadLetterReceiver.CompleteMessageAsync(deadLetterMessage);

            scope.Complete();            

Most of what’s here has previously been covered; the old Message.Clone is now much neater (but slightly less obvious) in that you simply pass the old method in as a constructor parameter. Because the Dead Letter Reason, et al, are now properties, there’s no longer a need to manually deal with them not getting copied across.

The transaction simply checks that either the dead letter message is correctly re-submitted, or it remains a dead letter.

Summary

The new library makes the code much more concise and discoverable. We’ve seen how to force a dead letter; how to receive and view the contents of the Dead Letter Queue, and finally, how to resubmit a dead lettered message.

Azure Service Bus – Send an e-mail on Message Timeout

A message queue has, in its architecture, two main points of failure; the first is the situation where a message is added to a queue, but never read (or at least not read within a specified period of time); this is called a Dead Letter, and it is the subject of this post. The second is where the message is corrupt, or it breaks the reading logic in some way; that is known as a Poison Message.

There are a number of reasons that a message might not get read in the specified time: the service reading and processing the messages might not be keeping up with the supply, it might have crashed, the network connection might have failed.

One possible thing to do at this stage, is to have a process that automatically notifies someone that a message has ended up in the dead letter queue.

Step One – specify a timeout

Here’s how you would specify a timeout on the message specifically:

           BrokeredMessage message = new BrokeredMessage(messageBody)
            {
                MessageId = id,
                TimeToLive = new TimeSpan(0, 5, 0)
            };

Or, you can create a default on the queue from the QueueDescription (typically this would be done when you initially create the queue:

                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0)
                };
                nm.CreateQueue(qd);

Should these values differ, the shortest time will be taken.

What happens to the message by default?

I’ve added a message to the queue using the default timeout of 5 minutes; here it is happily sitting in the queue:

Looking at the properties of the queue, we can determine that the “TimeToLive” is, indeed, 5 minutes:

In addition, you can see that, by default, the flag telling Service Bus to move the message to a dead letter queue is not checked. This means that the message will not be moved to the dead letter queue.

5 Minutes later:

Nothing has happened to this queue, except time passing. The message has now been discarded. It seems an odd behaviour; however, as with ReadAndDelete Locks there may be reasons that this behaviour is required.

Step Two – Dead Letters

If you want to actually do something with the expired message, the key is a concept called “Dead Lettering”. The following code will direct the Service Bus to put the offending message into the “Dead Letter Queue”:

                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0),
                    EnableDeadLetteringOnMessageExpiration = true
                };
                nm.CreateQueue(qd);

Here’s the result for the same test:

Step Three – Doing something with this…

Okay – so the message hasn’t been processed, and it’s now sat in a queue specially designed for that kind of thing, so what can we do with it? One possible thing is to create a piece of software that monitors this queue. This is an adaptation of the code that I originally created here:

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static bool InitialiseClient()
        {
            Uri uri = ServiceManagementHelper.GetServiceUri();
            TokenProvider tokenProvider = ServiceManagementHelper.GetTokenProvider(uri);

            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);
            return nm.QueueExists("TestQueue");
        }

        private static string ReadMessage(string queueName)
        {
            QueueClient client = QueueManagementHelper.GetQueueClient(queueName, true);

            BrokeredMessage message = client.Receive();
            if (message == null) return string.Empty;
            string messageBody = message.GetBody<string>();

            //message.Complete();

            return messageBody;
        }

If this was all that we had to monitor the queue, then somebody’s job would need to be to watch this application. That may make sense, depending on the nature of the business; however, we could simply notify the person in question that there’s a problem. Now, if only the internet had a concept of an offline messaging facility that works something akin to the postal service, only faster…

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");

                    Console.WriteLine($"{DateTime.Now}: Send e-mail");
                    SendEmail(message);
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static void SendEmail(string messageText)
        {
            System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
            message.To.Add("[email protected]");
            message.Subject = "Message in queue has expired";
            message.From = new System.Net.Mail.MailAddress("[email protected]");
            message.Body = messageText;
            System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("smtp.live.com");
            smtp.Port = 587;
            smtp.UseDefaultCredentials = false;
            smtp.Credentials = new System.Net.NetworkCredential("[email protected]", "passw0rd");
            smtp.EnableSsl = true;
            smtp.Send(message);
        }

In order to prevent a torrent of mails, you might want to put a delay in this code, or even maintain some kind of list so that you only send one mail per day.

References

https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.queuedescription.enabledeadletteringonmessageexpiration?view=azureservicebus-4.0.0#Microsoft_ServiceBus_Messaging_QueueDescription_EnableDeadLetteringOnMessageExpiration

https://www.codit.eu/blog/2015/01/automatically-expire-messages-in-azure-service-bus-how-it-works/

https://stackoverflow.com/questions/9851319/how-to-add-smtp-hotmail-account-to-send-mail

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:

private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9