Hi, I am the author of issue #3039.
I am writing again to report that we are still observing anomalies during the shutdown sequence. Specifically, we have noted that the Recover
command for in-flight messages is being dispatched immediately, without respecting the configured shutdown timeout allocated for the process to finalize its operations.
Upon enabling debug logging, we observe that the "Closing Rabbit Channel" log statement within the BlockingQueueConsumer.stop()
method occurs directly after the "Waiting for workers to finish" log:
2025-05-27T16:23:17.329+02:00 INFO 112517 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2025-05-27T16:23:17.330+02:00 DEBUG 112517 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://javamt@10.122.122.10:5673/,1), conn: Proxy@5e7687f6 Dedicated Rabbit Connection: SimpleConnection@39985b7d [delegate=amqp://javamt@10.122.122.10:5673/, localPort=44504]
2025-05-27T16:23:17.330+02:00 DEBUG 112517 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://javamt@10.122.122.10:5673/,1), conn: Proxy@4b4be8c7 Dedicated Rabbit Connection: SimpleConnection@3da25e84 [delegate=amqp://javamt@10.122.122.10:5673/, localPort=44494]
As a side effect of this behavior, we are seeing messages immediately being returned to the queues following the shutdown event. We have encountered a scenario where a message was returned to the same process that was shutting down.
TCP traffic logs reveal a rapid succession of events (the second column represents the timestamp):
While the root cause of this issue is unclear to us, we have attempted to analyze all points within the libraries where the stop()
method is invoked. One potential additional point of invocation we identified is the SimpleMessageListenerContainer.killOrRestart()
function, which generates the "Cancelling Consumer" log entry that we also observe in our logs:
2025-05-27T16:23:17.830+02:00 DEBUG 112517 --- [ntContainer#0-7] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@1704f67f: tags=[[amq.ctag-jkP8GAacjSlyXTCUTdaU6Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://javamt@10.122.122.10:5673/,2), conn: Proxy@14439550 Dedicated Rabbit Connection: SimpleConnection@107c0387 [delegate=amqp://javamt@10.122.122.10:5673/, localPort=44536], acknowledgeMode=AUTO local queue size=0
2025-05-27T16:23:17.830+02:00 DEBUG 112517 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@42e22a53: tags=[[amq.ctag-Lu24WkQzMZV1a0e3duvMgQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://javamt@10.122.122.10:5673/,1), conn: Proxy@798c9f6d Dedicated Rabbit Connection: SimpleConnection@29ece6aa [delegate=amqp://javamt@10.122.122.10:5673/, localPort=44518], acknowledgeMode=AUTO local queue size=0
2025-05-27T16:23:17.830+02:00 DEBUG 112517 --- [ntContainer#0-4] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@6e0f7aad: tags=[[amq.ctag-Fkjxuc814WrNZWmYkuv31A]], channel=Cached Rabbit Channel: AMQChannel(amqp://javamt@10.122.122.10:5673/,1), conn: Proxy@18733805 Dedicated Rabbit Connection: SimpleConnection@32ed8046 [delegate=amqp://javamt@10.122.122.10:5673/, localPort=44522], acknowledgeMode=AUTO local queue size=0
2025-05-27T16:23:17.830+02:00 INFO 112517 --- [ionShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
Regardless of the exact trigger, our expectation is that the process should await the configured shutdown timeout period before issuing the Recover
command.
We have conducted our tests using the following test project: https://github.com/marcogramy/rabbitmqdemo.git, which logs any redelivered messages. You can reproduce the redeliveries by starting the application, stopping it, and then restarting it. Upon the second execution, you should find logs indicating redelivered messages.
Comment From: artembilan
Hi @marcogramy !
Thank you for the great feedback and help too keep this project in a robust and clean state!
So, the Basic.Recover
command is initiated unconditionally when our consumer is closed already and we have done with processing:
if (this.transactional) {
/*
* Re-queue in-flight messages if any
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
* Does not require a tx.commit.
*/
this.channel.basicRecover(true);
}
This does not mean that there are any in-flight messages to rollback. The command is there just for safety.
I see like at about 10 seconds in between Waiting for workers to finish.
and Successfully waited for workers to finish.
.
And there are a lot of Received message
logs in between.
That mean that we are really waiting until internal queue is drained:
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
this.activeObjectCounter.release(this);
throw new ConsumerCancelledException();
}
The message.getMessageProperties().isRedelivered()
is about:
/**
* Get the redelivery flag included in this parameter envelope. This is a
* hint as to whether this message may have been delivered before (but not
* acknowledged). If the flag is not set, the message definitely has not
* been delivered before. If it is set, it may have been delivered before.
*
* @return the redelivery flag
*/
public boolean isRedeliver() {
That means a message has been handed to consumer, but there is no guarantee that this message has been able to reach the mentioned internal queue.
So, kinda a race condition between broker and consumer, but thanks to the cancel
and recover
commands we don't lose those messages.
Not, sure what else you'd like to see here: as long as handleDelivery()
callback does not add entries to the this.queue
, the nextMessage()
loop is exiting because of the mentioned condition.
You may play with factory.setReceiveTimeout();
which is 1 second by default.
So, there might be the case when this one second has been elapsed during cancelation, but handleDelivery()
did not make it in time.
We cannot make the container to always wait for the mentioned shutdown timeout since that is not a goal if we don't have in-flight messages to clean up.
From my point of view the behavior we see so far in your application is correct.
Perhaps just an impression of that isRedelivered()
is confusing...
Comment From: marcogramy
Hi Artem, thanks a lot for the quick reply!
My colleague and I took a couple of days to debug the library to provide you with the most precise indication possible of the problem we believe we've identified. Unfortunately, after two days of work, we've pinpointed a suspicious area but don't yet have a proposed solution.
What I’d really like to focus on is the immediate sequence between the Basic.Cancel
command and the Basic.Recover
. I saw in the commit that you removed the following section from the if condition:
if (this.transactional) {
/*
* Re-queue in-flight messages if any
* (after the consumer is cancelled to prevent the broker from
simply sending them back to us).
* Does not require a tx.commit.
*/
this.channel.basicRecover(true);
}
So now it's called unconditionally. I also noticed that a lot of messages were processed within the 10-second window, meaning your latest change did indeed have a positive effect.
What we still don't understand is why only some of the messages are processed correctly and not ALL of them. From the logs, the presence of "Closing Rabbit Channel" suggests that the stop()
method—and consequently the Basic-Recover
operation—is being called immediately without waiting for the ShutdownTimeout
.
From our tests, we've observed that upon receiving a shutdown command, the execution of the run()
method of the class SimpleMessageListenerContainer terminates, exiting the mainLoop()
and the killOrRestart()
function is called with aborted=false (indicating that no exception occurred).
What we suspect is happening, based on network traffic analysis, is that following a Basic.Deliver
command from the server, the client doesn't have enough time to send the Ack
because the killOrRestart()
function is immediately invoked, which then calls the stop()
method without delay.
We would expect that even under these conditions, after the Basic.Cancel()
command but before calling the stop()
method, the shutdown timeout would still be honored.
We hope this analysis proves helpful in identifying any potential anomalies.
Comment From: artembilan
So now it's called unconditionally.
I'm not sure what you mean with that.
The logic now is in the BlockingQueueConsumer.stop()
:
if (!cancelled()) {
basicCancel(true);
}
try {
if (this.transactional) {
/*
* Re-queue in-flight messages if any
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
* Does not require a tx.commit.
*/
this.channel.basicRecover(true);
}
}
This stop()
indeed is called from the AsyncMessageProcessingConsumer.killOrRestart(false)
.
That one, in turn, is called from the AsyncMessageProcessingConsumer.run()
after we exit from the main loop:
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
Since we have just called shutdown, only one of those conditions could be true to let us loop more. And that is exactly this.consumer.hasDelivery()
.
If no inflight messages in the consumer, we definitely has nothing to here any more.
And we are good to go to stop()
with respective Basic.Cancel
.
Feels like we are in this situation: https://www.rabbitmq.com/docs/confirms#automatic-requeueing.
Looks like there is no way to to wait for Basic.Ack
result.
We could, though, move this.channel.basicRecover(true)
call down to the InternalConsumer.handleCancelOk()
, essentially waiting for the broker answer to our just initiated Basic.Cancel
from that stop()
.
Not sure if that helps. And I'm not sure what you mean with a "shutdown timeout would still be honored". We do wait maximum of that time or exit because there are no inflight messages to handle yet. What do you propose?
Thanks
Comment From: marcogramy
Hi Artem, thanks for your response. We've spent additional time investigating this by downloading and running the library code locally.
We observed that with this commit:
https://github.com/spring-projects/spring-amqp/commit/4deb40c0facb6fd11b4e068e3655314689616993
certain changes were introduced, including the addition of the !cancelled()
condition at line 920.
However, upon comparing this with the latest current version:
https://github.com/spring-projects/spring-amqp/compare/v3.1.1...v3.1.11
we noticed that some modifications to the BlockingQueueConsumer
class were subsequently rollbacked.
This led us to question whether the !cancelled()
condition remains in the code in error (and perhaps should have been rollbacked as well). Given the subsequent fixes that have been implemented, is this condition still necessary in the latest version?
Our reasoning for this question is that, after experimenting with removing it, we no longer observed redelivered messages, as all in-flight messages were successfully ACKed.
In our tests last week, by analyzing the network traffic, we consistently found that every instance of a redelivered message occurred because the ACK for that message was not sent during shutdown.
What I intended to convey when referring to "shutdown timeout" is that our expectation from the library is its ability to send all ACKs within the duration specified by the shutdownTimeout variable.
Comment From: artembilan
I see, @marcogramy .
Thank you for such a detailed investigation!
How about you raise a Pull Request and we go from there?
The 3.1.12
next Monday is going to be the last Open Source release.
So, your fix would be a valuable improvement for last standing artifact.