** Problem ** When having multiple servers that each have multiple consumers all on the same queue, I would like to get even message distribution on the servers (not on each individual consumer). That is, no server should consume more than one message if at least another server is idle. This allows to relax ram requirements under small load, and allows to spread the cpu load on the different servers.

Expected Behavior

Currently i'm using one SMLC with setConcurrentConsumers() (would be the same with DMLC setConsumersPerQueue()) with a value >1. I think my problem would be solved if I was able to set each consumer to have a consumer priority equal to its index. This way, accros all servers, there would be one priority N thread, one N-1, ... down to priority 1.

Current Behavior

Currently, the same consumer parameters of the container are set on all the consumers

** alternatives ** are there better solutions than having different consumer priorities in the same SMLC / DMLC ? I can think of the following but maybe they are not even feaseable: - Using maxConcurrency, the number of consumer would scale with the load so it would presumably be one per server during low loads, but I see issues saying that it's hard to control to scale down after a high load burst: #2305 - using N SMLC/DMLC each having only 1 consumer manually ? - something else ?

Context

I'm using spring cloud stream which is using SMLC or DMLC directly.

Comment From: artembilan

I'm not fully understand what is your request about. but let's try to clean something what I'm in doubts.

I think server in your meaning is an instance of the application. Because for me a server is really message broker (RabbitMQ in our case), the one who delivers messages to our application.

Then next one:

no server should consume more than one message if at least another server is idle

In my words that would be:

no application should consume more than one message if at least another application is idle

I don't think that is possible without RabbitMQ interaction. I'm not RabbitMQ expert to give a clue if there is any option over there to control workload.

I see there is something like Consumer Priority: https://www.rabbitmq.com/docs/consumer-priority.

However, according to Spring AMQP logic we indeed propagate the same set of consumer args to all their instances:

        consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
                this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
                isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);

Currently the maxConcurrency does not take into account other application instances (can it at all?), and works the way like: If I've just consumed a message and can start a new consumer (see consecutiveMessages) I do that for my own performance improvement.

Not setting maxConcurrency and having several SMLC/DMLC instances in the same application (probably with different x-priority) settings is the way to go.

I don't see how a single SMLC with maxConcurrency can be improved.

What is your proposal, please? Can you share your idea as Pull Request?

Comment From: jonenst

Hi ! Thanks a lot for answering and taking time to clear up misunderstandings. Yes, you are right I meant "no application instance should consume more than one message if at least another application instance is idle" (in fact this was actually a simplification: more generally I would like messages to be processed evenly, so that application instances should all idealy process the same number of messages at a given time, with a difference of at most 1. An approximation of this (for example maybe a max difference of 10% or some fixed number a little greater than 1) would also be good and maybe lead to better performance? As I remainder, during my tests with 4 application instances, each running 4 consumers, I would regularly see 4 0 0 0, or 3 1 0 0. I would like 1 1 1 1, ie each application instance with 1 processing consumer and three idle consumers)

I don't think that is possible without RabbitMQ interaction. I see there is something like Consumer Priority: https://www.rabbitmq.com/docs/consumer-priority.

Yes this is what I found to be a possible solution to my problem. But I lack knownledge about best practices around spring-amqp and was hoping that this general goal of evenly distributing messages over application instances, each having multiple consumers in a single SMLC or SDLC was maybe a known pattern.

What is your proposal, please?

My first idea was was to allow to set the x-priority of each of the consumer of the SMLC/DMLC of each application instance to a an increasing value starting from 1. With this, when all instances have the same number of consumers, they would all have 1 consumer of priority 1, 1 of priority 2, and so on. That should result in evenly distributed message accross application instances by rabbitmq.

I had a second idea about maxConcurrency but I don't think it works. With a maxconcurrency of N, all application instances start with 1 consumer and when consuming the first messages, they don't create new consumers because other applications instances consume messages before they can create more consumers. But as you said, maxConcurrency doesn't take into account other application instances, so consumers may be created before the messages are evenly distributed. I don't know enough to know for sure what is guaranteed to happen in this case, or to know if it's not a solution to the problem, but it doesn't look good. I was asking about it with low hope.

My third idea was to manually create many containers, but this sounds like it would just manually duplicate the functionality of having multiple consumers per container, so I would like to avoid that. Even if copypasting the queue name for example could be avoided, it sounds like more ressources is needed to have all the containers ?

Can you share your idea as Pull Request? I'm sorry, I haven't really tried to think of a generic API that would allow to do this. Maybe the easiest is to add a new optional lambda ConsumerArgsStrategy (like ConsumerTagStrategy) ? I don't know the design and the code much, I only skimmed through it when when exploring possibilities.

Thanks a lot for your time

Comment From: artembilan

Thank you for clarification! I still don't how consumer priority can help here, but probably you are missing this option:

    /**
     * Tell the broker how many messages to send to each consumer in a single request.
     * Often this can be set quite high to improve throughput.
     * @param prefetchCount the prefetch count
     * @see com.rabbitmq.client.Channel#basicQos(int, boolean)
     */
    public void setPrefetchCount(int prefetchCount) {

Which is 250 by default and it is for all consumers on the channel. Therefore, every single new consumer would park that number of messages from the queue not letting others to consumer them.

I think RabbitMQ broker is smart enough to distribute messages evenly between consumers on the queue, but it really depends on prefetch settings: https://www.cloudamqp.com/blog/how-to-optimize-the-rabbitmq-prefetch-count.html.

Please, let me know what do you think about this option and elaborate, please, more about your idea with consumer priority. Sounds more like "original" consumers (this.consumers = new HashSet<>(this.concurrentConsumers);) should have a higher priority. And every single adjusted (considerAddingAConsumer()) should have lower by one every single time we add the next one.

But I'm still not sure how that going to help you if it feels like the root cause is exactly in that prefetch.

Comment From: jonenst

Hi, thanks for your answer.

" I still don't how consumer priority can help here, but probably you are missing this option: setPrefetchCount"

No I'm correctly using prefetchCount of 1 (which is the default of spring cloud stream). Here are the logs I see in each of my application instances. You see the single SMLC with 4 consumers, each one in its thread, doing the basicQos(prefetchCount=1) and basicConsume(consumerArgs = {}) at startup:

[myConsumerGroup-3] channel.basicQos([1, false])
[myConsumerGroup-4] channel.basicQos([1, false])
[myConsumerGroup-2] channel.basicQos([1, false])
[myConsumerGroup-1] channel.basicQos([1, false])
[myConsumerGroup-3] channel.basicConsume([myGroup, false, , false, false, {}, InternalConsumer{queue='myGroup', consumerTag='null'}])
[myConsumerGroup-4] channel.basicConsume([myGroup, false, , false, false, {}, InternalConsumer{queue='myGroup', consumerTag='null'}])
[myConsumerGroup-1] channel.basicConsume([myGroup, false, , false, false, {}, InternalConsumer{queue='myGroup', consumerTag='null'}])
[myConsumerGroup-2] channel.basicConsume([myGroup, false, , false, false, {}, InternalConsumer{queue='myGroup', consumerTag='null'}])

And later when 4 messages are published, even though 3 other applications instances did the exact same setup sequence, all 4 messages are consummed by a single application instance in its 4 threads.

However if each thread in each application instance had a priority in it's consumerarguments in the basicConsume call, the messages should be distributed by rabbitmq to the consumers of higher priority first. So if you have the same setup on all application instances 1..N, then each application instance thread#N is balanced only with all the other application instance thread#N, not other threads with lower priority in other application instances.

I haven't written any code, nor tested this, I just read the docs so I may be wrong. I could hack it and rebuild locally the spring amqp code if you want, if you think this is the way to go. What do you think ?

Thanks again, jon

Comment From: artembilan

Well, I think when we say concurrency in the listener container that means all the consumers initiated (at least from the start) are identical. That's really a purpose of the container and its concurrency option. And for RabbitMQ it does not matter if those consumers are connected from the same application: all of them, from all your application instances are identical. And that's how it supposed to be from a logic of concurrency purpose. So, just don't use concurrency in your application and have those several instances to get such a concurrency naturally.

If you'd like to have different priorities for several consumers in the same application, then have then in the separate containers, even if they look into the same queue. I know that is a bit duplication, but logically it is not OK to break a single container where its design is to treat all the consumers from its concurrency as identical.

We may look int making a lower priority for those volatile consumers created from the considerAddingAConsumer() (for maxConcurrency), but that is probably not what you are looking for according to your comment about Spring Cloud Stream.

Does that make sense?

Comment From: jonenst

Hi, yes it makes sense, thanks for taking the time to explain the design philosophy. I will use different containers then, thanks

Comment From: artembilan

Closed as Works as Designed.