Description
Hi folks, Thanks heaps for this library. Appreciate your work! I wanted to sense check if the following behavior is expected.
We have a flow where we use multi-part forms to send the request to a rendering server and stream the response.
The first part of the request is quickPart. This is available almost immediately and gets sent to the service. The service starts streaming the response back.
The second part is slowPart. It becomes ready after a couple of hundred milliseconds. Once resolved, it gets sent to the service, the service streams the rest of the data.
We use MultipartBodyBuilder() and attach parts with syncPart and asyncPart. See example below.
Rough diagram below:
sequenceDiagram
participant Client
participant Server
Client->>Server: send quickPart
Server->>Client: start streaming html
Note over Client,Server: a few hundred ms later
Client->>Server: send slowPart
Server->>Client: stream rest of html
So I've been testing what happens if the publisher of the slow part errors out. The following happens: - Exception gets thrown - Client closes the connection - Client completes the response body stream normally
Here is a log on the body flux.
18:35:33.281 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onSubscribe(FluxOnErrorResume.ResumeSubscriber)
18:35:33.284 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- request(unbounded)
18:35:33.285 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onNext(PooledSlicedByteBuf(ridx: 0, widx: 19, cap: 19/19, unwrapped: PooledUnsafeDirectByteBuf(ridx: 25, widx: 25, cap: 2048)))
18:35:38.170 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onComplete()
My question is: Should the response body stream be closed with an error here? Otherwise, the consumers of the body stream would think that the stream has completed successfully. I haven't been able to find if this is expected behavior for this scenario, so I wanted to get some help from you.
I've attached a way to reproduce this. Can be booted with the following version.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.5.6</version>
</dependency>
How to reproduce
Client code
import io.netty.buffer.ByteBuf;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class MultipartClient {
public static void main(String[] args) {
var data = new MultipartBodyBuilder();
data.part("quickPart", "success");
data.asyncPart("slowPart",
Mono.delay(Duration.ofSeconds(5)).then(Mono.error(new RuntimeException("ruh roh"))),
String.class);
WebClient.create("http://localhost:8080")
.post().uri("/stream")
.body(BodyInserters.fromMultipartData(data.build()))
.retrieve()
.toEntityFlux(ByteBuf.class).block().getBody()
.log()
.blockLast();
}
}
Server code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
@SpringBootApplication
public class MultipartEchoServer {
private static final Logger log = LoggerFactory.getLogger(MultipartEchoServer.class);
public static void main(String[] args) {
SpringApplication.run(MultipartEchoServer.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.POST("/stream", req -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(req.body(BodyExtractors.toParts())
.concatMap(part -> DataBufferUtils.join(part.content())
.map(buf -> {
byte[] bytes = new byte[buf.readableByteCount()];
buf.read(bytes);
DataBufferUtils.release(buf);
return new String(bytes, StandardCharsets.UTF_8);
})
.doOnNext(value -> log.info("📥 Received part: {} = {}", part.name(), value))
.map(value -> part.name() + ": " + value + "\n")
)
.concatWith(Mono.just("complete\n"))
.doOnCancel(() -> log.info("❌ Client closed connection"))
.doFinally(signal -> log.info("✅ Stream finished with signal: {}", signal)),
String.class)
)
.build();
}
}