I've created a simple Boot project using spring-boot-starter-webflux
to upload files using the multipart support in a streaming fashion.
The sample code is here on github and uses Boot version 3.5.4
This is my very simple controller:
@RestController
@Slf4j
public class UploadStream {
@PostMapping("/stream")
public ResponseEntity<Flux<String>> handlePartsEvents(@RequestBody Flux<PartEvent> allPartsEvents, @RequestHeader HttpHeaders headers) {
Accumulator acc = new Accumulator();
long length = headers.getContentLength();
var result = allPartsEvents.map(pe -> {
if (pe instanceof FilePartEvent fileEvent) {
DataBuffer content = fileEvent.content();
acc.byteCount += content.readableByteCount();
acc.count++;
acc.trueCount = pe.isLast() ? acc.trueCount + 1 : acc.trueCount;
log.info("Part event name:{} last:{} diff:{} acc:{} content:{} partEvent:{}", pe.name(), pe.isLast(), length - acc.byteCount, acc, content, pe);
return content;
}
throw new RuntimeException("Unexpected event: " + pe);
})
.map(t -> ""+t.readableByteCount() + " - ")
;
return ok().body(result);
}
The code simply logs some information about the request and the PartEvent
s.
When called with a 20Mb file a DecodingException is thrown:
% curl -v -F file1=@20m.pdf http://localhost:8080/stream
> POST /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.7.1
> Accept: */*
> Content-Length: 20702499
> Content-Type: multipart/form-data; boundary=------------------------4xJmRasabztDAlyGtCN0JM
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
<
* upload completely sent off: 20702499 bytes
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/plain;charset=UTF-8
<
1664 - 8192 - 8192 - 8192 - 8192 - 8192 - 8192 - [...] 8192 - 8192 - 8192 -* transfer closed with outstanding read data remaining
* Closing connection
curl: (18) transfer closed with outstanding read data remaining
8192 - 8192 - 8192 - 8192
And the log is:
Part event name:file1 last:false diff:20700835 acc:Accumulator(byteCount=1664, count=1, trueCount=0) content:AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 0, widx: 1664, cap: 1664/1664, unwrapped: PooledUnsafeDirectByteBuf(ridx: 2048, widx: 2048, cap: 2048))
[...]
Part event name:file1 last:false diff:19398307 acc:Accumulator(byteCount=1304192, count=160, trueCount=0) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32768, widx: 65536, cap: 65536))
Part event name:file1 last:true diff:19391717 acc:Accumulator(byteCount=1310782, count=161, trueCount=1) content:PooledSlicedByteBuf(ridx: 0, widx: 6590, cap: 6590/6590, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32768, widx: 65536, cap: 65536))
Part event name:file1 last:false diff:19390277 acc:Accumulator(byteCount=1312222, count=162, trueCount=1) content:AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 0, widx: 1440, cap: 1440/1440, unwrapped: PooledUnsafeDirectByteBuf(ridx: 40960, widx: 65536, cap: 65536))
[...]
Part event name:file1 last:false diff:18448197 acc:Accumulator(byteCount=2254302, count=277, trueCount=1) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 65536, widx: 65536, cap: 65536))
Part event name:file1 last:true diff:18441383 acc:Accumulator(byteCount=2261116, count=278, trueCount=2) content:PooledSlicedByteBuf(ridx: 0, widx: 6814, cap: 6814/6814, unwrapped: PooledUnsafeDirectByteBuf(ridx: 65536, widx: 65536, cap: 65536))
Part event name:file1 last:false diff:18440167 acc:Accumulator(byteCount=2262332, count=279, trueCount=2) content:AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 0, widx: 1216, cap: 1216/1216, unwrapped: PooledUnsafeDirectByteBuf(ridx: 65536, widx: 65536, cap: 65536))
[...]
Part event name:file1 last:false diff:12541 acc:Accumulator(byteCount=20689958, count=2555, trueCount=29) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 23843, widx: 65536, cap: 65536))
Operator called default onErrorDropped
org.springframework.core.codec.DecodingException: Could not find end of body (␍␊--------------------------4xJmRasabztDAlyGtCN0JM)
at org.springframework.http.codec.multipart.MultipartParser$BodyState.onComplete(MultipartParser.java:613) ~[spring-web-6.2.8.jar:6.2.8]
at org.springframework.http.codec.multipart.MultipartParser.hookOnComplete(MultipartParser.java:128) ~[spring-web-6.2.8.jar:6.2.8]
at reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197) ~[reactor-core-3.7.7.jar:3.7.7]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.7.7.jar:3.7.7]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.7.7.jar:3.7.7]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.7.7.jar:3.7.7]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:209) ~[reactor-core-3.7.7.jar:3.7.7]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:481) ~[reactor-netty-core-1.2.7.jar:1.2.7]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:273) ~[reactor-netty-core-1.2.7.jar:1.2.7]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:417) ~[reactor-netty-core-1.2.7.jar:1.2.7]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:455) ~[reactor-netty-core-1.2.7.jar:1.2.7]
at reactor.netty.http.server.HttpServerOperations.handleLastHttpContent(HttpServerOperations.java:897) ~[reactor-netty-http-1.2.7.jar:1.2.7]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:806) ~[reactor-netty-http-1.2.7.jar:1.2.7]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.7.jar:1.2.7]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:321) ~[reactor-netty-http-1.2.7.jar:1.2.7]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455) ~[netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[netty-common-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.122.Final.jar:4.1.122.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.122.Final.jar:4.1.122.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
The pe.isLast()
is sometimes true (trueCount=29 times) but not in the last line of log before the exception. It seems that the last bytes of the request (those that contains the end boundary) are missing or not evaluated.
Is it possible that isLast()
is true in the middle of a http part?
Why the last isLast()
is false and the end boundary is missing? Could the last callback with isLast()=true
be missing?
When called with a smaller file (500kb), for 99% of the times all goes well, but sometimes the same exception occur. For example, this is a "good" attempt:
% curl -v -F file1=@500k.pdf http://localhost:8080/stream
> POST /stream HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.7.1
> Content-Length: 595708
> Content-Type: multipart/form-data; boundary=------------------------37AbFXpiOxdvYnCig5L6yT
>
* upload completely sent off: 595708 bytes
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/plain;charset=UTF-8
<
* Connection #0 to host localhost left intact
1701 - 8192 - 8192 - 8192 - 8192 - 8192 - 8192 - [...] 8192 - 8192 - 8192 - 3982 -
And the log is:
Part event name:file1 last:false diff:594007 acc:Accumulator(byteCount=1701, count=1, trueCount=0) content:AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 0, widx: 1701, cap: 1701/1701, unwrapped: PooledUnsafeDirectByteBuf(ridx: 2048, widx: 2048, cap: 2048))
Part event name:file1 last:false diff:585815 acc:Accumulator(byteCount=9893, count=2, trueCount=0) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 16384, widx: 32768, cap: 32768))
[...]
Part event name:file1 last:false diff:12375 acc:Accumulator(byteCount=583333, count=72, trueCount=0) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32768, widx: 36804, cap: 65536))
Part event name:file1 last:false diff:4183 acc:Accumulator(byteCount=591525, count=73, trueCount=0) content:PooledSlicedByteBuf(ridx: 0, widx: 8192, cap: 8192/8192, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36804, widx: 36804, cap: 65536))
Part event name:file1 last:true diff:201 acc:Accumulator(byteCount=595507, count=74, trueCount=1) content:PooledSlicedByteBuf(ridx: 0, widx: 3982, cap: 3982/3982, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36804, widx: 36804, cap: 65536))
The pe.isLast()
is true only in the last callback (the 74th), in fact trueCount=1
I tried to debug Spring and Netty code to see what was happening but with no success.
Is there something I don't understand or could there be some kind of bug?
I also tried with old 3.x boot versions and the code didn't work at all, for example freezing the response. This reinforces the idea of a bug but it's only a suspicion.
I already opened a stack overflow question but with no luck.
Thank you for any help