From 25b15559a4a74d5a36a82008d3007b99a14c7f7d Mon Sep 17 00:00:00 2001 From: lijiahangmax Date: Thu, 22 Feb 2024 00:06:24 +0800 Subject: [PATCH] =?UTF-8?q?:hammer:=20=E4=B8=8A=E4=BC=A0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/AssetWebSocketConfiguration.java | 6 +- .../handler/host/sftp/TransferManager.java | 23 --- .../host/sftp/upload/FileUploader.java | 9 - .../handler/AbstractTerminalHandler.java | 2 +- .../host/terminal/session/SftpSession.java | 2 +- .../TransferMessageDispatcher.java} | 26 ++- .../transfer/enums/TransferOperatorType.java | 48 +++++ .../transfer/handler/ITransferHandler.java | 29 +++ .../transfer/handler/TransferHandler.java | 192 ++++++++++++++++++ .../model/TransferOperatorRequest.java | 35 ++++ .../model/TransferOperatorResponse.java | 38 ++++ .../session/ITransferHostSession.java | 42 ++++ .../transfer/session/TransferHostSession.java | 79 +++++++ .../terminal/handler/sftp-transfer-manager.ts | 65 ++++-- .../host/terminal/types/terminal.const.ts | 7 + .../host/terminal/types/terminal.type.ts | 10 + 16 files changed, 555 insertions(+), 58 deletions(-) delete mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferManager.java delete mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/upload/FileUploader.java rename orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/{sftp/TransferMessageHandler.java => transfer/TransferMessageDispatcher.java} (55%) create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/ITransferHandler.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorRequest.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorResponse.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/config/AssetWebSocketConfiguration.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/config/AssetWebSocketConfiguration.java index b6d0f5f4..518498b1 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/config/AssetWebSocketConfiguration.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/config/AssetWebSocketConfiguration.java @@ -1,6 +1,6 @@ package com.orion.ops.module.asset.config; -import com.orion.ops.module.asset.handler.host.sftp.TransferMessageHandler; +import com.orion.ops.module.asset.handler.host.transfer.TransferMessageDispatcher; import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher; import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor; import org.springframework.beans.factory.annotation.Value; @@ -30,7 +30,7 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer { private TerminalMessageDispatcher terminalMessageDispatcher; @Resource - private TransferMessageHandler transferMessageHandler; + private TransferMessageDispatcher transferMessageDispatcher; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { @@ -39,7 +39,7 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer { .addInterceptors(terminalAccessInterceptor) .setAllowedOrigins("*"); // 文件传输 - registry.addHandler(transferMessageHandler, prefix + "/host/transfer/{accessToken}") + registry.addHandler(transferMessageDispatcher, prefix + "/host/transfer/{accessToken}") .addInterceptors(terminalAccessInterceptor) .setAllowedOrigins("*"); } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferManager.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferManager.java deleted file mode 100644 index 5e91077b..00000000 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferManager.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.orion.ops.module.asset.handler.host.sftp; - -import com.orion.lang.define.collect.MultiConcurrentHashMap; -import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession; -import org.springframework.stereotype.Component; - -/** - * 传输管理器 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/21 19:05 - */ -@Component -public class TransferManager { - - /** - * 会话存储器 - */ - private final MultiConcurrentHashMap channelSessions = MultiConcurrentHashMap.create(); - - -} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/upload/FileUploader.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/upload/FileUploader.java deleted file mode 100644 index 1c59430f..00000000 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/upload/FileUploader.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.orion.ops.module.asset.handler.host.sftp.upload; - -/** - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/21 19:04 - */ -public class FileUploader { -} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/AbstractTerminalHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/AbstractTerminalHandler.java index 680e8070..0c5a3c60 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/AbstractTerminalHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/AbstractTerminalHandler.java @@ -60,7 +60,7 @@ public abstract class AbstractTerminalHandler imp } /** - * 获取 sftp 错误信息 + * 获取错误信息 * * @param ex ex * @return msg diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java index 26615c6e..ac67ba8a 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java @@ -49,7 +49,7 @@ public class SftpSession extends TerminalSession implements ISftpSession { @Override public void connect() { - // 打开 shell + // 打开 sftp this.executor = sessionStore.getSftpExecutor(config.getFileNameCharset()); executor.connect(); } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferMessageHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/TransferMessageDispatcher.java similarity index 55% rename from orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferMessageHandler.java rename to orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/TransferMessageDispatcher.java index f9b22bc2..f1b647df 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/sftp/TransferMessageHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/TransferMessageDispatcher.java @@ -1,5 +1,10 @@ -package com.orion.ops.module.asset.handler.host.sftp; +package com.orion.ops.module.asset.handler.host.transfer; +import com.alibaba.fastjson.JSON; +import com.orion.lang.utils.io.Streams; +import com.orion.ops.module.asset.handler.host.transfer.handler.ITransferHandler; +import com.orion.ops.module.asset.handler.host.transfer.handler.TransferHandler; +import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.BinaryMessage; @@ -8,6 +13,8 @@ import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; +import java.util.concurrent.ConcurrentHashMap; + /** * sftp 传输消息处理器 * @@ -17,16 +24,21 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler; */ @Slf4j @Component -public class TransferMessageHandler extends AbstractWebSocketHandler { +public class TransferMessageDispatcher extends AbstractWebSocketHandler { + + private final ConcurrentHashMap handlers = new ConcurrentHashMap<>(); @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - System.out.println("text"); + protected void handleTextMessage(WebSocketSession session, TextMessage message) { + // 获取处理器 + ITransferHandler handler = handlers.computeIfAbsent(session.getId(), s -> new TransferHandler(session)); + // 处理消息 + handler.handleMessage(JSON.parseObject(message.getPayload(), TransferOperatorRequest.class)); } @Override - protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { - System.out.println("binary"); + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { + handlers.get(session.getId()).putContent(message.getPayload().array()); } @Override @@ -42,6 +54,8 @@ public class TransferMessageHandler extends AbstractWebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { String id = session.getId(); + // 关闭会话 + Streams.close(handlers.get(id)); log.info("TransferMessageHandler-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason()); } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java new file mode 100644 index 00000000..f35bd4c8 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java @@ -0,0 +1,48 @@ +package com.orion.ops.module.asset.handler.host.transfer.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 消息操作类型 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 22:03 + */ +@Getter +@AllArgsConstructor +public enum TransferOperatorType { + + /** + * 处理完成 + */ + PROCESSED("processed"), + + /** + * 开始上传 + */ + UPLOAD_START("upload_start"), + + /** + * 上传完成 + */ + UPLOAD_FINISH("upload_finish"), + + ; + + private final String type; + + public static TransferOperatorType of(String type) { + if (type == null) { + return null; + } + for (TransferOperatorType value : values()) { + if (value.type.equals(type)) { + return value; + } + } + return null; + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/ITransferHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/ITransferHandler.java new file mode 100644 index 00000000..0330ddb6 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/ITransferHandler.java @@ -0,0 +1,29 @@ +package com.orion.ops.module.asset.handler.host.transfer.handler; + +import com.orion.lang.able.SafeCloseable; +import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest; + +/** + * 传输处理器定义 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 22:46 + */ +public interface ITransferHandler extends SafeCloseable { + + /** + * 处理消息 + * + * @param payload payload + */ + void handleMessage(TransferOperatorRequest payload); + + /** + * 写入内容 + * + * @param content content + */ + void putContent(byte[] content); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java new file mode 100644 index 00000000..a642e05b --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java @@ -0,0 +1,192 @@ +package com.orion.ops.module.asset.handler.host.transfer.handler; + +import com.alibaba.fastjson.JSON; +import com.orion.lang.exception.argument.InvalidArgumentException; +import com.orion.lang.utils.io.Streams; +import com.orion.net.host.SessionStore; +import com.orion.ops.framework.common.constant.ErrorMessage; +import com.orion.ops.framework.common.constant.ExtraFieldConst; +import com.orion.ops.framework.websocket.core.utils.WebSockets; +import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; +import com.orion.ops.module.asset.enums.HostConnectTypeEnum; +import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType; +import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest; +import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse; +import com.orion.ops.module.asset.handler.host.transfer.session.ITransferHostSession; +import com.orion.ops.module.asset.handler.host.transfer.session.TransferHostSession; +import com.orion.ops.module.asset.service.HostTerminalService; +import com.orion.spring.SpringHolder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 传输处理器 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 20:57 + */ +@Slf4j +public class TransferHandler implements ITransferHandler { + + private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class); + + private final Long userId; + + private final WebSocketSession channel; + + /** + * 当前会话 + */ + private ITransferHostSession currentSession; + + /** + * 会话列表 + */ + private final ConcurrentHashMap sessions; + + public TransferHandler(WebSocketSession channel) { + this.channel = channel; + this.userId = (Long) channel.getAttributes().get(ExtraFieldConst.USER_ID); + this.sessions = new ConcurrentHashMap<>(); + } + + @Override + public void handleMessage(TransferOperatorRequest payload) { + // 解析消息类型 + TransferOperatorType type = TransferOperatorType.of(payload.getType()); + // 获取会话 + if (!this.getAndInitSession(payload)) { + return; + } + // 处理消息 + switch (type) { + case UPLOAD_START: + // 准备上传 + this.uploadStart(payload); + break; + case UPLOAD_FINISH: + // 上传完成 + this.uploadFinish(); + break; + default: + break; + } + } + + @Override + public void putContent(byte[] content) { + Exception ex = null; + try { + // 写入内容 + currentSession.putContent(content); + } catch (IOException e) { + ex = e; + log.error("TransferHandler.putContent error", e); + // 写入完成 + currentSession.putFinish(); + } + // 响应结果 + TransferOperatorResponse resp = TransferOperatorResponse.builder() + .type(TransferOperatorType.PROCESSED.getType()) + .success(ex == null) + .msg(this.getErrorMessage(ex)) + .build(); + WebSockets.sendText(this.channel, JSON.toJSONString(resp)); + } + + /** + * 准备上传 + * + * @param payload payload + */ + private void uploadStart(TransferOperatorRequest payload) { + Exception ex = null; + try { + currentSession.startUpload(payload.getPath()); + } catch (Exception e) { + ex = e; + log.error("TransferHandler.uploadStart error", e); + } + // 响应结果 + TransferOperatorResponse resp = TransferOperatorResponse.builder() + .type(TransferOperatorType.PROCESSED.getType()) + .success(ex == null) + .msg(this.getErrorMessage(ex)) + .build(); + WebSockets.sendText(this.channel, JSON.toJSONString(resp)); + } + + /** + * 上传完成 + */ + private void uploadFinish() { + currentSession.putFinish(); + // 响应结果 + TransferOperatorResponse resp = TransferOperatorResponse.builder() + .type(TransferOperatorType.PROCESSED.getType()) + .success(true) + .build(); + WebSockets.sendText(this.channel, JSON.toJSONString(resp)); + } + + /** + * 获取并且初始化会话 + * + * @param payload payload + * @return success + */ + private boolean getAndInitSession(TransferOperatorRequest payload) { + Long hostId = payload.getHostId(); + try { + // 获取会话 + ITransferHostSession session = sessions.get(hostId); + if (session == null) { + // 获取主机信息 + HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId, this.userId, HostConnectTypeEnum.SFTP); + SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo); + // 打开会话并初始化 + session = new TransferHostSession(connectInfo, sessionStore); + session.init(); + this.currentSession = session; + sessions.put(hostId, session); + } + return true; + } catch (Exception e) { + log.error("TransferHandler.getAndInitSession error", e); + // 响应结果 + TransferOperatorResponse resp = TransferOperatorResponse.builder() + .type(TransferOperatorType.PROCESSED.getType()) + .success(false) + .msg(this.getErrorMessage(e)) + .build(); + WebSockets.sendText(this.channel, JSON.toJSONString(resp)); + return false; + } + } + + /** + * 获取错误信息 + * + * @param ex ex + * @return msg + */ + private String getErrorMessage(Exception ex) { + if (ex == null) { + return null; + } + if (ex instanceof InvalidArgumentException) { + return ex.getMessage(); + } + return ErrorMessage.OPERATE_ERROR; + } + + @Override + public void close() { + sessions.values().forEach(Streams::close); + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorRequest.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorRequest.java new file mode 100644 index 00000000..1fab78be --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorRequest.java @@ -0,0 +1,35 @@ +package com.orion.ops.module.asset.handler.host.transfer.model; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * 文件操作请求 实体对象 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 21:01 + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@Schema(name = "FileOperatorRequest", description = "文件操作请求 实体对象") +public class TransferOperatorRequest { + + @Schema(description = "上传路径") + private String path; + + @Schema(description = "type") + private String type; + + @Schema(description = "fileId") + private String fileId; + + @Schema(description = "主机id") + private Long hostId; + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorResponse.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorResponse.java new file mode 100644 index 00000000..a05223f1 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/model/TransferOperatorResponse.java @@ -0,0 +1,38 @@ +package com.orion.ops.module.asset.handler.host.transfer.model; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * 文件操作响应 实体对象 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 22:38 + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@Schema(name = "FileOperatorResponse", description = "文件操作响应 实体对象") +public class TransferOperatorResponse { + + @Schema(description = "type") + private String type; + + @Schema(description = "fileId") + private String fileId; + + @Schema(description = "主机id") + private Long hostId; + + @Schema(description = "是否成功") + private Boolean success; + + @Schema(description = "消息") + private String msg; + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java new file mode 100644 index 00000000..57c5d651 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java @@ -0,0 +1,42 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +import com.orion.lang.able.SafeCloseable; + +import java.io.IOException; + +/** + * 主机传输会话定义 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 23:06 + */ +public interface ITransferHostSession extends SafeCloseable { + + /** + * 初始化 + */ + void init(); + + /** + * 开始上传 + * + * @param path path + * @throws IOException IOException + */ + void startUpload(String path) throws IOException; + + /** + * 写入内容 + * + * @param bytes bytes + * @throws IOException IOException + */ + void putContent(byte[] bytes) throws IOException; + + /** + * 写入完成 + */ + void putFinish(); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java new file mode 100644 index 00000000..29a473be --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java @@ -0,0 +1,79 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +import com.orion.lang.utils.io.Streams; +import com.orion.net.host.SessionStore; +import com.orion.net.host.sftp.SftpExecutor; +import com.orion.net.host.sftp.SftpFile; +import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * 主机传输会话实现 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 21:12 + */ +public class TransferHostSession implements ITransferHostSession { + + private final HostTerminalConnectDTO connectInfo; + + private final SessionStore sessionStore; + + private SftpExecutor executor; + + private OutputStream currentOutputStream; + + public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore) { + this.connectInfo = connectInfo; + this.sessionStore = sessionStore; + } + + @Override + public void init() { + if (executor == null) { + // 建立连接 + this.executor = sessionStore.getSftpExecutor(connectInfo.getFileNameCharset()); + executor.connect(); + } else { + // 检查连接 + if (!this.executor.isConnected()) { + executor.connect(); + } + } + } + + @Override + public void startUpload(String path) throws IOException { + // 检查连接 + this.init(); + SftpFile file = executor.getFile(path); + if (file != null) { + // 文件存在则重命名 + executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis()); + } + // 打开输出流 + this.currentOutputStream = executor.openOutputStream(path); + } + + @Override + public void putContent(byte[] bytes) throws IOException { + currentOutputStream.write(bytes); + } + + @Override + public void putFinish() { + Streams.close(currentOutputStream); + this.currentOutputStream = null; + } + + @Override + public void close() { + Streams.close(executor); + Streams.close(sessionStore); + Streams.close(currentOutputStream); + } + +} diff --git a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts index f67ec3c8..9365f740 100644 --- a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts +++ b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts @@ -1,8 +1,10 @@ import type { ISftpTransferManager, SftpTransferItem } from '../types/terminal.type'; -import { TransferStatus, TransferType } from '../types/terminal.const'; +import { TransferOperatorResponse } from '../types/terminal.type'; +import { TransferOperatorType, TransferStatus, TransferType } from '../types/terminal.const'; import { sleep } from '@/utils'; import { Message } from '@arco-design/web-vue'; import { getTerminalAccessToken } from '@/api/asset/host-terminal'; +import { getPath } from '@/utils/file'; export const BLOCK_SIZE = 1024 * 1024; @@ -17,6 +19,8 @@ export default class SftpTransferManager implements ISftpTransferManager { private run: boolean; + private resp?: TransferOperatorResponse; + transferList: Array; constructor() { @@ -96,27 +100,35 @@ export default class SftpTransferManager implements ISftpTransferManager { // 接收消息 private async resolveMessage(message: MessageEvent) { // TODO - console.log(); - const data = message.data; - if (data === 'flush') { - - } else if (data === 'error') { - - } else if (data === 'close') { - // TODO 关闭会话 - this.client?.close(); - } + this.resp = JSON.parse(message.data); + // // TODO 关闭会话 + // this.client?.close(); + // } } // 上传文件 private async uploadFile(item: SftpTransferItem) { const file = item.file; - // TODO 发送开始 + // 发送开始上传信息 + this.client?.send(JSON.stringify({ + type: TransferOperatorType.UPLOAD_START, + path: getPath(item.parentPath + '/' + item.name), + hostId: item.hostId + })); + // TODO 等待处理结果 吧错误信息展示出来 + try { + await this.awaitProcessedThrow(); + } catch (ex: any) { + console.log(ex); + item.status = TransferStatus.ERROR; + item.errorMessage = ex.message; + return; + } // 计算分片数量 const totalBlock = Math.ceil(file.size / BLOCK_SIZE); // 分片上传 for (let i = 0; i < totalBlock; i++) { - // TODO wait ACK + // 读取数据 const start = i * BLOCK_SIZE; const end = Math.min(file.size, start + BLOCK_SIZE); @@ -127,9 +139,9 @@ export default class SftpTransferManager implements ISftpTransferManager { reader.onerror = (error) => reject(error); reader.readAsArrayBuffer(chunk); }); - // 上传 TODO - console.log(arrayBuffer); this.client?.send(arrayBuffer as ArrayBuffer); + // TODO 等待处理结果 + await this.awaitProcessedThrow(); } // TODO 发送 END } @@ -139,4 +151,27 @@ export default class SftpTransferManager implements ISftpTransferManager { // TODO } + // 等待处理完成 + private async awaitProcessedThrow() { + for (let i = 0; i < 100; i++) { + await sleep(50); + if (this.resp) { + break; + } + } + const resp = this.resp; + // const resp = undefined; + this.resp = undefined; + // 抛出异常 + if (resp) { + if (resp.success) { + return; + } else { + throw new Error(resp.msg || '处理失败'); + } + } else { + throw new Error('处理超时'); + } + } + } diff --git a/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts b/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts index c492d544..f557dd9f 100644 --- a/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts +++ b/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts @@ -299,6 +299,13 @@ export const TransferType = { DOWNLOAD: 'download' }; +// 传输操作类型 +export const TransferOperatorType = { + PROCESSED: 'processed', + UPLOAD_START: 'upload_start', + UPLOAD_FINISH: 'upload_finish' +}; + // 打开 sshSettingModal key export const openSshSettingModalKey = Symbol(); diff --git a/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts b/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts index 5d5b2f00..6930b000 100644 --- a/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts +++ b/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts @@ -384,5 +384,15 @@ export interface SftpTransferItem { currentSize: number, totalSize: number; status: string; + errorMessage?: string; file: File; } + +// 传输操作响应 +export interface TransferOperatorResponse { + type: string; + fileId?: string; + hostId?: number; + success: boolean; + msg?: string; +}