In the documentation, an example is provided for implementing WebSocketHandler when processing inbound and outbound messages as independent streams. It specifies using the Mono.zip() method, but this does not work as intended. Instead, Flux.merge() should be used.

Mono.zip() waits for all sources to emit a value before combining them, so if one stream only receives or only sends messages, it may not emit at all. In contrast, Flux.merge() emits as soon as any source emits, making it more suitable for joining independent streams.

Comment From: rstoyanchev

We're only interested in the completion (successful or not) of the inbound and outbound, and are essentially joining two Mono<Void> instances. It's not clear what you mean by waiting to emit a value.

If you mean that one should continue to operate after the other has completed, then I see your point since zip is documented to cancel other sources if one completes empty.

Perhaps it would be best if you explain what you've tried and what happened.

Comment From: peter21adrian

In my case, the issue occurred when one of the Flux sources was actually empty.

Consider this example from the documentation:

Mono<Void> output = session.send(source.map(session::textMessage));

If source is Mono.never(), the zip operation doesn't behave as expected. This happened to me when there were no messages to send to the client.

You can work around this in a couple of ways:

By using:

return Flux.merge(output, input).then();

Or by modifying the output like this:

Mono<Void> output = source
                .flatMap(__ -> session.send(Mono.just(__)))
                .then();

I'm not sure whether this is a limitation of session.send when the source is Mono.never(), or if it's simply a gap in the documentation.

Comment From: rstoyanchev

Did you mean Mono.empty() perhaps instead of Mono.never()? The latter will never emit anything, as the name suggests, and runs indefinitely. Although in that case it's not clear why source.flatMap would make a difference.

I do recognize the zip isn't the best operator to suggest here since it will cancel all sources if one completes empty. The Mono#and operator would be a better fit.

Comment From: peter21adrian

A similar issue occurs even when the source isn't empty, but simply delayed. For example:

Mono<Void> output = session.send(Mono.just("Hello from server")
                .delayElement(Duration.ofSeconds(1))
                .map(session::textMessage));

In this case, the message is present but delayed, and the behavior is still problematic.

Using Mono#and also resolves the issue.

Thanks!