Expected Behavior
When the server receives SIGINT or SIGTERM, I would like the user code that is consuming the messages to be able to react. A simple way would be to set the worker threads interrupt flag, so that user code can check it (or rely on functions like Thread.sleep() to check it). This is useful for consummers that take a long time to consume each message (e.g. low bandwidth queue with big computations for each message)
Current Behavior If I understood correctly, the only possible options now are to wait for all prefeteched messages to be consummed normally, or to wait for only the current messages to be consummed normally. There are no options to interrupt the threads consumming the message. After a short time, the predestroy/shutdown hook stops waiting for consummer and returns, and so the jvm halts all running threads without any notice.
Context
I would like to be able to store in my database that a message consumption was interrupted and what my code was doing when it was interrupted (a stacktrace captures this information nicely)
I could install my own predestroy/shutdownhook to interrupt the threads, but you need deep understanding of very low level jvm apis to do it correctly, you need to get a reference to the thread. It's doable at the begining of every message consumption and store the threads in a list of things to interrupt but a bit hard to do.
Also the fact that it doesn't exist suggests that it's not really a good idea, maybe documentation could clarify the perception around this idea.
I'm using spring-cloud-stream and creating a java.util.function.Consumer
spring.cloud.function.definition=consumeRun
spring.cloud.stream.bindings.consumeRun-in-0.destination=myQueue
spring.cloud.stream.bindings.consumeRun-in-0.group= myGroup
spring.cloud.stream.bindings.consumeRun-in-0.consumer.concurrency=2
@Bean public Consumer<Message<String>> consumeRun() { return mySlowComputationIntensiveConsumerBean; }
Thanks in advance
Comment From: artembilan
For better understanding that problem, I'd like to see what you want to have fixed in Spring AMQP and how that would help the target end-user consumer? Any chances elaborating more? Thanks
Comment From: madanhk18
Can I work on this issue? If it's possible please \assign-me
Comment From: artembilan
@madanhk18 ,
I said before: we cannot assign to the issue someone who is not team member. Please, stop asking about that.
As you see according to my comment it is not clear what is the problem and what should be done. If you have any idea, please, share it here.
Comment From: jonenst
Hi @artembilan, From memory, the difficulties arise when consuming a messages takes a lot of time (>10s) and you need to ack after finishing consuming (for exemple maybe because the consuming could fail). In this case, if you CTRL C the jvm while messages are consumed it takes a long time. When it timeouts and kill the jvm eventually, it is not easy to associate with the message which event lead to the failure (even though the message is correctly redelivered to another consumer). Knowing the reason why a message was redelivered is helpful for exemple because it allows to implement more precise poison message detection strategies.
I can work on a short code exemple if you want. Would that help ?
Comment From: artembilan
Knowing the reason why a message was redelivered is helpful
OK. So, you mean to do Thread.currentThread().interrupt();
in the main loop of the listener container before calling end-user listener when we are about to be stopped?
Yes, sample would be great. Thanks
Comment From: jonenst
hi @artembilan , using https://github.com/spring-projects/spring-amqp-samples/blob/main/helloworld/src/main/java/org/springframework/amqp/helloworld/async/HelloWorldHandler.java, add to the handler
try {
Thread.sleep(Duration.ofSeconds(20));
} catch (InterruptedException e) {
System.out.println("Interrupt detected, stopping gracefully");
Thread.currentThread().interrupt();
}
Also add code to close the context to demonstrate what happens ( for example in https://github.com/spring-projects/spring-amqp-samples/blob/main/helloworld/src/main/java/org/springframework/amqp/helloworld/async/Consumer.java add a sleep as well and call close() on the context; or add a shutdownhook calling ctx.close() like springboot does in springapplication.run and send kill -SIGINT to the process)
Running with -Dorg.apache.logging.log4j.simplelog.level=debug to see the logs even without a log4j2 implementation
DEBUG AnnotationConfigApplicationContext Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@63e2203c, started on Thu Jul 24 14:33:31 CEST 2025
DEBUG DefaultLifecycleProcessor Stopping beans in phase 2147483647
DEBUG SimpleMessageListenerContainer Shutting down Rabbit listener container
INFO SimpleMessageListenerContainer Waiting for workers to finish.
DEBUG BlockingQueueConsumer Received cancelOk for tag amq.ctag-tFjAh1b5LE8rcOwGkcNMaA (hello.world.queue); Consumer@2320fa6f: tags=[[amq.ctag-tFjAh1b5LE8rcOwGkcNMaA]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@32193bea Shared Rabbit Connection: SimpleConnection@63a270c9 [delegate=amqp://guest@127.0.0.1:5672/, localPort=36360], acknowledgeMode=AUTO local queue size=249
INFO SimpleMessageListenerContainer Workers not finished.
WARN SimpleMessageListenerContainer Closing channel for unresponsive consumer: Consumer@2320fa6f: tags=[[amq.ctag-tFjAh1b5LE8rcOwGkcNMaA]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@32193bea Shared Rabbit Connection: SimpleConnection@63a270c9 [delegate=amqp://guest@127.0.0.1:5672/, localPort=36360], acknowledgeMode=AUTO local queue size=249
Notice the long delay to close the context (which is the expected behavior of the code of course, see the log "Closing channel for unresponsive consumer" after a timeout)
The listener is called "unresponsive" but what is it supposed to have responded to ? I don't see any API for the container to notify the user handler that it should stop what it's doing.
Using thread interruption for this is a very complicated system but has the advantage to work with many jdk methods. Or alternatively a new method "close" or something of SMLC or each blockingqueue consumer or whatever can be called to signal to the usercode that it needs to stop what it's doing.
Let me know if you need more information ! Jon
Comment From: artembilan
Thanks for explanation, @jonenst !
I understand the current behavior you show, without an ability to really interrupt end-user code.
So, apparently my assumption that you want Thread.currentThread().interrupt();
in the consumer loop is correct.
Please, confirm.
This feel then like further evolution of that Waiting for workers to finish.
And before we log Workers not finished
due to shoutdown timeout, it would be great to notify those consumers to be interrupted.
I'm not sure how to do that since we are blocked by the end-user callback in that main loop.
If we are on the same page, I will look deeper what could be done.
I believe we are using some Executor
there for those consumers, so we can probably cancel(true)
their executions.
Comment From: jonenst
Hi,
So, apparently my assumption that you want Thread.currentThread().interrupt(); in the consumer loop is correct. Please, confirm.
Well mostly yes I imagine, except it should be the shutdown thread that interrupts all the consumer threads, not each thread interrupting itself with Thread.currentThread().interrupt()
I agree with you that it's an evolution of the current code that just waits a bit for consumers to finish consuming messages normaly on their own. With this evolution they can finish consuming in a different (ideally faster) manner.
Thanks
Comment From: jonenst
Hi @artembilan , this looks great ! I haven't tested yet or read deep enough into the code, but maybe more synchronisation is needed around adding/removing/traversing the arraylist (not thread safe) ? Unless the structure of the code already guarantees no concurrent access. And even if that's the case, maybe the code could be made safer against future changes that may change the structure and trigger bugs? Cheers, Jon
Comment From: artembilan
Hey, @jonenst !
Please, take a look into the latest code. That is like this now:
private final List<Thread> processorThreadsToInterrupt = Collections.synchronizedList(new ArrayList<>());
...
synchronized (this.processorThreadsToInterrupt) {
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
}
The add()
and remove()
don't need to be wrapped according to Collections.synchronizedList()
Javadocs.
Feel free to share your further concerns if you find them!