Description

Hi folks, Thanks heaps for this library. Appreciate your work! I wanted to sense check if the following behavior is expected.

We have a flow where we use multi-part forms to send the request to a rendering server and stream the response. The first part of the request is quickPart. This is available almost immediately and gets sent to the service. The service starts streaming the response back. The second part is slowPart. It becomes ready after a couple of hundred milliseconds. Once resolved, it gets sent to the service, the service streams the rest of the data.

We use MultipartBodyBuilder() and attach parts with syncPart and asyncPart. See example below.

Rough diagram below:

sequenceDiagram
    participant Client
    participant Server

    Client->>Server: send quickPart
    Server->>Client: start streaming html
    Note over Client,Server: a few hundred ms later
    Client->>Server: send slowPart
    Server->>Client: stream rest of html

So I've been testing what happens if the publisher of the slow part errors out. The following happens: - Exception gets thrown - Client closes the connection - Client completes the response body stream normally

Here is a log on the body flux.

18:35:33.281 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onSubscribe(FluxOnErrorResume.ResumeSubscriber)
18:35:33.284 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- request(unbounded)
18:35:33.285 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onNext(PooledSlicedByteBuf(ridx: 0, widx: 19, cap: 19/19, unwrapped: PooledUnsafeDirectByteBuf(ridx: 25, widx: 25, cap: 2048)))
18:35:38.170 [reactor-http-nio-2] INFO reactor.Flux.OnErrorResume.1 -- onComplete()

My question is: Should the response body stream be closed with an error here? Otherwise, the consumers of the body stream would think that the stream has completed successfully. I haven't been able to find if this is expected behavior for this scenario, so I wanted to get some help from you.

I've attached a way to reproduce this. Can be booted with the following version.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.5.6</version>
</dependency>

How to reproduce

Client code

import io.netty.buffer.ByteBuf;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.time.Duration;

public class MultipartClient {

    public static void main(String[] args) {
        var data = new MultipartBodyBuilder();
        data.part("quickPart", "success");
        data.asyncPart("slowPart",
            Mono.delay(Duration.ofSeconds(5)).then(Mono.error(new RuntimeException("ruh roh"))),
            String.class);

        WebClient.create("http://localhost:8080")
            .post().uri("/stream")
            .body(BodyInserters.fromMultipartData(data.build()))
            .retrieve()
            .toEntityFlux(ByteBuf.class).block().getBody()
            .log()
            .blockLast();
    }
}

Server code


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@SpringBootApplication
public class MultipartEchoServer {

    private static final Logger log = LoggerFactory.getLogger(MultipartEchoServer.class);

    public static void main(String[] args) {
        SpringApplication.run(MultipartEchoServer.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes() {
        return RouterFunctions.route()
            .POST("/stream", req -> ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(req.body(BodyExtractors.toParts())
                        .concatMap(part -> DataBufferUtils.join(part.content())
                            .map(buf -> {
                                byte[] bytes = new byte[buf.readableByteCount()];
                                buf.read(bytes);
                                DataBufferUtils.release(buf);
                                return new String(bytes, StandardCharsets.UTF_8);
                            })
                            .doOnNext(value -> log.info("📥 Received part: {} = {}", part.name(), value))
                            .map(value -> part.name() + ": " + value + "\n")
                        )
                        .concatWith(Mono.just("complete\n"))
                        .doOnCancel(() -> log.info("❌ Client closed connection"))
                        .doFinally(signal -> log.info("✅ Stream finished with signal: {}", signal)),
                    String.class)
            )
            .build();
    }
}

Comment From: bclozel

Thanks for the detailed example @sshkel

I managed to reproduce this under some conditions, but couldn't reproduce it at all in an integration test in Spring Framework. I believe this is due to the nature of the Reactor client, let me elaborate.

the repro project

There are three key elements in the repro project that are essential.

  1. it uses the WebClient (backed by the Reactor Netty client) to send a multipart request. The key element here is not the multipart request, but rather the fact that the request sends first a valid part and delays a bit before failing to write the second and produce an error signal.
  2. on the server side, the server echoes the received request body back to the client. When the first part is received, its entire content is streamed back to the client.
  3. finally, on the client side we don't receive the response as a high level object but rather as a stream of byte buffers

What is happening here is essentially a race between the error signal for the request body and the connection being closed. Here, we are not getting an error signal because we received byte buffers and the client voluntarily closed the connection because of the error while sending the request.

workarounds

I have found two ways to work around this problem.

First, the JdkClientHttpConnector does not behave like this and we consistently get an error signal on the client. Another possibility is to slightly delay the parts being emitted on the server side with .body(req.body(BodyExtractors.toParts()).delayElements(Duration.ofMillis(200)) (tweaking the delay value here can trigger the error on the client side, or consider the exchange as complete).

is this behavior valid?

As a summary, I would say that we cannot change the behavior without breaking other more popular use cases where the client cancels the request on purpose. There are many unusual things about this use case, namely the fact that we use text/plain as a streaming media type and that nowhere we are using high level objects, but rather dealing with byte buffers directly. Changing some conditions here would help to better detect that the request or the response is incomplete.

I have tested locally that the expected error message does flow from multipart codec/http writer to the client stream and that this is really a matter of a race condition that isn't strictly invalid. I'm closing this issue for now, but we can reopen if we receive new uses cases for this where the behavior is clearly invalid.

Thanks!