diff --git a/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java b/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java index e520dd30..3ccfc298 100644 --- a/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java +++ b/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java @@ -81,4 +81,8 @@ public interface ErrorMessage { String OPERATE_ERROR = "操作失败"; + String UNKNOWN_TYPE = "未知类型"; + + String FILE_ABSENT = "文件不存在"; + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java index 27dcf65a..d6b9e7e2 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java @@ -27,4 +27,16 @@ public interface AssetThreadPools { .allowCoreThreadTimeout(true) .build(); + /** + * SFTP 下载线程池 + */ + ThreadPoolExecutor SFTP_DOWNLOAD_SCHEDULER = ExecutorBuilder.create() + .namedThreadFactory("sftp-download-thread-") + .corePoolSize(1) + .maxPoolSize(Integer.MAX_VALUE) + .keepAliveTime(Const.MS_S_60) + .workQueue(new SynchronousQueue<>()) + .allowCoreThreadTimeout(true) + .build(); + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java index 7094af08..b73e5309 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/InputTypeEnum.java @@ -124,12 +124,12 @@ public enum InputTypeEnum { SftpChangeModRequest.class), /** - * SFTP 下载文件夹 flat + * SFTP 下载文件夹展开文件 */ - SFTP_DOWNLOAD_DIRECTORY_FLAT("df", - SftpDownloadDirectoryFlatHandler.class, + SFTP_DOWNLOAD_FLAT_DIRECTORY("df", + SftpDownloadFlatDirectoryHandler.class, new String[]{"type", "sessionId", "currentPath", "path"}, - SftpDownloadDirectoryFlatRequest.class), + SftpDownloadFlatDirectoryRequest.class), /** * SFTP 获取内容 diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/OutputTypeEnum.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/OutputTypeEnum.java index 7f52e06d..9c414fcc 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/OutputTypeEnum.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/enums/OutputTypeEnum.java @@ -76,9 +76,9 @@ public enum OutputTypeEnum { SFTP_CHMOD("cm", "${type}|${sessionId}|${result}|${msg}"), /** - * SFTP 下载文件夹 flat + * SFTP 下载文件夹展开文件 */ - SFTP_DOWNLOAD_DIRECTORY_FLAT("df", "${type}|${sessionId}|${currentPath}|${body}"), + SFTP_DOWNLOAD_FLAT_DIRECTORY("df", "${type}|${sessionId}|${currentPath}|${body}"), /** * SFTP 获取文件内容 diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadDirectoryFlatHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadFlatDirectoryHandler.java similarity index 54% rename from orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadDirectoryFlatHandler.java rename to orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadFlatDirectoryHandler.java index e3f64ba7..b2624387 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadDirectoryFlatHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpDownloadFlatDirectoryHandler.java @@ -1,16 +1,22 @@ package com.orion.ops.module.asset.handler.host.terminal.handler; +import com.alibaba.fastjson.JSON; +import com.orion.lang.utils.collect.Lists; import com.orion.ops.framework.common.enums.BooleanBit; import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum; -import com.orion.ops.module.asset.handler.host.terminal.model.request.SftpDownloadDirectoryFlatRequest; +import com.orion.ops.module.asset.handler.host.terminal.model.request.SftpDownloadFlatDirectoryRequest; import com.orion.ops.module.asset.handler.host.terminal.model.response.SftpDownloadDirectoryFlatResponse; +import com.orion.ops.module.asset.handler.host.terminal.model.response.SftpFileVO; import com.orion.ops.module.asset.handler.host.terminal.session.ISftpSession; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; +import java.util.Arrays; +import java.util.List; + /** - * sftp 下载文件夹 flat + * sftp 下载文件夹展开文件 * * @author Jiahang Li * @version 1.0.0 @@ -18,31 +24,30 @@ import org.springframework.web.socket.WebSocketSession; */ @Slf4j @Component -public class SftpDownloadDirectoryFlatHandler extends AbstractTerminalHandler { +public class SftpDownloadFlatDirectoryHandler extends AbstractTerminalHandler { @Override - public void handle(WebSocketSession channel, SftpDownloadDirectoryFlatRequest payload) { + public void handle(WebSocketSession channel, SftpDownloadFlatDirectoryRequest payload) { // 获取会话 ISftpSession session = terminalManager.getSession(channel.getId(), payload.getSessionId()); - String path = payload.getPath(); - log.info("SftpDownloadDirectoryFlatHandler-handle session: {}, path: {}", payload.getSessionId(), path); + String[] paths = payload.getPath().split("\\|"); + log.info("SftpDownloadFlatDirectoryHandler-handle session: {}, paths: {}", payload.getSessionId(), Arrays.toString(paths)); Exception ex = null; - // 获取文件夹内的全部文件 + List files = Lists.empty(); + // 展开文件夹内的全部文件 try { - // TODO - + files = session.flatDirectory(paths); } catch (Exception e) { - log.error("SftpDownloadDirectoryFlatHandler-handle error", e); + log.error("SftpDownloadFlatDirectoryHandler-handle error", e); ex = e; } // 返回 this.send(channel, - OutputTypeEnum.SFTP_DOWNLOAD_DIRECTORY_FLAT, + OutputTypeEnum.SFTP_DOWNLOAD_FLAT_DIRECTORY, SftpDownloadDirectoryFlatResponse.builder() .sessionId(payload.getSessionId()) - .currentPath(payload.getPath()) - // TODO - .body("") + .currentPath(payload.getCurrentPath()) + .body(JSON.toJSONString(files)) .result(BooleanBit.of(ex == null).getValue()) .msg(this.getErrorMessage(ex)) .build()); diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpRemoveHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpRemoveHandler.java index 17357bb1..dd34158b 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpRemoveHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/SftpRemoveHandler.java @@ -9,6 +9,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; +import java.util.Arrays; + /** * sftp 删除文件 * @@ -25,7 +27,7 @@ public class SftpRemoveHandler extends AbstractTerminalHandler // 获取会话 ISftpSession session = terminalManager.getSession(channel.getId(), payload.getSessionId()); String[] paths = payload.getPath().split("\\|"); - log.info("SftpRemoveHandler-handle session: {}, path: {}", payload.getSessionId(), paths); + log.info("SftpRemoveHandler-handle session: {}, path: {}", payload.getSessionId(), Arrays.toString(paths)); Exception ex = null; // 删除 try { diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadDirectoryFlatRequest.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadFlatDirectoryRequest.java similarity index 75% rename from orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadDirectoryFlatRequest.java rename to orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadFlatDirectoryRequest.java index 738b96d5..d04d8319 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadDirectoryFlatRequest.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/model/request/SftpDownloadFlatDirectoryRequest.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; /** - * sftp 下载文件夹 flat 实体对象 + * sftp 下载文件夹展开文件 实体对象 *

* i|eff00a1|currentPath|path * @@ -21,10 +21,10 @@ import lombok.experimental.SuperBuilder; @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode(callSuper = true) -@Schema(name = "SftpDownloadDirectoryFlatRequest", description = "sftp 下载文件夹 flat 实体对象") -public class SftpDownloadDirectoryFlatRequest extends SftpBaseRequest { +@Schema(name = "SftpDownloadDirectoryFlatRequest", description = "sftp 下载文件夹展开文件 实体对象") +public class SftpDownloadFlatDirectoryRequest extends SftpBaseRequest { @Schema(description = "当前路径") - private Integer currentPath; + private String currentPath; } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/ISftpSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/ISftpSession.java index e17392b7..d3b16ccb 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/ISftpSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/ISftpSession.java @@ -78,6 +78,14 @@ public interface ISftpSession extends ITerminalSession { */ void chmod(String path, int mod); + /** + * 展开文件夹内的所有文件 + * + * @param paths paths + * @return files + */ + List flatDirectory(String[] paths); + /** * 获取内容 * diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java index 215173a4..4650a501 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/session/SftpSession.java @@ -16,10 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.io.InputStream; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; /** @@ -107,6 +104,15 @@ public class SftpSession extends TerminalSession implements ISftpSession { executor.changeMode(path, mod); } + @Override + public List flatDirectory(String[] paths) { + return Arrays.stream(paths) + .map(s -> executor.listFiles(s, true, false)) + .flatMap(Collection::stream) + .map(SftpSession::fileMapping) + .collect(Collectors.toList()); + } + @Override public String getContent(String path) { path = Valid.checkNormalize(path); diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java index b4813367..f9e3d8d2 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferOperatorType.java @@ -17,20 +17,36 @@ public enum TransferOperatorType { /** * 开始上传 */ - UPLOAD_START("uploadStart"), + UPLOAD_START(TransferOperatorType.UPLOAD, "uploadStart"), /** * 上传完成 */ - UPLOAD_FINISH("uploadFinish"), + UPLOAD_FINISH(TransferOperatorType.UPLOAD, "uploadFinish"), /** * 上传失败 */ - UPLOAD_ERROR("uploadError"), + UPLOAD_ERROR(TransferOperatorType.UPLOAD, "uploadError"), + + /** + * 开始下载 + */ + DOWNLOAD_START(TransferOperatorType.DOWNLOAD, "downloadStart"), + + /** + * 中断下载 + */ + DOWNLOAD_ABORT(TransferOperatorType.DOWNLOAD, "downloadAbort"), ; + public static final String UPLOAD = "UPLOAD"; + + public static final String DOWNLOAD = "DOWNLOAD"; + + private final String operator; + private final String type; public static TransferOperatorType of(String type) { diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferReceiverType.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferReceiverType.java index 95902ca4..8b58095a 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferReceiverType.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/enums/TransferReceiverType.java @@ -14,16 +14,36 @@ import lombok.Getter; @AllArgsConstructor public enum TransferReceiverType { - /** - * 请求下一块上传数据 - */ - NEXT_BLOCK("nextBlock"), - /** * 请求下一个传输任务 */ NEXT_TRANSFER("nextTransfer"), + /** + * 请求下一块上传数据 + */ + UPLOAD_NEXT_BLOCK("uploadNextBlock"), + + /** + * 上传完成 + */ + UPLOAD_FINISH("uploadFinish"), + + /** + * 上传失败 + */ + UPLOAD_ERROR("uploadError"), + + /** + * 下载完成 + */ + DOWNLOAD_FINISH("downloadFinish"), + + /** + * 下载失败 + */ + DOWNLOAD_ERROR("downloadError"), + ; private final String type; diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java index b2ab0874..da3ff2de 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/handler/TransferHandler.java @@ -1,27 +1,22 @@ package com.orion.ops.module.asset.handler.host.transfer.handler; -import com.alibaba.fastjson.JSON; -import com.orion.lang.exception.argument.InvalidArgumentException; +import com.orion.lang.utils.Exceptions; import com.orion.lang.utils.io.Streams; import com.orion.net.host.SessionStore; -import com.orion.ops.framework.common.constant.Const; import com.orion.ops.framework.common.constant.ErrorMessage; import com.orion.ops.framework.common.constant.ExtraFieldConst; -import com.orion.ops.framework.websocket.core.utils.WebSockets; import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; import com.orion.ops.module.asset.enums.HostConnectTypeEnum; import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType; import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType; import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest; -import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse; -import com.orion.ops.module.asset.handler.host.transfer.session.ITransferHostSession; -import com.orion.ops.module.asset.handler.host.transfer.session.TransferHostSession; +import com.orion.ops.module.asset.handler.host.transfer.session.*; +import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils; import com.orion.ops.module.asset.service.HostTerminalService; import com.orion.spring.SpringHolder; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; -import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** @@ -48,7 +43,7 @@ public class TransferHandler implements ITransferHandler { /** * 会话列表 */ - private final ConcurrentHashMap sessions; + private final ConcurrentHashMap sessions; public TransferHandler(WebSocketSession channel) { this.channel = channel; @@ -61,22 +56,30 @@ public class TransferHandler implements ITransferHandler { // 解析消息类型 TransferOperatorType type = TransferOperatorType.of(payload.getType()); // 获取会话 - if (!this.getAndInitSession(payload)) { + if (!this.getAndInitSession(payload, type)) { return; } // 处理消息 switch (type) { case UPLOAD_START: - // 准备上传 - this.uploadStart(payload); + // 开始上传 + ((IUploadSession) currentSession).startUpload(payload.getPath()); break; case UPLOAD_FINISH: // 上传完成 - this.uploadFinish(); + ((IUploadSession) currentSession).uploadFinish(); break; case UPLOAD_ERROR: // 上传失败 - this.uploadError(); + ((IUploadSession) currentSession).uploadError(); + break; + case DOWNLOAD_START: + // 开始下载 + ((IDownloadSession) currentSession).startDownload(payload.getPath()); + break; + case DOWNLOAD_ABORT: + // 中断下载 + ((IDownloadSession) currentSession).abortDownload(); break; default: break; @@ -85,119 +88,49 @@ public class TransferHandler implements ITransferHandler { @Override public void putContent(byte[] content) { - try { - // 写入内容 - currentSession.putContent(content); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_BLOCK, null); - } catch (IOException e) { - log.error("TransferHandler.putContent error", e); - // 写入完成 - currentSession.putFinish(); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e); - } - } - - /** - * 准备上传 - * - * @param payload payload - */ - private void uploadStart(TransferOperatorRequest payload) { - try { - // 开始上传 - currentSession.startUpload(payload.getPath()); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_BLOCK, null); - } catch (Exception e) { - log.error("TransferHandler.uploadStart error", e); - // 传输完成 - currentSession.putFinish(); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e); - } - } - - /** - * 上传完成 - */ - private void uploadFinish() { - currentSession.putFinish(); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_TRANSFER, null); - } - - /** - * 上传失败 - */ - private void uploadError() { - currentSession.putFinish(); - // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_TRANSFER, new InvalidArgumentException(Const.EMPTY)); + ((IUploadSession) currentSession).putContent(content); } /** * 获取并且初始化会话 * * @param payload payload + * @param type type * @return success */ - private boolean getAndInitSession(TransferOperatorRequest payload) { + private boolean getAndInitSession(TransferOperatorRequest payload, TransferOperatorType type) { Long hostId = payload.getHostId(); + String sessionKey = hostId + "_" + type.getOperator(); try { // 获取会话 - ITransferHostSession session = sessions.get(hostId); + ITransferHostSession session = sessions.get(sessionKey); if (session == null) { // 获取主机信息 HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId, this.userId, HostConnectTypeEnum.SFTP); SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo); // 打开会话并初始化 - session = new TransferHostSession(connectInfo, sessionStore); + if (TransferOperatorType.UPLOAD.equals(type.getOperator())) { + // 上传操作 + session = new UploadSession(connectInfo, sessionStore, this.channel); + } else if (TransferOperatorType.DOWNLOAD.equals(type.getOperator())) { + // 下载操作 + session = new DownloadSession(connectInfo, sessionStore, this.channel); + } else { + throw Exceptions.invalidArgument(ErrorMessage.UNKNOWN_TYPE); + } session.init(); - this.currentSession = session; - sessions.put(hostId, session); + sessions.put(sessionKey, session); } + this.currentSession = session; return true; } catch (Exception e) { log.error("TransferHandler.getAndInitSession error", e); // 响应结果 - this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e); + TransferUtils.sendMessage(this.channel, TransferReceiverType.NEXT_TRANSFER, e); return false; } } - /** - * 发送消息 - * - * @param type type - * @param ex ex - */ - private void sendMessage(TransferReceiverType type, Exception ex) { - TransferOperatorResponse resp = TransferOperatorResponse.builder() - .type(type.getType()) - .success(ex == null) - .msg(this.getErrorMessage(ex)) - .build(); - WebSockets.sendText(this.channel, JSON.toJSONString(resp)); - } - - /** - * 获取错误信息 - * - * @param ex ex - * @return msg - */ - private String getErrorMessage(Exception ex) { - if (ex == null) { - return null; - } - if (ex instanceof InvalidArgumentException) { - return ex.getMessage(); - } - return ErrorMessage.OPERATE_ERROR; - } - @Override public void close() { sessions.values().forEach(Streams::close); diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java new file mode 100644 index 00000000..3b733915 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/DownloadSession.java @@ -0,0 +1,96 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +import com.orion.lang.utils.Valid; +import com.orion.lang.utils.io.Streams; +import com.orion.net.host.SessionStore; +import com.orion.net.host.sftp.SftpFile; +import com.orion.ops.framework.common.constant.Const; +import com.orion.ops.framework.common.constant.ErrorMessage; +import com.orion.ops.module.asset.define.AssetThreadPools; +import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; +import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.InputStream; + +/** + * 下载会话实现 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/22 22:25 + */ +@Slf4j +public class DownloadSession extends TransferHostSession implements IDownloadSession { + + private InputStream inputStream; + + public DownloadSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) { + super(connectInfo, sessionStore, channel); + } + + @Override + public void startDownload(String path) { + try { + // 检查连接 + this.init(); + // 检查文件是否存在 + SftpFile file = executor.getFile(path); + Valid.notNull(file, ErrorMessage.FILE_ABSENT); + // 打开输入流 + this.inputStream = executor.openInputStream(path); + } catch (Exception e) { + log.error("DownloadSession.uploadStart error", e); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e); + return; + } + // 异步读取文件内容 FIXME bug + AssetThreadPools.TERMINAL_SCHEDULER.execute(() -> { + try { + byte[] buffer = new byte[Const.BUFFER_KB_32]; + int len; + // 响应文件内容 + while ((len = this.inputStream.read(buffer)) != -1) { + this.channel.sendMessage(new BinaryMessage(buffer, 0, len, true)); + } + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null); + } catch (Exception e) { + log.error("DownloadSession.transfer error", e); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e); + } finally { + this.closeStream(); + } + }); + } + + @Override + public void abortDownload() { + log.info("DownloadSession.abortDownload"); + // 关闭流 + Streams.close(inputStream); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null); + } + + @Override + public void close() { + super.close(); + this.closeStream(); + } + + /** + * 关闭流 + */ + private void closeStream() { + // 关闭流 + Streams.close(inputStream); + this.inputStream = null; + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IDownloadSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IDownloadSession.java new file mode 100644 index 00000000..2dd84601 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IDownloadSession.java @@ -0,0 +1,24 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +/** + * 下载会话定义 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/22 22:25 + */ +public interface IDownloadSession { + + /** + * 开始下载 + * + * @param path path + */ + void startDownload(String path); + + /** + * 停止下载 + */ + void abortDownload(); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java index 57c5d651..93e01d1a 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/ITransferHostSession.java @@ -2,8 +2,6 @@ package com.orion.ops.module.asset.handler.host.transfer.session; import com.orion.lang.able.SafeCloseable; -import java.io.IOException; - /** * 主机传输会话定义 * @@ -18,25 +16,4 @@ public interface ITransferHostSession extends SafeCloseable { */ void init(); - /** - * 开始上传 - * - * @param path path - * @throws IOException IOException - */ - void startUpload(String path) throws IOException; - - /** - * 写入内容 - * - * @param bytes bytes - * @throws IOException IOException - */ - void putContent(byte[] bytes) throws IOException; - - /** - * 写入完成 - */ - void putFinish(); - } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IUploadSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IUploadSession.java new file mode 100644 index 00000000..e8e92ea7 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/IUploadSession.java @@ -0,0 +1,36 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +/** + * 上传会话定义 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/22 22:03 + */ +public interface IUploadSession extends ITransferHostSession { + + /** + * 开始上传 + * + * @param path path + */ + void startUpload(String path); + + /** + * 写入内容 + * + * @param bytes bytes + */ + void putContent(byte[] bytes); + + /** + * 上传完成 + */ + void uploadFinish(); + + /** + * 上传失败 + */ + void uploadError(); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java index 29a473be..a88fbab8 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/TransferHostSession.java @@ -3,11 +3,8 @@ package com.orion.ops.module.asset.handler.host.transfer.session; import com.orion.lang.utils.io.Streams; import com.orion.net.host.SessionStore; import com.orion.net.host.sftp.SftpExecutor; -import com.orion.net.host.sftp.SftpFile; import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; - -import java.io.IOException; -import java.io.OutputStream; +import org.springframework.web.socket.WebSocketSession; /** * 主机传输会话实现 @@ -16,19 +13,20 @@ import java.io.OutputStream; * @version 1.0.0 * @since 2024/2/21 21:12 */ -public class TransferHostSession implements ITransferHostSession { +public abstract class TransferHostSession implements ITransferHostSession { - private final HostTerminalConnectDTO connectInfo; + protected final HostTerminalConnectDTO connectInfo; - private final SessionStore sessionStore; + protected final SessionStore sessionStore; - private SftpExecutor executor; + protected final WebSocketSession channel; - private OutputStream currentOutputStream; + protected SftpExecutor executor; - public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore) { + public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) { this.connectInfo = connectInfo; this.sessionStore = sessionStore; + this.channel = channel; } @Override @@ -45,35 +43,10 @@ public class TransferHostSession implements ITransferHostSession { } } - @Override - public void startUpload(String path) throws IOException { - // 检查连接 - this.init(); - SftpFile file = executor.getFile(path); - if (file != null) { - // 文件存在则重命名 - executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis()); - } - // 打开输出流 - this.currentOutputStream = executor.openOutputStream(path); - } - - @Override - public void putContent(byte[] bytes) throws IOException { - currentOutputStream.write(bytes); - } - - @Override - public void putFinish() { - Streams.close(currentOutputStream); - this.currentOutputStream = null; - } - @Override public void close() { Streams.close(executor); Streams.close(sessionStore); - Streams.close(currentOutputStream); } } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/UploadSession.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/UploadSession.java new file mode 100644 index 00000000..975266f2 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/session/UploadSession.java @@ -0,0 +1,99 @@ +package com.orion.ops.module.asset.handler.host.transfer.session; + +import com.orion.lang.exception.argument.InvalidArgumentException; +import com.orion.lang.utils.io.Streams; +import com.orion.net.host.SessionStore; +import com.orion.net.host.sftp.SftpFile; +import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; +import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * 上传会话实现 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/22 22:04 + */ +@Slf4j +public class UploadSession extends TransferHostSession implements IUploadSession { + + private OutputStream outputStream; + + public UploadSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) { + super(connectInfo, sessionStore, channel); + } + + @Override + public void startUpload(String path) { + try { + // 检查连接 + this.init(); + // 检查文件是否存在 + SftpFile file = executor.getFile(path); + if (file != null) { + // 文件存在则重命名 + executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis()); + } + // 打开输出流 + this.outputStream = executor.openOutputStream(path); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null); + } catch (Exception e) { + log.error("UploadSession.uploadStart error", e); + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e); + } + } + + @Override + public void putContent(byte[] bytes) { + try { + // 写入内容 + outputStream.write(bytes); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null); + } catch (IOException e) { + log.error("UploadSession.putContent error", e); + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e); + } + } + + @Override + public void uploadFinish() { + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_FINISH, null); + } + + @Override + public void uploadError() { + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, new InvalidArgumentException((String) null)); + } + + @Override + public void close() { + super.close(); + this.closeStream(); + } + + /** + * 关闭流 + */ + private void closeStream() { + // 关闭流 + Streams.close(outputStream); + this.outputStream = null; + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/utils/TransferUtils.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/utils/TransferUtils.java new file mode 100644 index 00000000..08b8641e --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/transfer/utils/TransferUtils.java @@ -0,0 +1,55 @@ +package com.orion.ops.module.asset.handler.host.transfer.utils; + +import com.alibaba.fastjson.JSON; +import com.orion.lang.exception.argument.InvalidArgumentException; +import com.orion.ops.framework.common.constant.ErrorMessage; +import com.orion.ops.framework.websocket.core.utils.WebSockets; +import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse; +import org.springframework.web.socket.WebSocketSession; + +/** + * 传输工具类 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/22 22:14 + */ +public class TransferUtils { + + private TransferUtils() { + } + + /** + * 发送消息 + * + * @param channel channel + * @param type type + * @param ex ex + */ + public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex) { + TransferOperatorResponse resp = TransferOperatorResponse.builder() + .type(type.getType()) + .success(ex == null) + .msg(TransferUtils.getErrorMessage(ex)) + .build(); + WebSockets.sendText(channel, JSON.toJSONString(resp)); + } + + /** + * 获取错误信息 + * + * @param ex ex + * @return msg + */ + public static String getErrorMessage(Exception ex) { + if (ex == null) { + return null; + } + if (ex instanceof InvalidArgumentException) { + return ex.getMessage(); + } + return ErrorMessage.OPERATE_ERROR; + } + +} diff --git a/orion-ops-ui/src/utils/file.ts b/orion-ops-ui/src/utils/file.ts index d2718e51..0110282c 100644 --- a/orion-ops-ui/src/utils/file.ts +++ b/orion-ops-ui/src/utils/file.ts @@ -57,6 +57,14 @@ export function getPath(path: string) { .replace(new RegExp('/+', 'g'), '/'); } +/** + * 获取文件名 + */ +export function getFileName(path: string) { + path = getPath(path); + return path.substring(path.lastIndexOf('/') + 1); +} + /** * 获取父级路径 */ diff --git a/orion-ops-ui/src/views/host/terminal/components/sftp/sftp-view.vue b/orion-ops-ui/src/views/host/terminal/components/sftp/sftp-view.vue index 8555bc09..579dbd38 100644 --- a/orion-ops-ui/src/views/host/terminal/components/sftp/sftp-view.vue +++ b/orion-ops-ui/src/views/host/terminal/components/sftp/sftp-view.vue @@ -156,9 +156,11 @@ // 添加普通文件到下载队列 const normalFiles = files.filter(s => !s.isDir); transferManager.addDownload(props.tab.hostId as number, currentPath.value, normalFiles); - // 将文件夹转为普通文件 - const directoryFiles = files.filter(s => s.isDir); - // TODO + // 将文件夹展开普通文件 + const directoryPaths = files.filter(s => s.isDir).map(s => s.path); + if (directoryPaths.length) { + session.value?.downloadFlatDirectory(currentPath.value, directoryPaths); + } }; // 连接成功回调 @@ -224,6 +226,12 @@ Message.success('保存成功'); }; + // 接收下载文件夹展开文件响应 + const resolveDownloadFlatDirectory = (currentPath: string, list: Array) => { + setTableLoading(false); + transferManager.addDownload(props.tab.hostId as number, currentPath, list); + }; + // 初始化会话 onMounted(async () => { // 创建终端处理器 @@ -236,6 +244,7 @@ resolveSftpMove: resolveFileAction, resolveSftpRemove: resolveFileAction, resolveSftpChmod: resolveFileAction, + resolveDownloadFlatDirectory, resolveSftpGetContent, resolveSftpSetContent, }); diff --git a/orion-ops-ui/src/views/host/terminal/components/transfer/transfer-drawer.vue b/orion-ops-ui/src/views/host/terminal/components/transfer/transfer-drawer.vue index 9b3c3949..ef605323 100644 --- a/orion-ops-ui/src/views/host/terminal/components/transfer/transfer-drawer.vue +++ b/orion-ops-ui/src/views/host/terminal/components/transfer/transfer-drawer.vue @@ -190,6 +190,7 @@ text-overflow: ellipsis; width: fit-content; max-width: 100%; + white-space: nowrap; } .transfer-progress, .target-path, .error-message { diff --git a/orion-ops-ui/src/views/host/terminal/handler/sftp-session.ts b/orion-ops-ui/src/views/host/terminal/handler/sftp-session.ts index cf9b5f30..45213dbe 100644 --- a/orion-ops-ui/src/views/host/terminal/handler/sftp-session.ts +++ b/orion-ops-ui/src/views/host/terminal/handler/sftp-session.ts @@ -107,6 +107,16 @@ export default class SftpSession implements ISftpSession { }); }; + // 下载文件夹展开文件 + downloadFlatDirectory(currentPath: string, path: string[]) { + this.resolver.setLoading(true); + this.channel.send(InputProtocol.SFTP_DOWNLOAD_FLAT_DIRECTORY, { + sessionId: this.sessionId, + currentPath, + path: path.join('|') + }); + } + // 获取内容 getContent(path: string) { this.channel.send(InputProtocol.SFTP_GET_CONTENT, { diff --git a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts new file mode 100644 index 00000000..da44671d --- /dev/null +++ b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts @@ -0,0 +1,79 @@ +import type { ISftpTransferDownloader, SftpTransferItem } from '../types/terminal.type'; +import { TransferOperatorType, TransferStatus } from '../types/terminal.const'; +import { getFileName, getPath } from '@/utils/file'; +import { saveAs } from 'file-saver'; + +// sftp 上传器实现 +export default class SftpTransferDownloader implements ISftpTransferDownloader { + + public abort: boolean; + + private blobArr: Array; + private client: WebSocket; + private item: SftpTransferItem; + + constructor(item: SftpTransferItem, client: WebSocket) { + this.abort = false; + this.blobArr = []; + this.item = item; + this.client = client; + } + + // 开始下载 + startDownload() { + this.item.status = TransferStatus.TRANSFERRING; + // 发送开始下载信息 + this.client?.send(JSON.stringify({ + type: TransferOperatorType.DOWNLOAD_START, + path: getPath(this.item.parentPath + '/' + this.item.name), + hostId: this.item.hostId + })); + } + + // 接收 blob + resolveBlob(blob: Blob) { + this.blobArr.push(blob); + this.item.currentSize += blob.size; + } + + // 下载完成 + downloadFinish() { + console.log(this.abort); + if (this.abort) { + // 中断则不触发下载 + return; + } + // fixme bug + try { + console.log('saveAs'); + // 触发下载 + saveAs(new Blob(this.blobArr, { + type: 'application/octet-stream' + }), getFileName(this.item.name)); + this.item.status = TransferStatus.SUCCESS; + } catch (e) { + this.item.status = TransferStatus.ERROR; + this.item.errorMessage = '保存失败'; + } finally { + this.blobArr = []; + } + } + + // 下载失败 + downloadError(msg: string | undefined) { + this.blobArr = []; + this.item.status = TransferStatus.ERROR; + this.item.errorMessage = msg || '下载失败'; + } + + // 下载中断 + downloadAbort() { + this.abort = true; + // 发送下载中断信息 + this.client?.send(JSON.stringify({ + type: TransferOperatorType.DOWNLOAD_ABORT, + hostId: this.item.hostId + })); + } + +} diff --git a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts index 29beb4cf..1c4b5da9 100644 --- a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts +++ b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts @@ -1,10 +1,11 @@ import type { ISftpTransferManager, ISftpTransferUploader, SftpTransferItem } from '../types/terminal.type'; -import { SftpFile, TransferOperatorResponse } from '../types/terminal.type'; +import { ISftpTransferDownloader, SftpFile, TransferOperatorResponse } from '../types/terminal.type'; import { TransferReceiverType, TransferStatus, TransferType } from '../types/terminal.const'; import { Message } from '@arco-design/web-vue'; import { getTerminalAccessToken } from '@/api/asset/host-terminal'; -import SftpTransferUploader from '@/views/host/terminal/handler/sftp-transfer-uploader'; import { nextId } from '@/utils'; +import SftpTransferUploader from './sftp-transfer-uploader'; +import SftpTransferDownloader from './sftp-transfer-downloader'; export const wsBase = import.meta.env.VITE_WS_BASE_URL; @@ -19,6 +20,8 @@ export default class SftpTransferManager implements ISftpTransferManager { private currentUploader?: ISftpTransferUploader; + private currentDownloader?: ISftpTransferDownloader; + public transferList: Array; constructor() { @@ -57,7 +60,8 @@ export default class SftpTransferManager implements ISftpTransferManager { fileId: nextId(10), type: TransferType.DOWNLOAD, hostId: hostId, - name: s.path.substring(currentPath.length), + name: s.path.substring(currentPath.length + 1), + parentPath: currentPath, currentSize: 0, totalSize: s.size, status: TransferStatus.WAITING, @@ -81,6 +85,8 @@ export default class SftpTransferManager implements ISftpTransferManager { // 传输中则中断传输 if (this.currentUploader) { this.currentUploader.uploadAbort(); + } else if (this.currentDownloader) { + this.currentDownloader.downloadAbort(); } } // 从列表中移除 @@ -121,6 +127,7 @@ export default class SftpTransferManager implements ISftpTransferManager { // 传输下一条任务 private transferNextItem() { this.currentUploader = undefined; + this.currentDownloader = undefined; // 释放内存 if (this.currentItem) { this.currentItem.file = null as unknown as File; @@ -144,13 +151,27 @@ export default class SftpTransferManager implements ISftpTransferManager { // 接收消息 private async resolveMessage(message: MessageEvent) { - const data = JSON.parse(message.data) as TransferOperatorResponse; - if (data.type === TransferReceiverType.NEXT_BLOCK) { - // 接收下一块上传数据 - await this.resolveNextBlock(); - } else if (data.type === TransferReceiverType.NEXT_TRANSFER) { - // 接收接收下一个传输任务处理完成 - this.resolveNextTransfer(data); + if (message.data instanceof Blob) { + // 二进制消息 下载数据 + this.resolveDownloadBlob(message.data); + } else { + // 文本消息 + const data = JSON.parse(message.data) as TransferOperatorResponse; + if (data.type === TransferReceiverType.NEXT_TRANSFER + || data.type === TransferReceiverType.UPLOAD_FINISH + || data.type === TransferReceiverType.UPLOAD_ERROR) { + // 执行下一个传输任务 + this.resolveNextTransfer(data); + } else if (data.type === TransferReceiverType.UPLOAD_NEXT_BLOCK) { + // 接收下一块上传数据 + await this.resolveUploadNextBlock(); + } else if (data.type === TransferReceiverType.DOWNLOAD_FINISH) { + // 下载完成 + this.resolveDownloadFinish(); + } else if (data.type === TransferReceiverType.DOWNLOAD_ERROR) { + // 下载失败 + this.resolveDownloadError(data.msg); + } } } @@ -164,11 +185,28 @@ export default class SftpTransferManager implements ISftpTransferManager { // 下载文件 private uploadDownload() { - // TODO + // 创建下载器 + this.currentDownloader = new SftpTransferDownloader(this.currentItem as SftpTransferItem, this.client as WebSocket); + // 开始下载 + this.currentDownloader.startDownload(); } - // 接收下一块上传数据 - private async resolveNextBlock() { + // 接收下一个传输任务响应 + private resolveNextTransfer(data: TransferOperatorResponse) { + if (this.currentItem) { + if (data.success) { + this.currentItem.status = TransferStatus.SUCCESS; + } else { + this.currentItem.status = TransferStatus.ERROR; + this.currentItem.errorMessage = data.msg || '传输失败'; + } + } + // 开始下一个传输任务 + this.transferNextItem(); + } + + // 接收下一块上传数据响应 + private async resolveUploadNextBlock() { // 只可能为上传并且成功 if (!this.currentUploader) { return; @@ -189,16 +227,21 @@ export default class SftpTransferManager implements ISftpTransferManager { } } - // 接收下一个传输任务 - private resolveNextTransfer(data: TransferOperatorResponse) { - if (this.currentItem) { - if (data.success) { - this.currentItem.status = TransferStatus.SUCCESS; - } else { - this.currentItem.status = TransferStatus.ERROR; - this.currentItem.errorMessage = data.msg || '上传失败'; - } - } + // 接收下载数据 + private resolveDownloadBlob(blob: Blob) { + this.currentDownloader?.resolveBlob(blob); + } + + // 接收下载完成响应 + private resolveDownloadFinish() { + this.currentDownloader?.downloadFinish(); + // 开始下一个传输任务 + this.transferNextItem(); + } + + // 接收下载失败响应 + private resolveDownloadError(msg: string | undefined) { + this.currentDownloader?.downloadError(msg); // 开始下一个传输任务 this.transferNextItem(); } diff --git a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts index 07b71853..93e75cd7 100644 --- a/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts +++ b/orion-ops-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts @@ -7,8 +7,8 @@ export const BLOCK_SIZE = 1024 * 1024; // sftp 上传器实现 export default class SftpTransferUploader implements ISftpTransferUploader { - public abort: boolean; public finish: boolean; + public abort: boolean; private currentBlock: number; private totalBlock: number; private client: WebSocket; diff --git a/orion-ops-ui/src/views/host/terminal/handler/terminal-output-processor.ts b/orion-ops-ui/src/views/host/terminal/handler/terminal-output-processor.ts index 2d0e7681..a7fba33f 100644 --- a/orion-ops-ui/src/views/host/terminal/handler/terminal-output-processor.ts +++ b/orion-ops-ui/src/views/host/terminal/handler/terminal-output-processor.ts @@ -154,6 +154,13 @@ export default class TerminalOutputProcessor implements ITerminalOutputProcessor session && session.resolver.resolveSftpChmod(result, msg); } + // 处理 SFTP 下载文件夹展开文件 + processDownloadFlatDirectory({ sessionId, currentPath, body }: OutputPayload): void { + // 获取会话 + const session = this.sessionManager.getSession(sessionId); + session && session.resolver.resolveDownloadFlatDirectory(currentPath, JSON.parse(body)); + } + // 处理 SFTP 获取文件内容 processSftpGetContent({ sessionId, path, result, content }: OutputPayload): void { // 获取会话 diff --git a/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts b/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts index 73fa4823..9711b117 100644 --- a/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts +++ b/orion-ops-ui/src/views/host/terminal/types/terminal.const.ts @@ -304,12 +304,18 @@ export const TransferOperatorType = { UPLOAD_START: 'uploadStart', UPLOAD_FINISH: 'uploadFinish', UPLOAD_ERROR: 'uploadError', + DOWNLOAD_START: 'downloadStart', + DOWNLOAD_ABORT: 'downloadAbort', }; // 传输响应类型 export const TransferReceiverType = { - NEXT_BLOCK: 'nextBlock', NEXT_TRANSFER: 'nextTransfer', + UPLOAD_NEXT_BLOCK: 'uploadNextBlock', + UPLOAD_FINISH: 'uploadFinish', + UPLOAD_ERROR: 'uploadError', + DOWNLOAD_FINISH: 'downloadFinish', + DOWNLOAD_ERROR: 'downloadError', }; // 打开 sshSettingModal key diff --git a/orion-ops-ui/src/views/host/terminal/types/terminal.protocol.ts b/orion-ops-ui/src/views/host/terminal/types/terminal.protocol.ts index d1ed580a..37396696 100644 --- a/orion-ops-ui/src/views/host/terminal/types/terminal.protocol.ts +++ b/orion-ops-ui/src/views/host/terminal/types/terminal.protocol.ts @@ -60,6 +60,11 @@ export const InputProtocol = { type: 'cm', template: ['type', 'sessionId', 'path', 'mod'] }, + // SFTP 修改文件权限 + SFTP_DOWNLOAD_FLAT_DIRECTORY: { + type: 'df', + template: ['type', 'sessionId', 'currentPath', 'path'] + }, // SFTP 获取内容 SFTP_GET_CONTENT: { type: 'gc', @@ -140,6 +145,12 @@ export const OutputProtocol = { template: ['type', 'sessionId', 'result', 'msg'], processMethod: 'processSftpChmod' }, + // SFTP 修改文件权限 + SFTP_DOWNLOAD_FLAT_DIRECTORY: { + type: 'df', + template: ['type', 'sessionId', 'currentPath', 'body'], + processMethod: 'processDownloadFlatDirectory' + }, // SFTP 获取文件内容 SFTP_GET_CONTENT: { type: 'gc', diff --git a/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts b/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts index 57bd9fe4..10f846c0 100644 --- a/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts +++ b/orion-ops-ui/src/views/host/terminal/types/terminal.type.ts @@ -190,6 +190,8 @@ export interface ITerminalOutputProcessor { processSftpRemove: (payload: OutputPayload) => void; // 处理 SFTP 修改文件权限 processSftpChmod: (payload: OutputPayload) => void; + // 处理 SFTP 下载文件夹展开文件 + processDownloadFlatDirectory: (payload: OutputPayload) => void; // 处理 SFTP 获取文件内容 processSftpGetContent: (payload: OutputPayload) => void; // 处理 SFTP 修改文件内容 @@ -325,6 +327,8 @@ export interface ISftpSession extends ITerminalSession { remove: (path: string[]) => void; // 修改权限 chmod: (path: string, mod: number) => void; + // 下载文件夹展开文件 + downloadFlatDirectory: (currentPath: string, path: string[]) => void; // 获取内容 getContent: (path: string) => void; // 修改内容 @@ -349,6 +353,8 @@ export interface ISftpSessionResolver { resolveSftpRemove: (result: string, msg: string) => void; // 接收修改文件权限响应 resolveSftpChmod: (result: string, msg: string) => void; + // 接收下载文件夹展开文件响应 + resolveDownloadFlatDirectory: (currentPath: string, list: Array) => void; // 接收获取文件内容响应 resolveSftpGetContent: (path: string, result: string, content: string) => void; // 接收修改文件内容响应 @@ -400,6 +406,22 @@ export interface ISftpTransferUploader { uploadAbort: () => void; } +// sftp 下载器定义 +export interface ISftpTransferDownloader { + // 是否中断 + abort: boolean; + // 开始下载 + startDownload: () => void; + // 接收 blob + resolveBlob: (blob: Blob) => void; + // 下载完成 + downloadFinish: () => void; + // 下载失败 + downloadError: (msg: string | undefined) => void; + // 下载中断 + downloadAbort: () => void; +} + // sftp 上传文件项 export interface SftpTransferItem { fileId: string;