From 9fa5c8e6a56863b7954e489bc8f0c603f9154815 Mon Sep 17 00:00:00 2001 From: diant Date: Mon, 17 Jul 2023 18:32:47 +0800 Subject: [PATCH] =?UTF-8?q?db=E6=96=B0=E5=A2=9Esse=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DbSseEmitterController.java | 70 ++++++ .../sse/enums/DbSseEmitterParameterEnum.java | 35 +++ .../framework/sse/param/DbCommonSseParam.java | 19 ++ .../sse/service/DbSseEmitterService.java | 46 ++++ .../service/impl/DbSseEmitterServiceImpl.java | 118 ++++++++++ .../db/framework/sse/util/DbSseCacheUtil.java | 216 ++++++++++++++++++ 6 files changed, 504 insertions(+) create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/enums/DbSseEmitterParameterEnum.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/param/DbCommonSseParam.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java new file mode 100644 index 00000000..06c3cddd --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java @@ -0,0 +1,70 @@ +package com.zyplayer.doc.db.framework.sse.controller; + +import com.zyplayer.doc.core.annotation.AuthMan; +import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam; +import com.zyplayer.doc.db.framework.sse.service.DbSseEmitterService; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import javax.annotation.Resource; +import java.util.function.Consumer; + +/** + * SSE通信控制器 + * + * @author diantu + * @date 2023/7/17 + **/ +@AuthMan +@RestController +@RequestMapping("/zyplayer-doc-db/doc-db") +public class DbSseEmitterController { + + @Resource + private DbSseEmitterService dbSseEmitterService; + + /** + * 创建sse连接 + * + * @author diantu + * @date 2023/7/17 + **/ + @GetMapping("/sse/createConnect") + public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat, + @RequestParam(required = false) Consumer consumer){ + return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + } + + /** + * 关闭sse连接 + * + * @author diantu + * @date 2023/7/17 + **/ + @GetMapping("/sse/closeSseConnect") + public void closeSseConnect(String clientId){ + dbSseEmitterService.closeSseConnect(clientId); + } + + /** + * 推送消息到所有客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + @PostMapping("/sse/broadcast") + public void sendMessageToAllClient(@RequestBody(required = false) String msg){ + dbSseEmitterService.sendMessageToAllClient(msg); + } + + /** + * 根据clientId发送消息给某一客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + @PostMapping("/sse/sendMessage") + public void sendMessageToOneClient(String clientId,String msg){ + dbSseEmitterService.sendMessageToOneClient(clientId,msg); + } + +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/enums/DbSseEmitterParameterEnum.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/enums/DbSseEmitterParameterEnum.java new file mode 100644 index 00000000..3f978cfd --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/enums/DbSseEmitterParameterEnum.java @@ -0,0 +1,35 @@ +package com.zyplayer.doc.db.framework.sse.enums; + +import lombok.Getter; + +/** + * SSE通信参数枚举 + * + * @author diantu + * @date 2023/7/17 + **/ +@Getter +public enum DbSseEmitterParameterEnum { + + /** + * 通信 + */ + EMITTER("EMITTER"), + + /** + * 心跳 + */ + FUTURE("FUTURE"), + + /** + * 用户ID + */ + LOGINID("LOGINID"); + + private final String value; + + DbSseEmitterParameterEnum(String value) { + this.value = value; + } + +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/param/DbCommonSseParam.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/param/DbCommonSseParam.java new file mode 100644 index 00000000..d13f6d1d --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/param/DbCommonSseParam.java @@ -0,0 +1,19 @@ +package com.zyplayer.doc.db.framework.sse.param; + +import lombok.Getter; +import lombok.Setter; + +/** + * 通用SSE参数 + * + * @author diantu + * @date 2023/7/17 + */ +@Getter +@Setter +public class DbCommonSseParam { + + private String clientId; + + private String loginId; +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java new file mode 100644 index 00000000..24d8dc6e --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java @@ -0,0 +1,46 @@ +package com.zyplayer.doc.db.framework.sse.service; + +import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.util.function.Consumer; + +/** + * SSE通信Service接口 + * + * @author diantu + * @date 2023/7/17 + **/ +public interface DbSseEmitterService { + + /** + * 创建连接 + * + * @author diantu + * @date 2023/7/17 + **/ + public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer consumer); + + /** + * 关闭连接 + * + * @author diantu + * @date 2023/7/17 + **/ + public void closeSseConnect(String clientId); + + /** + * 推送消息到所有客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + public void sendMessageToAllClient(String msg); + + /** + * 根据clientId发送消息给某一客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + public void sendMessageToOneClient(String clientId, String msg); +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java new file mode 100644 index 00000000..7da7ebff --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java @@ -0,0 +1,118 @@ +package com.zyplayer.doc.db.framework.sse.service.impl; + +import cn.hutool.core.util.IdUtil; +import com.zyplayer.doc.data.config.security.DocUserDetails; +import com.zyplayer.doc.data.config.security.DocUserUtil; +import com.zyplayer.doc.db.framework.json.DocDbResponseJson; +import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam; +import com.zyplayer.doc.db.framework.sse.service.DbSseEmitterService; +import com.zyplayer.doc.db.framework.sse.util.DbSseCacheUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * SSE通信Service接口实现类 + * + * @author diantu + * @date 2023/7/17 + **/ +@Slf4j +@Service +public class DbSseEmitterServiceImpl implements DbSseEmitterService { + + /** + * 心跳线程池 + */ + private static final ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(10); + + + /** + * 创建连接 + * + * @author diantu + * @date 2023/7/17 + **/ + @Override + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { + // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException + SseEmitter sseEmitter = new SseEmitter(0L); + DocUserDetails currentUser = DocUserUtil.getCurrentUser(); + String loginId = currentUser.getUserId().toString(); + // 判断连接是否有效 + if (DbSseCacheUtil.connectionValidity(clientId,loginId)) { + return DbSseCacheUtil.getSseEmitterByClientId(clientId); + }else{ + DbSseCacheUtil.removeConnection(clientId); + } + clientId = IdUtil.simpleUUID(); + String finalClientId = clientId; + // 增加心跳 + final ScheduledFuture future; + // 是否自定义心跳任务 + if (setHeartBeat!=null&&setHeartBeat) { + DbCommonSseParam commonSseParam = new DbCommonSseParam(); + commonSseParam.setClientId(clientId); + commonSseParam.setLoginId(loginId); + future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), + 2, 10, TimeUnit.SECONDS); + } else { + //默认心跳任务 + future = heartbeatExecutors.scheduleAtFixedRate(() -> + DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), + 2, 10, TimeUnit.SECONDS); + } + // 长链接完成后回调(即关闭连接时调用) + sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId,future)); + // 连接超时回调 + sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId,future)); + // 推送消息异常回调 + sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId,future)); + // 增加连接 + DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); + // 初次建立连接,推送客户端id + //DocDbResponseJson message = new DocDbResponseJson(); + DocDbResponseJson message = new DocDbResponseJson(0,"",clientId); + DbSseCacheUtil.sendMessageToClientByClientId(clientId,message); + return sseEmitter; + } + + /** + * 关闭连接 + * + * @author diantu + * @date 2023/7/17 + **/ + @Override + public void closeSseConnect(String clientId){ + DbSseCacheUtil.removeConnection(clientId); + } + + /** + * 推送消息到所有客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + @Override + public void sendMessageToAllClient(String msg) { + DbSseCacheUtil.sendMessageToAllClient(msg); + } + + /** + * 根据clientId发送消息给某一客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + @Override + public void sendMessageToOneClient(String clientId, String msg) { + DbSseCacheUtil.sendMessageToOneClient(clientId,msg); + } + +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java new file mode 100644 index 00000000..672e88a4 --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java @@ -0,0 +1,216 @@ +package com.zyplayer.doc.db.framework.sse.util; + +import cn.hutool.core.util.StrUtil; +import com.zyplayer.doc.core.exception.ConfirmException; +import com.zyplayer.doc.db.framework.json.DocDbResponseJson; +import com.zyplayer.doc.db.framework.sse.enums.DbSseEmitterParameterEnum; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Consumer; + +/** + * SseEmitter工具类 + * + * @author diantu + * @date 2023/7/17 + **/ +@Slf4j +public class DbSseCacheUtil { + + /** + * 创建一个容器来存储所有的 SseEmitter(使用ConcurrentHashMap是因为它是线程安全的)。 + */ + public static Map> sseCache = new ConcurrentHashMap<>(); + + + /** + * 根据客户端id获取连接对象 + * + * @author diantu + * @date 2023/7/17 + **/ + public static SseEmitter getSseEmitterByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue()); + } + + /** + * 判断容器是否存在连接 + * + * @author diantu + * @date 2023/7/17 + **/ + public static boolean existSseCache() { + return sseCache.size()>0; + } + + /** + * 判断连接是否有效 + * + * @author diantu + * @date 2023/7/17 + **/ + public static boolean connectionValidity(String clientId,String loginId){ + if(sseCache.get(clientId) == null){ + return false; + } + return Objects.equals(loginId, sseCache.get(clientId).get(DbSseEmitterParameterEnum.LOGINID.getValue())); + } + + /** + * 增加连接 + * + * @author diantu + * @date 2023/7/17 + **/ + public static void addConnection(String clientId,String loginId, SseEmitter emitter, ScheduledFuture future) { + final SseEmitter oldEmitter = getSseEmitterByClientId(clientId); + if (oldEmitter != null) { + throw new ConfirmException("连接已存在:"+clientId); + } + Map map = new ConcurrentHashMap<>(); + map.put(DbSseEmitterParameterEnum.EMITTER.getValue(),emitter); + map.put(DbSseEmitterParameterEnum.FUTURE.getValue(), future); + map.put(DbSseEmitterParameterEnum.LOGINID.getValue(), loginId); + sseCache.put(clientId, map); + } + + /** + * 移除连接 + * + * @author diantu + * @date 2023/7/17 + **/ + public static void removeConnection(String clientId) { + SseEmitter emitter = getSseEmitterByClientId(clientId); + if (emitter != null) { + cancelScheduledFuture((ScheduledFuture) sseCache.get(clientId).get(DbSseEmitterParameterEnum.FUTURE.getValue())); + } + sseCache.remove(clientId); + log.info("移除连接:{}", clientId); + } + + /** + * 中断心跳发送任务 + * + * @author diantu + * @date 2023/7/17 + */ + public static void cancelScheduledFuture(ScheduledFuture future){ + if (future != null) { + future.cancel(true); + } + } + + + /** + * 长链接完成后回调 + * + * @author diantu + * @date 2023/7/17 + **/ + public static Runnable completionCallBack(String clientId, ScheduledFuture future) { + return () -> { + log.info("结束连接:{}", clientId); + removeConnection(clientId); + cancelScheduledFuture(future); + }; + } + + /** + * 连接超时回调 + * + * @author diantu + * @date 2023/7/17 + **/ + public static Runnable timeoutCallBack(String clientId, ScheduledFuture future){ + return ()->{ + log.info("连接超时:{}", clientId); + removeConnection(clientId); + cancelScheduledFuture(future); + }; + } + + /** + * 推送消息异常时回调 + * + * @author diantu + * @date 2023/7/17 + **/ + public static Consumer errorCallBack(String clientId, ScheduledFuture future) { + return throwable -> { + log.info("推送消息异常:{}", clientId); + removeConnection(clientId); + cancelScheduledFuture(future); + }; + } + + /** + * 推送消息到所有客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + public static void sendMessageToAllClient(String msg) { + if (!existSseCache()) { + return; + } + // 判断发送的消息是否为空 + if (StrUtil.isEmpty(msg)){ + log.info("群发消息为空"); + return; + } + for (Map.Entry> entry : sseCache.entrySet()) { + sendMessageToClientByClientId(entry.getKey(), DocDbResponseJson.ok(msg)); + } + } + + /** + * 根据clientId发送消息给某一客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + public static void sendMessageToOneClient(String clientId, String msg) { + if (StrUtil.isEmpty(clientId)){ + log.info("客户端ID为空"); + return; + } + if (StrUtil.isEmpty(msg)){ + log.info("向客户端{}推送消息为空",clientId); + return; + } + sendMessageToClientByClientId(clientId,DocDbResponseJson.ok(msg)); + } + + /** + * 推送消息到客户端 + * + * @author diantu + * @date 2023/7/17 + **/ + public static void sendMessageToClientByClientId(String clientId, DocDbResponseJson message) { + Map map = sseCache.get(clientId); + if (map==null||map.size()==0) { + log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, message.toString()); + return; + } + SseEmitter.SseEventBuilder sendData = SseEmitter.event().data(message,MediaType.APPLICATION_JSON); + SseEmitter sseEmitter = getSseEmitterByClientId(clientId); + try { + Objects.requireNonNull(sseEmitter).send(sendData); + } catch (Exception e) { + log.error("推送消息失败,报错异常:",e); + removeConnection(clientId); + } + } + +}