🔨 文件下载.

This commit is contained in:
lijiahangmax
2024-02-23 00:47:52 +08:00
parent 3d38246c25
commit c19680b213
30 changed files with 686 additions and 224 deletions

View File

@@ -81,4 +81,8 @@ public interface ErrorMessage {
String OPERATE_ERROR = "操作失败";
String UNKNOWN_TYPE = "未知类型";
String FILE_ABSENT = "文件不存在";
}

View File

@@ -27,4 +27,16 @@ public interface AssetThreadPools {
.allowCoreThreadTimeout(true)
.build();
/**
* SFTP 下载线程池
*/
ThreadPoolExecutor SFTP_DOWNLOAD_SCHEDULER = ExecutorBuilder.create()
.namedThreadFactory("sftp-download-thread-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)
.workQueue(new SynchronousQueue<>())
.allowCoreThreadTimeout(true)
.build();
}

View File

@@ -124,12 +124,12 @@ public enum InputTypeEnum {
SftpChangeModRequest.class),
/**
* SFTP 下载文件夹 flat
* SFTP 下载文件夹展开文件
*/
SFTP_DOWNLOAD_DIRECTORY_FLAT("df",
SftpDownloadDirectoryFlatHandler.class,
SFTP_DOWNLOAD_FLAT_DIRECTORY("df",
SftpDownloadFlatDirectoryHandler.class,
new String[]{"type", "sessionId", "currentPath", "path"},
SftpDownloadDirectoryFlatRequest.class),
SftpDownloadFlatDirectoryRequest.class),
/**
* SFTP 获取内容

View File

@@ -76,9 +76,9 @@ public enum OutputTypeEnum {
SFTP_CHMOD("cm", "${type}|${sessionId}|${result}|${msg}"),
/**
* SFTP 下载文件夹 flat
* SFTP 下载文件夹展开文件
*/
SFTP_DOWNLOAD_DIRECTORY_FLAT("df", "${type}|${sessionId}|${currentPath}|${body}"),
SFTP_DOWNLOAD_FLAT_DIRECTORY("df", "${type}|${sessionId}|${currentPath}|${body}"),
/**
* SFTP 获取文件内容

View File

@@ -1,16 +1,22 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.collect.Lists;
import com.orion.ops.framework.common.enums.BooleanBit;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.model.request.SftpDownloadDirectoryFlatRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.request.SftpDownloadFlatDirectoryRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.response.SftpDownloadDirectoryFlatResponse;
import com.orion.ops.module.asset.handler.host.terminal.model.response.SftpFileVO;
import com.orion.ops.module.asset.handler.host.terminal.session.ISftpSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.util.Arrays;
import java.util.List;
/**
* sftp 下载文件夹 flat
* sftp 下载文件夹展开文件
*
* @author Jiahang Li
* @version 1.0.0
@@ -18,31 +24,30 @@ import org.springframework.web.socket.WebSocketSession;
*/
@Slf4j
@Component
public class SftpDownloadDirectoryFlatHandler extends AbstractTerminalHandler<SftpDownloadDirectoryFlatRequest> {
public class SftpDownloadFlatDirectoryHandler extends AbstractTerminalHandler<SftpDownloadFlatDirectoryRequest> {
@Override
public void handle(WebSocketSession channel, SftpDownloadDirectoryFlatRequest payload) {
public void handle(WebSocketSession channel, SftpDownloadFlatDirectoryRequest payload) {
// 获取会话
ISftpSession session = terminalManager.getSession(channel.getId(), payload.getSessionId());
String path = payload.getPath();
log.info("SftpDownloadDirectoryFlatHandler-handle session: {}, path: {}", payload.getSessionId(), path);
String[] paths = payload.getPath().split("\\|");
log.info("SftpDownloadFlatDirectoryHandler-handle session: {}, paths: {}", payload.getSessionId(), Arrays.toString(paths));
Exception ex = null;
// 获取文件夹内的全部文件
List<SftpFileVO> files = Lists.empty();
// 展开文件夹内的全部文件
try {
// TODO
files = session.flatDirectory(paths);
} catch (Exception e) {
log.error("SftpDownloadDirectoryFlatHandler-handle error", e);
log.error("SftpDownloadFlatDirectoryHandler-handle error", e);
ex = e;
}
// 返回
this.send(channel,
OutputTypeEnum.SFTP_DOWNLOAD_DIRECTORY_FLAT,
OutputTypeEnum.SFTP_DOWNLOAD_FLAT_DIRECTORY,
SftpDownloadDirectoryFlatResponse.builder()
.sessionId(payload.getSessionId())
.currentPath(payload.getPath())
// TODO
.body("")
.currentPath(payload.getCurrentPath())
.body(JSON.toJSONString(files))
.result(BooleanBit.of(ex == null).getValue())
.msg(this.getErrorMessage(ex))
.build());

View File

@@ -9,6 +9,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.util.Arrays;
/**
* sftp 删除文件
*
@@ -25,7 +27,7 @@ public class SftpRemoveHandler extends AbstractTerminalHandler<SftpBaseRequest>
// 获取会话
ISftpSession session = terminalManager.getSession(channel.getId(), payload.getSessionId());
String[] paths = payload.getPath().split("\\|");
log.info("SftpRemoveHandler-handle session: {}, path: {}", payload.getSessionId(), paths);
log.info("SftpRemoveHandler-handle session: {}, path: {}", payload.getSessionId(), Arrays.toString(paths));
Exception ex = null;
// 删除
try {

View File

@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* sftp 下载文件夹 flat 实体对象
* sftp 下载文件夹展开文件 实体对象
* <p>
* i|eff00a1|currentPath|path
*
@@ -21,10 +21,10 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Schema(name = "SftpDownloadDirectoryFlatRequest", description = "sftp 下载文件夹 flat 实体对象")
public class SftpDownloadDirectoryFlatRequest extends SftpBaseRequest {
@Schema(name = "SftpDownloadDirectoryFlatRequest", description = "sftp 下载文件夹展开文件 实体对象")
public class SftpDownloadFlatDirectoryRequest extends SftpBaseRequest {
@Schema(description = "当前路径")
private Integer currentPath;
private String currentPath;
}

View File

@@ -78,6 +78,14 @@ public interface ISftpSession extends ITerminalSession {
*/
void chmod(String path, int mod);
/**
* 展开文件夹内的所有文件
*
* @param paths paths
* @return files
*/
List<SftpFileVO> flatDirectory(String[] paths);
/**
* 获取内容
*

View File

@@ -16,10 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -107,6 +104,15 @@ public class SftpSession extends TerminalSession implements ISftpSession {
executor.changeMode(path, mod);
}
@Override
public List<SftpFileVO> flatDirectory(String[] paths) {
return Arrays.stream(paths)
.map(s -> executor.listFiles(s, true, false))
.flatMap(Collection::stream)
.map(SftpSession::fileMapping)
.collect(Collectors.toList());
}
@Override
public String getContent(String path) {
path = Valid.checkNormalize(path);

View File

@@ -17,20 +17,36 @@ public enum TransferOperatorType {
/**
* 开始上传
*/
UPLOAD_START("uploadStart"),
UPLOAD_START(TransferOperatorType.UPLOAD, "uploadStart"),
/**
* 上传完成
*/
UPLOAD_FINISH("uploadFinish"),
UPLOAD_FINISH(TransferOperatorType.UPLOAD, "uploadFinish"),
/**
* 上传失败
*/
UPLOAD_ERROR("uploadError"),
UPLOAD_ERROR(TransferOperatorType.UPLOAD, "uploadError"),
/**
* 开始下载
*/
DOWNLOAD_START(TransferOperatorType.DOWNLOAD, "downloadStart"),
/**
* 中断下载
*/
DOWNLOAD_ABORT(TransferOperatorType.DOWNLOAD, "downloadAbort"),
;
public static final String UPLOAD = "UPLOAD";
public static final String DOWNLOAD = "DOWNLOAD";
private final String operator;
private final String type;
public static TransferOperatorType of(String type) {

View File

@@ -14,16 +14,36 @@ import lombok.Getter;
@AllArgsConstructor
public enum TransferReceiverType {
/**
* 请求下一块上传数据
*/
NEXT_BLOCK("nextBlock"),
/**
* 请求下一个传输任务
*/
NEXT_TRANSFER("nextTransfer"),
/**
* 请求下一块上传数据
*/
UPLOAD_NEXT_BLOCK("uploadNextBlock"),
/**
* 上传完成
*/
UPLOAD_FINISH("uploadFinish"),
/**
* 上传失败
*/
UPLOAD_ERROR("uploadError"),
/**
* 下载完成
*/
DOWNLOAD_FINISH("downloadFinish"),
/**
* 下载失败
*/
DOWNLOAD_ERROR("downloadError"),
;
private final String type;

View File

@@ -1,27 +1,22 @@
package com.orion.ops.module.asset.handler.host.transfer.handler;
import com.alibaba.fastjson.JSON;
import com.orion.lang.exception.argument.InvalidArgumentException;
import com.orion.lang.utils.Exceptions;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse;
import com.orion.ops.module.asset.handler.host.transfer.session.ITransferHostSession;
import com.orion.ops.module.asset.handler.host.transfer.session.TransferHostSession;
import com.orion.ops.module.asset.handler.host.transfer.session.*;
import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils;
import com.orion.ops.module.asset.service.HostTerminalService;
import com.orion.spring.SpringHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -48,7 +43,7 @@ public class TransferHandler implements ITransferHandler {
/**
* 会话列表
*/
private final ConcurrentHashMap<Long, ITransferHostSession> sessions;
private final ConcurrentHashMap<String, ITransferHostSession> sessions;
public TransferHandler(WebSocketSession channel) {
this.channel = channel;
@@ -61,22 +56,30 @@ public class TransferHandler implements ITransferHandler {
// 解析消息类型
TransferOperatorType type = TransferOperatorType.of(payload.getType());
// 获取会话
if (!this.getAndInitSession(payload)) {
if (!this.getAndInitSession(payload, type)) {
return;
}
// 处理消息
switch (type) {
case UPLOAD_START:
// 准备上传
this.uploadStart(payload);
// 开始上传
((IUploadSession) currentSession).startUpload(payload.getPath());
break;
case UPLOAD_FINISH:
// 上传完成
this.uploadFinish();
((IUploadSession) currentSession).uploadFinish();
break;
case UPLOAD_ERROR:
// 上传失败
this.uploadError();
((IUploadSession) currentSession).uploadError();
break;
case DOWNLOAD_START:
// 开始下载
((IDownloadSession) currentSession).startDownload(payload.getPath());
break;
case DOWNLOAD_ABORT:
// 中断下载
((IDownloadSession) currentSession).abortDownload();
break;
default:
break;
@@ -85,119 +88,49 @@ public class TransferHandler implements ITransferHandler {
@Override
public void putContent(byte[] content) {
try {
// 写入内容
currentSession.putContent(content);
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_BLOCK, null);
} catch (IOException e) {
log.error("TransferHandler.putContent error", e);
// 写入完成
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
}
}
/**
* 准备上传
*
* @param payload payload
*/
private void uploadStart(TransferOperatorRequest payload) {
try {
// 开始上传
currentSession.startUpload(payload.getPath());
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_BLOCK, null);
} catch (Exception e) {
log.error("TransferHandler.uploadStart error", e);
// 传输完成
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
}
}
/**
* 上传完成
*/
private void uploadFinish() {
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, null);
}
/**
* 上传失败
*/
private void uploadError() {
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, new InvalidArgumentException(Const.EMPTY));
((IUploadSession) currentSession).putContent(content);
}
/**
* 获取并且初始化会话
*
* @param payload payload
* @param type type
* @return success
*/
private boolean getAndInitSession(TransferOperatorRequest payload) {
private boolean getAndInitSession(TransferOperatorRequest payload, TransferOperatorType type) {
Long hostId = payload.getHostId();
String sessionKey = hostId + "_" + type.getOperator();
try {
// 获取会话
ITransferHostSession session = sessions.get(hostId);
ITransferHostSession session = sessions.get(sessionKey);
if (session == null) {
// 获取主机信息
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId, this.userId, HostConnectTypeEnum.SFTP);
SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo);
// 打开会话并初始化
session = new TransferHostSession(connectInfo, sessionStore);
if (TransferOperatorType.UPLOAD.equals(type.getOperator())) {
// 上传操作
session = new UploadSession(connectInfo, sessionStore, this.channel);
} else if (TransferOperatorType.DOWNLOAD.equals(type.getOperator())) {
// 下载操作
session = new DownloadSession(connectInfo, sessionStore, this.channel);
} else {
throw Exceptions.invalidArgument(ErrorMessage.UNKNOWN_TYPE);
}
session.init();
this.currentSession = session;
sessions.put(hostId, session);
sessions.put(sessionKey, session);
}
this.currentSession = session;
return true;
} catch (Exception e) {
log.error("TransferHandler.getAndInitSession error", e);
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
TransferUtils.sendMessage(this.channel, TransferReceiverType.NEXT_TRANSFER, e);
return false;
}
}
/**
* 发送消息
*
* @param type type
* @param ex ex
*/
private void sendMessage(TransferReceiverType type, Exception ex) {
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(type.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
* 获取错误信息
*
* @param ex ex
* @return msg
*/
private String getErrorMessage(Exception ex) {
if (ex == null) {
return null;
}
if (ex instanceof InvalidArgumentException) {
return ex.getMessage();
}
return ErrorMessage.OPERATE_ERROR;
}
@Override
public void close() {
sessions.values().forEach(Streams::close);

View File

@@ -0,0 +1,96 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.utils.Valid;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.InputStream;
/**
* 下载会话实现
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/22 22:25
*/
@Slf4j
public class DownloadSession extends TransferHostSession implements IDownloadSession {
private InputStream inputStream;
public DownloadSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
super(connectInfo, sessionStore, channel);
}
@Override
public void startDownload(String path) {
try {
// 检查连接
this.init();
// 检查文件是否存在
SftpFile file = executor.getFile(path);
Valid.notNull(file, ErrorMessage.FILE_ABSENT);
// 打开输入流
this.inputStream = executor.openInputStream(path);
} catch (Exception e) {
log.error("DownloadSession.uploadStart error", e);
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
return;
}
// 异步读取文件内容 FIXME bug
AssetThreadPools.TERMINAL_SCHEDULER.execute(() -> {
try {
byte[] buffer = new byte[Const.BUFFER_KB_32];
int len;
// 响应文件内容
while ((len = this.inputStream.read(buffer)) != -1) {
this.channel.sendMessage(new BinaryMessage(buffer, 0, len, true));
}
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
} catch (Exception e) {
log.error("DownloadSession.transfer error", e);
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
} finally {
this.closeStream();
}
});
}
@Override
public void abortDownload() {
log.info("DownloadSession.abortDownload");
// 关闭流
Streams.close(inputStream);
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
}
@Override
public void close() {
super.close();
this.closeStream();
}
/**
* 关闭流
*/
private void closeStream() {
// 关闭流
Streams.close(inputStream);
this.inputStream = null;
}
}

View File

@@ -0,0 +1,24 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
/**
* 下载会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/22 22:25
*/
public interface IDownloadSession {
/**
* 开始下载
*
* @param path path
*/
void startDownload(String path);
/**
* 停止下载
*/
void abortDownload();
}

View File

@@ -2,8 +2,6 @@ package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.able.SafeCloseable;
import java.io.IOException;
/**
* 主机传输会话定义
*
@@ -18,25 +16,4 @@ public interface ITransferHostSession extends SafeCloseable {
*/
void init();
/**
* 开始上传
*
* @param path path
* @throws IOException IOException
*/
void startUpload(String path) throws IOException;
/**
* 写入内容
*
* @param bytes bytes
* @throws IOException IOException
*/
void putContent(byte[] bytes) throws IOException;
/**
* 写入完成
*/
void putFinish();
}

View File

@@ -0,0 +1,36 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
/**
* 上传会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/22 22:03
*/
public interface IUploadSession extends ITransferHostSession {
/**
* 开始上传
*
* @param path path
*/
void startUpload(String path);
/**
* 写入内容
*
* @param bytes bytes
*/
void putContent(byte[] bytes);
/**
* 上传完成
*/
void uploadFinish();
/**
* 上传失败
*/
void uploadError();
}

View File

@@ -3,11 +3,8 @@ package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import java.io.IOException;
import java.io.OutputStream;
import org.springframework.web.socket.WebSocketSession;
/**
* 主机传输会话实现
@@ -16,19 +13,20 @@ import java.io.OutputStream;
* @version 1.0.0
* @since 2024/2/21 21:12
*/
public class TransferHostSession implements ITransferHostSession {
public abstract class TransferHostSession implements ITransferHostSession {
private final HostTerminalConnectDTO connectInfo;
protected final HostTerminalConnectDTO connectInfo;
private final SessionStore sessionStore;
protected final SessionStore sessionStore;
private SftpExecutor executor;
protected final WebSocketSession channel;
private OutputStream currentOutputStream;
protected SftpExecutor executor;
public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore) {
public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
this.connectInfo = connectInfo;
this.sessionStore = sessionStore;
this.channel = channel;
}
@Override
@@ -45,35 +43,10 @@ public class TransferHostSession implements ITransferHostSession {
}
}
@Override
public void startUpload(String path) throws IOException {
// 检查连接
this.init();
SftpFile file = executor.getFile(path);
if (file != null) {
// 文件存在则重命名
executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis());
}
// 打开输出流
this.currentOutputStream = executor.openOutputStream(path);
}
@Override
public void putContent(byte[] bytes) throws IOException {
currentOutputStream.write(bytes);
}
@Override
public void putFinish() {
Streams.close(currentOutputStream);
this.currentOutputStream = null;
}
@Override
public void close() {
Streams.close(executor);
Streams.close(sessionStore);
Streams.close(currentOutputStream);
}
}

View File

@@ -0,0 +1,99 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.exception.argument.InvalidArgumentException;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.io.OutputStream;
/**
* 上传会话实现
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/22 22:04
*/
@Slf4j
public class UploadSession extends TransferHostSession implements IUploadSession {
private OutputStream outputStream;
public UploadSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
super(connectInfo, sessionStore, channel);
}
@Override
public void startUpload(String path) {
try {
// 检查连接
this.init();
// 检查文件是否存在
SftpFile file = executor.getFile(path);
if (file != null) {
// 文件存在则重命名
executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis());
}
// 打开输出流
this.outputStream = executor.openOutputStream(path);
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null);
} catch (Exception e) {
log.error("UploadSession.uploadStart error", e);
this.closeStream();
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e);
}
}
@Override
public void putContent(byte[] bytes) {
try {
// 写入内容
outputStream.write(bytes);
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null);
} catch (IOException e) {
log.error("UploadSession.putContent error", e);
this.closeStream();
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e);
}
}
@Override
public void uploadFinish() {
this.closeStream();
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_FINISH, null);
}
@Override
public void uploadError() {
this.closeStream();
// 响应结果
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, new InvalidArgumentException((String) null));
}
@Override
public void close() {
super.close();
this.closeStream();
}
/**
* 关闭流
*/
private void closeStream() {
// 关闭流
Streams.close(outputStream);
this.outputStream = null;
}
}

View File

@@ -0,0 +1,55 @@
package com.orion.ops.module.asset.handler.host.transfer.utils;
import com.alibaba.fastjson.JSON;
import com.orion.lang.exception.argument.InvalidArgumentException;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse;
import org.springframework.web.socket.WebSocketSession;
/**
* 传输工具类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/22 22:14
*/
public class TransferUtils {
private TransferUtils() {
}
/**
* 发送消息
*
* @param channel channel
* @param type type
* @param ex ex
*/
public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex) {
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(type.getType())
.success(ex == null)
.msg(TransferUtils.getErrorMessage(ex))
.build();
WebSockets.sendText(channel, JSON.toJSONString(resp));
}
/**
* 获取错误信息
*
* @param ex ex
* @return msg
*/
public static String getErrorMessage(Exception ex) {
if (ex == null) {
return null;
}
if (ex instanceof InvalidArgumentException) {
return ex.getMessage();
}
return ErrorMessage.OPERATE_ERROR;
}
}

View File

@@ -57,6 +57,14 @@ export function getPath(path: string) {
.replace(new RegExp('/+', 'g'), '/');
}
/**
* 获取文件名
*/
export function getFileName(path: string) {
path = getPath(path);
return path.substring(path.lastIndexOf('/') + 1);
}
/**
* 获取父级路径
*/

View File

@@ -156,9 +156,11 @@
// 添加普通文件到下载队列
const normalFiles = files.filter(s => !s.isDir);
transferManager.addDownload(props.tab.hostId as number, currentPath.value, normalFiles);
// 将文件夹转为普通文件
const directoryFiles = files.filter(s => s.isDir);
// TODO
// 将文件夹展开普通文件
const directoryPaths = files.filter(s => s.isDir).map(s => s.path);
if (directoryPaths.length) {
session.value?.downloadFlatDirectory(currentPath.value, directoryPaths);
}
};
// 连接成功回调
@@ -224,6 +226,12 @@
Message.success('保存成功');
};
// 接收下载文件夹展开文件响应
const resolveDownloadFlatDirectory = (currentPath: string, list: Array<SftpFile>) => {
setTableLoading(false);
transferManager.addDownload(props.tab.hostId as number, currentPath, list);
};
// 初始化会话
onMounted(async () => {
// 创建终端处理器
@@ -236,6 +244,7 @@
resolveSftpMove: resolveFileAction,
resolveSftpRemove: resolveFileAction,
resolveSftpChmod: resolveFileAction,
resolveDownloadFlatDirectory,
resolveSftpGetContent,
resolveSftpSetContent,
});

View File

@@ -190,6 +190,7 @@
text-overflow: ellipsis;
width: fit-content;
max-width: 100%;
white-space: nowrap;
}
.transfer-progress, .target-path, .error-message {

View File

@@ -107,6 +107,16 @@ export default class SftpSession implements ISftpSession {
});
};
// 下载文件夹展开文件
downloadFlatDirectory(currentPath: string, path: string[]) {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_DOWNLOAD_FLAT_DIRECTORY, {
sessionId: this.sessionId,
currentPath,
path: path.join('|')
});
}
// 获取内容
getContent(path: string) {
this.channel.send(InputProtocol.SFTP_GET_CONTENT, {

View File

@@ -0,0 +1,79 @@
import type { ISftpTransferDownloader, SftpTransferItem } from '../types/terminal.type';
import { TransferOperatorType, TransferStatus } from '../types/terminal.const';
import { getFileName, getPath } from '@/utils/file';
import { saveAs } from 'file-saver';
// sftp 上传器实现
export default class SftpTransferDownloader implements ISftpTransferDownloader {
public abort: boolean;
private blobArr: Array<Blob>;
private client: WebSocket;
private item: SftpTransferItem;
constructor(item: SftpTransferItem, client: WebSocket) {
this.abort = false;
this.blobArr = [];
this.item = item;
this.client = client;
}
// 开始下载
startDownload() {
this.item.status = TransferStatus.TRANSFERRING;
// 发送开始下载信息
this.client?.send(JSON.stringify({
type: TransferOperatorType.DOWNLOAD_START,
path: getPath(this.item.parentPath + '/' + this.item.name),
hostId: this.item.hostId
}));
}
// 接收 blob
resolveBlob(blob: Blob) {
this.blobArr.push(blob);
this.item.currentSize += blob.size;
}
// 下载完成
downloadFinish() {
console.log(this.abort);
if (this.abort) {
// 中断则不触发下载
return;
}
// fixme bug
try {
console.log('saveAs');
// 触发下载
saveAs(new Blob(this.blobArr, {
type: 'application/octet-stream'
}), getFileName(this.item.name));
this.item.status = TransferStatus.SUCCESS;
} catch (e) {
this.item.status = TransferStatus.ERROR;
this.item.errorMessage = '保存失败';
} finally {
this.blobArr = [];
}
}
// 下载失败
downloadError(msg: string | undefined) {
this.blobArr = [];
this.item.status = TransferStatus.ERROR;
this.item.errorMessage = msg || '下载失败';
}
// 下载中断
downloadAbort() {
this.abort = true;
// 发送下载中断信息
this.client?.send(JSON.stringify({
type: TransferOperatorType.DOWNLOAD_ABORT,
hostId: this.item.hostId
}));
}
}

View File

@@ -1,10 +1,11 @@
import type { ISftpTransferManager, ISftpTransferUploader, SftpTransferItem } from '../types/terminal.type';
import { SftpFile, TransferOperatorResponse } from '../types/terminal.type';
import { ISftpTransferDownloader, SftpFile, TransferOperatorResponse } from '../types/terminal.type';
import { TransferReceiverType, TransferStatus, TransferType } from '../types/terminal.const';
import { Message } from '@arco-design/web-vue';
import { getTerminalAccessToken } from '@/api/asset/host-terminal';
import SftpTransferUploader from '@/views/host/terminal/handler/sftp-transfer-uploader';
import { nextId } from '@/utils';
import SftpTransferUploader from './sftp-transfer-uploader';
import SftpTransferDownloader from './sftp-transfer-downloader';
export const wsBase = import.meta.env.VITE_WS_BASE_URL;
@@ -19,6 +20,8 @@ export default class SftpTransferManager implements ISftpTransferManager {
private currentUploader?: ISftpTransferUploader;
private currentDownloader?: ISftpTransferDownloader;
public transferList: Array<SftpTransferItem>;
constructor() {
@@ -57,7 +60,8 @@ export default class SftpTransferManager implements ISftpTransferManager {
fileId: nextId(10),
type: TransferType.DOWNLOAD,
hostId: hostId,
name: s.path.substring(currentPath.length),
name: s.path.substring(currentPath.length + 1),
parentPath: currentPath,
currentSize: 0,
totalSize: s.size,
status: TransferStatus.WAITING,
@@ -81,6 +85,8 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 传输中则中断传输
if (this.currentUploader) {
this.currentUploader.uploadAbort();
} else if (this.currentDownloader) {
this.currentDownloader.downloadAbort();
}
}
// 从列表中移除
@@ -121,6 +127,7 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 传输下一条任务
private transferNextItem() {
this.currentUploader = undefined;
this.currentDownloader = undefined;
// 释放内存
if (this.currentItem) {
this.currentItem.file = null as unknown as File;
@@ -144,13 +151,27 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 接收消息
private async resolveMessage(message: MessageEvent) {
const data = JSON.parse(message.data) as TransferOperatorResponse;
if (data.type === TransferReceiverType.NEXT_BLOCK) {
// 接收下一块上传数据
await this.resolveNextBlock();
} else if (data.type === TransferReceiverType.NEXT_TRANSFER) {
// 接收接收下一个传输任务处理完成
this.resolveNextTransfer(data);
if (message.data instanceof Blob) {
// 二进制消息 下载数据
this.resolveDownloadBlob(message.data);
} else {
// 文本消息
const data = JSON.parse(message.data) as TransferOperatorResponse;
if (data.type === TransferReceiverType.NEXT_TRANSFER
|| data.type === TransferReceiverType.UPLOAD_FINISH
|| data.type === TransferReceiverType.UPLOAD_ERROR) {
// 执行下一个传输任务
this.resolveNextTransfer(data);
} else if (data.type === TransferReceiverType.UPLOAD_NEXT_BLOCK) {
// 接收下一块上传数据
await this.resolveUploadNextBlock();
} else if (data.type === TransferReceiverType.DOWNLOAD_FINISH) {
// 下载完成
this.resolveDownloadFinish();
} else if (data.type === TransferReceiverType.DOWNLOAD_ERROR) {
// 下载失败
this.resolveDownloadError(data.msg);
}
}
}
@@ -164,11 +185,28 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 下载文件
private uploadDownload() {
// TODO
// 创建下载器
this.currentDownloader = new SftpTransferDownloader(this.currentItem as SftpTransferItem, this.client as WebSocket);
// 开始下载
this.currentDownloader.startDownload();
}
// 接收下一块上传数据
private async resolveNextBlock() {
// 接收下一个传输任务响应
private resolveNextTransfer(data: TransferOperatorResponse) {
if (this.currentItem) {
if (data.success) {
this.currentItem.status = TransferStatus.SUCCESS;
} else {
this.currentItem.status = TransferStatus.ERROR;
this.currentItem.errorMessage = data.msg || '传输失败';
}
}
// 开始下一个传输任务
this.transferNextItem();
}
// 接收下一块上传数据响应
private async resolveUploadNextBlock() {
// 只可能为上传并且成功
if (!this.currentUploader) {
return;
@@ -189,16 +227,21 @@ export default class SftpTransferManager implements ISftpTransferManager {
}
}
// 接收下一个传输任务
private resolveNextTransfer(data: TransferOperatorResponse) {
if (this.currentItem) {
if (data.success) {
this.currentItem.status = TransferStatus.SUCCESS;
} else {
this.currentItem.status = TransferStatus.ERROR;
this.currentItem.errorMessage = data.msg || '上传失败';
}
}
// 接收下载数据
private resolveDownloadBlob(blob: Blob) {
this.currentDownloader?.resolveBlob(blob);
}
// 接收下载完成响应
private resolveDownloadFinish() {
this.currentDownloader?.downloadFinish();
// 开始下一个传输任务
this.transferNextItem();
}
// 接收下载失败响应
private resolveDownloadError(msg: string | undefined) {
this.currentDownloader?.downloadError(msg);
// 开始下一个传输任务
this.transferNextItem();
}

View File

@@ -7,8 +7,8 @@ export const BLOCK_SIZE = 1024 * 1024;
// sftp 上传器实现
export default class SftpTransferUploader implements ISftpTransferUploader {
public abort: boolean;
public finish: boolean;
public abort: boolean;
private currentBlock: number;
private totalBlock: number;
private client: WebSocket;

View File

@@ -154,6 +154,13 @@ export default class TerminalOutputProcessor implements ITerminalOutputProcessor
session && session.resolver.resolveSftpChmod(result, msg);
}
// 处理 SFTP 下载文件夹展开文件
processDownloadFlatDirectory({ sessionId, currentPath, body }: OutputPayload): void {
// 获取会话
const session = this.sessionManager.getSession<ISftpSession>(sessionId);
session && session.resolver.resolveDownloadFlatDirectory(currentPath, JSON.parse(body));
}
// 处理 SFTP 获取文件内容
processSftpGetContent({ sessionId, path, result, content }: OutputPayload): void {
// 获取会话

View File

@@ -304,12 +304,18 @@ export const TransferOperatorType = {
UPLOAD_START: 'uploadStart',
UPLOAD_FINISH: 'uploadFinish',
UPLOAD_ERROR: 'uploadError',
DOWNLOAD_START: 'downloadStart',
DOWNLOAD_ABORT: 'downloadAbort',
};
// 传输响应类型
export const TransferReceiverType = {
NEXT_BLOCK: 'nextBlock',
NEXT_TRANSFER: 'nextTransfer',
UPLOAD_NEXT_BLOCK: 'uploadNextBlock',
UPLOAD_FINISH: 'uploadFinish',
UPLOAD_ERROR: 'uploadError',
DOWNLOAD_FINISH: 'downloadFinish',
DOWNLOAD_ERROR: 'downloadError',
};
// 打开 sshSettingModal key

View File

@@ -60,6 +60,11 @@ export const InputProtocol = {
type: 'cm',
template: ['type', 'sessionId', 'path', 'mod']
},
// SFTP 修改文件权限
SFTP_DOWNLOAD_FLAT_DIRECTORY: {
type: 'df',
template: ['type', 'sessionId', 'currentPath', 'path']
},
// SFTP 获取内容
SFTP_GET_CONTENT: {
type: 'gc',
@@ -140,6 +145,12 @@ export const OutputProtocol = {
template: ['type', 'sessionId', 'result', 'msg'],
processMethod: 'processSftpChmod'
},
// SFTP 修改文件权限
SFTP_DOWNLOAD_FLAT_DIRECTORY: {
type: 'df',
template: ['type', 'sessionId', 'currentPath', 'body'],
processMethod: 'processDownloadFlatDirectory'
},
// SFTP 获取文件内容
SFTP_GET_CONTENT: {
type: 'gc',

View File

@@ -190,6 +190,8 @@ export interface ITerminalOutputProcessor {
processSftpRemove: (payload: OutputPayload) => void;
// 处理 SFTP 修改文件权限
processSftpChmod: (payload: OutputPayload) => void;
// 处理 SFTP 下载文件夹展开文件
processDownloadFlatDirectory: (payload: OutputPayload) => void;
// 处理 SFTP 获取文件内容
processSftpGetContent: (payload: OutputPayload) => void;
// 处理 SFTP 修改文件内容
@@ -325,6 +327,8 @@ export interface ISftpSession extends ITerminalSession {
remove: (path: string[]) => void;
// 修改权限
chmod: (path: string, mod: number) => void;
// 下载文件夹展开文件
downloadFlatDirectory: (currentPath: string, path: string[]) => void;
// 获取内容
getContent: (path: string) => void;
// 修改内容
@@ -349,6 +353,8 @@ export interface ISftpSessionResolver {
resolveSftpRemove: (result: string, msg: string) => void;
// 接收修改文件权限响应
resolveSftpChmod: (result: string, msg: string) => void;
// 接收下载文件夹展开文件响应
resolveDownloadFlatDirectory: (currentPath: string, list: Array<SftpFile>) => void;
// 接收获取文件内容响应
resolveSftpGetContent: (path: string, result: string, content: string) => void;
// 接收修改文件内容响应
@@ -400,6 +406,22 @@ export interface ISftpTransferUploader {
uploadAbort: () => void;
}
// sftp 下载器定义
export interface ISftpTransferDownloader {
// 是否中断
abort: boolean;
// 开始下载
startDownload: () => void;
// 接收 blob
resolveBlob: (blob: Blob) => void;
// 下载完成
downloadFinish: () => void;
// 下载失败
downloadError: (msg: string | undefined) => void;
// 下载中断
downloadAbort: () => void;
}
// sftp 上传文件项
export interface SftpTransferItem {
fileId: string;