feat: 主机消息处理器.

This commit is contained in:
lijiahang
2023-12-29 19:15:11 +08:00
parent e580fcee87
commit 6ff8ca2526
16 changed files with 483 additions and 73 deletions

View File

@@ -13,6 +13,8 @@ public interface ExtraFieldConst extends FieldConst {
String TRACE_ID = "traceId";
String IDENTITY = "identity";
String GROUP_NAME = "groupName";
String ID_LIST = "idList";

View File

@@ -1,6 +1,7 @@
package com.orion.ops.framework.common.utils;
import com.orion.ops.framework.common.entity.RequestIdentity;
import com.orion.ops.framework.common.entity.RequestIdentityModel;
import com.orion.web.servlet.web.Servlets;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
@@ -19,6 +20,17 @@ public class Requests {
private Requests() {
}
/**
* 获取请求留痕信息
*
* @return model
*/
public static RequestIdentityModel getIdentity() {
RequestIdentityModel model = new RequestIdentityModel();
fillIdentity(model);
return model;
}
/**
* 填充请求留痕信息
*

View File

@@ -2,16 +2,20 @@ package com.orion.ops.framework.websocket.core.constant;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
/**
* ws服务端关闭code
* ws 服务端关闭 code
*
* @author Jiahang Li
* @version 1.0.0
* @since 2021/6/16 15:18
*/
@AllArgsConstructor
@Slf4j
@Getter
@AllArgsConstructor
public enum WsCloseCode {
/**
@@ -125,6 +129,22 @@ public enum WsCloseCode {
private final String reason;
/**
* 关闭会话
*
* @param session session
*/
public void close(WebSocketSession session) {
if (!session.isOpen()) {
return;
}
try {
session.close(new CloseStatus(code, reason));
} catch (Exception e) {
log.error("websocket close failure", e);
}
}
public static WsCloseCode of(int code) {
for (WsCloseCode value : values()) {
if (value.code == code) {

View File

@@ -0,0 +1,49 @@
package com.orion.ops.framework.websocket.core.handler;
import org.springframework.web.socket.*;
/**
* 文本类型消息处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 18:23
*/
public abstract class TextWebSocketHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
// 非 text message 不处理
if (!(message instanceof TextMessage)) {
return;
}
// 处理消息
this.onMessage(session, (String) message.getPayload());
}
/**
* 处理消息
*
* @param session session
* @param payload payload
*/
public abstract void onMessage(WebSocketSession session, String payload);
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}

View File

@@ -1,7 +1,7 @@
package com.orion.ops.module.asset.config;
import com.orion.ops.module.asset.handler.host.terminal.TerminalDispatchHandler;
import com.orion.ops.module.asset.interceptor.TerminalInterceptor;
import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher;
import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
@@ -23,16 +23,16 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
private String prefix;
@Resource
private TerminalInterceptor terminalInterceptor;
private TerminalAccessInterceptor terminalAccessInterceptor;
@Resource
private TerminalDispatchHandler terminalDispatchHandler;
private TerminalMessageDispatcher terminalMessageDispatcher;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 终端
registry.addHandler(terminalDispatchHandler, prefix + "/host/terminal/{token}")
.addInterceptors(terminalInterceptor)
registry.addHandler(terminalMessageDispatcher, prefix + "/host/terminal/{token}")
.addInterceptors(terminalAccessInterceptor)
.setAllowedOrigins("*");
}

View File

@@ -1,52 +0,0 @@
package com.orion.ops.module.asset.handler.host.terminal;
import com.orion.ops.framework.biz.operator.log.core.service.OperatorLogFrameworkService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
/**
* 终端处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/28 14:33
*/
@Slf4j
@Component
public class TerminalDispatchHandler implements WebSocketHandler {
@Resource
private OperatorLogFrameworkService operatorLogFrameworkService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("afterConnectionEstablished");
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
log.info("handleMessage");
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.info("handleTransportError");
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.info("afterConnectionClosed");
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}

View File

@@ -0,0 +1,75 @@
package com.orion.ops.module.asset.handler.host.terminal;
import com.alibaba.fastjson.JSON;
import com.orion.ops.framework.websocket.core.handler.TextWebSocketHandler;
import com.orion.ops.module.asset.handler.host.terminal.entity.MessageWrapper;
import com.orion.ops.module.asset.handler.host.terminal.enums.InputOperatorTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.handler.TerminalConnectHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.Optional;
/**
* 终端处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/28 14:33
*/
@Slf4j
@Component
public class TerminalMessageDispatcher extends TextWebSocketHandler {
@Resource
private TerminalConnectHandler terminalConnectHandler;
@Override
public void onMessage(WebSocketSession session, String payload) {
try {
// 解析类型
InputOperatorTypeEnum type = Optional.ofNullable(payload)
.map(s -> JSON.parseObject(s, MessageWrapper.class))
.map(MessageWrapper::getType)
.map(InputOperatorTypeEnum::of)
.orElse(null);
if (InputOperatorTypeEnum.CONNECT.equals(type)) {
// 连接主机
// {"t":"co","s": "1001","b":{"h":1}}
terminalConnectHandler.process(session, payload);
} else if (InputOperatorTypeEnum.CLOSE.equals(type)) {
// 关闭连接
} else if (InputOperatorTypeEnum.PING.equals(type)) {
// ping
} else if (InputOperatorTypeEnum.RESIZE.equals(type)) {
// resize
} else if (InputOperatorTypeEnum.EXEC.equals(type)) {
// 执行
} else if (InputOperatorTypeEnum.INPUT.equals(type)) {
// 输入
}
} catch (Exception e) {
log.error("TerminalDispatchHandler-handleMessage-error msg: {}", payload, e);
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.info("handleTransportError");
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.info("afterConnectionClosed");
// release session
}
}

View File

@@ -0,0 +1,36 @@
package com.orion.ops.module.asset.handler.host.terminal.entity;
import com.alibaba.fastjson.annotation.JSONField;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 消息体包装
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 16:24
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "MessageWrapper", description = "消息体包装")
public class MessageWrapper<T> {
@JSONField(name = "s")
@Schema(description = "会话id")
private String session;
@JSONField(name = "t")
@Schema(description = "消息类型")
private String type;
@JSONField(name = "b")
@Schema(description = "消息体")
private T body;
}

View File

@@ -0,0 +1,28 @@
package com.orion.ops.module.asset.handler.host.terminal.entity.request;
import com.alibaba.fastjson.annotation.JSONField;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 终端连接请求 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 16:20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "TerminalConnectRequest", description = "终端连接请求 实体对象")
public class TerminalConnectRequest {
@JSONField(name = "h")
@Schema(description = "主机id")
private String hostId;
}

View File

@@ -0,0 +1,28 @@
package com.orion.ops.module.asset.handler.host.terminal.entity.response;
import com.alibaba.fastjson.annotation.JSONField;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 终端连接响应 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 16:20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "TerminalConnectResponse", description = "终端连接响应 实体对象")
public class TerminalConnectResponse {
@JSONField(name = "s")
@Schema(description = "会话id")
private String s;
}

View File

@@ -0,0 +1,63 @@
package com.orion.ops.module.asset.handler.host.terminal.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 输入操作类型枚举
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 15:33
*/
@Getter
@AllArgsConstructor
public enum InputOperatorTypeEnum {
/**
* 连接主机
*/
CONNECT("co"),
/**
* 关闭连接
*/
CLOSE("cl"),
/**
* ping
*/
PING("p"),
/**
* 修改大小
*/
RESIZE("rs"),
/**
* 执行
*/
EXEC("e"),
/**
* 输入
*/
INPUT("i"),
;
private final String type;
public static InputOperatorTypeEnum of(String type) {
if (type == null) {
return null;
}
for (InputOperatorTypeEnum value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,58 @@
package com.orion.ops.module.asset.handler.host.terminal.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 输出操作类型枚举
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 15:33
*/
@Getter
@AllArgsConstructor
public enum OutputOperatorTypeEnum {
/**
* 连接主机成功
*/
CONNECT_COMPLETE("cc"),
/**
* 连接主机失败
*/
CONNECT_FAILED("cf"),
/**
* pong
*/
PONG("p"),
/**
* 输出
*/
OUTPUT("o"),
/**
* 发生错误
*/
ERROR("e"),
;
private final String type;
public static OutputOperatorTypeEnum of(String type) {
if (type == null) {
return null;
}
for (OutputOperatorTypeEnum value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,39 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.orion.ops.module.asset.handler.host.terminal.entity.MessageWrapper;
import org.springframework.web.socket.WebSocketSession;
/**
* 终端消息处理器 基类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 18:59
*/
public abstract class AbstractTerminalHandler<T> implements ITerminalHandler {
/**
* 类型转换器
*/
private final TypeReference<MessageWrapper<T>> convert;
public AbstractTerminalHandler(TypeReference<MessageWrapper<T>> convert) {
this.convert = convert;
}
@Override
public void process(WebSocketSession session, String payload) {
this.onMessage(session, JSON.parseObject(payload, convert));
}
/**
* 处理消息
*
* @param session session
* @param msg msg
*/
protected abstract void onMessage(WebSocketSession session, MessageWrapper<T> msg);
}

View File

@@ -0,0 +1,22 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import org.springframework.web.socket.WebSocketSession;
/**
* 终端消息处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 18:53
*/
public interface ITerminalHandler {
/**
* 处理消息
*
* @param session session
* @param payload payload
*/
void process(WebSocketSession session, String payload);
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.alibaba.fastjson.TypeReference;
import com.orion.ops.module.asset.handler.host.terminal.entity.MessageWrapper;
import com.orion.ops.module.asset.handler.host.terminal.entity.request.TerminalConnectRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
/**
* 连接主机处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 15:32
*/
@Component
public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConnectRequest> {
public TerminalConnectHandler() {
super(new TypeReference<MessageWrapper<TerminalConnectRequest>>(TerminalConnectRequest.class) {
});
}
@Override
protected void onMessage(WebSocketSession session, MessageWrapper<TerminalConnectRequest> msg) {
System.out.println(msg);
}
}

View File

@@ -1,11 +1,9 @@
package com.orion.ops.module.asset.interceptor;
import com.orion.lang.utils.Urls;
import com.orion.ops.framework.biz.operator.log.core.model.OperatorLogModel;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.common.entity.RequestIdentity;
import com.orion.ops.framework.common.meta.TraceIdHolder;
import com.orion.ops.framework.common.utils.Requests;
import com.orion.ops.module.asset.entity.dto.HostTerminalAccessDTO;
import com.orion.ops.module.asset.service.HostTerminalService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
@@ -26,7 +24,7 @@ import java.util.Map;
*/
@Slf4j
@Component
public class TerminalInterceptor implements HandshakeInterceptor {
public class TerminalAccessInterceptor implements HandshakeInterceptor {
@Resource
private HostTerminalService hostTerminalService;
@@ -36,16 +34,19 @@ public class TerminalInterceptor implements HandshakeInterceptor {
// 获取 token
String token = Urls.getUrlSource(request.getURI().getPath());
log.info("TerminalInterceptor-beforeHandshake start token: {}", token);
attributes.put(ExtraFieldConst.USER_ID, 1L);
attributes.put(ExtraFieldConst.TRACE_ID, TraceIdHolder.get());
attributes.put(ExtraFieldConst.IDENTITY, Requests.getIdentity());
// 获取连接数据
HostTerminalAccessDTO access = hostTerminalService.getAccessInfoByToken(token);
if (access == null) {
log.error("TerminalInterceptor-beforeHandshake absent token: {}", token);
return false;
}
// 设置参数
attributes.put(ExtraFieldConst.USER_ID, access.getUserId());
OperatorLogModel identity = new OperatorLogModel();
Requests.fillIdentity(identity);
// HostTerminalAccessDTO access = hostTerminalService.getAccessInfoByToken(token);
// if (access == null) {
// log.error("TerminalInterceptor-beforeHandshake absent token: {}", token);
// return false;
// }
// // 设置参数
// attributes.put(ExtraFieldConst.USER_ID, access.getUserId());
// attributes.put(ExtraFieldConst.TRACE_ID, TraceIdHolder.get());
// attributes.put(ExtraFieldConst.IDENTITY, Requests.getIdentity());
return true;
}