In what version(s) of Spring AMQP are you seeing this issue?
For example:
3.2.6
Describe the bug
For sync Message handlers, the error handler (RabbitListenerErrorHandler
) can throw an AmqpRejectAndDontRequeueException
, the message is then deadlettered (asuming the in-queue has a deadletter-exchange configured).
For an async listeners (methods annotated with @RabbitListener
end returning Mono
or Future
or being a Kotlin suspend
functions) that is not the case. When the Error Hander throws any exception (including an AmqpRejectAndDontRequeueException
), the message will be nacked with requeue=true resulting an an endless loop. When Error handler returns a value, the messase is not acked either.
The logged error message is:
16:47:50.649 [DefaultDispatcher-worker-2 @coroutine#3288] ERROR o.s.a.r.l.a.MessagingMessageListenerAdapter - Future, Mono, or suspend function was completed with an exception for (Body:'{"this":"is not a valid message"}' MessageProperties [headers={x-delivery-count=3135, traceparent=00-68a5e0018bfa76845f473c7ef2501588-192f6d5acaefac54-00}, type=someEvent, correlationId=123-abc-456, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=..., deliveryTag=63, consumerTag=amq.ctag-FVilYqFsNGCOikPuqNJU6w, consumerQueue=...])
To Reproduce have a Listener
@Component
class SomeListener {
@RabbitListener(..., errorHandler = "errorHandler")
fun doSomething(message: Message) = Mono.error (
throw IllegalStateException("Oops, I cannot process any message")
)
}
@Component
class ErrorHandler: RabbitListenerErrorHandler {
override fun handleError(
amqpMessage: Message,
channel: Channel,
message: org.springframework.messaging.Message<*>?,
exception: ListenerExecutionFailedException,
): Unit? {
// just deadletter everything
throw AmqpRejectAndDontRequeueException(...)
}
Expected behavior
The message is deadlettered.
** Workaround **
Let the RabbitListenerErrorHandler
for these async rabbit listeners ack/nack the message directly insted of returning null
or throwing an exception. NOTE: you cannot reuse that errorhandler for sync rabbit Listeners then.
Comment From: artembilan
The logic there is like this:
protected void asyncFailure(org.springframework.amqp.core.Message request, @Nullable Channel channel, Throwable t,
@Nullable Object source) {
try {
handleException(request, channel, (Message<?>) source,
new ListenerExecutionFailedException("Async Fail", t, request));
return;
}
catch (Exception ex) {
// Ignore
}
super.asyncFailure(request, channel, t, source);
}
So, your AmqpRejectAndDontRequeueException
from the RabbitListenerErrorHandler
is just ignored.
We do say this in the docs:
Also starting with version 3.0.5, if a
RabbitListenerErrorHandler
is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure. See Handling Exceptions for more information about this error handler and its purpose.
So, feel like indeed a bug and we need to override that t
after the handleException()
failure before calling super.asyncFailure()
.
This way such an AmqpRejectAndDontRequeueException
from the RabbitListenerErrorHandler
would be handled properly.
For now, just don't use RabbitListenerErrorHandler
and throw that AmqpRejectAndDontRequeueException
directly from your listener method. Well, as a Mono.error
.