diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java index 06c3cddd..59292780 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/controller/DbSseEmitterController.java @@ -29,9 +29,11 @@ public class DbSseEmitterController { * @date 2023/7/17 **/ @GetMapping("/sse/createConnect") - public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat, + public SseEmitter createConnect(String clientId, + @RequestParam(required = false)Boolean setHeartBeat, + @RequestParam(required = false)Boolean defaultHeartbeat, @RequestParam(required = false) Consumer consumer){ - return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer); + return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer); } /** diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java index 24d8dc6e..31dd4a5c 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/DbSseEmitterService.java @@ -13,12 +13,17 @@ import java.util.function.Consumer; public interface DbSseEmitterService { /** - * 创建连接 + * 创建SSE连接 * + * @param clientId 客户端id,不传则自动生成 + * @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置) + * @param defaultHeartbeat 是否使用默认心跳任务 + * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义) + * @return 初次建立连接会推送客户端id,状态码为0 * @author diantu * @date 2023/7/17 **/ - public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer consumer); + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer); /** * 关闭连接 diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java index 7da7ebff..77b72235 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/service/impl/DbSseEmitterServiceImpl.java @@ -33,13 +33,18 @@ public class DbSseEmitterServiceImpl implements DbSseEmitterService { /** - * 创建连接 + * 创建SSE连接 * + * @param clientId 客户端id,不传则自动生成 + * @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置) + * @param defaultHeartbeat 是否使用默认心跳任务 + * @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义) + * @return 初次建立连接会推送客户端id,状态码为0 * @author diantu * @date 2023/7/17 **/ @Override - public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer consumer) { + public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer consumer) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); DocUserDetails currentUser = DocUserUtil.getCurrentUser(); @@ -54,29 +59,35 @@ public class DbSseEmitterServiceImpl implements DbSseEmitterService { String finalClientId = clientId; // 增加心跳 final ScheduledFuture future; - // 是否自定义心跳任务 + // 是否设置心跳任务 if (setHeartBeat!=null&&setHeartBeat) { - DbCommonSseParam commonSseParam = new DbCommonSseParam(); - commonSseParam.setClientId(clientId); - commonSseParam.setLoginId(loginId); - future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), - 2, 10, TimeUnit.SECONDS); - } else { - //默认心跳任务 - future = heartbeatExecutors.scheduleAtFixedRate(() -> - DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), - 2, 10, TimeUnit.SECONDS); + //是否使用默认心跳任务 + if(defaultHeartbeat!=null&&defaultHeartbeat){ + //默认心跳任务 + future = heartbeatExecutors.scheduleAtFixedRate(() -> + DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId), + 2, 10, TimeUnit.SECONDS); + }else{ + //自定义心跳任务 + DbCommonSseParam commonSseParam = new DbCommonSseParam(); + commonSseParam.setClientId(clientId); + commonSseParam.setLoginId(loginId); + future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam), + 2, 10, TimeUnit.SECONDS); + } + // 增加连接 + DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); + }else{ + // 增加连接 + DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null); } // 长链接完成后回调(即关闭连接时调用) - sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId,future)); + sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId)); // 连接超时回调 - sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId,future)); + sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId)); // 推送消息异常回调 - sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId,future)); - // 增加连接 - DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future); + sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId)); // 初次建立连接,推送客户端id - //DocDbResponseJson message = new DocDbResponseJson(); DocDbResponseJson message = new DocDbResponseJson(0,"",clientId); DbSseCacheUtil.sendMessageToClientByClientId(clientId,message); return sseEmitter; diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java index 672e88a4..ae2ec619 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.java @@ -42,6 +42,53 @@ public class DbSseCacheUtil { return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue()); } + /** + * 根据客户端id获取心跳 + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getSseFutureByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DbSseEmitterParameterEnum.FUTURE.getValue()); + } + + /** + * 根据客户端id获取用户id + * + * @author diantu + * @date 2023/7/18 + **/ + public static ScheduledFuture getLoginIdByClientId(String clientId) { + Map map = sseCache.get(clientId); + if (map == null || map.isEmpty()) { + return null; + } + return (ScheduledFuture) map.get(DbSseEmitterParameterEnum.LOGINID.getValue()); + } + + /** + * 根据用户id获取客户端id + * + * @author diantu + * @date 2023/7/18 + **/ + public static String getClientIdByLoginId(String loginId){ + if(existSseCache()){ + for (Map.Entry> entry : sseCache.entrySet()) { + Map map = sseCache.get(entry.getKey()); + String lId = (String) map.get(DbSseEmitterParameterEnum.LOGINID.getValue()); + if(loginId.equals(lId)){ + return entry.getKey(); + } + } + } + return null; + } + /** * 判断容器是否存在连接 * @@ -92,7 +139,7 @@ public class DbSseCacheUtil { public static void removeConnection(String clientId) { SseEmitter emitter = getSseEmitterByClientId(clientId); if (emitter != null) { - cancelScheduledFuture((ScheduledFuture) sseCache.get(clientId).get(DbSseEmitterParameterEnum.FUTURE.getValue())); + cancelScheduledFuture(clientId); } sseCache.remove(clientId); log.info("移除连接:{}", clientId); @@ -104,7 +151,8 @@ public class DbSseCacheUtil { * @author diantu * @date 2023/7/17 */ - public static void cancelScheduledFuture(ScheduledFuture future){ + public static void cancelScheduledFuture(String clientId){ + ScheduledFuture future = getSseFutureByClientId(clientId); if (future != null) { future.cancel(true); } @@ -117,11 +165,11 @@ public class DbSseCacheUtil { * @author diantu * @date 2023/7/17 **/ - public static Runnable completionCallBack(String clientId, ScheduledFuture future) { + public static Runnable completionCallBack(String clientId) { return () -> { log.info("结束连接:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -131,11 +179,11 @@ public class DbSseCacheUtil { * @author diantu * @date 2023/7/17 **/ - public static Runnable timeoutCallBack(String clientId, ScheduledFuture future){ + public static Runnable timeoutCallBack(String clientId){ return ()->{ log.info("连接超时:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; } @@ -145,11 +193,11 @@ public class DbSseCacheUtil { * @author diantu * @date 2023/7/17 **/ - public static Consumer errorCallBack(String clientId, ScheduledFuture future) { + public static Consumer errorCallBack(String clientId) { return throwable -> { log.info("推送消息异常:{}", clientId); removeConnection(clientId); - cancelScheduledFuture(future); + cancelScheduledFuture(clientId); }; }