I hope to quickly learn Spring AI + ollama + deepseek r1 (local learning and deployment) through the official documentation, but when I call the interface /ai/generate based on the official documentation, I get the following error:

Image

Image

Image

When I configure the following code, it can work normally.

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}

Image

Environment

ollama version is 0.5.12
Java version: zulu 21
    <dependencies>
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-ollama-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.4.2</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-resolver-dns-native-macos</artifactId>
            <version>4.1.116.Final</version>
            <classifier>osx-aarch_64</classifier>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.ai</groupId>
                <artifactId>spring-ai-bom</artifactId>
                <version>1.0.0-M6</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

It might be more appropriate to add RestClient configuration near the specific content of the document, which can help beginners get feedback quickly.

Comment From: heleihelei

same issue,add RestClient.Builder work

Comment From: Vevvev

Ran into a similar issue in OpenAi's API. Was running an Embedding example from https://docs.spring.io/spring-ai/reference/api/embeddings/openai-embeddings.html , and it was that example code that produced the following error.

org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/embeddings": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3 at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.createResourceAccessException(DefaultRestClient.java:692) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchangeInternal(DefaultRestClient.java:577) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchange(DefaultRestClient.java:535) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.RestClient$RequestHeadersSpec.exchange(RestClient.java:677) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.executeAndExtract(DefaultRestClient.java:809) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntityInternal(DefaultRestClient.java:769) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntity(DefaultRestClient.java:765) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.ai.openai.api.OpenAiApi.embeddings(OpenAiApi.java:364) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6] at org.springframework.ai.openai.OpenAiEmbeddingModel.lambda$call$1(OpenAiEmbeddingModel.java:163) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6]

Comment From: youcangetme

bump

Comment From: markpollack

Thanks for providing how you got this to work.

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}

We will investigate.

@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see https://github.com/spring-projects/spring-ai/issues/2427

Comment From: yukiofyume

@markpollack @youcangetme When I only import spring-ai-ollama-spring-boot-starter:

@SpringBootApplication
public class SpringAIApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringAIApplication.class, args);
    }
}

Image

Therefore, I add:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.4.2</version>
        </dependency>

Image

When I use qwen2.5:1.5b or gemma3:1b, call the interface /ai/generate, I get the same problem and it looks different #2427 . When I debug the RestClient initialize process and use example:

Image

Image

Image

Image result: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

After adding the following configuration, RestClient initialize as follows:

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}

Image

Image

Image result: {"generation":"I’m Gemma, a large language model created by the Gemma team at Google DeepMind. I’m openly available, which means you can use my weights. \n\n<0x0D>\n<0x0D>Do you have any specific questions you’d like me to answer?"}

I think it may be because RestClient exposing a fluent, synchronous API over underlying HTTP client, the default is to use ReactorClientHttpRequestFactory initialize ClientHttpRequestFactory, resulting in the above error

Comment From: making

spring.http.client.factory=simple

should also work

Comment From: Vevvev

Thanks for providing how you got this to work.

@Configuration public class Myconfig { @Bean public RestClient.Builder builder() { return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory()); } }

We will investigate.

@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see #2427

I was having it happen with OpenAI, not Deepseek.

Here's the line that was throwing out the blocking

Image

and the imports I have in my pom file.

Image

I did update the spring-ai.version in the Pom to 1.0.0-M7 over 1.0.0-M6, but ran into the same blocking problem.

Comment From: WindowsXP-XP

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Comment From: shaohzhangebay

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

I encounter same issue during a streaming chat with gpt enabled with tools wrapped in AsyncMcpToolCallbackProvider.

I think during chat streaming we are not able to use SyncMcpToolCallbackProvider since it invokes McpSyncClient and then invokes block() as well. public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { return this.delegate.callTool(callToolRequest).block(); } And current AsyncMcpToolCallbackProvider seems need to be redesigned.

Comment From: shaohzhangebay

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.

Comment From: WindowsXP-XP

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:
public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {

    private final List<McpAsyncClient> mcpClients;

    private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;

    public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
        Assert.notNull(mcpClients, "MCP clients must not be null");
        Assert.notNull(toolFilter, "Tool filter must not be null");
        this.mcpClients = mcpClients;
        this.toolFilter = toolFilter;
    }

    public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
        this((mcpClient, tool) -> true, mcpClients);
    }

    public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
        this(toolFilter, List.of(mcpClients));
    }

    public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
        this(List.of(mcpClients));
    }

    @Override
    public ToolCallback[] getToolCallbacks() {

        CountDownLatch latch = new CountDownLatch(1);
        CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
        AtomicReference<Throwable> errorRef = new AtomicReference<>();

        this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
            toolCallbackList.add(toolCallback);
        }, error -> {
            errorRef.set(error);
            latch.countDown();
        }, () -> {
            latch.countDown();
        });

        try {
            latch.await();
            if (errorRef.get() != null) {
                throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                        : new RuntimeException("Error during tool execution", errorRef.get());
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Tool execution was interrupted", e);
        }

        validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

        return toolCallbackList.toArray(new ToolCallback[0]);
    }

    private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
        List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
        if (!duplicateToolNames.isEmpty()) {
            throw new IllegalStateException(
                    "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
        }
    }

    private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
        if (CollectionUtils.isEmpty(mcpClients)) {
            return Flux.empty();
        }

        return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
            // Check if client is initialized and initialize if needed
            Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
                    : mcpClient.initialize().thenReturn(mcpClient);

            return clientMono.flatMap(client -> client.listTools())
                    .flatMapIterable(response -> response.tools())
                    .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
        });
    }

}

AsyncMcpToolCallback:

public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {

    private final McpAsyncClient asyncMcpClient;

    private final McpSchema.Tool tool;

    public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
        super(asyncMcpClient, tool);
        this.asyncMcpClient = asyncMcpClient;
        this.tool = tool;
    }

    @Override
    public ToolDefinition getToolDefinition() {
        return ToolDefinition.builder()
                .name(this.tool.name())
                .description(this.tool.description())
                .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
                .build();
    }

    @Override
    public String call(String functionInput) {
        Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> result = new AtomicReference<>();
        AtomicReference<Throwable> error = new AtomicReference<>();

        this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
                .map(response -> ModelOptionsUtils.toJsonString(response.content()))
                .subscribe(value -> {
                    result.set(value);
                    latch.countDown();
                }, throwable -> {
                    error.set(throwable);
                    latch.countDown();
                });

        try {
            latch.await();
            if (error.get() != null) {
                if (error.get() instanceof RuntimeException) {
                    throw (RuntimeException) error.get();
                } else {
                    throw new RuntimeException("Error during tool execution", error.get());
                }
            }
            return result.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Tool execution was interrupted", e);
        }
    }
}
  1. Inject the custom AsyncMcpToolCallbackProvider into the Spring container.
@AutoConfiguration
public class MyAiAutoConfiguration {
    @Bean
    @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List<McpAsyncClient>> mcpClientsProvider) {
        List<McpAsyncClient> mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList();
        return new MyAsyncMcpToolCallbackProvider(mcpClients);
    }
}
  1. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.
    public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
                                               Set<String> toolNames) {
        FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
        List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
        return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
                        .toolNames(toolNames).functionCallbacks(toolCallbackList).build()
    }

Comment From: markpollack

@WindowsXP-XP

Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.

and then you reply

Thank you. Based on your reference plan, I drafted a provisional solution:

So the #2341 didn't fix the issue for you?

There is another thing going on here wrt to the comment spring.http.client.factory=simple should also work.

unfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.

Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.

setting spring.http.client.factory=jdk also should help.

However, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.

Comment From: WindowsXP-XP

@WindowsXP-XP

Found this #2341 and tested it is working for me.

and then you reply

Thank you. Based on your reference plan, I drafted a provisional solution:

So the #2341 didn't fix the issue for you?

There is another thing going on here wrt to the comment should also work.spring.http.client.factory=simple

unfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.

Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.

setting also should help.spring.http.client.factory=jdk

However, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.

Neither setting spring.http.client.factory=simple nor spring.http.client.factory=jdk resolved my issue.

This is only a temporary solution. I haven't tried the approach in #2341 yet. I noticed he resolved it by modifying the ChatModels of different AI platforms, but I believe implementing unified handling when calling ChatModels would be preferable.

My provisional solution is based on Came up with some (ugly) workaround to overwrite/replace AsyncMcpToolCallbackProvider and wrap AsyncMcpToolCallbacks with a custom ToolCallback implementation that uses a ThreadPoolTaskExecuto in #2341

The Spring AI version referenced in #2341 has not been published to Maven Central, and third-party component support for this unreleased version remains limited. Therefore, I conclude that customizing AsyncMcpToolCallbackProvider and AsyncMcpToolCallbacks within our project currently represents:

  • The least invasive approach

  • The minimal code change requirement

  • The lowest risk of dependency conflicts

This why I choose this temporary solution

Comment From: zy1260957619

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:

public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {

private final List<McpAsyncClient> mcpClients;

private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
    Assert.notNull(mcpClients, "MCP clients must not be null");
    Assert.notNull(toolFilter, "Tool filter must not be null");
    this.mcpClients = mcpClients;
    this.toolFilter = toolFilter;
}

public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
    this((mcpClient, tool) -> true, mcpClients);
}

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
    this(toolFilter, List.of(mcpClients));
}

public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
    this(List.of(mcpClients));
}

@Override
public ToolCallback[] getToolCallbacks() {

    CountDownLatch latch = new CountDownLatch(1);
    CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
    AtomicReference<Throwable> errorRef = new AtomicReference<>();

    this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
        toolCallbackList.add(toolCallback);
    }, error -> {
        errorRef.set(error);
        latch.countDown();
    }, () -> {
        latch.countDown();
    });

    try {
        latch.await();
        if (errorRef.get() != null) {
            throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                    : new RuntimeException("Error during tool execution", errorRef.get());
        }
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }

    validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

    return toolCallbackList.toArray(new ToolCallback[0]);
}

private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
    List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
    if (!duplicateToolNames.isEmpty()) {
        throw new IllegalStateException(
                "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
    }
}

private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
    if (CollectionUtils.isEmpty(mcpClients)) {
        return Flux.empty();
    }

    return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
        // Check if client is initialized and initialize if needed
        Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
                : mcpClient.initialize().thenReturn(mcpClient);

        return clientMono.flatMap(client -> client.listTools())
                .flatMapIterable(response -> response.tools())
                .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
    });
}

} AsyncMcpToolCallback:

public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {

private final McpAsyncClient asyncMcpClient;

private final McpSchema.Tool tool;

public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
    super(asyncMcpClient, tool);
    this.asyncMcpClient = asyncMcpClient;
    this.tool = tool;
}

@Override
public ToolDefinition getToolDefinition() {
    return ToolDefinition.builder()
            .name(this.tool.name())
            .description(this.tool.description())
            .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
            .build();
}

@Override
public String call(String functionInput) {
    Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<String> result = new AtomicReference<>();
    AtomicReference<Throwable> error = new AtomicReference<>();

    this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
            .map(response -> ModelOptionsUtils.toJsonString(response.content()))
            .subscribe(value -> {
                result.set(value);
                latch.countDown();
            }, throwable -> {
                error.set(throwable);
                latch.countDown();
            });

    try {
        latch.await();
        if (error.get() != null) {
            if (error.get() instanceof RuntimeException) {
                throw (RuntimeException) error.get();
            } else {
                throw new RuntimeException("Error during tool execution", error.get());
            }
        }
        return result.get();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }
}

} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container.

@AutoConfiguration public class MyAiAutoConfiguration { @Bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider> mcpClientsProvider) { List mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.

public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
                                           Set<String> toolNames) {
    FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
    List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
    return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
                    .toolNames(toolNames).functionCallbacks(toolCallbackList).build()
}

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

Comment From: WindowsXP-XP

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:

public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider { ``` private final List mcpClients;

private final BiPredicate toolFilter;

public MyAsyncMcpToolCallbackProvider(BiPredicate toolFilter, List mcpClients) { Assert.notNull(mcpClients, "MCP clients must not be null"); Assert.notNull(toolFilter, "Tool filter must not be null"); this.mcpClients = mcpClients; this.toolFilter = toolFilter; }

public MyAsyncMcpToolCallbackProvider(List mcpClients) { this((mcpClient, tool) -> true, mcpClients); }

public MyAsyncMcpToolCallbackProvider(BiPredicate toolFilter, McpAsyncClient... mcpClients) { this(toolFilter, List.of(mcpClients)); }

public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) { this(List.of(mcpClients)); }

@Override public ToolCallback[] getToolCallbacks() {

CountDownLatch latch = new CountDownLatch(1);
CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
    toolCallbackList.add(toolCallback);
}, error -> {
    errorRef.set(error);
    latch.countDown();
}, () -> {
    latch.countDown();
});

try {
    latch.await();
    if (errorRef.get() != null) {
        throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                : new RuntimeException("Error during tool execution", errorRef.get());
    }
}
catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Tool execution was interrupted", e);
}

validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

return toolCallbackList.toArray(new ToolCallback[0]);

}

private void validateToolCallbacks(ToolCallback[] toolCallbacks) { List duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks); if (!duplicateToolNames.isEmpty()) { throw new IllegalStateException( "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames))); } }

private static Flux myAsyncToolCallbacks(List mcpClients) { if (CollectionUtils.isEmpty(mcpClients)) { return Flux.empty(); }

return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
    // Check if client is initialized and initialize if needed
    Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
            : mcpClient.initialize().thenReturn(mcpClient);

    return clientMono.flatMap(client -> client.listTools())
            .flatMapIterable(response -> response.tools())
            .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
});

} ```

} AsyncMcpToolCallback: public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback { ``` private final McpAsyncClient asyncMcpClient;

private final McpSchema.Tool tool;

public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) { super(asyncMcpClient, tool); this.asyncMcpClient = asyncMcpClient; this.tool = tool; }

@Override public ToolDefinition getToolDefinition() { return ToolDefinition.builder() .name(this.tool.name()) .description(this.tool.description()) .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema())) .build(); }

@Override public String call(String functionInput) { Map arguments = ModelOptionsUtils.jsonToMap(functionInput); CountDownLatch latch = new CountDownLatch(1); AtomicReference result = new AtomicReference<>(); AtomicReference error = new AtomicReference<>();

this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
        .map(response -> ModelOptionsUtils.toJsonString(response.content()))
        .subscribe(value -> {
            result.set(value);
            latch.countDown();
        }, throwable -> {
            error.set(throwable);
            latch.countDown();
        });

try {
    latch.await();
    if (error.get() != null) {
        if (error.get() instanceof RuntimeException) {
            throw (RuntimeException) error.get();
        } else {
            throw new RuntimeException("Error during tool execution", error.get());
        }
    }
    return result.get();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Tool execution was interrupted", e);
}

} ```

} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container. @autoConfiguration public class MyAiAutoConfiguration { @bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider mcpClientsProvider) { List mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations. public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens, Set<String> toolNames) { FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks(); List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks); return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens) .toolNames(toolNames).functionCallbacks(toolCallbackList).build() }

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

old version 1.0.0-M6

Comment From: zy1260957619

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

old version 1.0.0-M6

Thank you,this solution worked for me