1.0.0-M7 版本: client 采用sse 方式连接上 mcp server,可以正常运行;这个时候重启 mcp server, client 端就不会重连 mcp server,导致一直报错 reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 20000ms in 'source(MonoCreate)' (and no fallback has been configured) at reactor.core.Exceptions.propagate(Exceptions.java:410) at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:102)
怎么设置重连mcp server?
Comment From: Forest-Fairy
应该只能重新创建一个mcpclient对象 或者重新实现一个client可以重连的
Comment From: lyxfn
I also encountered this problem. Every time I restarted mcp-server, I needed to restart the client to recover. If I only restarted mcp-server, a 404 error would appear. I didn't see any reconnection configuration in the official documentation.
org.springframework.web.reactive.function.client.WebClientResponseException$NotFound: 404 Not Found from POST http://localhost:9080/mcp/message at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:324) ~[spring-webflux-6.1.14.jar:6.1.14] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ 404 NOT_FOUND from POST http://localhost:9080/mcp/message [DefaultWebClient]
Comment From: SuperWuYou
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
Comment From: hymmyh
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
Comment From: zhouwenjun-hub
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
大佬,请问有示例代码么?我尝试这个方案还是有问题,重新实例化时,mcp server会报错: session is null
Comment From: SuperWuYou
@zhouwenjun-hub 去看看autoconfigure包是怎么写的你就会了。
Comment From: dyrnq
stare
Comment From: luojinggit
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize(); 方法还是无法重连。
Comment From: 15185640400
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了....怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize();方法还是无法重连。
您好,请问一下,您解决了吗
Comment From: hymmyh
@Configuration
@EnableConfigurationProperties
public class McpConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class);
private McpSyncClient mcpClient;
private List
/**
* MCP客户端的URL地址,用于建立与MCP服务器的连接。
*/
@Value("${spring.ai.mcp.client.sse.connections.server1.url}")
private String mcpServerUrl;
/**
* 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
* <p>
* 同步方法确保客户端创建过程的线程安全。
*/
private synchronized void createMcpClient() {
// 关闭旧的MCP客户端(如果存在)
if (mcpClient != null) {
try {
mcpClient.close();
} catch (final Exception e) {
LOGGER.error("关闭旧客户端失败", e);
}
}
// 创建并配置新的MCP客户端传输层
final WebClient.Builder builder = WebClient.builder().baseUrl(mcpServerUrl);
final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
final McpSyncClient newClient = McpClient.sync(transport).build();
// 初始化新客户端并记录结果
final McpSchema.InitializeResult init = newClient.initialize();
LOGGER.info("<---------------->MCP Initialized: {}", init);
// 更新MCP客户端引用和工具回调
mcpClient = newClient;
toolCallbacks = List.of(new SyncMcpToolCallbackProvider(mcpClient).getToolCallbacks());
}
/**
* 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
* @return 已配置的工具回调列表
*/
public List<ToolCallback> getToolCallback() {
if (mcpClient == null) {
// 如果MCP客户端未初始化,则创建并直接返回回调列表
createMcpClient();
return toolCallbacks;
}
try {
// 检查与服务端的连接状态
mcpClient.ping();
} catch (final Exception e) {
// 捕获连接异常,记录错误日志并重建客户端
LOGGER.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
createMcpClient();
}
return toolCallbacks;
}
/**
* 在Bean初始化时创建MCP客户端。
*/
@PostConstruct
public void init() {
createMcpClient(); // 在Bean初始化时强制执行
}
}
Comment From: luojinggit
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
Comment From: 15185640400
private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class); private McpSyncClient mcpClient; private List toolCallbacks;
666啊兄弟,收下我的膝盖
Comment From: 15185640400
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
Comment From: luojinggit
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired
private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
Comment From: 15185640400
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
谢谢大佬
Comment From: lyxfn
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java
Comment From: luojinggit
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java
666666,我才发现
Comment From: lyxfn
贴一下代码,
@Slf4j
@Service
@EnableScheduling
@EnableConfigurationProperties({McpSseClientProperties.class})
public class CustomerSupportAssistant {
private final McpSseClientProperties sseProperties;
public CustomerSupportAssistant(McpSseClientProperties sseProperties) {
this.sseProperties = sseProperties;
}
@Resource
ChatModel chatModel;
@Resource
VectorStore vectorStore;
@Resource
ChatMemory chatMemory;
@Resource
ConversationAdvisor conversationAdvisor;
private ChatClient chatClient;
//more than one mcp-client
private static final List<McpSyncClient> mcpSyncClients = new CopyOnWriteArrayList<>();
@Scheduled(cron = "0 * * * * ?")
public void ping() {
if (mcpSyncClients.isEmpty()) {
initChatClient();
} else {
Iterator<McpSyncClient> iterator = mcpSyncClients.iterator();
while (iterator.hasNext()) {
McpSyncClient mcpSyncClient = iterator.next();
try {
mcpSyncClient.ping();
log.info("ping mcp client success:{}", mcpSyncClient.getClientInfo().name());
} catch (Exception e) {
log.error("ping mcp client error:{}", mcpSyncClient.getClientInfo().name());
//close & remove old mcp-client
mcpSyncClient.close();
iterator.remove();
//re-init
initChatClient();
}
}
}
}
@PostConstruct
private void initChatClient() {
this.chatClient = ChatClient.builder(chatModel)
.defaultSystem(new ClassPathResource(SYSTEM_PROMPT_TEMPLATE_PATH))
.defaultAdvisors(
new MessageChatMemoryAdvisor(chatMemory),
new QuestionAnswerAdvisor(vectorStore, SearchRequest.builder().topK(3).build()),
new ReReadingAdvisor(),
new LoggingAdvisor(),
conversationAdvisor
)
.defaultTools(
initToolCallbacks()
)
.build();
}
private synchronized List<ToolCallback> initToolCallbacks() {
List<ToolCallback> toolCallbacks = new ArrayList<>();
this.sseProperties.getConnections().forEach((key, value) -> {
final WebClient.Builder builder = WebClient.builder().baseUrl(value.url());
final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
final McpSyncClient mcpSyncClient = McpClient.sync(transport)
.clientInfo(new McpSchema.Implementation(
"mcp-client-".concat(value.url()),
"1.0.0"))
.build();
try {
McpSchema.InitializeResult initialize = mcpSyncClient.initialize();
log.info("initialize mcp client: {} mcp server :{}", mcpSyncClient.getClientInfo().name(), initialize.serverInfo().name());
mcpSyncClients.add(mcpSyncClient);
toolCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(mcpSyncClient).getToolCallbacks()));
} catch (Exception e) {
log.error("Failed to initialize mcp client for server: {}", value.url(), e);
}
});
return toolCallbacks;
}
//chat
public Flux<String> chat(String content) {
return this.chatClient.prompt()
.advisors()
.user(userMessageContent)
.toolContext()
.stream()
.content();
}
}
Comment From: zhihaoHappyeveryday
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
ChatClient.Builder builder = SpringUtil.getBean(ChatClient.Builder.class); ChatClient.Builder clone = builder.clone(); chatClient = clone .defaultTools(mcpConfig.getToolCallback().toArray(new ToolCallbackProvider[mcpConfig.getToolCallback().size()])) .build();
可以解决重复tools
Comment From: lyxfn
https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http MCP 官方已经有方案了,使用Streamable HTTP替代现在的SSE +HTTP传输模式,可以解决断线重连和恢复,SDK好像还在开发中,估计Spring AI在下个release会跟进这个。
Comment From: zhoujun134
参考各位大佬的使用,改写如下: 对于 同步调用的来说,可以尝试定义一个 McpConfig,具体代码如下:
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties.SseParameters;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.zj.study.ai.utils.gson.GsonUtils;
import cn.hutool.core.map.MapUtil;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhoujun134
* Created on 2025-05-07
*/
@EnableScheduling
@Configuration
@EnableConfigurationProperties
@Slf4j
public class McpConfig {
/**
* MCP客户端的配置属性,用于存储MCP客户端的相关配置信息。
* key : 服务名称-版本号-服务器地址
*/
private final Map<String, McpSyncClient> mcpSyncClientMap = new HashMap<>();
private final Map<String, List<ToolCallback>> toolCallbacksMap = new HashMap<>();
@Autowired
private McpSseClientProperties mcpSseClientProperties;
/**
* 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
* <p>
* 同步方法确保客户端创建过程的线程安全。
*/
private synchronized void pingAndCheckMcpClient() {
if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
final List<String> disableServers = new ArrayList<>();
this.mcpSyncClientMap.forEach((serverUrl, client) -> {
try {
// 检查与服务端的连接状态
client.ping();
} catch (final Exception e) {
// 捕获连接异常,记录错误日志并重建客户端
log.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
boolean createResult = this.createClientByServerUrl(serverUrl);
if (!createResult) {
disableServers.add(serverUrl);
}
}
});
if (CollectionUtils.isNotEmpty(disableServers)) {
disableServers.forEach(serverUrl -> {
if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
this.mcpSyncClientMap.remove(serverUrl);
}
if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
this.toolCallbacksMap.remove(serverUrl);
}
});
log.info("<---------------->MCP server 状态检测完成, 当前存活{}个MCP客户端, urls={},"
+ " 已失效 {} 个 MCP 客户端,失效 urls={}",
this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()),
disableServers.size(), GsonUtils.toJSONString(disableServers));
}
// 找出 mcpSyncClientMap 中不存在,配置文件中存在的连接信息,并再次重建 MCP 客户端
Map<String, SseParameters> connections = this.mcpSseClientProperties.getConnections();
if (MapUtil.isEmpty(connections)) {
log.warn("<---------------->MCP 没有配置连接信息,请检查配置文件");
return;
}
connections.forEach((key, sseParameter) -> {
String serverUrl = sseParameter.url();
if (this.mcpSyncClientMap.containsKey(serverUrl) || disableServers.contains(serverUrl)) {
return;
}
// 不在 mcpSyncClientMap 和 disableServers 中,则重建 MCP 客户端
this.createClientByServerUrl(serverUrl);
});
} else {
this.initMcpClient();
}
}
private boolean createClientByServerUrl(String serverUrl) {
if (StringUtils.isBlank(serverUrl)) {
log.warn("serverUrl is blank, please check the configuration file.");
return false;
}
try {
final HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(serverUrl).build();
final McpSyncClient newClient = McpClient.sync(transport).build();
// 初始化新客户端并记录结果
final McpSchema.InitializeResult init = newClient.initialize();
log.info("<----------------> sync MCP Initialized 完成: {}", init.serverInfo());
final List<ToolCallback> curToolCallbacks =
List.of(new SyncMcpToolCallbackProvider(newClient).getToolCallbacks());
this.toolCallbacksMap.put(serverUrl, curToolCallbacks);
this.mcpSyncClientMap.put(serverUrl, newClient);
} catch (Exception ex) {
log.error("<----------------> createClientByServerUrl 链接 MCP 客户端失败: {}, serverUrl={}",
ex.getLocalizedMessage(), serverUrl);
return false;
}
return true;
}
private synchronized void initMcpClient() {
Map<String, SseParameters> connectionsMap = mcpSseClientProperties.getConnections();
if (MapUtil.isEmpty(connectionsMap)) {
log.warn("initMcpClient <---------------->MCP 没有配置连接信息,请检查配置文件");
return;
}
connectionsMap.forEach((key, sseParameter) -> {
String serverUrl = sseParameter.url();
this.createClientByServerUrl(serverUrl);
});
}
/**
* 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
*
* @return 已配置的工具回调列表
*/
public List<ToolCallback> getToolCallback() {
if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
return this.toolCallbacksMap.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
}
return Collections.emptyList();
}
/**
* 每秒检查一次MCP客户端的状态,并重建异常的客户端。
*/
@Scheduled(cron = "* * * * * ?")
public void checkMcpClient() {
this.pingAndCheckMcpClient();
}
/**
* 在Bean初始化时创建MCP客户端。
*/
@PostConstruct
public void init() {
log.info("<---------------->MCP Initializing...");
// 在Bean初始化时强制执行
initMcpClient();
log.info("<---------------->MCP Initialized 完成, 共初始化{}个MCP客户端, urls={}",
this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()));
}
}
然后在使用的地方通过以下的方式使用:
@Autowired
private McpConfig mcpConfig;
ChatClient chatClient = ChatClient.builder(zhiPuAiChatModel)
.defaultTools(mcpConfig.getToolCallback())
.build();
Comment From: zhouwenjun-hub
@hymmyh @zhoujun134 @luojinggit 各位大佬,使用上面的方案遇到:
i.m.spec.McpClientSession : Unexpected response for unkown id cd772c95-561
怎么解决呢?多个client同事启动会出现这个问题,谢谢
Comment From: fwq418233640
@zhoujun134 非常感谢,我参考了你的代码,然后重新实现了一下,因为我这边是基于Consul的微服务,所以我这边添加了基于注册中心来动态注册Mcp Server 下面代码供大家参考一下,有问题欢迎指出
package com.yuan.chat.conf;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;
import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;
import org.springframework.ai.mcp.customizer.McpSyncClientCustomizer;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.consul.discovery.ConsulDiscoveryClient;
import org.springframework.cloud.consul.discovery.ConsulServiceInstance;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import java.net.http.HttpClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class McpClientConfig {
private final ConsulDiscoveryClient consulDiscoveryClient;
private final ObjectProvider<ObjectMapper> objectMapperProvide;
private final McpClientCommonProperties commonProperties;
// 黑名单,用来剔除一些不需要注册的服务
private final Set<String> blackList = Set.of("consul");
private final ConcurrentMap<String, List<NamedClientMcpTransport>> sseTransportMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, List<McpSyncClient>> mcpSyncClientMap = new ConcurrentHashMap<>();
private final Set<McpSyncClient> mcpSyncClients = ConcurrentHashMap.newKeySet();
private McpSyncClientConfigurer mcpSyncClientConfigurer;
@Getter
@Setter
private boolean clientValid;
// ======== 注册自动配置器 ========
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "SYNC", matchIfMissing = true)
McpSyncClientConfigurer mcpSyncClientConfigurer(ObjectProvider<McpSyncClientCustomizer> customizerProvider) {
this.mcpSyncClientConfigurer = new McpSyncClientConfigurer(customizerProvider.orderedStream().toList());
return mcpSyncClientConfigurer;
}
// ======== 从注册中心拉取服务并创建传输连接 ========
public void getMcpServer(ObjectProvider<ObjectMapper> objectMapperProvider) {
ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);
List<String> services = consulDiscoveryClient.getServices();
// 清空旧数据,防止重复添加
sseTransportMap.clear();
for (String service : services) {
if (blackList.contains(service)) continue;
for (ServiceInstance instance : consulDiscoveryClient.getInstances(service)) {
String baseUrl = instance.getUri().toString();
List<String> tags = ((ConsulServiceInstance) instance).getTags();
if (tags.isEmpty()) continue;
String sseEndpoint = tags.get(0).split("=")[1] + "/sse";
var transport = HttpClientSseClientTransport.builder(baseUrl)
.sseEndpoint(sseEndpoint)
.clientBuilder(HttpClient.newBuilder())
.objectMapper(objectMapper)
.build();
String connectedName = connectedClientName(commonProperties.getName(), instance.getInstanceId());
sseTransportMap.computeIfAbsent(connectedName, k -> new ArrayList<>())
.add(new NamedClientMcpTransport(instance.getInstanceId(), transport));
}
}
log.info("已加载 MCP 服务配置: {}", sseTransportMap.keySet());
}
// ======== 创建所有 MCP 客户端 ========
private List<McpSyncClient> mcpClients() {
this.getMcpServer(objectMapperProvide);
if (sseTransportMap.isEmpty()) {
log.warn("未发现 MCP 服务");
return List.of();
}
for (String key : sseTransportMap.keySet()) {
this.connectClient(key);
}
// 重新创建 ChatClient
this.clientValid = true;
log.info("已创建 MCP 客户端: {}", mcpSyncClients);
return new ArrayList<>(mcpSyncClients);
}
// ======== 每秒检查 MCP 客户端健康状态 ========
@Scheduled(cron = "* * * * * ?")
public void checkMcpClient() {
this.pingAndCheckMcpClient();
}
private synchronized void pingAndCheckMcpClient() {
log.debug("检查 MCP 客户端状态...");
if (sseTransportMap.isEmpty() || mcpSyncClientMap.isEmpty()) {
this.mcpClients();
return;
}
List<String> rebuildKeys = new ArrayList<>();
for (var entry : mcpSyncClientMap.entrySet()) {
List<McpSyncClient> clients = entry.getValue();
List<McpSyncClient> offlineClients = new ArrayList<>();
for (McpSyncClient client : clients) {
try {
client.ping();
} catch (Exception e) {
log.warn("客户端异常,准备重连: {}", entry.getKey());
offlineClients.add(client);
}
}
clients.removeAll(offlineClients);
offlineClients.forEach(mcpSyncClients::remove);
if (clients.isEmpty()) {
mcpSyncClientMap.remove(entry.getKey());
rebuildKeys.add(entry.getKey());
}
}
boolean rebuilt = false;
for (String name : rebuildKeys) {
if (this.connectClient(name)) {
rebuilt = true;
}
}
if (rebuilt) {
// 重新创建 ChatClient
this.clientValid = true;
}
}
// ======== 创建指定连接名称的 MCP 客户端 ========
private boolean connectClient(String key) {
List<NamedClientMcpTransport> transports = sseTransportMap.get(key);
if (transports == null) return false;
boolean connected = false;
for (NamedClientMcpTransport namedTransport : transports) {
String clientName = connectedClientName(commonProperties.getName(), namedTransport.name());
McpSchema.Implementation clientInfo = new McpSchema.Implementation(clientName, commonProperties.getVersion());
McpClient.SyncSpec spec = McpClient.sync(namedTransport.transport())
.clientInfo(clientInfo)
.requestTimeout(commonProperties.getRequestTimeout());
spec = mcpSyncClientConfigurer.configure(namedTransport.name(), spec);
McpSyncClient client = spec.build();
try {
if (commonProperties.isInitialized()) {
client.initialize();
}
if (mcpSyncClients.add(client)) {
mcpSyncClientMap.computeIfAbsent(clientName, k -> new ArrayList<>()).add(client);
log.info("连接 MCP 成功: {}", clientName);
connected = true;
}
} catch (Exception e) {
log.warn("连接 MCP 失败: {}", clientName);
}
}
return connected;
}
private String connectedClientName(String clientName, String serverConnectionName) {
return clientName + " - " + serverConnectionName;
}
// ======== MCP 客户端提供的工具回调集合 ========
public List<ToolCallback> getToolCallback() {
List<ToolCallback> allCallbacks = new ArrayList<>();
for (McpSyncClient client : mcpSyncClients) {
allCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(client).getToolCallbacks()));
}
return allCallbacks;
}
}
McpClientConfig 主要负责从注册中心获取服务注册成Mcp Server 以及断线重连
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ChatClientConfig {
private ChatClient chatClient;
private final OllamaChatModel ollamaChatModel;
private final McpClientConfig mcpClientConfig;
// ======== ChatClient 构建与刷新 ========
private synchronized void createChatClient() {
// 仅在Mcp Server 重连之后需要重新创建ChatClient
// 重新创建之后标记为 false,防止重复重构,
mcpClientConfig.setClientValid(false);
log.debug("重新创建 Chat Client");
List<ToolCallback> toolCallbacks = mcpClientConfig.getToolCallback();
toolCallbacks.forEach(cb -> log.info("加载 ToolCallback: {}", cb));
this.chatClient = ChatClient.builder(ollamaChatModel)
.defaultToolCallbacks(toolCallbacks)
.build();
}
public synchronized ChatClient getChatClient() {
if (this.chatClient == null || mcpClientConfig.isClientValid()) {
this.createChatClient();
}
return this.chatClient;
}
public synchronized void refreshChatClient() {
this.createChatClient();
}
}
ChatClientConfig 主要负责提供ChatClient实例,方便其他地方调用
@Slf4j
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {
private final ChatClientConfig chatClientConfig;
/**
* 处理聊天请求,使用AI和MCP工具进行响应
*
* @return 包含AI回复的响应
*/
@GetMapping
public Flux<String> chat(@RequestParam String message) {
return chatClientConfig.getChatClient().prompt(message)
.advisors(new SimpleLoggerAdvisor())
.stream()
.content();
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream(@RequestParam String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClientConfig.getChatClient().prompt(prompt)
.advisors(new SimpleLoggerAdvisor())
.stream()
.content();
}
}
ChatController 则是使用方