db新增sse推送模块

This commit is contained in:
diant
2023-07-17 18:32:47 +08:00
parent 1dd7dd0975
commit 9fa5c8e6a5
6 changed files with 504 additions and 0 deletions

View File

@@ -0,0 +1,70 @@
package com.zyplayer.doc.db.framework.sse.controller;
import com.zyplayer.doc.core.annotation.AuthMan;
import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam;
import com.zyplayer.doc.db.framework.sse.service.DbSseEmitterService;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.function.Consumer;
/**
* SSE通信控制器
*
* @author diantu
* @date 2023/7/17
**/
@AuthMan
@RestController
@RequestMapping("/zyplayer-doc-db/doc-db")
public class DbSseEmitterController {
@Resource
private DbSseEmitterService dbSseEmitterService;
/**
* 创建sse连接
*
* @author diantu
* @date 2023/7/17
**/
@GetMapping("/sse/createConnect")
public SseEmitter createConnect(String clientId, @RequestParam(required = false)Boolean setHeartBeat,
@RequestParam(required = false) Consumer<DbCommonSseParam> consumer){
return dbSseEmitterService.createSseConnect(clientId,setHeartBeat,consumer);
}
/**
* 关闭sse连接
*
* @author diantu
* @date 2023/7/17
**/
@GetMapping("/sse/closeSseConnect")
public void closeSseConnect(String clientId){
dbSseEmitterService.closeSseConnect(clientId);
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/17
**/
@PostMapping("/sse/broadcast")
public void sendMessageToAllClient(@RequestBody(required = false) String msg){
dbSseEmitterService.sendMessageToAllClient(msg);
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/17
**/
@PostMapping("/sse/sendMessage")
public void sendMessageToOneClient(String clientId,String msg){
dbSseEmitterService.sendMessageToOneClient(clientId,msg);
}
}

View File

@@ -0,0 +1,35 @@
package com.zyplayer.doc.db.framework.sse.enums;
import lombok.Getter;
/**
* SSE通信参数枚举
*
* @author diantu
* @date 2023/7/17
**/
@Getter
public enum DbSseEmitterParameterEnum {
/**
* 通信
*/
EMITTER("EMITTER"),
/**
* 心跳
*/
FUTURE("FUTURE"),
/**
* 用户ID
*/
LOGINID("LOGINID");
private final String value;
DbSseEmitterParameterEnum(String value) {
this.value = value;
}
}

View File

@@ -0,0 +1,19 @@
package com.zyplayer.doc.db.framework.sse.param;
import lombok.Getter;
import lombok.Setter;
/**
* 通用SSE参数
*
* @author diantu
* @date 2023/7/17
*/
@Getter
@Setter
public class DbCommonSseParam {
private String clientId;
private String loginId;
}

View File

@@ -0,0 +1,46 @@
package com.zyplayer.doc.db.framework.sse.service;
import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.function.Consumer;
/**
* SSE通信Service接口
*
* @author diantu
* @date 2023/7/17
**/
public interface DbSseEmitterService {
/**
* 创建连接
*
* @author diantu
* @date 2023/7/17
**/
public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat, Consumer<DbCommonSseParam> consumer);
/**
* 关闭连接
*
* @author diantu
* @date 2023/7/17
**/
public void closeSseConnect(String clientId);
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/17
**/
public void sendMessageToAllClient(String msg);
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/17
**/
public void sendMessageToOneClient(String clientId, String msg);
}

View File

@@ -0,0 +1,118 @@
package com.zyplayer.doc.db.framework.sse.service.impl;
import cn.hutool.core.util.IdUtil;
import com.zyplayer.doc.data.config.security.DocUserDetails;
import com.zyplayer.doc.data.config.security.DocUserUtil;
import com.zyplayer.doc.db.framework.json.DocDbResponseJson;
import com.zyplayer.doc.db.framework.sse.param.DbCommonSseParam;
import com.zyplayer.doc.db.framework.sse.service.DbSseEmitterService;
import com.zyplayer.doc.db.framework.sse.util.DbSseCacheUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* SSE通信Service接口实现类
*
* @author diantu
* @date 2023/7/17
**/
@Slf4j
@Service
public class DbSseEmitterServiceImpl implements DbSseEmitterService {
/**
* 心跳线程池
*/
private static final ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(10);
/**
* 创建连接
*
* @author diantu
* @date 2023/7/17
**/
@Override
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Consumer<DbCommonSseParam> consumer) {
// 设置超时时间0表示不过期。默认30秒超过时间未完成会抛出异常AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
DocUserDetails currentUser = DocUserUtil.getCurrentUser();
String loginId = currentUser.getUserId().toString();
// 判断连接是否有效
if (DbSseCacheUtil.connectionValidity(clientId,loginId)) {
return DbSseCacheUtil.getSseEmitterByClientId(clientId);
}else{
DbSseCacheUtil.removeConnection(clientId);
}
clientId = IdUtil.simpleUUID();
String finalClientId = clientId;
// 增加心跳
final ScheduledFuture<?> future;
// 是否自定义心跳任务
if (setHeartBeat!=null&&setHeartBeat) {
DbCommonSseParam commonSseParam = new DbCommonSseParam();
commonSseParam.setClientId(clientId);
commonSseParam.setLoginId(loginId);
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam),
2, 10, TimeUnit.SECONDS);
} else {
//默认心跳任务
future = heartbeatExecutors.scheduleAtFixedRate(() ->
DbSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
2, 10, TimeUnit.SECONDS);
}
// 长链接完成后回调(即关闭连接时调用)
sseEmitter.onCompletion(DbSseCacheUtil.completionCallBack(clientId,future));
// 连接超时回调
sseEmitter.onTimeout(DbSseCacheUtil.timeoutCallBack(clientId,future));
// 推送消息异常回调
sseEmitter.onError(DbSseCacheUtil.errorCallBack(clientId,future));
// 增加连接
DbSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
// 初次建立连接,推送客户端id
//DocDbResponseJson message = new DocDbResponseJson();
DocDbResponseJson message = new DocDbResponseJson(0,"",clientId);
DbSseCacheUtil.sendMessageToClientByClientId(clientId,message);
return sseEmitter;
}
/**
* 关闭连接
*
* @author diantu
* @date 2023/7/17
**/
@Override
public void closeSseConnect(String clientId){
DbSseCacheUtil.removeConnection(clientId);
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/17
**/
@Override
public void sendMessageToAllClient(String msg) {
DbSseCacheUtil.sendMessageToAllClient(msg);
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/17
**/
@Override
public void sendMessageToOneClient(String clientId, String msg) {
DbSseCacheUtil.sendMessageToOneClient(clientId,msg);
}
}

View File

@@ -0,0 +1,216 @@
package com.zyplayer.doc.db.framework.sse.util;
import cn.hutool.core.util.StrUtil;
import com.zyplayer.doc.core.exception.ConfirmException;
import com.zyplayer.doc.db.framework.json.DocDbResponseJson;
import com.zyplayer.doc.db.framework.sse.enums.DbSseEmitterParameterEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
/**
* SseEmitter工具类
*
* @author diantu
* @date 2023/7/17
**/
@Slf4j
public class DbSseCacheUtil {
/**
* 创建一个容器来存储所有的 SseEmitter(使用ConcurrentHashMap是因为它是线程安全的)。
*/
public static Map<String, Map<String,Object>> sseCache = new ConcurrentHashMap<>();
/**
* 根据客户端id获取连接对象
*
* @author diantu
* @date 2023/7/17
**/
public static SseEmitter getSseEmitterByClientId(String clientId) {
Map<String,Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue());
}
/**
* 判断容器是否存在连接
*
* @author diantu
* @date 2023/7/17
**/
public static boolean existSseCache() {
return sseCache.size()>0;
}
/**
* 判断连接是否有效
*
* @author diantu
* @date 2023/7/17
**/
public static boolean connectionValidity(String clientId,String loginId){
if(sseCache.get(clientId) == null){
return false;
}
return Objects.equals(loginId, sseCache.get(clientId).get(DbSseEmitterParameterEnum.LOGINID.getValue()));
}
/**
* 增加连接
*
* @author diantu
* @date 2023/7/17
**/
public static void addConnection(String clientId,String loginId, SseEmitter emitter, ScheduledFuture<?> future) {
final SseEmitter oldEmitter = getSseEmitterByClientId(clientId);
if (oldEmitter != null) {
throw new ConfirmException("连接已存在:"+clientId);
}
Map<String,Object> map = new ConcurrentHashMap<>();
map.put(DbSseEmitterParameterEnum.EMITTER.getValue(),emitter);
map.put(DbSseEmitterParameterEnum.FUTURE.getValue(), future);
map.put(DbSseEmitterParameterEnum.LOGINID.getValue(), loginId);
sseCache.put(clientId, map);
}
/**
* 移除连接
*
* @author diantu
* @date 2023/7/17
**/
public static void removeConnection(String clientId) {
SseEmitter emitter = getSseEmitterByClientId(clientId);
if (emitter != null) {
cancelScheduledFuture((ScheduledFuture<?>) sseCache.get(clientId).get(DbSseEmitterParameterEnum.FUTURE.getValue()));
}
sseCache.remove(clientId);
log.info("移除连接:{}", clientId);
}
/**
* 中断心跳发送任务
*
* @author diantu
* @date 2023/7/17
*/
public static void cancelScheduledFuture(ScheduledFuture<?> future){
if (future != null) {
future.cancel(true);
}
}
/**
* 长链接完成后回调
*
* @author diantu
* @date 2023/7/17
**/
public static Runnable completionCallBack(String clientId, ScheduledFuture<?> future) {
return () -> {
log.info("结束连接:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(future);
};
}
/**
* 连接超时回调
*
* @author diantu
* @date 2023/7/17
**/
public static Runnable timeoutCallBack(String clientId, ScheduledFuture<?> future){
return ()->{
log.info("连接超时:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(future);
};
}
/**
* 推送消息异常时回调
*
* @author diantu
* @date 2023/7/17
**/
public static Consumer<Throwable> errorCallBack(String clientId, ScheduledFuture<?> future) {
return throwable -> {
log.info("推送消息异常:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(future);
};
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/17
**/
public static void sendMessageToAllClient(String msg) {
if (!existSseCache()) {
return;
}
// 判断发送的消息是否为空
if (StrUtil.isEmpty(msg)){
log.info("群发消息为空");
return;
}
for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
sendMessageToClientByClientId(entry.getKey(), DocDbResponseJson.ok(msg));
}
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/17
**/
public static void sendMessageToOneClient(String clientId, String msg) {
if (StrUtil.isEmpty(clientId)){
log.info("客户端ID为空");
return;
}
if (StrUtil.isEmpty(msg)){
log.info("向客户端{}推送消息为空",clientId);
return;
}
sendMessageToClientByClientId(clientId,DocDbResponseJson.ok(msg));
}
/**
* 推送消息到客户端
*
* @author diantu
* @date 2023/7/17
**/
public static void sendMessageToClientByClientId(String clientId, DocDbResponseJson message) {
Map<String, Object> map = sseCache.get(clientId);
if (map==null||map.size()==0) {
log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, message.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().data(message,MediaType.APPLICATION_JSON);
SseEmitter sseEmitter = getSseEmitterByClientId(clientId);
try {
Objects.requireNonNull(sseEmitter).send(sendData);
} catch (Exception e) {
log.error("推送消息失败,报错异常:",e);
removeConnection(clientId);
}
}
}