6月5
Flux<Object> result = aOneAgentOpenApiService.chatRun(aOneAgentRunParam)
.replay() // 允许多个订阅者同时消费
.autoConnect(2); // 至少 2 个订阅者时开始(前台SSE + 后台元数据提取)
// 后台异步处理:提取元数据 + 保存历史记录
String userId = chatParam.getUserId();
String query = aOneAgentRunParam.getQuery();
CompletableFuture.runAsync(() -> {
try {
// 提取 AGENT_RUN_STARTED 事件
AOneAgentRunResponseVO resultVO = AOneSkillUtil.getAnswerFluxByEventType(
result, List.of(AOneAgentMessageEventTypeEnum.AGENT_RUN_STARTED));
ChatHistoryParam chatHistoryParam = ChatHistoryParam.builder()
.chatId(resultVO.getMessageGroupId())
.role(MessageRole.ASSISTANT.getValue())
.messageId(resultVO.getMessageId())
.sectionId(resultVO.getMessageGroupId())
.conversationId(resultVO.getConversationId())
.userId(userId)
.startTime(DateUtil.date(createdAt))
.platform(this.getPlatformType().getCode())
.input(query)
.latencyFirstResp(DateUtil.current() - createdAt)
.build();
// 提取 AGENT_ROUND_COMPLETED 元数据
resultVO = AOneSkillUtil.getAnswerFluxByEventType(
result, List.of(AOneAgentMessageEventTypeEnum.AGENT_ROUND_COMPLETED));
JSONObject metadata = resultVO.getMetadata();
if (ObjUtil.isNotNull(metadata)) {
chatHistoryParam.setInputTokens(Convert.toLong(metadata.getLong(INPUT_TOKEN)));
chatHistoryParam.setOutputTokens(Convert.toLong(metadata.getLong(OUTPUT_TOKEN)));
chatHistoryParam.setTotalTokens(Convert.toLong(metadata.getLong(TOTAL_TOKEN)));
}
// 提取完整回答
resultVO = AOneSkillUtil.getAnswerCompleteFlux(result);
if (ObjUtil.isNotNull(resultVO)) {
String answer = resultVO.getAnswer();
if (StrUtil.isBlank(answer) && StrUtil.isNotBlank(resultVO.getErrorCode())) {
answer = resultVO.getErrorMsg();
log.error("AOne领域智能体平台错误:{},{}", resultVO.getErrorCode(), answer);
}
chatHistoryParam.setOutput(answer);
chatHistoryParam.setRequestIp(chatParam.getRequestIp());
chatHistoryParam.setIsFallback(ZERO);
chatHistoryParam.setLatency(DateUtil.current() - createdAt);
chatHistoryEventPublisher.publishEvent(new ChatHistoryEvent(chatHistoryParam));
}
} catch (Exception e) {
log.error("保存对话历史记录失败", e);
}
});
// 立即返回流给前端,不等待元数据提取完成
return result;
.replay() // 允许多个订阅者同时消费
.autoConnect(2); // 至少 2 个订阅者时开始(前台SSE + 后台元数据提取)
// 后台异步处理:提取元数据 + 保存历史记录
String userId = chatParam.getUserId();
String query = aOneAgentRunParam.getQuery();
CompletableFuture.runAsync(() -> {
try {
// 提取 AGENT_RUN_STARTED 事件
AOneAgentRunResponseVO resultVO = AOneSkillUtil.getAnswerFluxByEventType(
result, List.of(AOneAgentMessageEventTypeEnum.AGENT_RUN_STARTED));
ChatHistoryParam chatHistoryParam = ChatHistoryParam.builder()
.chatId(resultVO.getMessageGroupId())
.role(MessageRole.ASSISTANT.getValue())
.messageId(resultVO.getMessageId())
.sectionId(resultVO.getMessageGroupId())
.conversationId(resultVO.getConversationId())
.userId(userId)
.startTime(DateUtil.date(createdAt))
.platform(this.getPlatformType().getCode())
.input(query)
.latencyFirstResp(DateUtil.current() - createdAt)
.build();
// 提取 AGENT_ROUND_COMPLETED 元数据
resultVO = AOneSkillUtil.getAnswerFluxByEventType(
result, List.of(AOneAgentMessageEventTypeEnum.AGENT_ROUND_COMPLETED));
JSONObject metadata = resultVO.getMetadata();
if (ObjUtil.isNotNull(metadata)) {
chatHistoryParam.setInputTokens(Convert.toLong(metadata.getLong(INPUT_TOKEN)));
chatHistoryParam.setOutputTokens(Convert.toLong(metadata.getLong(OUTPUT_TOKEN)));
chatHistoryParam.setTotalTokens(Convert.toLong(metadata.getLong(TOTAL_TOKEN)));
}
// 提取完整回答
resultVO = AOneSkillUtil.getAnswerCompleteFlux(result);
if (ObjUtil.isNotNull(resultVO)) {
String answer = resultVO.getAnswer();
if (StrUtil.isBlank(answer) && StrUtil.isNotBlank(resultVO.getErrorCode())) {
answer = resultVO.getErrorMsg();
log.error("AOne领域智能体平台错误:{},{}", resultVO.getErrorCode(), answer);
}
chatHistoryParam.setOutput(answer);
chatHistoryParam.setRequestIp(chatParam.getRequestIp());
chatHistoryParam.setIsFallback(ZERO);
chatHistoryParam.setLatency(DateUtil.current() - createdAt);
chatHistoryEventPublisher.publishEvent(new ChatHistoryEvent(chatHistoryParam));
}
} catch (Exception e) {
log.error("保存对话历史记录失败", e);
}
});
// 立即返回流给前端,不等待元数据提取完成
return result;




