异步处理终端消息.

This commit is contained in:
lijiahang
2024-02-26 11:16:48 +08:00
parent b30a48f174
commit 4a3acab23d
6 changed files with 83 additions and 29 deletions

View File

@@ -2,6 +2,7 @@ package com.orion.ops.framework.websocket.core.utils;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.Exceptions;
import com.orion.lang.utils.Threads;
import com.orion.ops.framework.websocket.core.constant.WsCloseCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
@@ -57,16 +58,38 @@ public class WebSockets {
return;
}
try {
// 响应
// 发重消息
session.sendMessage(new TextMessage(message));
} catch (IllegalStateException e) {
// 并发异常
log.error("发送消息失败 {}", Exceptions.getDigest(e));
log.error("发送消息失败, 准备进行重试 {}", Exceptions.getDigest(e));
// 并发重试
retrySendText(session, message, 50);
} catch (IOException e) {
throw Exceptions.ioRuntime(e);
}
}
/**
* 重试发送消息 忽略并发报错
*
* @param session session
* @param message message
* @param delay delay
*/
public static void retrySendText(WebSocketSession session, String message, long delay) {
if (!session.isOpen()) {
return;
}
try {
Threads.sleep(delay);
session.sendMessage(new TextMessage(message));
Threads.sleep(delay);
} catch (Exception ex) {
throw Exceptions.ioRuntime(ex);
}
}
/**
* 关闭会话
*

View File

@@ -16,10 +16,10 @@ import java.util.concurrent.ThreadPoolExecutor;
public interface AssetThreadPools {
/**
* terminal 调度线程池
* terminal 标准输出线程池
*/
ThreadPoolExecutor TERMINAL_SCHEDULER = ExecutorBuilder.create()
.namedThreadFactory("terminal-thread-")
ThreadPoolExecutor TERMINAL_STDOUT = ExecutorBuilder.create()
.namedThreadFactory("terminal-stdout-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)
@@ -28,10 +28,10 @@ public interface AssetThreadPools {
.build();
/**
* SFTP 下载线程池
* terminal 操作线程池
*/
ThreadPoolExecutor SFTP_DOWNLOAD_SCHEDULER = ExecutorBuilder.create()
.namedThreadFactory("sftp-download-thread-")
ThreadPoolExecutor TERMINAL_OPERATOR = ExecutorBuilder.create()
.namedThreadFactory("terminal-operator-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)

View File

@@ -1,5 +1,6 @@
package com.orion.ops.module.asset.handler.host.terminal;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.handler.host.terminal.enums.InputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import lombok.extern.slf4j.Slf4j;
@@ -31,8 +32,17 @@ public class TerminalMessageDispatcher extends AbstractWebSocketHandler {
try {
// 解析类型
InputTypeEnum type = InputTypeEnum.of(payload);
if (type != null) {
// 解析并处理消息
if (type == null) {
return;
}
// 解析并处理消息
if (type.isAsyncExec()) {
// 异步执行
AssetThreadPools.TERMINAL_OPERATOR.execute(() -> {
type.getHandler().handle(session, type.parse(payload));
});
} else {
// 同步执行
type.getHandler().handle(session, type.parse(payload));
}
} catch (Exception e) {

View File

@@ -25,7 +25,8 @@ public enum InputTypeEnum {
CHECK("ck",
TerminalCheckHandler.class,
new String[]{"type", "sessionId", "hostId", "connectType"},
TerminalCheckRequest.class),
TerminalCheckRequest.class,
true),
/**
* 连接主机
@@ -33,7 +34,8 @@ public enum InputTypeEnum {
CONNECT("co",
TerminalConnectHandler.class,
new String[]{"type", "sessionId", "terminalType", "cols", "rows"},
TerminalConnectRequest.class),
TerminalConnectRequest.class,
true),
/**
* 关闭连接
@@ -41,7 +43,8 @@ public enum InputTypeEnum {
CLOSE("cl",
TerminalCloseHandler.class,
new String[]{"type", "sessionId"},
TerminalBasePayload.class),
TerminalBasePayload.class,
true),
/**
* ping
@@ -49,7 +52,8 @@ public enum InputTypeEnum {
PING("p",
TerminalPingHandler.class,
new String[]{"type"},
TerminalBasePayload.class),
TerminalBasePayload.class,
true),
/**
* SSH 修改大小
@@ -57,7 +61,8 @@ public enum InputTypeEnum {
SSH_RESIZE("rs",
SshResizeHandler.class,
new String[]{"type", "sessionId", "cols", "rows"},
SshResizeRequest.class),
SshResizeRequest.class,
true),
/**
* SSH 输入
@@ -65,7 +70,8 @@ public enum InputTypeEnum {
SSH_INPUT("i",
SshInputHandler.class,
new String[]{"type", "sessionId", "command"},
SshInputRequest.class),
SshInputRequest.class,
false),
/**
* SFTP 文件列表
@@ -73,7 +79,8 @@ public enum InputTypeEnum {
SFTP_LIST("ls",
SftpListHandler.class,
new String[]{"type", "sessionId", "showHiddenFile", "path"},
SftpListRequest.class),
SftpListRequest.class,
true),
/**
* SFTP 创建文件夹
@@ -81,7 +88,8 @@ public enum InputTypeEnum {
SFTP_MKDIR("mk",
SftpMakeDirectoryHandler.class,
new String[]{"type", "sessionId", "path"},
SftpBaseRequest.class),
SftpBaseRequest.class,
true),
/**
* SFTP 创建文件
@@ -89,7 +97,8 @@ public enum InputTypeEnum {
SFTP_TOUCH("to",
SftpTouchHandler.class,
new String[]{"type", "sessionId", "path"},
SftpBaseRequest.class),
SftpBaseRequest.class,
true),
/**
* SFTP 移动文件
@@ -97,7 +106,8 @@ public enum InputTypeEnum {
SFTP_MOVE("mv",
SftpMoveHandler.class,
new String[]{"type", "sessionId", "path", "target"},
SftpMoveRequest.class),
SftpMoveRequest.class,
true),
/**
* SFTP 删除文件
@@ -105,7 +115,8 @@ public enum InputTypeEnum {
SFTP_REMOVE("rm",
SftpRemoveHandler.class,
new String[]{"type", "sessionId", "path"},
SftpBaseRequest.class),
SftpBaseRequest.class,
true),
/**
* SFTP 截断文件
@@ -113,7 +124,8 @@ public enum InputTypeEnum {
SFTP_TRUNCATE("tc",
SftpTruncateHandler.class,
new String[]{"type", "sessionId", "path"},
SftpBaseRequest.class),
SftpBaseRequest.class,
true),
/**
* SFTP 修改文件权限
@@ -121,7 +133,8 @@ public enum InputTypeEnum {
SFTP_CHMOD("cm",
SftpChangeModHandler.class,
new String[]{"type", "sessionId", "path", "mod"},
SftpChangeModRequest.class),
SftpChangeModRequest.class,
true),
/**
* SFTP 下载文件夹展开文件
@@ -129,7 +142,8 @@ public enum InputTypeEnum {
SFTP_DOWNLOAD_FLAT_DIRECTORY("df",
SftpDownloadFlatDirectoryHandler.class,
new String[]{"type", "sessionId", "currentPath", "path"},
SftpDownloadFlatDirectoryRequest.class),
SftpDownloadFlatDirectoryRequest.class,
true),
/**
* SFTP 获取内容
@@ -137,7 +151,8 @@ public enum InputTypeEnum {
SFTP_GET_CONTENT("gc",
SftpGetContentHandler.class,
new String[]{"type", "sessionId", "path"},
SftpBaseRequest.class),
SftpBaseRequest.class,
true),
/**
* SFTP 修改内容
@@ -145,7 +160,8 @@ public enum InputTypeEnum {
SFTP_SET_CONTENT("sc",
SftpSetContentHandler.class,
new String[]{"type", "sessionId", "path", "content"},
SftpSetContentRequest.class),
SftpSetContentRequest.class,
true),
;
@@ -160,17 +176,22 @@ public enum InputTypeEnum {
private final Class<? extends TerminalBasePayload> payloadClass;
@Getter
private final boolean asyncExec;
@Getter
private ITerminalHandler<? extends TerminalBasePayload> handler;
<T extends TerminalBasePayload> InputTypeEnum(String type,
Class<? extends ITerminalHandler<T>> handlerBean,
String[] payloadDefine,
Class<T> payloadClass) {
Class<T> payloadClass,
boolean asyncExec) {
this.type = type;
this.handlerBean = handlerBean;
this.payloadDefine = payloadDefine;
this.payloadClass = payloadClass;
this.asyncExec = asyncExec;
}
public static InputTypeEnum of(String payload) {

View File

@@ -56,7 +56,7 @@ public class SshSession extends TerminalSession implements ISshSession {
executor.callback(this::eofCallback);
executor.connect();
// 开始监听输出
AssetThreadPools.TERMINAL_SCHEDULER.execute(executor);
AssetThreadPools.TERMINAL_STDOUT.execute(executor);
}
@Override

View File

@@ -62,7 +62,7 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
return;
}
// 异步读取文件内容
AssetThreadPools.SFTP_DOWNLOAD_SCHEDULER.execute(() -> {
AssetThreadPools.TERMINAL_OPERATOR.execute(() -> {
Exception ex = null;
try {
byte[] buffer = new byte[Const.BUFFER_KB_32];