Expected Behavior

When the server receives SIGINT or SIGTERM, I would like the user code that is consuming the messages to be able to react. A simple way would be to set the worker threads interrupt flag, so that user code can check it (or rely on functions like Thread.sleep() to check it). This is useful for consummers that take a long time to consume each message (e.g. low bandwidth queue with big computations for each message)

Current Behavior If I understood correctly, the only possible options now are to wait for all prefeteched messages to be consummed normally, or to wait for only the current messages to be consummed normally. There are no options to interrupt the threads consumming the message. After a short time, the predestroy/shutdown hook stops waiting for consummer and returns, and so the jvm halts all running threads without any notice.

Context

I would like to be able to store in my database that a message consumption was interrupted and what my code was doing when it was interrupted (a stacktrace captures this information nicely)

I could install my own predestroy/shutdownhook to interrupt the threads, but you need deep understanding of very low level jvm apis to do it correctly, you need to get a reference to the thread. It's doable at the begining of every message consumption and store the threads in a list of things to interrupt but a bit hard to do.

Also the fact that it doesn't exist suggests that it's not really a good idea, maybe documentation could clarify the perception around this idea.

I'm using spring-cloud-stream and creating a java.util.function.Consumer bean to consume the message. With the default conf it uses container type SMLC:

spring.cloud.function.definition=consumeRun
spring.cloud.stream.bindings.consumeRun-in-0.destination=myQueue
spring.cloud.stream.bindings.consumeRun-in-0.group= myGroup
spring.cloud.stream.bindings.consumeRun-in-0.consumer.concurrency=2
@Bean public Consumer<Message<String>> consumeRun() { return mySlowComputationIntensiveConsumerBean; }

Thanks in advance

Comment From: artembilan

For better understanding that problem, I'd like to see what you want to have fixed in Spring AMQP and how that would help the target end-user consumer? Any chances elaborating more? Thanks

Comment From: madanhk18

Can I work on this issue? If it's possible please \assign-me

Comment From: artembilan

@madanhk18 ,

I said before: we cannot assign to the issue someone who is not team member. Please, stop asking about that.

As you see according to my comment it is not clear what is the problem and what should be done. If you have any idea, please, share it here.

Comment From: jonenst

Hi @artembilan, From memory, the difficulties arise when consuming a messages takes a lot of time (>10s) and you need to ack after finishing consuming (for exemple maybe because the consuming could fail). In this case, if you CTRL C the jvm while messages are consumed it takes a long time. When it timeouts and kill the jvm eventually, it is not easy to associate with the message which event lead to the failure (even though the message is correctly redelivered to another consumer). Knowing the reason why a message was redelivered is helpful for exemple because it allows to implement more precise poison message detection strategies.

I can work on a short code exemple if you want. Would that help ?

Comment From: artembilan

Knowing the reason why a message was redelivered is helpful

OK. So, you mean to do Thread.currentThread().interrupt(); in the main loop of the listener container before calling end-user listener when we are about to be stopped?

Yes, sample would be great. Thanks