Spring AI Version: 1.0.0

org.springframework.ai:spring-ai-starter-vector-store-cassandra is used for vector store and org.springframework.ai:spring-ai-starter-model-openai is used for embedding model org.springframework.ai:spring-ai-tika-document-reader is used for ETL.

I have the following etl pipeline to persist vectorized chunks into Cassandra vector store.

private Mono<FileMeta> vectorize(FileMeta fileMeta, long ts) {
        var documentIdConverter = new DocumentIdConverter(fileMeta.fileId(), ts);
        var filePath = fileService.getFilePath(fileMeta.fileId(), ts);

        return fileService.findResourceByPath(filePath)
                .switchIfEmpty(Mono.error(new FileNotFoundException("File not found at: " + fileService.getFilePath(fileMeta.fileId(), ts))))
                .map(TikaDocumentReader::new)
                .flatMap(Mono::fromSupplier)
                .map(tokenTextSplitter.andThen(documentIdConverter))
                .doOnSuccess(_ -> log.debug("DocumentService::vectorize is writing documents..."))
                .doOnNext(vectorstore)
                .retryWhen(backoff(3, ofSeconds(1)))
                .doOnError(e -> log.error("DocumentService::vectorize failed to persist documents", e))
                .flatMap(_ -> fileService.saveFileMeta(fileMeta.withVectorized(true)))
                .doOnSuccess(_ -> log.debug("DocumentService::vectorize has written documents."))
                .flatMap(fm -> fileService.delete(fm, ts));
    }

Here is my log:

2025-05-28 16:43:44,073 [main] INFO  o.s.b.w.e.netty.NettyWebServer - Netty started on port 7845 (http)
2025-05-28 16:43:44,080 [main] INFO  c.e.r.agentic.AgenticApplication - Started AgenticApplication in 4.882 seconds (process running for 5.739)
2025-05-28 16:43:49,499 [reactor-http-nio-3] INFO  c.e.r.a.controller.RAGController - Called is uploadFromUri with fileDto=FileDto[fileId=null, userId=814531, roleId=null, fileName=ApacheCassandraDocumentation.html, file=file:///Users/administrator/IdeaProjects/ReAIoT.Cassandra/agentic/src/test/resources/ApacheCassandraDocumentation.html]
2025-05-28 16:43:50,148 [boundedElastic-1] INFO  o.s.a.t.splitter.TextSplitter - Splitting up document into 3 chunks.
2025-05-28 16:43:50,149 [boundedElastic-1] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize is writing documents...
2025-05-28 16:43:58,707 [boundedElastic-1] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize has written documents.
2025-05-28 16:44:14,603 [reactor-http-nio-4] INFO  c.e.r.a.controller.RAGController - Called is uploadFromUri with fileDto=FileDto[fileId=null, userId=814531, roleId=null, fileName=ResearchTopics.docx, file=file:///Users/administrator/IdeaProjects/ReAIoT.Cassandra/agentic/src/test/resources/ResearchTopics.docx]
2025-05-28 16:44:15,116 [boundedElastic-2] INFO  o.s.a.t.splitter.TextSplitter - Splitting up document into 2 chunks.
2025-05-28 16:44:15,116 [boundedElastic-2] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize is writing documents...
2025-05-28 16:44:18,015 [boundedElastic-2] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize has written documents.
2025-05-28 16:45:06,815 [reactor-http-nio-5] INFO  c.e.r.a.controller.RAGController - Called is uploadFromUri with fileDto=FileDto[fileId=null, userId=814531, roleId=null, fileName=SoftSoilFoundationTreatment.pdf, file=file:///Users/administrator/IdeaProjects/ReAIoT.Cassandra/agentic/src/test/resources/SoftSoilFoundationTreatment.pdf]
2025-05-28 16:45:07,257 [boundedElastic-1] INFO  o.s.a.t.splitter.TextSplitter - Splitting up document into 7 chunks.
2025-05-28 16:45:07,257 [boundedElastic-1] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize is writing documents...
2025-05-28 16:45:12,586 [reactor-http-nio-6] INFO  c.e.r.a.controller.RAGController - Called is uploadFromUri with fileDto=FileDto[fileId=null, userId=814531, roleId=null, fileName=Requirement0528.docx, file=file:///Users/administrator/IdeaProjects/ReAIoT.Cassandra/agentic/src/test/resources/Requirement0528.docx]
2025-05-28 16:45:14,059 [boundedElastic-2] INFO  o.s.a.t.splitter.TextSplitter - Splitting up document into 44 chunks.
2025-05-28 16:45:14,060 [boundedElastic-2] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize is writing documents...
2025-05-28 16:45:21,746 [boundedElastic-1] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize has written documents.
2025-05-28 16:47:17,004 [boundedElastic-2] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize has written documents.
2025-05-28 16:50:12,753 [reactor-http-nio-7] INFO  c.e.r.a.controller.RAGController - Called is uploadFromUri with fileDto=FileDto[fileId=null, userId=814531, roleId=null, fileName=H2Document.pdf, file=file:///Users/administrator/IdeaProjects/ReAIoT.Cassandra/agentic/src/test/resources/H2Document.pdf]
2025-05-28 16:50:14,621 [boundedElastic-5] INFO  o.s.a.t.splitter.TextSplitter - Splitting up document into 143 chunks.
2025-05-28 16:50:14,626 [boundedElastic-5] DEBUG c.e.r.a.service.rag.DocumentService - DocumentService::vectorize is writing documents...
2025-05-28 16:55:12,763 [boundedElastic-5] WARN  o.s.a.r.a.SpringAiRetryAutoConfiguration - Retry error. Retry count: 1, Exception: I/O error on POST request for "http://10.0.0.66:7840/v1/embeddings": Request was interrupted: null
org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://10.0.0.66:7840/v1/embeddings": Request was interrupted: null
    at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.createResourceAccessException(DefaultRestClient.java:586)
    at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchangeInternal(DefaultRestClient.java:500)
    at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.retrieve(DefaultRestClient.java:456)
    at org.springframework.ai.openai.api.OpenAiApi.embeddings(OpenAiApi.java:293)
    at org.springframework.ai.openai.OpenAiEmbeddingModel.lambda$call$1(OpenAiEmbeddingModel.java:168)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:230)
    at org.springframework.ai.openai.OpenAiEmbeddingModel.lambda$call$3(OpenAiEmbeddingModel.java:168)
    at io.micrometer.observation.Observation.observe(Observation.java:565)
    at org.springframework.ai.openai.OpenAiEmbeddingModel.call(OpenAiEmbeddingModel.java:166)
    at org.springframework.ai.embedding.EmbeddingModel.embed(EmbeddingModel.java:91)
    at org.springframework.ai.vectorstore.cassandra.CassandraVectorStore.doAdd(CassandraVectorStore.java:273)
    at org.springframework.ai.vectorstore.observation.AbstractObservationVectorStore.lambda$add$1(AbstractObservationVectorStore.java:85)
    at io.micrometer.observation.Observation.observe(Observation.java:499)
    at org.springframework.ai.vectorstore.observation.AbstractObservationVectorStore.add(AbstractObservationVectorStore.java:85)
    at org.springframework.ai.vectorstore.VectorStore.accept(VectorStore.java:56)
    at org.springframework.ai.vectorstore.VectorStore.accept(VectorStore.java:41)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:252)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: Request was interrupted: null
    at org.springframework.http.client.JdkClientHttpRequest.executeInternal(JdkClientHttpRequest.java:112)
    at org.springframework.http.client.AbstractStreamingClientHttpRequest.executeInternal(AbstractStreamingClientHttpRequest.java:70)
    at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:66)
    at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchangeInternal(DefaultRestClient.java:494)
    ... 40 common frames omitted
Caused by: java.lang.InterruptedException: null
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:931)
    at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:133)
    at org.springframework.http.client.JdkClientHttpRequest.executeInternal(JdkClientHttpRequest.java:103)
    ... 43 common frames omitted

So long story short, this vectorize method of DocumentService is called 4 times where the first 3 times were successful and the last time was a failure. The only notable difference: during the last attempt, the file was splitted into 143 chunks which is much larger than previous ones.

The remote endpoint of the embedding model was deployed using

docker run --gpus all -p 7840:80 -v $volume:/data --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.7 --model-id $model

of https://huggingface.co/docs/text-embeddings-inference/quick_tour, although I don't think TEI was the issue since the first 3 attempts were successful.

The file of the last attempt was https://github.com/h2database/h2database/releases/download/version-2.3.232/h2.pdf

I post this issue as a bug, but ... I could very well be wrong. But the first 3 attepts were successful, so maybe not?

Comment From: ilayaperumalg

@ZYMCao Do you still see this issue? We need a consistent/reproducible example to verify this behaviour. Thanks

Comment From: ZYMCao

@ZYMCao Do you still see this issue? We need a consistent/reproducible example to verify this behaviour. Thanks

This issue persists, but I changed my code from

fileService.findResourceByPath(filePath)
                .switchIfEmpty(Mono.error(new FileNotFoundException("File not found at: " + fileService.getFilePath(fileMeta.fileId(), ts))))
                .map(TikaDocumentReader::new)
                .flatMap(Mono::fromSupplier)
                .map(tokenTextSplitter.andThen(documentIdConverter))
                .doOnSuccess(_ -> log.debug("DocumentService::vectorize is writing documents..."))
                .doOnNext(vectorstore)
...

to

fileService.findResourceByPath(filePath)
                .switchIfEmpty(Mono.error(new ReAIoTException("File not found at: " + fileService.getFilePath(fileMeta.pk()), NOT_FOUND)))
                .map(TikaDocumentReader::new)
                .flatMap(Mono::fromSupplier)
                .map(defaultTokenTextSplitter.andThen(documentIdConverter)) // .andThen(summaryMetadataEnricher).andThen(keywordMetadataEnricher)
                .doOnSuccess(_ -> log.debug("DocumentService::vectorize is writing documents..."))
                .flatMap(docs -> {
                    if (docs.size() > BATCH_SIZE) {
                        log.warn("Large document detected: {} chunks for {}", docs.size(), fileMeta.fileName());
                        return processInBatches(docs, BATCH_SIZE, fileMeta.fileName());
                    } else {
                        log.debug("Writing {} chunks for {}", docs.size(), fileMeta.fileName());
                        return Mono.fromRunnable(() -> documentWriter.accept(docs)).thenReturn(docs);
                    }
                })
...

where processInBatches method uses buffer method of Flux to deal with the issue. It works for me now, but yeah...it would be great if it is dealt with natively by Spring AI.