sse推送模块代码优化
This commit is contained in:
@@ -29,9 +29,11 @@ public class DbSseEmitterController {
|
|||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
@GetMapping("/sse/createConnect")
|
@GetMapping("/sse/createConnect")
|
||||||
public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat,
|
public SseEmitter createConnect(String clientId,
|
||||||
|
@RequestParam(required = false)Boolean setHeartBeat,
|
||||||
|
@RequestParam(required = false)Boolean defaultHeartbeat,
|
||||||
@RequestParam(required = false) Consumer<DbCommonSseParam> consumer){
|
@RequestParam(required = false) Consumer<DbCommonSseParam> consumer){
|
||||||
return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer);
|
return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -13,12 +13,17 @@ import java.util.function.Consumer;
|
|||||||
public interface DbSseEmitterService {
|
public interface DbSseEmitterService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建连接
|
* 创建SSE连接
|
||||||
*
|
*
|
||||||
|
* @param clientId 客户端id,不传则自动生成
|
||||||
|
* @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置)
|
||||||
|
* @param defaultHeartbeat 是否使用默认心跳任务
|
||||||
|
* @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义)
|
||||||
|
* @return 初次建立连接会推送客户端id,状态码为0
|
||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer<DbCommonSseParam> consumer);
|
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<DbCommonSseParam> consumer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 关闭连接
|
* 关闭连接
|
||||||
|
|||||||
@@ -33,13 +33,18 @@ public class DbSseEmitterServiceImpl implements DbSseEmitterService {
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建连接
|
* 创建SSE连接
|
||||||
*
|
*
|
||||||
|
* @param clientId 客户端id,不传则自动生成
|
||||||
|
* @param setHeartBeat 是否设置心跳定时任务,默认为false(true:设置 false:不设置)
|
||||||
|
* @param defaultHeartbeat 是否使用默认心跳任务
|
||||||
|
* @param consumer 自定义心跳任务,需要自定义实现Consumer接口中的accept方法(setHeartBeat必须为true,defaultHeartbeat为false才有意义)
|
||||||
|
* @return 初次建立连接会推送客户端id,状态码为0
|
||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
@Override
|
@Override
|
||||||
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer<DbCommonSseParam> consumer) {
|
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<DbCommonSseParam> consumer) {
|
||||||
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
|
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
|
||||||
SseEmitter sseEmitter = new SseEmitter(0L);
|
SseEmitter sseEmitter = new SseEmitter(0L);
|
||||||
DocUserDetails currentUser = DocUserUtil.getCurrentUser();
|
DocUserDetails currentUser = DocUserUtil.getCurrentUser();
|
||||||
@@ -54,29 +59,35 @@ public class DbSseEmitterServiceImpl implements DbSseEmitterService {
|
|||||||
String finalClientId = clientId;
|
String finalClientId = clientId;
|
||||||
// 增加心跳
|
// 增加心跳
|
||||||
final ScheduledFuture<?> future;
|
final ScheduledFuture<?> future;
|
||||||
// 是否自定义心跳任务
|
// 是否设置心跳任务
|
||||||
if (setHeartBeat!=null&&setHeartBeat) {
|
if (setHeartBeat!=null&&setHeartBeat) {
|
||||||
DbCommonSseParam commonSseParam = new DbCommonSseParam();
|
//是否使用默认心跳任务
|
||||||
commonSseParam.setClientId(clientId);
|
if(defaultHeartbeat!=null&&defaultHeartbeat){
|
||||||
commonSseParam.setLoginId(loginId);
|
//默认心跳任务
|
||||||
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam),
|
future = heartbeatExecutors.scheduleAtFixedRate(() ->
|
||||||
2, 10, TimeUnit.SECONDS);
|
DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
|
||||||
} else {
|
2, 10, TimeUnit.SECONDS);
|
||||||
//默认心跳任务
|
}else{
|
||||||
future = heartbeatExecutors.scheduleAtFixedRate(() ->
|
//自定义心跳任务
|
||||||
DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
|
DbCommonSseParam commonSseParam = new DbCommonSseParam();
|
||||||
2, 10, TimeUnit.SECONDS);
|
commonSseParam.setClientId(clientId);
|
||||||
|
commonSseParam.setLoginId(loginId);
|
||||||
|
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam),
|
||||||
|
2, 10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
// 增加连接
|
||||||
|
DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
|
||||||
|
}else{
|
||||||
|
// 增加连接
|
||||||
|
DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null);
|
||||||
}
|
}
|
||||||
// 长链接完成后回调(即关闭连接时调用)
|
// 长链接完成后回调(即关闭连接时调用)
|
||||||
sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId,future));
|
sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId));
|
||||||
// 连接超时回调
|
// 连接超时回调
|
||||||
sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId,future));
|
sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId));
|
||||||
// 推送消息异常回调
|
// 推送消息异常回调
|
||||||
sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId,future));
|
sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId));
|
||||||
// 增加连接
|
|
||||||
DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
|
|
||||||
// 初次建立连接,推送客户端id
|
// 初次建立连接,推送客户端id
|
||||||
//DocDbResponseJson message = new DocDbResponseJson();
|
|
||||||
DocDbResponseJson message = new DocDbResponseJson(0,"",clientId);
|
DocDbResponseJson message = new DocDbResponseJson(0,"",clientId);
|
||||||
DbSseCacheUtil.sendMessageToClientByClientId(clientId,message);
|
DbSseCacheUtil.sendMessageToClientByClientId(clientId,message);
|
||||||
return sseEmitter;
|
return sseEmitter;
|
||||||
|
|||||||
@@ -42,6 +42,53 @@ public class DbSseCacheUtil {
|
|||||||
return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue());
|
return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据客户端id获取心跳
|
||||||
|
*
|
||||||
|
* @author diantu
|
||||||
|
* @date 2023/7/18
|
||||||
|
**/
|
||||||
|
public static ScheduledFuture<?> getSseFutureByClientId(String clientId) {
|
||||||
|
Map<String,Object> map = sseCache.get(clientId);
|
||||||
|
if (map == null || map.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return (ScheduledFuture<?>) map.get(DbSseEmitterParameterEnum.FUTURE.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据客户端id获取用户id
|
||||||
|
*
|
||||||
|
* @author diantu
|
||||||
|
* @date 2023/7/18
|
||||||
|
**/
|
||||||
|
public static ScheduledFuture<?> getLoginIdByClientId(String clientId) {
|
||||||
|
Map<String,Object> map = sseCache.get(clientId);
|
||||||
|
if (map == null || map.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return (ScheduledFuture<?>) map.get(DbSseEmitterParameterEnum.LOGINID.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据用户id获取客户端id
|
||||||
|
*
|
||||||
|
* @author diantu
|
||||||
|
* @date 2023/7/18
|
||||||
|
**/
|
||||||
|
public static String getClientIdByLoginId(String loginId){
|
||||||
|
if(existSseCache()){
|
||||||
|
for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
|
||||||
|
Map<String,Object> map = sseCache.get(entry.getKey());
|
||||||
|
String lId = (String) map.get(DbSseEmitterParameterEnum.LOGINID.getValue());
|
||||||
|
if(loginId.equals(lId)){
|
||||||
|
return entry.getKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 判断容器是否存在连接
|
* 判断容器是否存在连接
|
||||||
*
|
*
|
||||||
@@ -92,7 +139,7 @@ public class DbSseCacheUtil {
|
|||||||
public static void removeConnection(String clientId) {
|
public static void removeConnection(String clientId) {
|
||||||
SseEmitter emitter = getSseEmitterByClientId(clientId);
|
SseEmitter emitter = getSseEmitterByClientId(clientId);
|
||||||
if (emitter != null) {
|
if (emitter != null) {
|
||||||
cancelScheduledFuture((ScheduledFuture<?>) sseCache.get(clientId).get(DbSseEmitterParameterEnum.FUTURE.getValue()));
|
cancelScheduledFuture(clientId);
|
||||||
}
|
}
|
||||||
sseCache.remove(clientId);
|
sseCache.remove(clientId);
|
||||||
log.info("移除连接:{}", clientId);
|
log.info("移除连接:{}", clientId);
|
||||||
@@ -104,7 +151,8 @@ public class DbSseCacheUtil {
|
|||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
*/
|
*/
|
||||||
public static void cancelScheduledFuture(ScheduledFuture<?> future){
|
public static void cancelScheduledFuture(String clientId){
|
||||||
|
ScheduledFuture<?> future = getSseFutureByClientId(clientId);
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.cancel(true);
|
future.cancel(true);
|
||||||
}
|
}
|
||||||
@@ -117,11 +165,11 @@ public class DbSseCacheUtil {
|
|||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
public static Runnable completionCallBack(String clientId, ScheduledFuture<?> future) {
|
public static Runnable completionCallBack(String clientId) {
|
||||||
return () -> {
|
return () -> {
|
||||||
log.info("结束连接:{}", clientId);
|
log.info("结束连接:{}", clientId);
|
||||||
removeConnection(clientId);
|
removeConnection(clientId);
|
||||||
cancelScheduledFuture(future);
|
cancelScheduledFuture(clientId);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,11 +179,11 @@ public class DbSseCacheUtil {
|
|||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
public static Runnable timeoutCallBack(String clientId, ScheduledFuture<?> future){
|
public static Runnable timeoutCallBack(String clientId){
|
||||||
return ()->{
|
return ()->{
|
||||||
log.info("连接超时:{}", clientId);
|
log.info("连接超时:{}", clientId);
|
||||||
removeConnection(clientId);
|
removeConnection(clientId);
|
||||||
cancelScheduledFuture(future);
|
cancelScheduledFuture(clientId);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,11 +193,11 @@ public class DbSseCacheUtil {
|
|||||||
* @author diantu
|
* @author diantu
|
||||||
* @date 2023/7/17
|
* @date 2023/7/17
|
||||||
**/
|
**/
|
||||||
public static Consumer<Throwable> errorCallBack(String clientId, ScheduledFuture<?> future) {
|
public static Consumer<Throwable> errorCallBack(String clientId) {
|
||||||
return throwable -> {
|
return throwable -> {
|
||||||
log.info("推送消息异常:{}", clientId);
|
log.info("推送消息异常:{}", clientId);
|
||||||
removeConnection(clientId);
|
removeConnection(clientId);
|
||||||
cancelScheduledFuture(future);
|
cancelScheduledFuture(clientId);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user