Little pre-word: Yes, I read the mention of that missing feature on the docs.
Currently, the intermediate messages exchanged with a large-language model when performing tool calls are not stored in the memory. This is a limitation of the current implementation and will be addressed in future releases. If you need to store these messages, refer to the instructions for the User Controlled Tool Execution.
Source: https://docs.spring.io/spring-ai/reference/api/tools.html#_user_controlled_tool_execution
Questions:
1. is there an ETA in which release this limitation might be addressed?
2. I looked at the example in the docs which shows how to handle this manually using the blocking .call
method. Unfortunately, I want to use .stream
and can't figure out on how to make it work without esentially re-implementing the whole existing .stream
logic myself.
What I tried: The closest I came for now was this implementation. However, it still does not add the final LLM response to the history. (I think I'd need to use the same aggregator that the implementation at https://github.com/spring-projects/spring-ai/blob/main/models/spring-ai-azure-openai/src/main/java/org/springframework/ai/azure/openai/AzureOpenAiChatModel.java#L380 is using.
The furthest I’ve come so far looks like this. But it feels quite hacky, since I’m rewriting the history (clearing and re-adding) between steps:
fun chatWithDocuments(
question: String,
conversationId: String
): Flux<ChatResponse> {
val messages = mutableListOf<org.springframework.ai.chat.messages.Message>()
val systemMessageWithContext = """
$systemPrompt
""".trimIndent()
// Helper function for recursive streaming with tool execution
fun internalStream(prompt: Prompt, previousResponse: org.springframework.ai.chat.client.ChatClientResponse? = null): Flux<ChatResponse> {
return chatClient
.prompt(prompt)
// .advisors { it.param(ChatMemory.CONVERSATION_ID, conversationId) }
.tools(RetrievalTools(retrievalService))
.stream()
.chatClientResponse()
.flatMap { chatClientResponse ->
logger.info("Received content on next: {}", chatClientResponse.chatResponse?.result?.output?.text)
val hasToolCalls = chatClientResponse.chatResponse?.hasToolCalls() ?: false
logger.info("Toolcalls: {}", hasToolCalls)
val usage = chatClientResponse.chatResponse?.metadata?.usage
logger.info("Current usage: {} total, {} prompt, {} completionTokens", usage?.totalTokens, usage?.promptTokens, usage?.completionTokens)
if (hasToolCalls) {
// Tool execution must be done on boundedElastic since it's synchronous
return@flatMap Flux.defer {
val toolExecutionResult = toolCallingManager.executeToolCalls(prompt, chatClientResponse.chatResponse!!)
chatMemory.clear(conversationId)
chatMemory.add(conversationId, toolExecutionResult.conversationHistory())
internalStream(
Prompt(toolExecutionResult.conversationHistory(), chatOptions),
chatClientResponse
)
}.subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic())
}
val content = chatClientResponse.chatResponse?.result?.output?.text ?: ""
Flux.just(ChatResponse(ChatResponseType.CONTENT, content))
}
}
// Start the recursive streaming process
val memory = chatMemory.get(conversationId)
// this prevents adding the system prompt multiple times
if(memory.isEmpty()){
messages.add(SystemMessage(systemMessageWithContext))
}else{
messages.addAll(memory)
}
messages.add(UserMessage(question))
logger.info("Sending chat request with {} messages", messages.size)
val prompt = Prompt(messages, chatOptions)
return internalStream(prompt)
}
Question
Do you have any suggestions on how to achieve this more cleanly with .stream, until the framework itself supports including tool call messages in history?
Really appreciated.