Tag Archives: Poison Message

Read the Dead Letter Queue

I’ve been writing about and speaking about Azure Service Bus a lot recently.

In this post, I’m going to focus on the Dead Letter Queue in more detail.

What is the Dead Letter Queue, and what has it ever done for me?

To describe what the dead letter queue does, I invite you to think about an assembly line for a car. The car in question has just come through to have a bonnet fitted (hood for any American readers). However, the guy that’s fitting the bonnet can’t get it to sit right in the hinges; he tries and tries, but it won’t fit. After a while, he goes to get his superviser, and they both try. They draft the workers in from all over the plant, but can’t get the bonnet fitted.

Meanwhile, the entire assembly line has stopped. The person that fits the steering wheel is behind the bonnet fitter, and there’s no space for him to move the car that he’s just fitted the wheel to; the dashboard fitter can’t pass onto the steering wheel, and so on.

(I have no knowledge of what a car assembly line looks like, outside of the film Christine, so apologies if this is incorrect).

A message that can’t be processed is often called a poison message, and it causes exactly this problem. The Service Bus can’t deliver any messages until this message has gone, and this message can’t go, because there’s something wrong with it. The solution is to have a dedicated queue that holds these messages: it’s called a Dead Letter Queue – it’s kind of like a holding bay for the car.

Why would a message be “poison”

There are a few reasons that a message can be considered “poison” and dead lettered; some of the most common are:

– Each queue has a maximum delivery count, if it’s exceeded – that is, we’ve tried too many times to process it
– The message can be explicitly marked as bad by the client
– The size of the message is bigger than the allocated maximum size
– The message has been “auto-forwarded” too many times

Essentially, the system tries to work out whether this message is staying around too long and causing issues with the system. It’s important to know, though, that the dead letter queue is just another queue. The message isn’t lost – just side-lined.

Dead Lettering

Let’s see how we can force a message into a dead letter queue. The easiest way to do this is to explicitly just Dead Letter the message; for example:

            var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
            var message = await messageReceiver.ReceiveAsync();

            await messageReceiver.DeadLetterAsync(message.SystemProperties.LockToken, "Really bad message");

Here, we’ve read the message, and then told Service Bus to just Dead Letter it. In real life, you may choose to do this on rare occasions, but I imagine its main use is for testing.

Abandon the Message

Another way to cause a message to be dead lettered is to exceed the Max Delivery Count. You can do this by “abandoning” the message multiple times; for example:

var messageReceiver = new MessageReceiver(connectionString, QUEUE_NAME);
var message = await messageReceiver.ReceiveAsync();

string messageBody = Encoding.UTF8.GetString(message.Body);

Console.WriteLine($"Message {message.MessageId} ({messageBody}) had a delivery count of {message.SystemProperties.DeliveryCount}");
await messageReceiver.AbandonAsync(message.SystemProperties.LockToken);

Here, we’re reading the message, and rather than completing it, we’re abandoning it. It’s worth bearing in mind that this is what happens when you abandon a message. It’s also what happens when you read a message and just implicitly abandon it (i.e., you read it on a PeekLock and then do nothing): the AbandonAsync method doesn’t actually change the functionality of the code above – it does change the speed, though.

Reading The Dead Letter Queue

Now that we’ve dead-lettered a message, we can read the Dead Letter Queue.

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
            
            var message = await deadLetterReceiver.ReceiveAsync();

            string messageBody = Encoding.UTF8.GetString(message.Body);

            Console.WriteLine("Message received: {0}", messageBody);
            if (message.UserProperties.ContainsKey("DeadLetterReason"))
            {
                Console.WriteLine("Reason: {0} ", message.UserProperties["DeadLetterReason"]);
            }
            if (message.UserProperties.ContainsKey("DeadLetterErrorDescription"))
            {
                Console.WriteLine("Description: {0} ", message.UserProperties["DeadLetterErrorDescription"]);
            }

The code above sets up a MessageReceiver for the dead letter queue. The delivery count inside the dead letter queue does not increase, but it does retain the number that it had from the original queue. Effectively, all you can do with a Dead Letter message is to complete it.

DeadLetterReason

When a message is dead lettered, the properties DeadLetterReason and DeadLetterErrorDescription may get added to the message. If you forcibly dead letter the message then you have the option to add this: if you choose not to then it will not be present (hence the checks around the properties), but mostly, these will be available.

Re-submitting a Message and Transactions

We’ve now seen how to cause a message to Dead Letter, and read the Dead Letter queue; next we’re going to investigate re-submitting the message.

As a quick side not – you can’t really re-submit a message – as you’ll see, what we actually do is to complete the dead letter message, and send a copy back to the queue.

            var serviceBusConnection = new ServiceBusConnection(connectionString);

            var deadletterPath = EntityNameHelper.FormatDeadLetterPath(QUEUE_NAME);
            var deadLetterReceiver = new MessageReceiver(serviceBusConnection, deadletterPath, ReceiveMode.PeekLock);
            
            var queueClient = new QueueClient(serviceBusConnection, QUEUE_NAME, ReceiveMode.PeekLock, RetryPolicy.Default);

            var deadLetterMessage = await deadLetterReceiver.ReceiveAsync();

            using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

            var resubmitMessage = deadLetterMessage.Clone();

            resubmitMessage.UserProperties.Remove("DeadLetterReason");
            resubmitMessage.UserProperties.Remove("DeadLetterErrorDescription");
            
            await queueClient.SendAsync(resubmitMessage);
            await deadLetterReceiver.CompleteAsync(deadLetterMessage.SystemProperties.LockToken);            

            scope.Complete();            

There’s a few points to note in the above code:

FormatDeadLetterPath

FormatDeadLetterPath gives you the entity path for the dead letter queue, based on an entity.

Transaction Scope

The scope ensures that everything between its creation and completion happens as a single transaction. That is, if part of that fails, the whole thing fails. For example, you could add a throw new exception between the send and the complete, and the new message will not send.

We’re using the new C# 8 using statement – that is, it will apply to everything between it, and the end of the method.

ServiceBusConnection

There are several overloads for most of these methods, and typically, you can pass a connection string into the constructor – for example, MessageReceiver could be called like this:

new MessageReceiver(connectionString, QUEUE_NAME);

Typically, you can use this and it works exactly the same as if you established your own connection and passed that through; however, with a transaction, everything needs to share a connection. If they do not, then you may see an error such as this:

Transaction hasn’t been declared yet, or has already been discharged

Hence we’re creating the connection upfront.

References

https://blogs.infosupport.com/implementing-a-retry-pattern-for-azure-service-bus-with-topic-filters/

https://stackoverflow.com/questions/38784331/how-to-peek-the-deadletter-messages

https://github.com/Azure/azure-service-bus/pull/91

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:

private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9