0
votes

I need to send my messages to Dead letter queue from azure topic subscription incase of any error while reading and processing the message from topic. So I tried testing pushing message directly to DLQ.

My sample code will be like

static void sendMessage()
{
    // create a Service Bus Sender client for the queue 
    ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sender()
            .topicName(topicName)
            
            .buildClient();

    // send one message to the topic
    
    
    senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));    


}
static void resceiveAsync() {
    ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .topicName(topicName)
            .subscriptionName(subName)
            .buildAsyncClient();

        // receive() operation continuously fetches messages until the subscription is disposed.
        // The stream is infinite, and completes when the subscription or receiver is closed.
        Disposable subscription = receiver.receiveMessages().subscribe(message -> {

            System.out.printf("Id: %s%n", message.getMessageId());
            System.out.printf("Contents: %s%n", message.getBody().toString());
        }, error -> {
                System.err.println("Error occurred while receiving messages: " + error);
            }, () -> {
                System.out.println("Finished receiving messages.");
            });

        // Continue application processing. When you are finished receiving messages, dispose of the subscription.
        subscription.dispose();

        // When you are done using the receiver, dispose of it.
        receiver.close();
    
    
    
}

I tried getting the deadletter queue path

    String dlq = EntityNameHelper.formatDeadLetterPath(topicName);

I got path of dead letter queue like = "mytopic/$deadletterqueue"

But It's not working while passing path as topic name. It throwing a Entity topic not found exception.

Any one can you please advise me on this

Reference : How to move error message to Azure dead letter queue using Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#moving-messages-to-the-dlq

How to push the failure messages to Azure service bus Dead Letter Queue in Spring Boot Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-topics-subscriptions-legacy#receive-messages-from-a-subscription

1

1 Answers

1
votes

You probably know that a message will be automatically moved to the deadletter queue if you throw exceptions during processing, and the maximum delievery count is exceeded. If you want to explicitly move the message to the DLQ, you can do so as well. A common case for this is if you know that the message can never succeed because of its contents.

You cannot send new messages directly to the DLQ, because then you would have two messages in the system. You need to call a special operation on the parent entity. Also, <topic path>/$deadletterqueue does not work, because this would be the DLQ of all subscriptions. The correct entity path is built like this:

<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue

https://github.com/Azure/azure-service-bus/blob/master/samples/Java/azure-servicebus/DeadletterQueue/src/main/java/com/microsoft/azure/servicebus/samples/deadletterqueue/DeadletterQueue.java

This sample code is for queues, but you should be able to adapt it to topics quite easily:

    // register the RegisterMessageHandler callback
    receiver.registerMessageHandler(
            new IMessageHandler() {
                // callback invoked when the message handler loop has obtained a message
                public CompletableFuture<Void> onMessageAsync(IMessage message) {
                    // receives message is passed to callback
                    if (message.getLabel() != null &&
                            message.getContentType() != null &&
                            message.getLabel().contentEquals("Scientist") &&
                            message.getContentType().contentEquals("application/json")) {

                        // ...
                    } else {
                        return receiver.deadLetterAsync(message.getLockToken());
                    }
                    return receiver.completeAsync(message.getLockToken());
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
            },
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
            executorService);