- 作者:老汪软件技巧
- 发表时间:2024-11-15 00:03
- 浏览量:
服务器推送技术:
服务器推送(Server Push)技术允许客户端和服务端在有新内容可用时主动向客户端推送更新,而不需要用户主动去查询。服务器推送的优点有两个:
用户体验更流畅。用户不需要一直去刷新页面来获取最新内容,系统会在有新的消息出现时自动推送给客户端。更高效。服务器只在有真正有用的内容时才主动推送,节省了大量不必要的客户端请求。
服务器推送技术:
SSE(Server-Sent Events) :
WebSocket:
SSE:
对比一下我们熟知的WebsocketWebsocket:
下面是通过SSE实现AI流式输出的举例用法
// 1. Controller 接口代码如下,为了防止串流以及后续支持客户端主动停止推流,每次请求携带唯一的用户id。
@PostMapping("/chat")
public SseEmitter chat(@Validated @RequestBody ChatRequest request){
return Facade.chat(request,getUserId());
}
//2.接下来是Facade层,使用CompletableFuture异步调用AI
public SseEmitter chat(ChatRequest request, Long userId){
//创建连接,并设置超时时间
SseEmitter sseEmitter =new SseEmitter(3600000L);
//注册完成回调,调用sseEmitter.complete()时触发
sseEmitter.onCompletion(()->
log.info(request,userId);
);
//注册超时回调,超时后触发
sseEmitter.onTimeOut(()->
log.info(request,userId);
);
// 注册异常回调,调用 emitter.completeWithError() 触发
sseEmitter.onError(()->
log.info(request,userId);
);
// 使用 CompletableFuture 异步调用 AI 获取结果
CompletableFuture.runAsync(() -> manager.chat(request.getAsk(),String.valueOf(userId), sseEmitter), taskExecutor);
return sseEmitter;
}
//3.接下来是Manager层
public void chat(String ask,String UserId,SseEmitter sseEmitter){
//使用okhttpClient创建事件源工厂
EventSource.Factory factory=EventSource.createFactory(okHttpClient);
//构建请求体(根据调用的AI的请求所需的请求体构建请求体)
...
String json=JSONObject.toJSONString(request);
...
//构建HTTP请求(根据你需要调用的AI构建对应的HTTP请求)
RequestBody requestBody=RequestBody.companion.create(json,MediaType.parse("applicationn/json;charset=utf-8"));//创建请求体
//构建HTTP请求
Request requestBuilder=new Request.Builder.url(...).post(requestBody).builder();
//创建自定义事件监听器
ZdyEventSourceListener eventSourceListener=new ZdyEventSourceListener(sseEmitter);
//创建并启动事件源,并用eventSourceListener处理返回的事件
factory.newEventSource(requestBuilder,eventSourceListener);
}
//下面是自定义监听器
@Slf4j
public class ZdyEventSourceListener extends EventSourceListener {
private final SseEmitter sseEmitter;
private final Map<String, String> sessionData = new HashMap<>();
public ZdyEventSourceListener(SseEmitter sseEmitter) {
this.sseEmitter = sseEmitter;
}
//连接打开
@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
log.debug("ZdyEventSourceListener::onOpen");
super.onOpen(eventSource, response);
}
//接收到内容
@Override
public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
log.debug("ZdyEventSourceListener::onEvent data = {}", data);
// 解析数据
Map<String, Object> eventData = JSONObject.parseObject(data, Map.class);
String sessionId = (String) eventData.get("sessionId");
String message = (String) eventData.get("message");
// 更新会话数据
if (Objects.nonNull(sessionId) && Objects.nonNull(message)) {
sessionData.compute(sessionId, (key, oldValue) -> {
if (oldValue == null) {
return message;
} else {
return oldValue + message;
}
});
}
// 发送数据
this.send(data);
super.onEvent(eventSource, id, type, data);
}
//连接关闭
@Override
public void onClosed(@NotNull EventSource eventSource) {
log.debug("ZdyEventSourceListener::onClosed");
// 处理会话数据
sessionData.forEach((sessionId, data) -> {
log.debug("Session {} closed with data: {}", sessionId, data);
// 可以在这里进行进一步处理,例如保存到数据库
});
// 完成 sseEmitter
sseEmitter.complete();
super.onClosed(eventSource);
}
// 接收失败
@Override
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
log.error("ZdyEventSourceListener::onFailure eventSource = {}, response = {}", eventSource, response, t);
// 发送错误消息
this.send(JSONObject.toJSONString(Map.of("error", "SSE connection failed")));
sseEmitter.complete();
super.onFailure(eventSource, t, response);
}
//发送消息
private void send(String data) {
try {
sseEmitter.send(data, org.springframework.http.MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("ZdyEventSourceListener::send error, errorMsg = {}", e.getMessage(), e);
sseEmitter.complete();
}
}
}
水平有限以上文章如果有小错误还请见谅
最近在公司代码中看到了一些线程池的使用,让我想起了TL、ITL、TTL的用法,后续我会在总结总结分享给大家,多谢大家捧场!!!