In what version(s) of Spring AMQP are you seeing this issue?

For example:

3.2.6

Describe the bug

For sync Message handlers, the error handler (RabbitListenerErrorHandler) can throw an AmqpRejectAndDontRequeueException, the message is then deadlettered (asuming the in-queue has a deadletter-exchange configured).

For an async listeners (methods annotated with @RabbitListener end returning Mono or Future or being a Kotlin suspend functions) that is not the case. When the Error Hander throws any exception (including an AmqpRejectAndDontRequeueException), the message will be nacked with requeue=true resulting an an endless loop. When Error handler returns a value, the messase is not acked either.

The logged error message is:

16:47:50.649 [DefaultDispatcher-worker-2 @coroutine#3288] ERROR o.s.a.r.l.a.MessagingMessageListenerAdapter - Future, Mono, or suspend function was completed with an exception for (Body:'{"this":"is not a valid message"}' MessageProperties [headers={x-delivery-count=3135, traceparent=00-68a5e0018bfa76845f473c7ef2501588-192f6d5acaefac54-00}, type=someEvent, correlationId=123-abc-456, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=..., deliveryTag=63, consumerTag=amq.ctag-FVilYqFsNGCOikPuqNJU6w, consumerQueue=...])

To Reproduce have a Listener

@Component
class SomeListener {
   @RabbitListener(..., errorHandler = "errorHandler")
   fun doSomething(message: Message) = Mono.error (
      throw IllegalStateException("Oops, I cannot process any message")
   )
}

@Component
class ErrorHandler: RabbitListenerErrorHandler {
    override fun handleError(
            amqpMessage: Message,
            channel: Channel,
            message: org.springframework.messaging.Message<*>?,
            exception: ListenerExecutionFailedException,
        ): Unit? {
       // just deadletter everything
       throw AmqpRejectAndDontRequeueException(...)
    }

Expected behavior

The message is deadlettered.

** Workaround **

Let the RabbitListenerErrorHandler for these async rabbit listeners ack/nack the message directly insted of returning null or throwing an exception. NOTE: you cannot reuse that errorhandler for sync rabbit Listeners then.

Comment From: artembilan

The logic there is like this:

    protected void asyncFailure(org.springframework.amqp.core.Message request, @Nullable Channel channel, Throwable t,
            @Nullable Object source) {

        try {
            handleException(request, channel, (Message<?>) source,
                    new ListenerExecutionFailedException("Async Fail", t, request));
            return;
        }
        catch (Exception ex) {
            // Ignore
        }
        super.asyncFailure(request, channel, t, source);
    }

So, your AmqpRejectAndDontRequeueException from the RabbitListenerErrorHandler is just ignored.

We do say this in the docs:

Also starting with version 3.0.5, if a RabbitListenerErrorHandler is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure. See Handling Exceptions for more information about this error handler and its purpose.

So, feel like indeed a bug and we need to override that t after the handleException() failure before calling super.asyncFailure(). This way such an AmqpRejectAndDontRequeueException from the RabbitListenerErrorHandler would be handled properly.

For now, just don't use RabbitListenerErrorHandler and throw that AmqpRejectAndDontRequeueException directly from your listener method. Well, as a Mono.error.