From 4a3acab23d61ae586b0b9aa7859c1985868f4686 Mon Sep 17 00:00:00 2001 From: lijiahang Date: Mon, 26 Feb 2024 11:16:48 +0800 Subject: [PATCH] =?UTF-8?q?:zap:=20=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86?= =?UTF-8?q?=E7=BB=88=E7=AB=AF=E6=B6=88=E6=81=AF.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/core/utils/WebSockets.java | 27 ++++++++- .../module/asset/define/AssetThreadPools.java | 12 ++-- .../terminal/TerminalMessageDispatcher.java | 14 ++++- .../host/terminal/enums/InputTypeEnum.java | 55 +++++++++++++------ .../host/terminal/session/SshSession.java | 2 +- .../transfer/session/DownloadSession.java | 2 +- 6 files changed, 83 insertions(+), 29 deletions(-) diff --git a/orion-ops-framework/orion-ops-spring-boot-starter-websocket/src/main/java/com/orion/ops/framework/websocket/core/utils/WebSockets.java b/orion-ops-framework/orion-ops-spring-boot-starter-websocket/src/main/java/com/orion/ops/framework/websocket/core/utils/WebSockets.java index 2ec18417..156c2449 100644 --- a/orion-ops-framework/orion-ops-spring-boot-starter-websocket/src/main/java/com/orion/ops/framework/websocket/core/utils/WebSockets.java +++ b/orion-ops-framework/orion-ops-spring-boot-starter-websocket/src/main/java/com/orion/ops/framework/websocket/core/utils/WebSockets.java @@ -2,6 +2,7 @@ package com.orion.ops.framework.websocket.core.utils; import com.alibaba.fastjson.JSON; import com.orion.lang.utils.Exceptions; +import com.orion.lang.utils.Threads; import com.orion.ops.framework.websocket.core.constant.WsCloseCode; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.CloseStatus; @@ -57,16 +58,38 @@ public class WebSockets { return; } try { - // 响应 + // 发重消息 session.sendMessage(new TextMessage(message)); } catch (IllegalStateException e) { // 并发异常 - log.error("发送消息失败 {}", Exceptions.getDigest(e)); + log.error("发送消息失败, 准备进行重试 {}", Exceptions.getDigest(e)); + // 并发重试 + retrySendText(session, message, 50); } catch (IOException e) { throw Exceptions.ioRuntime(e); } } + /** + * 重试发送消息 忽略并发报错 + * + * @param session session + * @param message message + * @param delay delay + */ + public static void retrySendText(WebSocketSession session, String message, long delay) { + if (!session.isOpen()) { + return; + } + try { + Threads.sleep(delay); + session.sendMessage(new TextMessage(message)); + Threads.sleep(delay); + } catch (Exception ex) { + throw Exceptions.ioRuntime(ex); + } + } + /** * 关闭会话 * diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java index d6b9e7e2..582a5f02 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java @@ -16,10 +16,10 @@ import java.util.concurrent.ThreadPoolExecutor; public interface AssetThreadPools { /** - * terminal 调度线程池 + * terminal 标准输出线程池 */ - ThreadPoolExecutor TERMINAL_SCHEDULER = ExecutorBuilder.create() - .namedThreadFactory("terminal-thread-") + ThreadPoolExecutor TERMINAL_STDOUT = ExecutorBuilder.create() + .namedThreadFactory("terminal-stdout-") .corePoolSize(1) .maxPoolSize(Integer.MAX_VALUE) .keepAliveTime(Const.MS_S_60) @@ -28,10 +28,10 @@ public interface AssetThreadPools { .build(); /** - * SFTP 下载线程池 + * terminal 操作线程池 */ - ThreadPoolExecutor SFTP_DOWNLOAD_SCHEDULER = ExecutorBuilder.create() - .namedThreadFactory("sftp-download-thread-") + ThreadPoolExecutor TERMINAL_OPERATOR = ExecutorBuilder.create() + .namedThreadFactory("terminal-operator-") .corePoolSize(1) .maxPoolSize(Integer.MAX_VALUE) .keepAliveTime(Const.MS_S_60) diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/TerminalMessageDispatcher.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/TerminalMessageDispatcher.java index 57859e22..a76dc6a5 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/TerminalMessageDispatcher.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/TerminalMessageDispatcher.java @@ -1,5 +1,6 @@ package com.orion.ops.module.asset.handler.host.terminal; +import com.orion.ops.module.asset.define.AssetThreadPools; import com.orion.ops.module.asset.handler.host.terminal.enums.InputTypeEnum; import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager; import lombok.extern.slf4j.Slf4j; @@ -31,8 +32,17 @@ public class TerminalMessageDispatcher extends AbstractWebSocketHandler { try { // 解析类型 InputTypeEnum type = InputTypeEnum.of(payload); - if (type != null) { - // 解析并处理消息 + if (type == null) { + return; + } + // 解析并处理消息 + if (type.isAsyncExec()) { + // 异步执行 + AssetThreadPools.TERMINAL_OPERATOR.execute(() -> { + type.getHandler().handle(session, type.parse(payload)); + }); + } else { + // 同步执行 type.getHandler().handle(session, type.parse(payload)); } } catch (Exception e) { diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java index b73e5309..fa357dae 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java @@ -25,7 +25,8 @@ public enum InputTypeEnum { CHECK("ck", TerminalCheckHandler.class, new String[]{"type", "sessionId", "hostId", "connectType"}, - TerminalCheckRequest.class), + TerminalCheckRequest.class, + true), /** * 连接主机 @@ -33,7 +34,8 @@ public enum InputTypeEnum { CONNECT("co", TerminalConnectHandler.class, new String[]{"type", "sessionId", "terminalType", "cols", "rows"}, - TerminalConnectRequest.class), + TerminalConnectRequest.class, + true), /** * 关闭连接 @@ -41,7 +43,8 @@ public enum InputTypeEnum { CLOSE("cl", TerminalCloseHandler.class, new String[]{"type", "sessionId"}, - TerminalBasePayload.class), + TerminalBasePayload.class, + true), /** * ping @@ -49,7 +52,8 @@ public enum InputTypeEnum { PING("p", TerminalPingHandler.class, new String[]{"type"}, - TerminalBasePayload.class), + TerminalBasePayload.class, + true), /** * SSH 修改大小 @@ -57,7 +61,8 @@ public enum InputTypeEnum { SSH_RESIZE("rs", SshResizeHandler.class, new String[]{"type", "sessionId", "cols", "rows"}, - SshResizeRequest.class), + SshResizeRequest.class, + true), /** * SSH 输入 @@ -65,7 +70,8 @@ public enum InputTypeEnum { SSH_INPUT("i", SshInputHandler.class, new String[]{"type", "sessionId", "command"}, - SshInputRequest.class), + SshInputRequest.class, + false), /** * SFTP 文件列表 @@ -73,7 +79,8 @@ public enum InputTypeEnum { SFTP_LIST("ls", SftpListHandler.class, new String[]{"type", "sessionId", "showHiddenFile", "path"}, - SftpListRequest.class), + SftpListRequest.class, + true), /** * SFTP 创建文件夹 @@ -81,7 +88,8 @@ public enum InputTypeEnum { SFTP_MKDIR("mk", SftpMakeDirectoryHandler.class, new String[]{"type", "sessionId", "path"}, - SftpBaseRequest.class), + SftpBaseRequest.class, + true), /** * SFTP 创建文件 @@ -89,7 +97,8 @@ public enum InputTypeEnum { SFTP_TOUCH("to", SftpTouchHandler.class, new String[]{"type", "sessionId", "path"}, - SftpBaseRequest.class), + SftpBaseRequest.class, + true), /** * SFTP 移动文件 @@ -97,7 +106,8 @@ public enum InputTypeEnum { SFTP_MOVE("mv", SftpMoveHandler.class, new String[]{"type", "sessionId", "path", "target"}, - SftpMoveRequest.class), + SftpMoveRequest.class, + true), /** * SFTP 删除文件 @@ -105,7 +115,8 @@ public enum InputTypeEnum { SFTP_REMOVE("rm", SftpRemoveHandler.class, new String[]{"type", "sessionId", "path"}, - SftpBaseRequest.class), + SftpBaseRequest.class, + true), /** * SFTP 截断文件 @@ -113,7 +124,8 @@ public enum InputTypeEnum { SFTP_TRUNCATE("tc", SftpTruncateHandler.class, new String[]{"type", "sessionId", "path"}, - SftpBaseRequest.class), + SftpBaseRequest.class, + true), /** * SFTP 修改文件权限 @@ -121,7 +133,8 @@ public enum InputTypeEnum { SFTP_CHMOD("cm", SftpChangeModHandler.class, new String[]{"type", "sessionId", "path", "mod"}, - SftpChangeModRequest.class), + SftpChangeModRequest.class, + true), /** * SFTP 下载文件夹展开文件 @@ -129,7 +142,8 @@ public enum InputTypeEnum { SFTP_DOWNLOAD_FLAT_DIRECTORY("df", SftpDownloadFlatDirectoryHandler.class, new String[]{"type", "sessionId", "currentPath", "path"}, - SftpDownloadFlatDirectoryRequest.class), + SftpDownloadFlatDirectoryRequest.class, + true), /** * SFTP 获取内容 @@ -137,7 +151,8 @@ public enum InputTypeEnum { SFTP_GET_CONTENT("gc", SftpGetContentHandler.class, new String[]{"type", "sessionId", "path"}, - SftpBaseRequest.class), + SftpBaseRequest.class, + true), /** * SFTP 修改内容 @@ -145,7 +160,8 @@ public enum InputTypeEnum { SFTP_SET_CONTENT("sc", SftpSetContentHandler.class, new String[]{"type", "sessionId", "path", "content"}, - SftpSetContentRequest.class), + SftpSetContentRequest.class, + true), ; @@ -160,17 +176,22 @@ public enum InputTypeEnum { private final Class payloadClass; + @Getter + private final boolean asyncExec; + @Getter private ITerminalHandler handler; InputTypeEnum(String type, Class> handlerBean, String[] payloadDefine, - Class payloadClass) { + Class payloadClass, + boolean asyncExec) { this.type = type; this.handlerBean = handlerBean; this.payloadDefine = payloadDefine; this.payloadClass = payloadClass; + this.asyncExec = asyncExec; } public static InputTypeEnum of(String payload) { diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SshSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SshSession.java index 3798a43a..88b78403 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SshSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SshSession.java @@ -56,7 +56,7 @@ public class SshSession extends TerminalSession implements ISshSession { executor.callback(this::eofCallback); executor.connect(); // 开始监听输出 - AssetThreadPools.TERMINAL_SCHEDULER.execute(executor); + AssetThreadPools.TERMINAL_STDOUT.execute(executor); } @Override diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java index e7877ad1..9bb23073 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java @@ -62,7 +62,7 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes return; } // 异步读取文件内容 - AssetThreadPools.SFTP_DOWNLOAD_SCHEDULER.execute(() -> { + AssetThreadPools.TERMINAL_OPERATOR.execute(() -> { Exception ex = null; try { byte[] buffer = new byte[Const.BUFFER_KB_32];