diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/TransferMessageDispatcher.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/TransferMessageDispatcher.java index 4af25861..6aa9d946 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/TransferMessageDispatcher.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/TransferMessageDispatcher.java @@ -50,7 +50,7 @@ public class TransferMessageDispatcher extends AbstractWebSocketHandler { // 获取处理器 ITransferHandler handler = hostTransferManager.getHandler(session.getId()); // 添加数据 - handler.putContent(message.getPayload().array()); + handler.handleMessage(message.getPayload().array()); } @Override diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperator.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperator.java new file mode 100644 index 00000000..6dc0bccc --- /dev/null +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperator.java @@ -0,0 +1,53 @@ +package com.orion.visor.module.asset.handler.host.transfer.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 传输操作类型 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 22:03 + */ +@Getter +@AllArgsConstructor +public enum TransferOperator { + + /** + * 开始传输 + */ + START("start"), + + /** + * 传输完成 + */ + FINISH("finish"), + + /** + * 传输失败 + */ + ERROR("error"), + + /** + * 传输中断 + */ + ABORT("abort"), + + ; + + private final String type; + + public static TransferOperator of(String type) { + if (type == null) { + return null; + } + for (TransferOperator value : values()) { + if (value.type.equals(type)) { + return value; + } + } + return null; + } + +} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperatorType.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperatorType.java deleted file mode 100644 index e8a4e5ae..00000000 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferOperatorType.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.orion.visor.module.asset.handler.host.transfer.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * 传输操作类型 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/21 22:03 - */ -@Getter -@AllArgsConstructor -public enum TransferOperatorType { - - /** - * 开始上传 - */ - UPLOAD_START(TransferOperatorType.UPLOAD, "uploadStart"), - - /** - * 上传完成 - */ - UPLOAD_FINISH(TransferOperatorType.UPLOAD, "uploadFinish"), - - /** - * 上传失败 - */ - UPLOAD_ERROR(TransferOperatorType.UPLOAD, "uploadError"), - - /** - * 初始化下载 - */ - DOWNLOAD_INIT(TransferOperatorType.DOWNLOAD, "downloadInit"), - - /** - * 中断下载 - */ - DOWNLOAD_ABORT(TransferOperatorType.DOWNLOAD, "downloadAbort"), - - ; - - public static final String UPLOAD = "UPLOAD"; - - public static final String DOWNLOAD = "DOWNLOAD"; - - private final String kind; - - private final String type; - - public static TransferOperatorType of(String type) { - if (type == null) { - return null; - } - for (TransferOperatorType value : values()) { - if (value.type.equals(type)) { - return value; - } - } - return null; - } - -} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiver.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiver.java new file mode 100644 index 00000000..c2e099a2 --- /dev/null +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiver.java @@ -0,0 +1,63 @@ +package com.orion.visor.module.asset.handler.host.transfer.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 传输响应类型 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 22:03 + */ +@Getter +@AllArgsConstructor +public enum TransferReceiver { + + /** + * 请求下一分片 + */ + NEXT_PART("nextPart"), + + /** + * 开始 + */ + START("start"), + + /** + * 进度 + */ + PROGRESS("progress"), + + /** + * 完成 + */ + FINISH("finish"), + + /** + * 失败 + */ + ERROR("error"), + + /** + * 关闭 + */ + ABORT("abort"), + + ; + + private final String type; + + public static TransferReceiver of(String type) { + if (type == null) { + return null; + } + for (TransferReceiver value : values()) { + if (value.type.equals(type)) { + return value; + } + } + return null; + } + +} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiverType.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiverType.java deleted file mode 100644 index 2e305297..00000000 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferReceiverType.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.orion.visor.module.asset.handler.host.transfer.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * 传输响应类型 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/21 22:03 - */ -@Getter -@AllArgsConstructor -public enum TransferReceiverType { - - /** - * 请求下一个传输任务 - */ - NEXT_TRANSFER("nextTransfer"), - - /** - * 请求下一块上传数据 - */ - UPLOAD_NEXT_BLOCK("uploadNextBlock"), - - /** - * 上传完成 - */ - UPLOAD_FINISH("uploadFinish"), - - /** - * 上传失败 - */ - UPLOAD_ERROR("uploadError"), - - /** - * 开始下载 - */ - DOWNLOAD_START("downloadStart"), - - /** - * 下载进度 - */ - DOWNLOAD_PROGRESS("downloadProgress"), - - /** - * 下载完成 - */ - DOWNLOAD_FINISH("downloadFinish"), - - /** - * 下载失败 - */ - DOWNLOAD_ERROR("downloadError"), - - ; - - private final String type; - - public static TransferReceiverType of(String type) { - if (type == null) { - return null; - } - for (TransferReceiverType value : values()) { - if (value.type.equals(type)) { - return value; - } - } - return null; - } - -} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferType.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferType.java new file mode 100644 index 00000000..822d35d7 --- /dev/null +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/enums/TransferType.java @@ -0,0 +1,43 @@ +package com.orion.visor.module.asset.handler.host.transfer.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 传输类型 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/7/12 12:41 + */ +@Getter +@AllArgsConstructor +public enum TransferType { + + /** + * 上传 + */ + UPLOAD("upload"), + + /** + * 下载 + */ + DOWNLOAD("download"), + + ; + + private final String type; + + public static TransferType of(String type) { + if (type == null) { + return null; + } + for (TransferType value : values()) { + if (value.type.equals(type)) { + return value; + } + } + return null; + } + +} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/ITransferHandler.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/ITransferHandler.java index 2d6ddf14..b057a4c1 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/ITransferHandler.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/ITransferHandler.java @@ -2,9 +2,7 @@ package com.orion.visor.module.asset.handler.host.transfer.handler; import com.orion.lang.able.SafeCloseable; import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; -import com.orion.visor.module.asset.handler.host.transfer.session.IDownloadSession; - -import java.util.Map; +import com.orion.visor.module.asset.handler.host.transfer.session.ITransferSession; /** * 传输处理器定义 @@ -16,24 +14,25 @@ import java.util.Map; public interface ITransferHandler extends SafeCloseable { /** - * 处理消息 + * 处理文本消息 * * @param payload payload */ void handleMessage(TransferOperatorRequest payload); /** - * 写入内容 + * 处理二进制消息 * * @param content content */ - void putContent(byte[] content); + void handleMessage(byte[] content); /** - * 获取 token sessions + * 通过 token 获取 session * - * @return token sessions + * @param token token + * @return session */ - Map getTokenSessions(); + ITransferSession getSessionByToken(String token); } diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/TransferHandler.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/TransferHandler.java index e2b300a5..fddb1c67 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/TransferHandler.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/handler/TransferHandler.java @@ -1,21 +1,21 @@ package com.orion.visor.module.asset.handler.host.transfer.handler; import com.orion.lang.id.UUIds; -import com.orion.lang.utils.Exceptions; import com.orion.lang.utils.io.Streams; import com.orion.net.host.SessionStore; import com.orion.spring.SpringHolder; -import com.orion.visor.framework.common.constant.ErrorMessage; import com.orion.visor.framework.common.constant.ExtraFieldConst; import com.orion.visor.framework.websocket.core.utils.WebSockets; import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO; -import com.orion.visor.module.asset.handler.host.transfer.enums.TransferOperatorType; -import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferOperator; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferType; import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; -import com.orion.visor.module.asset.handler.host.transfer.session.*; +import com.orion.visor.module.asset.handler.host.transfer.session.DownloadSession; +import com.orion.visor.module.asset.handler.host.transfer.session.ITransferSession; +import com.orion.visor.module.asset.handler.host.transfer.session.UploadSession; import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils; import com.orion.visor.module.asset.service.HostTerminalService; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; @@ -33,98 +33,72 @@ public class TransferHandler implements ITransferHandler { private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class); - private final Long userId; - private final WebSocketSession channel; - /** - * 当前会话 - */ - private ITransferHostSession currentSession; + private ITransferSession currentSession; - /** - * 会话列表 - */ - private final ConcurrentHashMap sessions; - - @Getter - private final ConcurrentHashMap tokenSessions; + private final ConcurrentHashMap sessions; public TransferHandler(WebSocketSession channel) { this.channel = channel; - this.userId = WebSockets.getAttr(channel, ExtraFieldConst.USER_ID); this.sessions = new ConcurrentHashMap<>(); - this.tokenSessions = new ConcurrentHashMap<>(); } @Override public void handleMessage(TransferOperatorRequest payload) { // 解析消息类型 - TransferOperatorType type = TransferOperatorType.of(payload.getType()); + TransferOperator operator = TransferOperator.of(payload.getOperator()); // 获取会话 - if (!this.getAndInitSession(payload, type)) { + if (!this.getAndInitSession(payload)) { return; } // 处理消息 - switch (type) { - case UPLOAD_START: - // 开始上传 - ((IUploadSession) currentSession).startUpload(payload.getPath()); - break; - case UPLOAD_FINISH: - // 上传完成 - ((IUploadSession) currentSession).uploadFinish(); - break; - case UPLOAD_ERROR: - // 上传失败 - ((IUploadSession) currentSession).uploadError(); - break; - case DOWNLOAD_INIT: - // 开始下载 - String token = UUIds.random32(); - tokenSessions.put(token, (IDownloadSession) currentSession); - ((IDownloadSession) currentSession).downloadInit(payload.getPath(), token); - break; - case DOWNLOAD_ABORT: - // 中断下载 - ((IDownloadSession) currentSession).abortDownload(); - break; - default: - break; + if (TransferOperator.START.equals(operator)) { + // 开始 + currentSession.setToken(UUIds.random32()); + currentSession.onStart(payload); + } else if (TransferOperator.FINISH.equals(operator)) { + // 完成 + currentSession.onFinish(payload); + } else if (TransferOperator.ERROR.equals(operator)) { + // 失败 + currentSession.onError(payload); + } else if (TransferOperator.ABORT.equals(operator)) { + // 中断 + currentSession.onAbort(payload); } } @Override - public void putContent(byte[] content) { - ((IUploadSession) currentSession).putContent(content); + public void handleMessage(byte[] content) { + currentSession.handleBinary(content); } /** * 获取并且初始化会话 * * @param payload payload - * @param type type * @return success */ - private boolean getAndInitSession(TransferOperatorRequest payload, TransferOperatorType type) { + private boolean getAndInitSession(TransferOperatorRequest payload) { Long hostId = payload.getHostId(); - String sessionKey = hostId + "_" + type.getKind(); + TransferType type = TransferType.of(payload.getType()); + String sessionKey = hostId + "_" + type.getType(); try { // 获取会话 - ITransferHostSession session = sessions.get(sessionKey); + ITransferSession session = sessions.get(sessionKey); if (session == null) { - // 获取主机信息 - HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(this.userId, hostId); + // 获取主机连接信息 + Long userId = WebSockets.getAttr(channel, ExtraFieldConst.USER_ID); + HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(userId, hostId); SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo); // 打开会话并初始化 - if (TransferOperatorType.UPLOAD.equals(type.getKind())) { + if (TransferType.UPLOAD.equals(type)) { // 上传操作 session = new UploadSession(connectInfo, sessionStore, this.channel); - } else if (TransferOperatorType.DOWNLOAD.equals(type.getKind())) { + } else if (TransferType.DOWNLOAD.equals(type)) { // 下载操作 session = new DownloadSession(connectInfo, sessionStore, this.channel); - } else { - throw Exceptions.invalidArgument(ErrorMessage.UNKNOWN_TYPE); } session.init(); sessions.put(sessionKey, session); @@ -135,16 +109,25 @@ public class TransferHandler implements ITransferHandler { } catch (Exception e) { log.error("TransferHandler.getAndInitSession error channelId: {}", channel.getId(), e); // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.NEXT_TRANSFER, e); + TransferUtils.sendMessage(this.channel, TransferReceiver.ERROR, e); return false; } } + @Override + public ITransferSession getSessionByToken(String token) { + return sessions.values() + .stream() + .filter(s -> token.equals(s.getToken())) + .findFirst() + .orElse(null); + } + @Override public void close() { log.info("TransferHandler.close channelId: {}", channel.getId()); sessions.values().forEach(Streams::close); - tokenSessions.clear(); + sessions.clear(); } } diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/model/TransferOperatorRequest.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/model/TransferOperatorRequest.java index 34589242..c305bfc6 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/model/TransferOperatorRequest.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/model/TransferOperatorRequest.java @@ -28,6 +28,11 @@ public class TransferOperatorRequest { */ private String type; + /** + * operator + */ + private String operator; + /** * 主机id */ diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/DownloadSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/DownloadSession.java index 316ed712..0a9c5bda 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/DownloadSession.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/DownloadSession.java @@ -11,10 +11,11 @@ import com.orion.visor.framework.common.constant.ErrorMessage; import com.orion.visor.module.asset.define.AssetThreadPools; import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType; import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO; -import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver; +import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; @@ -29,10 +30,11 @@ import java.io.OutputStream; * @since 2024/2/22 22:25 */ @Slf4j -public class DownloadSession extends TransferHostSession implements IDownloadSession { +public class DownloadSession extends TransferSession implements StreamingResponseBody { - @Getter - private String path; + private static final int BUFFER_SIZE = Const.BUFFER_KB_32; + + private static final int FLUSH_COUNT = Const.BUFFER_KB_1 * Const.BUFFER_KB_1 / Const.BUFFER_KB_32; private InputStream inputStream; @@ -41,10 +43,9 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes } @Override - public void downloadInit(String path, String token) { - this.path = path; - String channelId = channel.getId(); + public void onStart(TransferOperatorRequest request) { try { + super.onStart(request); log.info("DownloadSession.startDownload open start channelId: {}, path: {}", channelId, path); // 保存操作日志 this.saveOperatorLog(HostTerminalOperatorType.SFTP_DOWNLOAD, path); @@ -56,13 +57,13 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes if (file.getSize() == 0L) { // 文件为空 log.info("DownloadSession.startDownload file empty channelId: {}, path: {}", channelId, path); - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null); + TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null); return; } // 打开输入流 this.inputStream = executor.openInputStream(path); // 响应开始下载 - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_START, null, e -> { + TransferUtils.sendMessage(channel, TransferReceiver.START, null, e -> { e.setChannelId(channelId); e.setTransferToken(token); }); @@ -70,23 +71,23 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes } catch (Exception e) { log.error("DownloadSession.startDownload open error channelId: {}, path: {}", channelId, path, e); // 响应下载失败 - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e); + TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e); } } @Override - public void abortDownload() { - log.info("DownloadSession.abortDownload channelId: {}", channel.getId()); + public void onAbort(TransferOperatorRequest request) { + log.info("TransferSession.abort channelId: {}, path: {}", channelId, path); // 关闭流 this.closeStream(); + // download 的 abort 无需发送回调 } @Override public void writeTo(OutputStream outputStream) { - String channelId = channel.getId(); Ref ex = new Ref<>(); try { - byte[] buffer = new byte[Const.BUFFER_KB_32]; + byte[] buffer = new byte[BUFFER_SIZE]; int len; int i = 0; int size = 0; @@ -95,7 +96,7 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes outputStream.write(buffer, 0, len); size += len; // 不要每次都 flush 和 send > 1mb - if (i == 32) { + if (i == FLUSH_COUNT) { i = 0; } // 首次触发 @@ -122,9 +123,9 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes // 响应结果 Exception e = ex.getValue(); if (e == null) { - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null); + TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null); } else { - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e); + TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e); } }); } @@ -140,13 +141,12 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes // flush outputStream.flush(); // send - TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_PROGRESS, null, e -> e.setCurrentSize(size)); + TransferUtils.sendMessage(channel, TransferReceiver.PROGRESS, null, e -> e.setCurrentSize(size)); } @Override protected void closeStream() { // 关闭 inputStream 可能会被阻塞 ???...??? 只能关闭 executor - this.path = null; Streams.close(this.executor); this.executor = null; this.inputStream = null; diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IDownloadSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IDownloadSession.java deleted file mode 100644 index 9cacdd40..00000000 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IDownloadSession.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.orion.visor.module.asset.handler.host.transfer.session; - -import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; - -/** - * 下载会话定义 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/22 22:25 - */ -public interface IDownloadSession extends StreamingResponseBody { - - /** - * 初始化下载 - * - * @param path path - * @param token token - */ - void downloadInit(String path, String token); - - /** - * 停止下载 - */ - void abortDownload(); - - /** - * 获取下载文件路径 - * - * @return path - */ - String getPath(); - -} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferHostSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferHostSession.java deleted file mode 100644 index 82d90534..00000000 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferHostSession.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.orion.visor.module.asset.handler.host.transfer.session; - -import com.orion.lang.able.SafeCloseable; - -/** - * 主机传输会话定义 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/21 23:06 - */ -public interface ITransferHostSession extends SafeCloseable { - - /** - * 初始化 - */ - void init(); - -} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferSession.java new file mode 100644 index 00000000..13a83205 --- /dev/null +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/ITransferSession.java @@ -0,0 +1,76 @@ +package com.orion.visor.module.asset.handler.host.transfer.session; + +import com.orion.lang.able.SafeCloseable; +import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; + +/** + * 主机传输会话定义 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/2/21 23:06 + */ +public interface ITransferSession extends SafeCloseable { + + /** + * 初始化 + */ + void init(); + + /** + * 处理二进制内容 + * + * @param bytes bytes + */ + void handleBinary(byte[] bytes); + + /** + * 开始传输 + * + * @param request request + */ + void onStart(TransferOperatorRequest request); + + /** + * 传输完成 + * + * @param request request + */ + void onFinish(TransferOperatorRequest request); + + /** + * 传输失败 + * + * @param request request + */ + void onError(TransferOperatorRequest request); + + /** + * 传输中断 + * + * @param request request + */ + void onAbort(TransferOperatorRequest request); + + /** + * 获取文件路径 + * + * @return path + */ + String getPath(); + + /** + * 获取 token + * + * @return token + */ + String getToken(); + + /** + * 设置 token + * + * @param token token + */ + void setToken(String token); + +} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IUploadSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IUploadSession.java deleted file mode 100644 index 824afa6f..00000000 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/IUploadSession.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.orion.visor.module.asset.handler.host.transfer.session; - -/** - * 上传会话定义 - * - * @author Jiahang Li - * @version 1.0.0 - * @since 2024/2/22 22:03 - */ -public interface IUploadSession extends ITransferHostSession { - - /** - * 开始上传 - * - * @param path path - */ - void startUpload(String path); - - /** - * 写入内容 - * - * @param bytes bytes - */ - void putContent(byte[] bytes); - - /** - * 上传完成 - */ - void uploadFinish(); - - /** - * 上传失败 - */ - void uploadError(); - -} diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferHostSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferSession.java similarity index 59% rename from orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferHostSession.java rename to orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferSession.java index 461f5380..2028afb6 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferHostSession.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/TransferSession.java @@ -1,5 +1,6 @@ package com.orion.visor.module.asset.handler.host.transfer.session; +import com.orion.lang.exception.argument.InvalidArgumentException; import com.orion.lang.utils.collect.Maps; import com.orion.lang.utils.io.Streams; import com.orion.net.host.SessionStore; @@ -11,6 +12,12 @@ import com.orion.visor.framework.biz.operator.log.core.utils.OperatorLogs; import com.orion.visor.module.asset.define.config.AppSftpConfig; import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO; import com.orion.visor.module.asset.handler.host.terminal.utils.TerminalUtils; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver; +import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; +import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.util.Map; @@ -22,7 +29,8 @@ import java.util.Map; * @version 1.0.0 * @since 2024/2/21 21:12 */ -public abstract class TransferHostSession implements ITransferHostSession { +@Slf4j +public abstract class TransferSession implements ITransferSession { protected static final AppSftpConfig SFTP_CONFIG = SpringHolder.getBean(AppSftpConfig.class); @@ -34,10 +42,20 @@ public abstract class TransferHostSession implements ITransferHostSession { protected SftpExecutor executor; - public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) { + protected String channelId; + + @Getter + protected String path; + + @Getter + @Setter + protected String token; + + public TransferSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) { this.connectInfo = connectInfo; this.sessionStore = sessionStore; this.channel = channel; + this.channelId = channel.getId(); } @Override @@ -54,6 +72,52 @@ public abstract class TransferHostSession implements ITransferHostSession { } } + @Override + public void handleBinary(byte[] bytes) { + } + + @Override + public void onStart(TransferOperatorRequest request) { + this.path = request.getPath(); + } + + @Override + public void onFinish(TransferOperatorRequest request) { + log.info("TransferSession.uploadFinish channelId: {}", channelId); + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null); + } + + @Override + public void onError(TransferOperatorRequest request) { + log.error("TransferSession.uploadError channelId: {}", channelId); + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(channel, TransferReceiver.ERROR, new InvalidArgumentException((String) null)); + } + + @Override + public void onAbort(TransferOperatorRequest request) { + log.info("TransferSession.abort channelId: {}, path: {}", channelId, path); + // 关闭流 + this.closeStream(); + // 响应结果 + TransferUtils.sendMessage(channel, TransferReceiver.ABORT, null); + } + + /** + * 关闭流 + */ + protected abstract void closeStream(); + + @Override + public void close() { + this.closeStream(); + Streams.close(executor); + Streams.close(sessionStore); + } + /** * 保存操作日志 * @@ -73,16 +137,4 @@ public abstract class TransferHostSession implements ITransferHostSession { SpringHolder.getBean(OperatorLogFrameworkService.class).insert(model); } - /** - * 关闭流 - */ - protected abstract void closeStream(); - - @Override - public void close() { - this.closeStream(); - Streams.close(executor); - Streams.close(sessionStore); - } - } diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/UploadSession.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/UploadSession.java index f44873e7..10452f91 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/UploadSession.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/session/UploadSession.java @@ -1,11 +1,11 @@ package com.orion.visor.module.asset.handler.host.transfer.session; -import com.orion.lang.exception.argument.InvalidArgumentException; import com.orion.lang.utils.io.Streams; import com.orion.net.host.SessionStore; import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType; import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO; -import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver; +import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest; import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils; import com.orion.visor.module.asset.utils.SftpUtils; import lombok.extern.slf4j.Slf4j; @@ -22,7 +22,7 @@ import java.io.OutputStream; * @since 2024/2/22 22:04 */ @Slf4j -public class UploadSession extends TransferHostSession implements IUploadSession { +public class UploadSession extends TransferSession { private OutputStream outputStream; @@ -31,8 +31,8 @@ public class UploadSession extends TransferHostSession implements IUploadSession } @Override - public void startUpload(String path) { - String channelId = channel.getId(); + public void onStart(TransferOperatorRequest request) { + super.onStart(request); try { log.info("UploadSession.startUpload start channelId: {}, path: {}", channelId, path); // 保存操作日志 @@ -44,52 +44,38 @@ public class UploadSession extends TransferHostSession implements IUploadSession // 打开输出流 this.outputStream = executor.openOutputStream(path); // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null); + TransferUtils.sendMessage(channel, TransferReceiver.NEXT_PART, null); log.info("UploadSession.startUpload transfer channelId: {}, path: {}", channelId, path); } catch (Exception e) { log.error("UploadSession.startUpload error channelId: {}, path: {}", channelId, path, e); this.closeStream(); // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e); + TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e); } } @Override - public void putContent(byte[] bytes) { + public void handleBinary(byte[] bytes) { try { // 写入内容 outputStream.write(bytes); // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null); + TransferUtils.sendMessage(channel, TransferReceiver.NEXT_PART, null); } catch (IOException e) { - log.error("UploadSession.putContent error channelId: {}", channel.getId(), e); + log.error("UploadSession.handleBinary error channelId: {}", channel.getId(), e); this.closeStream(); // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e); + TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e); } } - @Override - public void uploadFinish() { - log.info("UploadSession.uploadFinish channelId: {}", channel.getId()); - this.closeStream(); - // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_FINISH, null); - } - - @Override - public void uploadError() { - log.error("UploadSession.uploadError channelId: {}", channel.getId()); - this.closeStream(); - // 响应结果 - TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, new InvalidArgumentException((String) null)); - } - @Override protected void closeStream() { - // 关闭流 - Streams.close(outputStream); - this.outputStream = null; + if (this.outputStream != null) { + // 关闭流 + Streams.close(outputStream); + this.outputStream = null; + } } } diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/utils/TransferUtils.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/utils/TransferUtils.java index 03e2bdfd..1e91dc7b 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/utils/TransferUtils.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/handler/host/transfer/utils/TransferUtils.java @@ -5,7 +5,7 @@ import com.orion.lang.exception.argument.InvalidArgumentException; import com.orion.lang.utils.Strings; import com.orion.visor.framework.common.constant.ErrorMessage; import com.orion.visor.framework.websocket.core.utils.WebSockets; -import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType; +import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver; import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorResponse; import org.apache.catalina.connector.ClientAbortException; import org.springframework.web.socket.WebSocketSession; @@ -13,11 +13,9 @@ import org.springframework.web.socket.WebSocketSession; import java.util.function.Consumer; /** - * 传输工具类 - * * @author Jiahang Li * @version 1.0.0 - * @since 2024/2/22 22:14 + * @since 2024/7/12 15:06 */ public class TransferUtils { @@ -31,7 +29,7 @@ public class TransferUtils { * @param type type * @param ex ex */ - public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex) { + public static void sendMessage(WebSocketSession channel, TransferReceiver type, Exception ex) { sendMessage(channel, type, ex, null); } @@ -43,7 +41,7 @@ public class TransferUtils { * @param ex ex * @param filler filler */ - public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex, Consumer filler) { + public static void sendMessage(WebSocketSession channel, TransferReceiver type, Exception ex, Consumer filler) { TransferOperatorResponse resp = TransferOperatorResponse.builder() .type(type.getType()) .success(ex == null) diff --git a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/service/impl/HostSftpServiceImpl.java b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/service/impl/HostSftpServiceImpl.java index bf7363cb..765982d6 100644 --- a/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/service/impl/HostSftpServiceImpl.java +++ b/orion-visor-module-asset/orion-visor-module-asset-service/src/main/java/com/orion/visor/module/asset/service/impl/HostSftpServiceImpl.java @@ -15,9 +15,8 @@ import com.orion.visor.module.asset.convert.HostSftpLogConvert; import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType; import com.orion.visor.module.asset.entity.request.host.HostSftpLogQueryRequest; import com.orion.visor.module.asset.entity.vo.HostSftpLogVO; -import com.orion.visor.module.asset.handler.host.transfer.handler.ITransferHandler; import com.orion.visor.module.asset.handler.host.transfer.manager.HostTransferManager; -import com.orion.visor.module.asset.handler.host.transfer.session.IDownloadSession; +import com.orion.visor.module.asset.handler.host.transfer.session.DownloadSession; import com.orion.visor.module.asset.service.HostSftpService; import com.orion.visor.module.infra.api.OperatorLogApi; import com.orion.visor.module.infra.entity.dto.operator.OperatorLogQueryDTO; @@ -82,10 +81,10 @@ public class HostSftpServiceImpl implements HostSftpService { @Override public StreamingResponseBody downloadWithTransferToken(String channelId, String transferToken, HttpServletResponse response) { // 获取会话 - IDownloadSession session = Optional.ofNullable(channelId) + DownloadSession session = (DownloadSession) Optional.ofNullable(channelId) .map(hostTransferManager::getHandler) - .map(ITransferHandler::getTokenSessions) - .map(s -> s.remove(transferToken)) + .map(s -> s.getSessionByToken(transferToken)) + .filter(s -> s instanceof DownloadSession) .orElse(null); // 响应会话 if (session == null) { diff --git a/orion-visor-ui/src/views/asset/host-config/components/ssh-config-form.vue b/orion-visor-ui/src/views/asset/host-config/components/ssh-config-form.vue index ee953980..686cf5c7 100644 --- a/orion-visor-ui/src/views/asset/host-config/components/ssh-config-form.vue +++ b/orion-visor-ui/src/views/asset/host-config/components/ssh-config-form.vue @@ -29,6 +29,14 @@ :options="toOptions(sshOsTypeKey)" placeholder="请选择系统类型" /> + + + + - - - - + :percent="file.fileSize ? (file.current || 0) / file.fileSize : 0" /> diff --git a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts index 4c64cf09..08b49bf1 100644 --- a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts +++ b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-downloader.ts @@ -1,41 +1,33 @@ -import type { ISftpTransferDownloader, SftpTransferItem } from '../types/terminal.type'; -import { TransferOperatorType, TransferStatus } from '../types/terminal.const'; -import { getFileName, getPath } from '@/utils/file'; +import type { SftpTransferItem } from '../types/terminal.type'; +import { TransferStatus, TransferType } from '../types/terminal.const'; +import { getFileName, openDownloadFile } from '@/utils/file'; import { saveAs } from 'file-saver'; +import { getDownloadTransferUrl } from '@/api/asset/host-sftp'; +import SftpTransferHandler from './sftp-transfer-handler'; // sftp 下载器实现 -export default class SftpTransferDownloader implements ISftpTransferDownloader { - - public abort: boolean; - - private client: WebSocket; - private item: SftpTransferItem; +export default class SftpTransferDownloader extends SftpTransferHandler { constructor(item: SftpTransferItem, client: WebSocket) { - this.abort = false; - this.item = item; - this.client = client; + super(TransferType.DOWNLOAD, item, client); } - // 开始下载 - initDownload() { - this.item.status = TransferStatus.TRANSFERRING; - // 发送开始下载信息 - this.client?.send(JSON.stringify({ - type: TransferOperatorType.DOWNLOAD_INIT, - path: getPath(this.item.parentPath + '/' + this.item.name), - hostId: this.item.hostId - })); + // 开始回调 + onStart(channelId: string, token: string) { + super.onStart(channelId, token); + // 获取下载 url + const url = getDownloadTransferUrl(channelId, token); + // 打开 + openDownloadFile(url); } - // 下载完成 - downloadFinish() { - if (this.abort) { + // 完成回调 + onFinish() { + super.onFinish(); + if (this.aborted) { // 中断则不触发下载 return; } - // 设置实际大小 - this.item.currentSize = this.item.totalSize; if (this.item.totalSize === 0) { // 空文件直接触发下载 try { @@ -53,20 +45,4 @@ export default class SftpTransferDownloader implements ISftpTransferDownloader { } } - // 下载失败 - downloadError(msg: string | undefined) { - this.item.status = TransferStatus.ERROR; - this.item.errorMessage = msg || '下载失败'; - } - - // 下载中断 - downloadAbort() { - this.abort = true; - // 发送下载中断信息 - this.client?.send(JSON.stringify({ - type: TransferOperatorType.DOWNLOAD_ABORT, - hostId: this.item.hostId - })); - } - } diff --git a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-handler.ts b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-handler.ts new file mode 100644 index 00000000..36c633fd --- /dev/null +++ b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-handler.ts @@ -0,0 +1,109 @@ +import type { ISftpTransferHandler, SftpTransferItem } from '../types/terminal.type'; +import { TransferOperator, TransferStatus } from '../types/terminal.const'; +import { getPath } from '@/utils/file'; + +// sftp 传输处理器定义 +export default abstract class SftpTransferHandler implements ISftpTransferHandler { + + public type: string; + public finished: boolean; + public aborted: boolean; + protected client: WebSocket; + protected item: SftpTransferItem; + + protected constructor(type: string, item: SftpTransferItem, client: WebSocket) { + this.type = type; + this.finished = false; + this.aborted = false; + this.item = item; + this.client = client; + } + + // 开始 + start() { + this.item.status = TransferStatus.TRANSFERRING; + // 发送开始信息 + this.client?.send(JSON.stringify({ + operator: TransferOperator.START, + type: this.type, + path: getPath(this.item.parentPath + '/' + this.item.name), + hostId: this.item.hostId + })); + }; + + // 完成 + finish() { + this.finished = true; + this.item.status = TransferStatus.SUCCESS; + // 发送完成的信息 + this.client?.send(JSON.stringify({ + operator: TransferOperator.FINISH, + type: this.type, + hostId: this.item.hostId + })); + }; + + // 失败 + error() { + this.finished = true; + this.item.status = TransferStatus.ERROR; + // 发送上传失败的信息 + this.client?.send(JSON.stringify({ + operator: TransferOperator.ERROR, + type: this.type, + hostId: this.item.hostId + })); + }; + + // 中断 + abort() { + this.aborted = true; + // 发送中断的信息 + this.client?.send(JSON.stringify({ + operator: TransferOperator.ABORT, + type: this.type, + hostId: this.item.hostId + })); + } + + // 是否有下一个分片 + hasNextPart() { + return false; + }; + + // 下一页分片回调 + onNextPart() { + return undefined as unknown as any; + }; + + // 开始回调 + onStart(channelId: string, token: string) { + }; + + // 进度回调 + onProgress(size: number) { + if (this.item && size) { + this.item.currentSize = size; + } + }; + + // 失败回调 + onError(msg: string | undefined) { + this.finished = true; + this.item.status = TransferStatus.ERROR; + this.item.errorMessage = msg || '传输失败'; + } + + // 完成回调 + onFinish() { + this.finished = true; + this.item.status = TransferStatus.SUCCESS; + this.item.currentSize = this.item.totalSize; + }; + + // 中断回调 + onAbort() { + this.aborted = true; + }; + +} diff --git a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts index ed395cd5..42bb4326 100644 --- a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts +++ b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-manager.ts @@ -1,13 +1,10 @@ -import type { ISftpTransferManager, ISftpTransferUploader, SftpTransferItem } from '../types/terminal.type'; -import { ISftpTransferDownloader, SftpFile, TransferOperatorResponse } from '../types/terminal.type'; -import { sessionCloseMsg, TransferReceiverType, TransferStatus, TransferType } from '../types/terminal.const'; +import type { ISftpTransferHandler, ISftpTransferManager, SftpFile, SftpTransferItem, TransferOperatorResponse } from '../types/terminal.type'; +import { sessionCloseMsg, TransferReceiver, TransferStatus, TransferType } from '../types/terminal.const'; import { Message } from '@arco-design/web-vue'; import { getTerminalAccessToken, openHostTransferChannel } from '@/api/asset/host-terminal'; import { nextId } from '@/utils'; -import { getDownloadTransferUrl } from '@/api/asset/host-sftp'; import SftpTransferUploader from './sftp-transfer-uploader'; import SftpTransferDownloader from './sftp-transfer-downloader'; -import { openDownloadFile } from '@/utils/file'; // sftp 传输管理器实现 export default class SftpTransferManager implements ISftpTransferManager { @@ -20,9 +17,7 @@ export default class SftpTransferManager implements ISftpTransferManager { private currentItem?: SftpTransferItem; - private currentUploader?: ISftpTransferUploader; - - private currentDownloader?: ISftpTransferDownloader; + private currentTransfer?: ISftpTransferHandler; public transferList: Array; @@ -57,13 +52,14 @@ export default class SftpTransferManager implements ISftpTransferManager { // 添加下载任务 addDownload(hostId: number, currentPath: string, files: Array) { + let pathIndex = currentPath === '/' ? 1 : currentPath.length + 1; // 转为下载文件 const items = files.map(s => { return { fileId: nextId(10), type: TransferType.DOWNLOAD, hostId: hostId, - name: s.path.substring(currentPath.length + 1), + name: s.path.substring(pathIndex), parentPath: currentPath, currentSize: 0, totalSize: s.size, @@ -87,11 +83,7 @@ export default class SftpTransferManager implements ISftpTransferManager { const item = this.transferList[index]; if (item.status === TransferStatus.TRANSFERRING) { // 传输中则中断传输 - if (this.currentUploader) { - this.currentUploader.uploadAbort(); - } else if (this.currentDownloader) { - this.currentDownloader.downloadAbort(); - } + this.currentTransfer?.abort(); } // 从列表中移除 this.transferList.splice(index, 1); @@ -154,8 +146,7 @@ export default class SftpTransferManager implements ISftpTransferManager { // 传输下一条任务 private transferNextItem() { - this.currentUploader = undefined; - this.currentDownloader = undefined; + this.currentTransfer = undefined; // 释放内存 if (this.currentItem) { this.currentItem.file = null as unknown as File; @@ -163,14 +154,15 @@ export default class SftpTransferManager implements ISftpTransferManager { // 获取任务 this.currentItem = this.transferList.find(s => s.status === TransferStatus.WAITING); if (this.currentItem) { - // 开始传输 if (this.currentItem.type === TransferType.UPLOAD) { // 上传 - this.uploadFile(); + this.currentTransfer = new SftpTransferUploader(this.currentItem, this.client as WebSocket); } else { // 下载 - this.downloadFile(); + this.currentTransfer = new SftpTransferDownloader(this.currentItem, this.client as WebSocket); } + // 开始 + this.currentTransfer?.start(); } else { // 无任务关闭会话 this.client?.close(); @@ -181,110 +173,33 @@ export default class SftpTransferManager implements ISftpTransferManager { private async resolveMessage(message: MessageEvent) { // 文本消息 const data = JSON.parse(message.data) as TransferOperatorResponse; - if (data.type === TransferReceiverType.NEXT_TRANSFER - || data.type === TransferReceiverType.UPLOAD_FINISH - || data.type === TransferReceiverType.UPLOAD_ERROR) { - // 执行下一个传输任务 - this.resolveNextTransfer(data); - } else if (data.type === TransferReceiverType.UPLOAD_NEXT_BLOCK) { - // 接收下一块上传数据 - await this.resolveUploadNextBlock(); - } else if (data.type === TransferReceiverType.DOWNLOAD_START) { - // 开始下载 - this.resolveDownloadStart(data); - } else if (data.type === TransferReceiverType.DOWNLOAD_PROGRESS) { - // 下载进度 - this.resolveDownloadProgress(data); - } else if (data.type === TransferReceiverType.DOWNLOAD_FINISH) { - // 下载完成 - this.resolveDownloadFinish(); - } else if (data.type === TransferReceiverType.DOWNLOAD_ERROR) { - // 下载失败 - this.resolveDownloadError(data.msg); + if (data.type === TransferReceiver.NEXT_PART) { + // 接收下一块数据回调 + await this.currentTransfer?.onNextPart(); + } else if (data.type === TransferReceiver.START) { + // 开始回调 + this.currentTransfer?.onStart(data.channelId as string, data.transferToken as string); + } else if (data.type === TransferReceiver.PROGRESS) { + // 进度回调 + this.currentTransfer?.onProgress(data.currentSize as number); + } else if (data.type === TransferReceiver.FINISH) { + // 完成回调 + this.currentTransfer?.onFinish(); + // 开始下一个传输任务 + this.transferNextItem(); + } else if (data.type === TransferReceiver.ERROR) { + // 失败回调 + this.currentTransfer?.onError(data.msg); + // 开始下一个传输任务 + this.transferNextItem(); + } else if (data.type === TransferReceiver.ABORT) { + // 中断回调 + this.currentTransfer?.onAbort(); + // 开始下一个传输任务 + this.transferNextItem(); } } - // 上传文件 - private uploadFile() { - // 创建上传器 - this.currentUploader = new SftpTransferUploader(this.currentItem as SftpTransferItem, this.client as WebSocket); - // 开始上传 - this.currentUploader.startUpload(); - } - - // 下载文件 - private downloadFile() { - // 创建下载器 - this.currentDownloader = new SftpTransferDownloader(this.currentItem as SftpTransferItem, this.client as WebSocket); - // 初始化下载 - this.currentDownloader.initDownload(); - } - - // 接收下一个传输任务响应 - private resolveNextTransfer(data: TransferOperatorResponse) { - if (this.currentItem) { - if (data.success) { - this.currentItem.status = TransferStatus.SUCCESS; - } else { - this.currentItem.status = TransferStatus.ERROR; - this.currentItem.errorMessage = data.msg || '传输失败'; - } - } - // 开始下一个传输任务 - this.transferNextItem(); - } - - // 接收下一块上传数据响应 - private async resolveUploadNextBlock() { - // 只可能为上传并且成功 - if (!this.currentUploader) { - return; - } - if (this.currentUploader.hasNextBlock() - && !this.currentUploader.abort - && !this.currentUploader.finish) { - try { - // 有下一个分片则上传 (上一个分片传输完成) - await this.currentUploader.uploadNextBlock(); - } catch (e) { - // 读取文件失败 - this.currentUploader.uploadError((e as Error).message); - } - } else { - // 没有下一个分片则发送完成 - this.currentUploader.uploadFinish(); - } - } - - // 接收开始下载响应 - private resolveDownloadStart(data: TransferOperatorResponse) { - // 获取下载 url - const url = getDownloadTransferUrl(data.channelId as string, data.transferToken as string); - // 打开 - openDownloadFile(url); - } - - // 接收下载进度响应 - private resolveDownloadProgress(data: TransferOperatorResponse) { - if (this.currentItem && data.currentSize) { - this.currentItem.currentSize = data.currentSize; - } - } - - // 接收下载完成响应 - private resolveDownloadFinish() { - this.currentDownloader?.downloadFinish(); - // 开始下一个传输任务 - this.transferNextItem(); - } - - // 接收下载失败响应 - private resolveDownloadError(msg: string | undefined) { - this.currentDownloader?.downloadError(msg); - // 开始下一个传输任务 - this.transferNextItem(); - } - // 关闭 释放资源 private close() { // 重置 run diff --git a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts index 32f38947..6bb301ae 100644 --- a/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts +++ b/orion-visor-ui/src/views/host/terminal/handler/sftp-transfer-uploader.ts @@ -1,52 +1,54 @@ -import type { ISftpTransferUploader, SftpTransferItem } from '../types/terminal.type'; -import { TransferOperatorType, TransferStatus } from '../types/terminal.const'; -import { getPath } from '@/utils/file'; +import type { SftpTransferItem } from '../types/terminal.type'; +import { TransferType } from '../types/terminal.const'; +import SftpTransferHandler from './sftp-transfer-handler'; // 512 KB -export const BLOCK_SIZE = 512 * 1024; +export const PART_SIZE = 512 * 1024; // sftp 上传器实现 -export default class SftpTransferUploader implements ISftpTransferUploader { +export default class SftpTransferUploader extends SftpTransferHandler { - public finish: boolean; - public abort: boolean; - private currentBlock: number; - private totalBlock: number; - private client: WebSocket; - private item: SftpTransferItem; + private currentPart: number; + private readonly totalPart: number; private file: File; constructor(item: SftpTransferItem, client: WebSocket) { - this.abort = false; - this.finish = false; - this.item = item; - this.client = client; + super(TransferType.UPLOAD, item, client); this.file = item.file; - this.currentBlock = 0; - this.totalBlock = Math.ceil(item.file.size / BLOCK_SIZE); - } - - // 开始上传 - startUpload() { - this.item.status = TransferStatus.TRANSFERRING; - // 发送开始上传信息 - this.client?.send(JSON.stringify({ - type: TransferOperatorType.UPLOAD_START, - path: getPath(this.item.parentPath + '/' + this.item.name), - hostId: this.item.hostId - })); + this.currentPart = 0; + this.totalPart = Math.ceil(item.file.size / PART_SIZE); } // 是否有下一个分片 - hasNextBlock() { - return this.currentBlock < this.totalBlock; + hasNextPart() { + return this.currentPart < this.totalPart; } // 上传下一个分片 - async uploadNextBlock() { + async onNextPart() { + super.onNextPart(); + // 完成或者中断直接跳过 + if (this.aborted || this.finished) { + return; + } + if (this.hasNextPart()) { + try { + // 有下一个分片则上传 + await this.doUploadNextPart(); + } catch (e) { + // 读取文件失败 + this.error(); + } + } else { + this.finish(); + } + } + + // 执行上传下一分片 + private async doUploadNextPart() { // 读取数据 - const start = this.currentBlock * BLOCK_SIZE; - const end = Math.min(this.file.size, start + BLOCK_SIZE); + const start = this.currentPart * PART_SIZE; + const end = Math.min(this.file.size, start + PART_SIZE); const chunk = this.file.slice(start, end); const reader = new FileReader(); const arrayBuffer = await new Promise((resolve, reject) => { @@ -56,36 +58,8 @@ export default class SftpTransferUploader implements ISftpTransferUploader { }); // 发送数据 this.client?.send(arrayBuffer as ArrayBuffer); - this.currentBlock++; + this.currentPart++; this.item.currentSize += (end - start); } - // 上传完成 - uploadFinish() { - this.finish = true; - this.item.status = TransferStatus.SUCCESS; - // 发送上传完成的信息 - this.client?.send(JSON.stringify({ - type: TransferOperatorType.UPLOAD_FINISH, - hostId: this.item.hostId - })); - } - - // 上传失败 - uploadError(msg: string | undefined) { - this.finish = true; - this.item.status = TransferStatus.ERROR; - this.item.errorMessage = msg || '上传失败'; - // 发送上传完成的信息 - this.client?.send(JSON.stringify({ - type: TransferOperatorType.UPLOAD_ERROR, - hostId: this.item.hostId - })); - } - - // 上传中断 - uploadAbort() { - this.abort = true; - } - } diff --git a/orion-visor-ui/src/views/host/terminal/handler/terminal-output-processor.ts b/orion-visor-ui/src/views/host/terminal/handler/terminal-output-processor.ts index 793c6be5..7f9af867 100644 --- a/orion-visor-ui/src/views/host/terminal/handler/terminal-output-processor.ts +++ b/orion-visor-ui/src/views/host/terminal/handler/terminal-output-processor.ts @@ -1,4 +1,4 @@ -import { +import type { ISftpSession, ISshSession, ITerminalChannel, @@ -76,7 +76,7 @@ export default class TerminalOutputProcessor implements ITerminalOutputProcessor ssh.connect(); } else { // 未成功展示错误信息 - ssh.write(`${msg || ''}\r\n输入回车重新连接...\r\n\r\n`); + ssh.write(`${msg || ''}\r\n输入回车重新连接...\r\n\r\n`); ssh.status = TerminalStatus.CLOSED; } }, sftp => { @@ -109,7 +109,7 @@ export default class TerminalOutputProcessor implements ITerminalOutputProcessor // ssh 拼接关闭消息 ssh.write(`\r\n\r\n${msg || ''}\r\n`); if (!isForceClose) { - ssh.write('输入回车重新连接...\r\n\r\n'); + ssh.write(`${msg || ''}\r\n输入回车重新连接...\r\n\r\n`); } // 设置状态 ssh.status = TerminalStatus.CLOSED; diff --git a/orion-visor-ui/src/views/host/terminal/types/terminal.const.ts b/orion-visor-ui/src/views/host/terminal/types/terminal.const.ts index ddce8287..056f2ea0 100644 --- a/orion-visor-ui/src/views/host/terminal/types/terminal.const.ts +++ b/orion-visor-ui/src/views/host/terminal/types/terminal.const.ts @@ -327,25 +327,22 @@ export const TransferType = { DOWNLOAD: 'download' }; -// 传输操作类型 -export const TransferOperatorType = { - UPLOAD_START: 'uploadStart', - UPLOAD_FINISH: 'uploadFinish', - UPLOAD_ERROR: 'uploadError', - DOWNLOAD_INIT: 'downloadInit', - DOWNLOAD_ABORT: 'downloadAbort', +// 传输操作 +export const TransferOperator = { + START: 'start', + FINISH: 'finish', + ERROR: 'error', + ABORT: 'abort', }; -// 传输响应类型 -export const TransferReceiverType = { - NEXT_TRANSFER: 'nextTransfer', - UPLOAD_NEXT_BLOCK: 'uploadNextBlock', - UPLOAD_FINISH: 'uploadFinish', - UPLOAD_ERROR: 'uploadError', - DOWNLOAD_START: 'downloadStart', - DOWNLOAD_PROGRESS: 'downloadProgress', - DOWNLOAD_FINISH: 'downloadFinish', - DOWNLOAD_ERROR: 'downloadError', +// 传输响应 +export const TransferReceiver = { + NEXT_PART: 'nextPart', + START: 'start', + PROGRESS: 'progress', + FINISH: 'finish', + ERROR: 'error', + ABORT: 'abort', }; // 会话关闭信息 diff --git a/orion-visor-ui/src/views/host/terminal/types/terminal.type.ts b/orion-visor-ui/src/views/host/terminal/types/terminal.type.ts index 78516898..fad45e5c 100644 --- a/orion-visor-ui/src/views/host/terminal/types/terminal.type.ts +++ b/orion-visor-ui/src/views/host/terminal/types/terminal.type.ts @@ -414,38 +414,41 @@ export interface ISftpTransferManager { cancelAllTransfer: () => void; } -// sftp 上传器定义 -export interface ISftpTransferUploader { - // 是否完成 - finish: boolean; - // 是否中断 - abort: boolean; - // 开始上传 - startUpload: () => void; - // 是否有下一个分片 - hasNextBlock: () => boolean; - // 上传下一个分片 - uploadNextBlock: () => Promise; - // 上传完成 - uploadFinish: () => void; - // 上传失败 - uploadError: (msg: string | undefined) => void; - // 上传中断 - uploadAbort: () => void; + +// sftp 传输处理回调定义 +export interface ISftpTransferCallback { + // 下一分片回调 + onNextPart: () => Promise; + // 开始回调 + onStart: (channelId: string, token: string) => void; + // 进度回调 + onProgress: (size: number) => void; + // 失败回调 + onError: (msg: string | undefined) => void; + // 完成回调 + onFinish: () => void; + // 中断回调 + onAbort: () => void; } -// sftp 下载器定义 -export interface ISftpTransferDownloader { +// sftp 传输处理器定义 +export interface ISftpTransferHandler extends ISftpTransferCallback { + // 类型 + type: string; + // 是否完成 + finished: boolean; // 是否中断 - abort: boolean; - // 初始化下载 - initDownload: () => void; - // 下载完成 - downloadFinish: () => void; - // 下载失败 - downloadError: (msg: string | undefined) => void; - // 下载中断 - downloadAbort: () => void; + aborted: boolean; + // 开始 + start: () => void; + // 完成 + finish: () => void; + // 失败 + error: () => void; + // 中断 + abort: () => void; + // 是否有下一个分片 + hasNextPart: () => boolean; } // sftp 上传文件项