• 作者:老汪软件技巧
  • 发表时间:2024-11-15 00:03
  • 浏览量:

服务器推送技术:

服务器推送(Server Push)技术允许客户端和服务端在有新内容可用时主动向客户端推送更新,而不需要用户主动去查询。服务器推送的优点有两个:

用户体验更流畅。用户不需要一直去刷新页面来获取最新内容,系统会在有新的消息出现时自动推送给客户端。更高效。服务器只在有真正有用的内容时才主动推送,节省了大量不必要的客户端请求。

服务器推送技术:

SSE(Server-Sent Events) :

WebSocket:

SSE:

对比一下我们熟知的WebsocketWebsocket:

Snipaste_2024-11-11_10-37-16.png

1c7dfc237e48408fb0063c0f56f26ecb.webp

下面是通过SSE实现AI流式输出的举例用法

// 1.  Controller 接口代码如下,为了防止串流以及后续支持客户端主动停止推流,每次请求携带唯一的用户id。
@PostMapping("/chat")
public SseEmitter chat(@Validated @RequestBody ChatRequest request){
       return Facade.chat(request,getUserId());
}
//2.接下来是Facade层,使用CompletableFuture异步调用AI
public SseEmitter chat(ChatRequest request, Long userId){
       //创建连接,并设置超时时间
       SseEmitter sseEmitter =new SseEmitter(3600000L);
       //注册完成回调,调用sseEmitter.complete()时触发
       sseEmitter.onCompletion(()->
                 log.info(request,userId);
       );
       //注册超时回调,超时后触发
       sseEmitter.onTimeOut(()->
                 log.info(request,userId);
       );
       // 注册异常回调,调用 emitter.completeWithError() 触发
       sseEmitter.onError(()->
                 log.info(request,userId);
       );
        // 使用 CompletableFuture 异步调用 AI 获取结果
        CompletableFuture.runAsync(() -> manager.chat(request.getAsk(),String.valueOf(userId), sseEmitter), taskExecutor);
       return sseEmitter;
}
//3.接下来是Manager层
public void chat(String ask,String UserId,SseEmitter sseEmitter){
       //使用okhttpClient创建事件源工厂
       EventSource.Factory factory=EventSource.createFactory(okHttpClient);
       //构建请求体(根据调用的AI的请求所需的请求体构建请求体)
       ...
       String json=JSONObject.toJSONString(request);
       ...
       //构建HTTP请求(根据你需要调用的AI构建对应的HTTP请求)
       RequestBody requestBody=RequestBody.companion.create(json,MediaType.parse("applicationn/json;charset=utf-8"));//创建请求体
       //构建HTTP请求
       Request requestBuilder=new Request.Builder.url(...).post(requestBody).builder();
       //创建自定义事件监听器
       ZdyEventSourceListener eventSourceListener=new ZdyEventSourceListener(sseEmitter);
       //创建并启动事件源,并用eventSourceListener处理返回的事件
       factory.newEventSource(requestBuilder,eventSourceListener);
}
//下面是自定义监听器
@Slf4j
public class ZdyEventSourceListener extends EventSourceListener {
    private final SseEmitter sseEmitter;
    private final Map<String, String> sessionData = new HashMap<>();
    public ZdyEventSourceListener(SseEmitter sseEmitter) {
        this.sseEmitter = sseEmitter;
    }
    //连接打开
    @Override

流式输入输出处理_输出流的flush方法_

public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { log.debug("ZdyEventSourceListener::onOpen"); super.onOpen(eventSource, response); } //接收到内容 @Override public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) { log.debug("ZdyEventSourceListener::onEvent data = {}", data); // 解析数据 Map<String, Object> eventData = JSONObject.parseObject(data, Map.class); String sessionId = (String) eventData.get("sessionId"); String message = (String) eventData.get("message"); // 更新会话数据 if (Objects.nonNull(sessionId) && Objects.nonNull(message)) { sessionData.compute(sessionId, (key, oldValue) -> { if (oldValue == null) { return message; } else { return oldValue + message; } }); } // 发送数据 this.send(data); super.onEvent(eventSource, id, type, data); } //连接关闭 @Override public void onClosed(@NotNull EventSource eventSource) { log.debug("ZdyEventSourceListener::onClosed"); // 处理会话数据 sessionData.forEach((sessionId, data) -> { log.debug("Session {} closed with data: {}", sessionId, data); // 可以在这里进行进一步处理,例如保存到数据库 }); // 完成 sseEmitter sseEmitter.complete(); super.onClosed(eventSource); } // 接收失败 @Override public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) { log.error("ZdyEventSourceListener::onFailure eventSource = {}, response = {}", eventSource, response, t); // 发送错误消息 this.send(JSONObject.toJSONString(Map.of("error", "SSE connection failed"))); sseEmitter.complete(); super.onFailure(eventSource, t, response); } //发送消息 private void send(String data) { try { sseEmitter.send(data, org.springframework.http.MediaType.APPLICATION_JSON); } catch (IOException e) { log.error("ZdyEventSourceListener::send error, errorMsg = {}", e.getMessage(), e); sseEmitter.complete(); } } }

水平有限以上文章如果有小错误还请见谅

最近在公司代码中看到了一些线程池的使用,让我想起了TL、ITL、TTL的用法,后续我会在总结总结分享给大家,多谢大家捧场!!!