1.0.0-M7 版本: client 采用sse 方式连接上 mcp server,可以正常运行;这个时候重启 mcp server, client 端就不会重连 mcp server,导致一直报错 reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 20000ms in 'source(MonoCreate)' (and no fallback has been configured) at reactor.core.Exceptions.propagate(Exceptions.java:410) at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:102)

怎么设置重连mcp server?

Comment From: Forest-Fairy

应该只能重新创建一个mcpclient对象 或者重新实现一个client可以重连的

Comment From: lyxfn

I also encountered this problem. Every time I restarted mcp-server, I needed to restart the client to recover. If I only restarted mcp-server, a 404 error would appear. I didn't see any reconnection configuration in the official documentation.

org.springframework.web.reactive.function.client.WebClientResponseException$NotFound: 404 Not Found from POST http://localhost:9080/mcp/message at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:324) ~[spring-webflux-6.1.14.jar:6.1.14] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ 404 NOT_FOUND from POST http://localhost:9080/mcp/message [DefaultWebClient]

Comment From: SuperWuYou

这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???

Comment From: hymmyh

这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???

目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。

Comment From: zhouwenjun-hub

这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???

目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。

大佬,请问有示例代码么?我尝试这个方案还是有问题,重新实例化时,mcp server会报错: session is null

Comment From: SuperWuYou

@zhouwenjun-hub 去看看autoconfigure包是怎么写的你就会了。

Comment From: dyrnq

stare

Comment From: luojinggit

这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???

目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。

麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize(); 方法还是无法重连。

Comment From: 15185640400

这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了....怎么办???

目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。

麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize();方法还是无法重连。

您好,请问一下,您解决了吗

Comment From: hymmyh

@Configuration @EnableConfigurationProperties public class McpConfig { private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class); private McpSyncClient mcpClient; private List toolCallbacks;

/**
 * MCP客户端的URL地址,用于建立与MCP服务器的连接。
 */
@Value("${spring.ai.mcp.client.sse.connections.server1.url}")
private String mcpServerUrl;

/**
 * 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
 * <p>
 * 同步方法确保客户端创建过程的线程安全。
 */
private synchronized void createMcpClient() {
    // 关闭旧的MCP客户端(如果存在)
    if (mcpClient != null) {
        try {
            mcpClient.close();
        } catch (final Exception e) {
            LOGGER.error("关闭旧客户端失败", e);
        }
    }

    // 创建并配置新的MCP客户端传输层
    final WebClient.Builder builder = WebClient.builder().baseUrl(mcpServerUrl);
    final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
    final McpSyncClient newClient = McpClient.sync(transport).build();

    // 初始化新客户端并记录结果
    final McpSchema.InitializeResult init = newClient.initialize();
    LOGGER.info("<---------------->MCP Initialized: {}", init);

    // 更新MCP客户端引用和工具回调
    mcpClient = newClient;
    toolCallbacks = List.of(new SyncMcpToolCallbackProvider(mcpClient).getToolCallbacks());
}


/**
 * 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
 * @return 已配置的工具回调列表
 */
public List<ToolCallback> getToolCallback() {
    if (mcpClient == null) {
        // 如果MCP客户端未初始化,则创建并直接返回回调列表
        createMcpClient();
        return toolCallbacks;
    }
    try {
        // 检查与服务端的连接状态
        mcpClient.ping();
    } catch (final Exception e) {
        // 捕获连接异常,记录错误日志并重建客户端
        LOGGER.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
        createMcpClient();
    }
    return toolCallbacks;
}

/**
 * 在Bean初始化时创建MCP客户端。
 */
@PostConstruct
public void init() {
    createMcpClient(); // 在Bean初始化时强制执行
}

}

Comment From: luojinggit

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

Comment From: 15185640400

private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class); private McpSyncClient mcpClient; private List toolCallbacks;

666啊兄弟,收下我的膝盖

Comment From: 15185640400

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

可以附一下代码吗兄弟?不,义父0.0

Comment From: luojinggit

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

可以附一下代码吗兄弟?不,义父0.0

配置文件关闭client自动配置: ai: mcp: client: enabled: false

@Autowired
private OpenAiChatModel chatModel;

Image 在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧

Comment From: 15185640400

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

可以附一下代码吗兄弟?不,义父0.0

配置文件关闭client自动配置: ai: mcp: client: enabled: false

@Autowired private OpenAiChatModel chatModel;

Image 在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧

谢谢大佬

Comment From: lyxfn

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

可以附一下代码吗兄弟?不,义父0.0

配置文件关闭client自动配置: ai: mcp: client: enabled: false

@Autowired private OpenAiChatModel chatModel;

Image 在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧

用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java

Comment From: luojinggit

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

可以附一下代码吗兄弟?不,义父0.0

配置文件关闭client自动配置: ai: mcp: client: enabled: false @Autowired private OpenAiChatModel chatModel;

Image 在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧

用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java

666666,我才发现

Comment From: lyxfn

贴一下代码,1.0.0-M7,不需要使用advisor的可以去掉。

@Slf4j
@Service
@EnableScheduling
@EnableConfigurationProperties({McpSseClientProperties.class})
public class CustomerSupportAssistant {

    private final McpSseClientProperties sseProperties;

    public CustomerSupportAssistant(McpSseClientProperties sseProperties) {
        this.sseProperties = sseProperties;
    }

    @Resource
    ChatModel chatModel;

    @Resource
    VectorStore vectorStore;

    @Resource
    ChatMemory chatMemory;

    @Resource
    ConversationAdvisor conversationAdvisor;

    private ChatClient chatClient;

    //more than one mcp-client
    private static final List<McpSyncClient> mcpSyncClients = new CopyOnWriteArrayList<>();

    @Scheduled(cron = "0 * * * * ?")
    public void ping() {
        if (mcpSyncClients.isEmpty()) {
            initChatClient();
        } else {
            Iterator<McpSyncClient> iterator = mcpSyncClients.iterator();
            while (iterator.hasNext()) {
                McpSyncClient mcpSyncClient = iterator.next();
                try {
                    mcpSyncClient.ping();
                    log.info("ping mcp client success:{}", mcpSyncClient.getClientInfo().name());
                } catch (Exception e) {
                    log.error("ping mcp client error:{}", mcpSyncClient.getClientInfo().name());
                    //close & remove old mcp-client
                    mcpSyncClient.close();
                    iterator.remove();
                    //re-init
                    initChatClient();
                }
            }
        }
    }

    @PostConstruct
    private void initChatClient() {
        this.chatClient = ChatClient.builder(chatModel)
                .defaultSystem(new ClassPathResource(SYSTEM_PROMPT_TEMPLATE_PATH))
                .defaultAdvisors(
                        new MessageChatMemoryAdvisor(chatMemory),
                        new QuestionAnswerAdvisor(vectorStore, SearchRequest.builder().topK(3).build()),
                        new ReReadingAdvisor(),
                        new LoggingAdvisor(),
                        conversationAdvisor
                )
                .defaultTools(
                        initToolCallbacks()
                )
                .build();
    }

    private synchronized List<ToolCallback> initToolCallbacks() {
        List<ToolCallback> toolCallbacks = new ArrayList<>();
        this.sseProperties.getConnections().forEach((key, value) -> {
            final WebClient.Builder builder = WebClient.builder().baseUrl(value.url());
            final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
            final McpSyncClient mcpSyncClient = McpClient.sync(transport)
                    .clientInfo(new McpSchema.Implementation(
                            "mcp-client-".concat(value.url()),
                            "1.0.0"))
                    .build();
            try {
                McpSchema.InitializeResult initialize = mcpSyncClient.initialize();
                log.info("initialize mcp client: {} mcp server :{}", mcpSyncClient.getClientInfo().name(), initialize.serverInfo().name());
                mcpSyncClients.add(mcpSyncClient);
                toolCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(mcpSyncClient).getToolCallbacks()));
            } catch (Exception e) {
                log.error("Failed to initialize mcp client for server: {}", value.url(), e);
            }
        });
        return toolCallbacks;
    }

    //chat
    public Flux<String> chat(String content) {
        return this.chatClient.prompt()
                .advisors()
                .user(userMessageContent)
                .toolContext()
                .stream()
                .content();
    }
}

Comment From: zhihaoHappyeveryday

这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient

ChatClient.Builder builder = SpringUtil.getBean(ChatClient.Builder.class); ChatClient.Builder clone = builder.clone(); chatClient = clone .defaultTools(mcpConfig.getToolCallback().toArray(new ToolCallbackProvider[mcpConfig.getToolCallback().size()])) .build();

可以解决重复tools

Comment From: lyxfn

https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http MCP 官方已经有方案了,使用Streamable HTTP替代现在的SSE +HTTP传输模式,可以解决断线重连和恢复,SDK好像还在开发中,估计Spring AI在下个release会跟进这个。

Comment From: zhoujun134

参考各位大佬的使用,改写如下: 对于 同步调用的来说,可以尝试定义一个 McpConfig,具体代码如下:


import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties.SseParameters;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.zj.study.ai.utils.gson.GsonUtils;

import cn.hutool.core.map.MapUtil;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;

/**
 * @author zhoujun134
 * Created on 2025-05-07
 */
@EnableScheduling
@Configuration
@EnableConfigurationProperties
@Slf4j
public class McpConfig {
    /**
     * MCP客户端的配置属性,用于存储MCP客户端的相关配置信息。
     * key : 服务名称-版本号-服务器地址
     */
    private final Map<String, McpSyncClient> mcpSyncClientMap = new HashMap<>();

    private final Map<String, List<ToolCallback>> toolCallbacksMap = new HashMap<>();

    @Autowired
    private McpSseClientProperties mcpSseClientProperties;

    /**
     * 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
     * <p>
     * 同步方法确保客户端创建过程的线程安全。
     */
    private synchronized void pingAndCheckMcpClient() {
        if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
            final List<String> disableServers = new ArrayList<>();
            this.mcpSyncClientMap.forEach((serverUrl, client) -> {
                try {
                    // 检查与服务端的连接状态
                    client.ping();
                } catch (final Exception e) {
                    // 捕获连接异常,记录错误日志并重建客户端
                    log.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
                    boolean createResult = this.createClientByServerUrl(serverUrl);
                    if (!createResult) {
                        disableServers.add(serverUrl);
                    }
                }
            });
            if (CollectionUtils.isNotEmpty(disableServers)) {
                disableServers.forEach(serverUrl -> {
                    if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
                        this.mcpSyncClientMap.remove(serverUrl);
                    }
                    if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
                        this.toolCallbacksMap.remove(serverUrl);
                    }
                });
                log.info("<---------------->MCP server 状态检测完成, 当前存活{}个MCP客户端, urls={},"
                                + " 已失效 {} 个 MCP 客户端,失效 urls={}",
                        this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()),
                        disableServers.size(), GsonUtils.toJSONString(disableServers));
            }
            // 找出 mcpSyncClientMap 中不存在,配置文件中存在的连接信息,并再次重建 MCP 客户端
            Map<String, SseParameters> connections = this.mcpSseClientProperties.getConnections();
            if (MapUtil.isEmpty(connections)) {
                log.warn("<---------------->MCP 没有配置连接信息,请检查配置文件");
                return;
            }
            connections.forEach((key, sseParameter) -> {
                String serverUrl = sseParameter.url();
                if (this.mcpSyncClientMap.containsKey(serverUrl) || disableServers.contains(serverUrl)) {
                    return;
                }
                // 不在 mcpSyncClientMap 和 disableServers 中,则重建 MCP 客户端
                this.createClientByServerUrl(serverUrl);
            });
        } else {
            this.initMcpClient();
        }
    }

    private boolean createClientByServerUrl(String serverUrl) {
        if (StringUtils.isBlank(serverUrl)) {
            log.warn("serverUrl is blank, please check the configuration file.");
            return false;
        }
        try {
            final HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(serverUrl).build();
            final McpSyncClient newClient = McpClient.sync(transport).build();
            // 初始化新客户端并记录结果
            final McpSchema.InitializeResult init = newClient.initialize();
            log.info("<----------------> sync MCP Initialized 完成: {}", init.serverInfo());
            final List<ToolCallback> curToolCallbacks =
                    List.of(new SyncMcpToolCallbackProvider(newClient).getToolCallbacks());
            this.toolCallbacksMap.put(serverUrl, curToolCallbacks);
            this.mcpSyncClientMap.put(serverUrl, newClient);
        } catch (Exception ex) {
            log.error("<----------------> createClientByServerUrl 链接 MCP 客户端失败: {}, serverUrl={}",
                    ex.getLocalizedMessage(), serverUrl);
            return false;
        }
        return true;
    }

    private synchronized void initMcpClient() {
        Map<String, SseParameters> connectionsMap = mcpSseClientProperties.getConnections();
        if (MapUtil.isEmpty(connectionsMap)) {
            log.warn("initMcpClient <---------------->MCP 没有配置连接信息,请检查配置文件");
            return;
        }
        connectionsMap.forEach((key, sseParameter) -> {
            String serverUrl = sseParameter.url();
            this.createClientByServerUrl(serverUrl);
        });
    }

    /**
     * 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
     *
     * @return 已配置的工具回调列表
     */
    public List<ToolCallback> getToolCallback() {
        if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
            return this.toolCallbacksMap.values().stream()
                    .flatMap(List::stream)
                    .collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    /**
     * 每秒检查一次MCP客户端的状态,并重建异常的客户端。
     */
    @Scheduled(cron = "* * * * * ?")
    public void checkMcpClient() {
        this.pingAndCheckMcpClient();
    }

    /**
     * 在Bean初始化时创建MCP客户端。
     */
    @PostConstruct
    public void init() {
        log.info("<---------------->MCP Initializing...");
        // 在Bean初始化时强制执行
        initMcpClient();
        log.info("<---------------->MCP Initialized 完成, 共初始化{}个MCP客户端, urls={}",
                this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()));
    }
}

然后在使用的地方通过以下的方式使用:


    @Autowired
    private McpConfig mcpConfig;

      ChatClient chatClient = ChatClient.builder(zhiPuAiChatModel)
                .defaultTools(mcpConfig.getToolCallback())
                .build();

Comment From: zhouwenjun-hub

@hymmyh @zhoujun134 @luojinggit 各位大佬,使用上面的方案遇到:

i.m.spec.McpClientSession : Unexpected response for unkown id cd772c95-561

怎么解决呢?多个client同事启动会出现这个问题,谢谢

Comment From: fwq418233640

@zhoujun134 非常感谢,我参考了你的代码,然后重新实现了一下,因为我这边是基于Consul的微服务,所以我这边添加了基于注册中心来动态注册Mcp Server 下面代码供大家参考一下,有问题欢迎指出


package com.yuan.chat.conf;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;
import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;
import org.springframework.ai.mcp.customizer.McpSyncClientCustomizer;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.consul.discovery.ConsulDiscoveryClient;
import org.springframework.cloud.consul.discovery.ConsulServiceInstance;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

import java.net.http.HttpClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class McpClientConfig {

    private final ConsulDiscoveryClient consulDiscoveryClient;
    private final ObjectProvider<ObjectMapper> objectMapperProvide;
    private final McpClientCommonProperties commonProperties;
    // 黑名单,用来剔除一些不需要注册的服务
    private final Set<String> blackList = Set.of("consul");

    private final ConcurrentMap<String, List<NamedClientMcpTransport>> sseTransportMap = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, List<McpSyncClient>> mcpSyncClientMap = new ConcurrentHashMap<>();
    private final Set<McpSyncClient> mcpSyncClients = ConcurrentHashMap.newKeySet();

    private McpSyncClientConfigurer mcpSyncClientConfigurer;

    @Getter
    @Setter
    private boolean clientValid;

    // ======== 注册自动配置器 ========
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "SYNC", matchIfMissing = true)
    McpSyncClientConfigurer mcpSyncClientConfigurer(ObjectProvider<McpSyncClientCustomizer> customizerProvider) {
        this.mcpSyncClientConfigurer = new McpSyncClientConfigurer(customizerProvider.orderedStream().toList());
        return mcpSyncClientConfigurer;
    }

    // ======== 从注册中心拉取服务并创建传输连接 ========
    public void getMcpServer(ObjectProvider<ObjectMapper> objectMapperProvider) {
        ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);
        List<String> services = consulDiscoveryClient.getServices();

        // 清空旧数据,防止重复添加
        sseTransportMap.clear();

        for (String service : services) {
            if (blackList.contains(service)) continue;

            for (ServiceInstance instance : consulDiscoveryClient.getInstances(service)) {
                String baseUrl = instance.getUri().toString();
                List<String> tags = ((ConsulServiceInstance) instance).getTags();
                if (tags.isEmpty()) continue;

                String sseEndpoint = tags.get(0).split("=")[1] + "/sse";
                var transport = HttpClientSseClientTransport.builder(baseUrl)
                        .sseEndpoint(sseEndpoint)
                        .clientBuilder(HttpClient.newBuilder())
                        .objectMapper(objectMapper)
                        .build();

                String connectedName = connectedClientName(commonProperties.getName(), instance.getInstanceId());
                sseTransportMap.computeIfAbsent(connectedName, k -> new ArrayList<>())
                        .add(new NamedClientMcpTransport(instance.getInstanceId(), transport));
            }
        }

        log.info("已加载 MCP 服务配置: {}", sseTransportMap.keySet());
    }

    // ======== 创建所有 MCP 客户端 ========
    private List<McpSyncClient> mcpClients() {
        this.getMcpServer(objectMapperProvide);
        if (sseTransportMap.isEmpty()) {
            log.warn("未发现 MCP 服务");
            return List.of();
        }

        for (String key : sseTransportMap.keySet()) {
            this.connectClient(key);
        }

        // 重新创建 ChatClient
        this.clientValid = true;

        log.info("已创建 MCP 客户端: {}", mcpSyncClients);
        return new ArrayList<>(mcpSyncClients);
    }

    // ======== 每秒检查 MCP 客户端健康状态 ========
    @Scheduled(cron = "* * * * * ?")
    public void checkMcpClient() {
        this.pingAndCheckMcpClient();
    }

    private synchronized void pingAndCheckMcpClient() {
        log.debug("检查 MCP 客户端状态...");

        if (sseTransportMap.isEmpty() || mcpSyncClientMap.isEmpty()) {
            this.mcpClients();
            return;
        }

        List<String> rebuildKeys = new ArrayList<>();

        for (var entry : mcpSyncClientMap.entrySet()) {
            List<McpSyncClient> clients = entry.getValue();
            List<McpSyncClient> offlineClients = new ArrayList<>();

            for (McpSyncClient client : clients) {
                try {
                    client.ping();
                } catch (Exception e) {
                    log.warn("客户端异常,准备重连: {}", entry.getKey());
                    offlineClients.add(client);
                }
            }

            clients.removeAll(offlineClients);
            offlineClients.forEach(mcpSyncClients::remove);

            if (clients.isEmpty()) {
                mcpSyncClientMap.remove(entry.getKey());
                rebuildKeys.add(entry.getKey());
            }
        }

        boolean rebuilt = false;
        for (String name : rebuildKeys) {
            if (this.connectClient(name)) {
                rebuilt = true;
            }
        }

        if (rebuilt) {
            // 重新创建 ChatClient
            this.clientValid = true;
        }
    }

    // ======== 创建指定连接名称的 MCP 客户端 ========
    private boolean connectClient(String key) {
        List<NamedClientMcpTransport> transports = sseTransportMap.get(key);
        if (transports == null) return false;

        boolean connected = false;

        for (NamedClientMcpTransport namedTransport : transports) {
            String clientName = connectedClientName(commonProperties.getName(), namedTransport.name());
            McpSchema.Implementation clientInfo = new McpSchema.Implementation(clientName, commonProperties.getVersion());

            McpClient.SyncSpec spec = McpClient.sync(namedTransport.transport())
                    .clientInfo(clientInfo)
                    .requestTimeout(commonProperties.getRequestTimeout());

            spec = mcpSyncClientConfigurer.configure(namedTransport.name(), spec);
            McpSyncClient client = spec.build();

            try {
                if (commonProperties.isInitialized()) {
                    client.initialize();
                }

                if (mcpSyncClients.add(client)) {
                    mcpSyncClientMap.computeIfAbsent(clientName, k -> new ArrayList<>()).add(client);
                    log.info("连接 MCP 成功: {}", clientName);
                    connected = true;
                }
            } catch (Exception e) {
                log.warn("连接 MCP 失败: {}", clientName);
            }
        }

        return connected;
    }

    private String connectedClientName(String clientName, String serverConnectionName) {
        return clientName + " - " + serverConnectionName;
    }

    // ======== MCP 客户端提供的工具回调集合 ========
    public List<ToolCallback> getToolCallback() {
        List<ToolCallback> allCallbacks = new ArrayList<>();
        for (McpSyncClient client : mcpSyncClients) {
            allCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(client).getToolCallbacks()));
        }
        return allCallbacks;
    }
}

McpClientConfig 主要负责从注册中心获取服务注册成Mcp Server 以及断线重连


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class ChatClientConfig {

    private ChatClient chatClient;

    private final OllamaChatModel ollamaChatModel;

    private final McpClientConfig mcpClientConfig;

    // ======== ChatClient 构建与刷新 ========
    private synchronized void createChatClient() {
        // 仅在Mcp Server 重连之后需要重新创建ChatClient
        // 重新创建之后标记为 false,防止重复重构,
        mcpClientConfig.setClientValid(false);
        log.debug("重新创建 Chat Client");
        List<ToolCallback> toolCallbacks = mcpClientConfig.getToolCallback();
        toolCallbacks.forEach(cb -> log.info("加载 ToolCallback: {}", cb));
        this.chatClient = ChatClient.builder(ollamaChatModel)
                .defaultToolCallbacks(toolCallbacks)
                .build();
    }

    public synchronized ChatClient getChatClient() {
        if (this.chatClient == null || mcpClientConfig.isClientValid()) {
            this.createChatClient();
        }
        return this.chatClient;
    }

    public synchronized void refreshChatClient() {
        this.createChatClient();
    }
}

ChatClientConfig 主要负责提供ChatClient实例,方便其他地方调用


@Slf4j
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {

    private final ChatClientConfig chatClientConfig;

    /**
     * 处理聊天请求,使用AI和MCP工具进行响应
     *
     * @return 包含AI回复的响应
     */
    @GetMapping
    public Flux<String> chat(@RequestParam String message) {
        return chatClientConfig.getChatClient().prompt(message)
                .advisors(new SimpleLoggerAdvisor())
                .stream()
                .content();
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> stream(@RequestParam String message) {
        Prompt prompt = new Prompt(new UserMessage(message));
        return chatClientConfig.getChatClient().prompt(prompt)
                .advisors(new SimpleLoggerAdvisor())
                .stream()
                .content();
    }
}

ChatController 则是使用方