When using Flux.generate to create WebSocketMessages for sending outbound on a WebSocketSession, there doesn't seem to be a way to react to the completion of sending an individual WebSocketFrame. By completion I mean when the frame has been released and there is no memory pressure on the server for that frame.
If the Client receiving the messages is slower than the Flux.generate publisher on the server, the outbound queue gets filled with Frames and can eventually OOM the server. Same problem with Flux.create.
There appears to be a boundary/disconnect between the Publisher supplied to WebSocketSession.send() and when the release of individual frames on the outbound queue actually occurs. Essentially you can only react to when the message is taken from your Flux to be sent/put on the outbound queue, but you can't react to when it has been sent+released. Perhaps this is a bug or I'm mistaken?
For example, sending each WebSocketMessage wrapped in a Mono through WebSocketSession.send() doesn't solve the issue, as the completion of the Mono occurs BEFORE the frame is released.
A mechanism to slow down the publisher when the outbound queue gets "too large" (as determined by the developer/use case) would protect the server from "Slow clients" causing OOM errors (Slow clients being those who download frames slower than the server produces them).
I have tried many options using pure Flux based logic, but none of them solve the issues since they cannot trigger any logic in reaction to an individual frame being released. I've also tried wrapping the existing WebSocketSession with my own wrapper class + extending all WebSocketFrame types so that I can hook in my own onCreate/onRelease functions but this is causing memory LEAK issues around ByteBuffers, and it feels like it a hacky workaround of the Flux framework.
I'm using Webflux v6.2.8 running on a Netty server.
Comment From: rstoyanchev
I would expect Flux#generate is connected to backpressure from Reactor Netty, and you shouldn't have to explicitly track when messages are sent. The backpressure should ensure the generate callback is invoked when there is demand. That is unless for some reason the session is requesting Integer.MAX_SIZE. You could check what the demand value that comes through is by logging via .doOnRequest on your Flux.
Comment From: spring-projects-issues
If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.
Comment From: spring-projects-issues
Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.