⚡ 优化文件上传逻辑.
This commit is contained in:
@@ -50,7 +50,7 @@ public class TransferMessageDispatcher extends AbstractWebSocketHandler {
|
||||
// 获取处理器
|
||||
ITransferHandler handler = hostTransferManager.getHandler(session.getId());
|
||||
// 添加数据
|
||||
handler.putContent(message.getPayload().array());
|
||||
handler.handleMessage(message.getPayload().array());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 传输操作类型
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 22:03
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TransferOperator {
|
||||
|
||||
/**
|
||||
* 开始传输
|
||||
*/
|
||||
START("start"),
|
||||
|
||||
/**
|
||||
* 传输完成
|
||||
*/
|
||||
FINISH("finish"),
|
||||
|
||||
/**
|
||||
* 传输失败
|
||||
*/
|
||||
ERROR("error"),
|
||||
|
||||
/**
|
||||
* 传输中断
|
||||
*/
|
||||
ABORT("abort"),
|
||||
|
||||
;
|
||||
|
||||
private final String type;
|
||||
|
||||
public static TransferOperator of(String type) {
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
for (TransferOperator value : values()) {
|
||||
if (value.type.equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 传输操作类型
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 22:03
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TransferOperatorType {
|
||||
|
||||
/**
|
||||
* 开始上传
|
||||
*/
|
||||
UPLOAD_START(TransferOperatorType.UPLOAD, "uploadStart"),
|
||||
|
||||
/**
|
||||
* 上传完成
|
||||
*/
|
||||
UPLOAD_FINISH(TransferOperatorType.UPLOAD, "uploadFinish"),
|
||||
|
||||
/**
|
||||
* 上传失败
|
||||
*/
|
||||
UPLOAD_ERROR(TransferOperatorType.UPLOAD, "uploadError"),
|
||||
|
||||
/**
|
||||
* 初始化下载
|
||||
*/
|
||||
DOWNLOAD_INIT(TransferOperatorType.DOWNLOAD, "downloadInit"),
|
||||
|
||||
/**
|
||||
* 中断下载
|
||||
*/
|
||||
DOWNLOAD_ABORT(TransferOperatorType.DOWNLOAD, "downloadAbort"),
|
||||
|
||||
;
|
||||
|
||||
public static final String UPLOAD = "UPLOAD";
|
||||
|
||||
public static final String DOWNLOAD = "DOWNLOAD";
|
||||
|
||||
private final String kind;
|
||||
|
||||
private final String type;
|
||||
|
||||
public static TransferOperatorType of(String type) {
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
for (TransferOperatorType value : values()) {
|
||||
if (value.type.equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 传输响应类型
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 22:03
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TransferReceiver {
|
||||
|
||||
/**
|
||||
* 请求下一分片
|
||||
*/
|
||||
NEXT_PART("nextPart"),
|
||||
|
||||
/**
|
||||
* 开始
|
||||
*/
|
||||
START("start"),
|
||||
|
||||
/**
|
||||
* 进度
|
||||
*/
|
||||
PROGRESS("progress"),
|
||||
|
||||
/**
|
||||
* 完成
|
||||
*/
|
||||
FINISH("finish"),
|
||||
|
||||
/**
|
||||
* 失败
|
||||
*/
|
||||
ERROR("error"),
|
||||
|
||||
/**
|
||||
* 关闭
|
||||
*/
|
||||
ABORT("abort"),
|
||||
|
||||
;
|
||||
|
||||
private final String type;
|
||||
|
||||
public static TransferReceiver of(String type) {
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
for (TransferReceiver value : values()) {
|
||||
if (value.type.equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 传输响应类型
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 22:03
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TransferReceiverType {
|
||||
|
||||
/**
|
||||
* 请求下一个传输任务
|
||||
*/
|
||||
NEXT_TRANSFER("nextTransfer"),
|
||||
|
||||
/**
|
||||
* 请求下一块上传数据
|
||||
*/
|
||||
UPLOAD_NEXT_BLOCK("uploadNextBlock"),
|
||||
|
||||
/**
|
||||
* 上传完成
|
||||
*/
|
||||
UPLOAD_FINISH("uploadFinish"),
|
||||
|
||||
/**
|
||||
* 上传失败
|
||||
*/
|
||||
UPLOAD_ERROR("uploadError"),
|
||||
|
||||
/**
|
||||
* 开始下载
|
||||
*/
|
||||
DOWNLOAD_START("downloadStart"),
|
||||
|
||||
/**
|
||||
* 下载进度
|
||||
*/
|
||||
DOWNLOAD_PROGRESS("downloadProgress"),
|
||||
|
||||
/**
|
||||
* 下载完成
|
||||
*/
|
||||
DOWNLOAD_FINISH("downloadFinish"),
|
||||
|
||||
/**
|
||||
* 下载失败
|
||||
*/
|
||||
DOWNLOAD_ERROR("downloadError"),
|
||||
|
||||
;
|
||||
|
||||
private final String type;
|
||||
|
||||
public static TransferReceiverType of(String type) {
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
for (TransferReceiverType value : values()) {
|
||||
if (value.type.equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 传输类型
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/7/12 12:41
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TransferType {
|
||||
|
||||
/**
|
||||
* 上传
|
||||
*/
|
||||
UPLOAD("upload"),
|
||||
|
||||
/**
|
||||
* 下载
|
||||
*/
|
||||
DOWNLOAD("download"),
|
||||
|
||||
;
|
||||
|
||||
private final String type;
|
||||
|
||||
public static TransferType of(String type) {
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
for (TransferType value : values()) {
|
||||
if (value.type.equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,9 +2,7 @@ package com.orion.visor.module.asset.handler.host.transfer.handler;
|
||||
|
||||
import com.orion.lang.able.SafeCloseable;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.IDownloadSession;
|
||||
|
||||
import java.util.Map;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.ITransferSession;
|
||||
|
||||
/**
|
||||
* 传输处理器定义
|
||||
@@ -16,24 +14,25 @@ import java.util.Map;
|
||||
public interface ITransferHandler extends SafeCloseable {
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
* 处理文本消息
|
||||
*
|
||||
* @param payload payload
|
||||
*/
|
||||
void handleMessage(TransferOperatorRequest payload);
|
||||
|
||||
/**
|
||||
* 写入内容
|
||||
* 处理二进制消息
|
||||
*
|
||||
* @param content content
|
||||
*/
|
||||
void putContent(byte[] content);
|
||||
void handleMessage(byte[] content);
|
||||
|
||||
/**
|
||||
* 获取 token sessions
|
||||
* 通过 token 获取 session
|
||||
*
|
||||
* @return token sessions
|
||||
* @param token token
|
||||
* @return session
|
||||
*/
|
||||
Map<String, IDownloadSession> getTokenSessions();
|
||||
ITransferSession getSessionByToken(String token);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.handler;
|
||||
|
||||
import com.orion.lang.id.UUIds;
|
||||
import com.orion.lang.utils.Exceptions;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.net.host.SessionStore;
|
||||
import com.orion.spring.SpringHolder;
|
||||
import com.orion.visor.framework.common.constant.ErrorMessage;
|
||||
import com.orion.visor.framework.common.constant.ExtraFieldConst;
|
||||
import com.orion.visor.framework.websocket.core.utils.WebSockets;
|
||||
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferOperatorType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferOperator;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.*;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.DownloadSession;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.ITransferSession;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.UploadSession;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import com.orion.visor.module.asset.service.HostTerminalService;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
@@ -33,98 +33,72 @@ public class TransferHandler implements ITransferHandler {
|
||||
|
||||
private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
|
||||
|
||||
private final Long userId;
|
||||
|
||||
private final WebSocketSession channel;
|
||||
|
||||
/**
|
||||
* 当前会话
|
||||
*/
|
||||
private ITransferHostSession currentSession;
|
||||
private ITransferSession currentSession;
|
||||
|
||||
/**
|
||||
* 会话列表
|
||||
*/
|
||||
private final ConcurrentHashMap<String, ITransferHostSession> sessions;
|
||||
|
||||
@Getter
|
||||
private final ConcurrentHashMap<String, IDownloadSession> tokenSessions;
|
||||
private final ConcurrentHashMap<String, ITransferSession> sessions;
|
||||
|
||||
public TransferHandler(WebSocketSession channel) {
|
||||
this.channel = channel;
|
||||
this.userId = WebSockets.getAttr(channel, ExtraFieldConst.USER_ID);
|
||||
this.sessions = new ConcurrentHashMap<>();
|
||||
this.tokenSessions = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(TransferOperatorRequest payload) {
|
||||
// 解析消息类型
|
||||
TransferOperatorType type = TransferOperatorType.of(payload.getType());
|
||||
TransferOperator operator = TransferOperator.of(payload.getOperator());
|
||||
// 获取会话
|
||||
if (!this.getAndInitSession(payload, type)) {
|
||||
if (!this.getAndInitSession(payload)) {
|
||||
return;
|
||||
}
|
||||
// 处理消息
|
||||
switch (type) {
|
||||
case UPLOAD_START:
|
||||
// 开始上传
|
||||
((IUploadSession) currentSession).startUpload(payload.getPath());
|
||||
break;
|
||||
case UPLOAD_FINISH:
|
||||
// 上传完成
|
||||
((IUploadSession) currentSession).uploadFinish();
|
||||
break;
|
||||
case UPLOAD_ERROR:
|
||||
// 上传失败
|
||||
((IUploadSession) currentSession).uploadError();
|
||||
break;
|
||||
case DOWNLOAD_INIT:
|
||||
// 开始下载
|
||||
String token = UUIds.random32();
|
||||
tokenSessions.put(token, (IDownloadSession) currentSession);
|
||||
((IDownloadSession) currentSession).downloadInit(payload.getPath(), token);
|
||||
break;
|
||||
case DOWNLOAD_ABORT:
|
||||
// 中断下载
|
||||
((IDownloadSession) currentSession).abortDownload();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
if (TransferOperator.START.equals(operator)) {
|
||||
// 开始
|
||||
currentSession.setToken(UUIds.random32());
|
||||
currentSession.onStart(payload);
|
||||
} else if (TransferOperator.FINISH.equals(operator)) {
|
||||
// 完成
|
||||
currentSession.onFinish(payload);
|
||||
} else if (TransferOperator.ERROR.equals(operator)) {
|
||||
// 失败
|
||||
currentSession.onError(payload);
|
||||
} else if (TransferOperator.ABORT.equals(operator)) {
|
||||
// 中断
|
||||
currentSession.onAbort(payload);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putContent(byte[] content) {
|
||||
((IUploadSession) currentSession).putContent(content);
|
||||
public void handleMessage(byte[] content) {
|
||||
currentSession.handleBinary(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取并且初始化会话
|
||||
*
|
||||
* @param payload payload
|
||||
* @param type type
|
||||
* @return success
|
||||
*/
|
||||
private boolean getAndInitSession(TransferOperatorRequest payload, TransferOperatorType type) {
|
||||
private boolean getAndInitSession(TransferOperatorRequest payload) {
|
||||
Long hostId = payload.getHostId();
|
||||
String sessionKey = hostId + "_" + type.getKind();
|
||||
TransferType type = TransferType.of(payload.getType());
|
||||
String sessionKey = hostId + "_" + type.getType();
|
||||
try {
|
||||
// 获取会话
|
||||
ITransferHostSession session = sessions.get(sessionKey);
|
||||
ITransferSession session = sessions.get(sessionKey);
|
||||
if (session == null) {
|
||||
// 获取主机信息
|
||||
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(this.userId, hostId);
|
||||
// 获取主机连接信息
|
||||
Long userId = WebSockets.getAttr(channel, ExtraFieldConst.USER_ID);
|
||||
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(userId, hostId);
|
||||
SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo);
|
||||
// 打开会话并初始化
|
||||
if (TransferOperatorType.UPLOAD.equals(type.getKind())) {
|
||||
if (TransferType.UPLOAD.equals(type)) {
|
||||
// 上传操作
|
||||
session = new UploadSession(connectInfo, sessionStore, this.channel);
|
||||
} else if (TransferOperatorType.DOWNLOAD.equals(type.getKind())) {
|
||||
} else if (TransferType.DOWNLOAD.equals(type)) {
|
||||
// 下载操作
|
||||
session = new DownloadSession(connectInfo, sessionStore, this.channel);
|
||||
} else {
|
||||
throw Exceptions.invalidArgument(ErrorMessage.UNKNOWN_TYPE);
|
||||
}
|
||||
session.init();
|
||||
sessions.put(sessionKey, session);
|
||||
@@ -135,16 +109,25 @@ public class TransferHandler implements ITransferHandler {
|
||||
} catch (Exception e) {
|
||||
log.error("TransferHandler.getAndInitSession error channelId: {}", channel.getId(), e);
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.NEXT_TRANSFER, e);
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiver.ERROR, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ITransferSession getSessionByToken(String token) {
|
||||
return sessions.values()
|
||||
.stream()
|
||||
.filter(s -> token.equals(s.getToken()))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
log.info("TransferHandler.close channelId: {}", channel.getId());
|
||||
sessions.values().forEach(Streams::close);
|
||||
tokenSessions.clear();
|
||||
sessions.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -28,6 +28,11 @@ public class TransferOperatorRequest {
|
||||
*/
|
||||
private String type;
|
||||
|
||||
/**
|
||||
* operator
|
||||
*/
|
||||
private String operator;
|
||||
|
||||
/**
|
||||
* 主机id
|
||||
*/
|
||||
|
||||
@@ -11,10 +11,11 @@ import com.orion.visor.framework.common.constant.ErrorMessage;
|
||||
import com.orion.visor.module.asset.define.AssetThreadPools;
|
||||
import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType;
|
||||
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -29,10 +30,11 @@ import java.io.OutputStream;
|
||||
* @since 2024/2/22 22:25
|
||||
*/
|
||||
@Slf4j
|
||||
public class DownloadSession extends TransferHostSession implements IDownloadSession {
|
||||
public class DownloadSession extends TransferSession implements StreamingResponseBody {
|
||||
|
||||
@Getter
|
||||
private String path;
|
||||
private static final int BUFFER_SIZE = Const.BUFFER_KB_32;
|
||||
|
||||
private static final int FLUSH_COUNT = Const.BUFFER_KB_1 * Const.BUFFER_KB_1 / Const.BUFFER_KB_32;
|
||||
|
||||
private InputStream inputStream;
|
||||
|
||||
@@ -41,10 +43,9 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadInit(String path, String token) {
|
||||
this.path = path;
|
||||
String channelId = channel.getId();
|
||||
public void onStart(TransferOperatorRequest request) {
|
||||
try {
|
||||
super.onStart(request);
|
||||
log.info("DownloadSession.startDownload open start channelId: {}, path: {}", channelId, path);
|
||||
// 保存操作日志
|
||||
this.saveOperatorLog(HostTerminalOperatorType.SFTP_DOWNLOAD, path);
|
||||
@@ -56,13 +57,13 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
if (file.getSize() == 0L) {
|
||||
// 文件为空
|
||||
log.info("DownloadSession.startDownload file empty channelId: {}, path: {}", channelId, path);
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null);
|
||||
return;
|
||||
}
|
||||
// 打开输入流
|
||||
this.inputStream = executor.openInputStream(path);
|
||||
// 响应开始下载
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_START, null, e -> {
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.START, null, e -> {
|
||||
e.setChannelId(channelId);
|
||||
e.setTransferToken(token);
|
||||
});
|
||||
@@ -70,23 +71,23 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
} catch (Exception e) {
|
||||
log.error("DownloadSession.startDownload open error channelId: {}, path: {}", channelId, path, e);
|
||||
// 响应下载失败
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortDownload() {
|
||||
log.info("DownloadSession.abortDownload channelId: {}", channel.getId());
|
||||
public void onAbort(TransferOperatorRequest request) {
|
||||
log.info("TransferSession.abort channelId: {}, path: {}", channelId, path);
|
||||
// 关闭流
|
||||
this.closeStream();
|
||||
// download 的 abort 无需发送回调
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream outputStream) {
|
||||
String channelId = channel.getId();
|
||||
Ref<Exception> ex = new Ref<>();
|
||||
try {
|
||||
byte[] buffer = new byte[Const.BUFFER_KB_32];
|
||||
byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int len;
|
||||
int i = 0;
|
||||
int size = 0;
|
||||
@@ -95,7 +96,7 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
outputStream.write(buffer, 0, len);
|
||||
size += len;
|
||||
// 不要每次都 flush 和 send > 1mb
|
||||
if (i == 32) {
|
||||
if (i == FLUSH_COUNT) {
|
||||
i = 0;
|
||||
}
|
||||
// 首次触发
|
||||
@@ -122,9 +123,9 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
// 响应结果
|
||||
Exception e = ex.getValue();
|
||||
if (e == null) {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null);
|
||||
} else {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -140,13 +141,12 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
// flush
|
||||
outputStream.flush();
|
||||
// send
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_PROGRESS, null, e -> e.setCurrentSize(size));
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.PROGRESS, null, e -> e.setCurrentSize(size));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeStream() {
|
||||
// 关闭 inputStream 可能会被阻塞 ???...??? 只能关闭 executor
|
||||
this.path = null;
|
||||
Streams.close(this.executor);
|
||||
this.executor = null;
|
||||
this.inputStream = null;
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
|
||||
/**
|
||||
* 下载会话定义
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/22 22:25
|
||||
*/
|
||||
public interface IDownloadSession extends StreamingResponseBody {
|
||||
|
||||
/**
|
||||
* 初始化下载
|
||||
*
|
||||
* @param path path
|
||||
* @param token token
|
||||
*/
|
||||
void downloadInit(String path, String token);
|
||||
|
||||
/**
|
||||
* 停止下载
|
||||
*/
|
||||
void abortDownload();
|
||||
|
||||
/**
|
||||
* 获取下载文件路径
|
||||
*
|
||||
* @return path
|
||||
*/
|
||||
String getPath();
|
||||
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import com.orion.lang.able.SafeCloseable;
|
||||
|
||||
/**
|
||||
* 主机传输会话定义
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 23:06
|
||||
*/
|
||||
public interface ITransferHostSession extends SafeCloseable {
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
void init();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import com.orion.lang.able.SafeCloseable;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
|
||||
/**
|
||||
* 主机传输会话定义
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 23:06
|
||||
*/
|
||||
public interface ITransferSession extends SafeCloseable {
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
void init();
|
||||
|
||||
/**
|
||||
* 处理二进制内容
|
||||
*
|
||||
* @param bytes bytes
|
||||
*/
|
||||
void handleBinary(byte[] bytes);
|
||||
|
||||
/**
|
||||
* 开始传输
|
||||
*
|
||||
* @param request request
|
||||
*/
|
||||
void onStart(TransferOperatorRequest request);
|
||||
|
||||
/**
|
||||
* 传输完成
|
||||
*
|
||||
* @param request request
|
||||
*/
|
||||
void onFinish(TransferOperatorRequest request);
|
||||
|
||||
/**
|
||||
* 传输失败
|
||||
*
|
||||
* @param request request
|
||||
*/
|
||||
void onError(TransferOperatorRequest request);
|
||||
|
||||
/**
|
||||
* 传输中断
|
||||
*
|
||||
* @param request request
|
||||
*/
|
||||
void onAbort(TransferOperatorRequest request);
|
||||
|
||||
/**
|
||||
* 获取文件路径
|
||||
*
|
||||
* @return path
|
||||
*/
|
||||
String getPath();
|
||||
|
||||
/**
|
||||
* 获取 token
|
||||
*
|
||||
* @return token
|
||||
*/
|
||||
String getToken();
|
||||
|
||||
/**
|
||||
* 设置 token
|
||||
*
|
||||
* @param token token
|
||||
*/
|
||||
void setToken(String token);
|
||||
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
/**
|
||||
* 上传会话定义
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/22 22:03
|
||||
*/
|
||||
public interface IUploadSession extends ITransferHostSession {
|
||||
|
||||
/**
|
||||
* 开始上传
|
||||
*
|
||||
* @param path path
|
||||
*/
|
||||
void startUpload(String path);
|
||||
|
||||
/**
|
||||
* 写入内容
|
||||
*
|
||||
* @param bytes bytes
|
||||
*/
|
||||
void putContent(byte[] bytes);
|
||||
|
||||
/**
|
||||
* 上传完成
|
||||
*/
|
||||
void uploadFinish();
|
||||
|
||||
/**
|
||||
* 上传失败
|
||||
*/
|
||||
void uploadError();
|
||||
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import com.orion.lang.exception.argument.InvalidArgumentException;
|
||||
import com.orion.lang.utils.collect.Maps;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.net.host.SessionStore;
|
||||
@@ -11,6 +12,12 @@ import com.orion.visor.framework.biz.operator.log.core.utils.OperatorLogs;
|
||||
import com.orion.visor.module.asset.define.config.AppSftpConfig;
|
||||
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
|
||||
import com.orion.visor.module.asset.handler.host.terminal.utils.TerminalUtils;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -22,7 +29,8 @@ import java.util.Map;
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/21 21:12
|
||||
*/
|
||||
public abstract class TransferHostSession implements ITransferHostSession {
|
||||
@Slf4j
|
||||
public abstract class TransferSession implements ITransferSession {
|
||||
|
||||
protected static final AppSftpConfig SFTP_CONFIG = SpringHolder.getBean(AppSftpConfig.class);
|
||||
|
||||
@@ -34,10 +42,20 @@ public abstract class TransferHostSession implements ITransferHostSession {
|
||||
|
||||
protected SftpExecutor executor;
|
||||
|
||||
public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
|
||||
protected String channelId;
|
||||
|
||||
@Getter
|
||||
protected String path;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
protected String token;
|
||||
|
||||
public TransferSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
|
||||
this.connectInfo = connectInfo;
|
||||
this.sessionStore = sessionStore;
|
||||
this.channel = channel;
|
||||
this.channelId = channel.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -54,6 +72,52 @@ public abstract class TransferHostSession implements ITransferHostSession {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleBinary(byte[] bytes) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStart(TransferOperatorRequest request) {
|
||||
this.path = request.getPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish(TransferOperatorRequest request) {
|
||||
log.info("TransferSession.uploadFinish channelId: {}", channelId);
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.FINISH, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(TransferOperatorRequest request) {
|
||||
log.error("TransferSession.uploadError channelId: {}", channelId);
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ERROR, new InvalidArgumentException((String) null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAbort(TransferOperatorRequest request) {
|
||||
log.info("TransferSession.abort channelId: {}, path: {}", channelId, path);
|
||||
// 关闭流
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ABORT, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭流
|
||||
*/
|
||||
protected abstract void closeStream();
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.closeStream();
|
||||
Streams.close(executor);
|
||||
Streams.close(sessionStore);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存操作日志
|
||||
*
|
||||
@@ -73,16 +137,4 @@ public abstract class TransferHostSession implements ITransferHostSession {
|
||||
SpringHolder.getBean(OperatorLogFrameworkService.class).insert(model);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭流
|
||||
*/
|
||||
protected abstract void closeStream();
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.closeStream();
|
||||
Streams.close(executor);
|
||||
Streams.close(sessionStore);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,11 +1,11 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import com.orion.lang.exception.argument.InvalidArgumentException;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.net.host.SessionStore;
|
||||
import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType;
|
||||
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorRequest;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import com.orion.visor.module.asset.utils.SftpUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -22,7 +22,7 @@ import java.io.OutputStream;
|
||||
* @since 2024/2/22 22:04
|
||||
*/
|
||||
@Slf4j
|
||||
public class UploadSession extends TransferHostSession implements IUploadSession {
|
||||
public class UploadSession extends TransferSession {
|
||||
|
||||
private OutputStream outputStream;
|
||||
|
||||
@@ -31,8 +31,8 @@ public class UploadSession extends TransferHostSession implements IUploadSession
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startUpload(String path) {
|
||||
String channelId = channel.getId();
|
||||
public void onStart(TransferOperatorRequest request) {
|
||||
super.onStart(request);
|
||||
try {
|
||||
log.info("UploadSession.startUpload start channelId: {}, path: {}", channelId, path);
|
||||
// 保存操作日志
|
||||
@@ -44,52 +44,38 @@ public class UploadSession extends TransferHostSession implements IUploadSession
|
||||
// 打开输出流
|
||||
this.outputStream = executor.openOutputStream(path);
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.NEXT_PART, null);
|
||||
log.info("UploadSession.startUpload transfer channelId: {}, path: {}", channelId, path);
|
||||
} catch (Exception e) {
|
||||
log.error("UploadSession.startUpload error channelId: {}, path: {}", channelId, path, e);
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putContent(byte[] bytes) {
|
||||
public void handleBinary(byte[] bytes) {
|
||||
try {
|
||||
// 写入内容
|
||||
outputStream.write(bytes);
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_NEXT_BLOCK, null);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.NEXT_PART, null);
|
||||
} catch (IOException e) {
|
||||
log.error("UploadSession.putContent error channelId: {}", channel.getId(), e);
|
||||
log.error("UploadSession.handleBinary error channelId: {}", channel.getId(), e);
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, e);
|
||||
TransferUtils.sendMessage(channel, TransferReceiver.ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uploadFinish() {
|
||||
log.info("UploadSession.uploadFinish channelId: {}", channel.getId());
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_FINISH, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uploadError() {
|
||||
log.error("UploadSession.uploadError channelId: {}", channel.getId());
|
||||
this.closeStream();
|
||||
// 响应结果
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.UPLOAD_ERROR, new InvalidArgumentException((String) null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeStream() {
|
||||
// 关闭流
|
||||
Streams.close(outputStream);
|
||||
this.outputStream = null;
|
||||
if (this.outputStream != null) {
|
||||
// 关闭流
|
||||
Streams.close(outputStream);
|
||||
this.outputStream = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.orion.lang.exception.argument.InvalidArgumentException;
|
||||
import com.orion.lang.utils.Strings;
|
||||
import com.orion.visor.framework.common.constant.ErrorMessage;
|
||||
import com.orion.visor.framework.websocket.core.utils.WebSockets;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiver;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorResponse;
|
||||
import org.apache.catalina.connector.ClientAbortException;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
@@ -13,11 +13,9 @@ import org.springframework.web.socket.WebSocketSession;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 传输工具类
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/2/22 22:14
|
||||
* @since 2024/7/12 15:06
|
||||
*/
|
||||
public class TransferUtils {
|
||||
|
||||
@@ -31,7 +29,7 @@ public class TransferUtils {
|
||||
* @param type type
|
||||
* @param ex ex
|
||||
*/
|
||||
public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex) {
|
||||
public static void sendMessage(WebSocketSession channel, TransferReceiver type, Exception ex) {
|
||||
sendMessage(channel, type, ex, null);
|
||||
}
|
||||
|
||||
@@ -43,7 +41,7 @@ public class TransferUtils {
|
||||
* @param ex ex
|
||||
* @param filler filler
|
||||
*/
|
||||
public static void sendMessage(WebSocketSession channel, TransferReceiverType type, Exception ex, Consumer<TransferOperatorResponse> filler) {
|
||||
public static void sendMessage(WebSocketSession channel, TransferReceiver type, Exception ex, Consumer<TransferOperatorResponse> filler) {
|
||||
TransferOperatorResponse resp = TransferOperatorResponse.builder()
|
||||
.type(type.getType())
|
||||
.success(ex == null)
|
||||
|
||||
@@ -15,9 +15,8 @@ import com.orion.visor.module.asset.convert.HostSftpLogConvert;
|
||||
import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType;
|
||||
import com.orion.visor.module.asset.entity.request.host.HostSftpLogQueryRequest;
|
||||
import com.orion.visor.module.asset.entity.vo.HostSftpLogVO;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.handler.ITransferHandler;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.manager.HostTransferManager;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.IDownloadSession;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.DownloadSession;
|
||||
import com.orion.visor.module.asset.service.HostSftpService;
|
||||
import com.orion.visor.module.infra.api.OperatorLogApi;
|
||||
import com.orion.visor.module.infra.entity.dto.operator.OperatorLogQueryDTO;
|
||||
@@ -82,10 +81,10 @@ public class HostSftpServiceImpl implements HostSftpService {
|
||||
@Override
|
||||
public StreamingResponseBody downloadWithTransferToken(String channelId, String transferToken, HttpServletResponse response) {
|
||||
// 获取会话
|
||||
IDownloadSession session = Optional.ofNullable(channelId)
|
||||
DownloadSession session = (DownloadSession) Optional.ofNullable(channelId)
|
||||
.map(hostTransferManager::getHandler)
|
||||
.map(ITransferHandler::getTokenSessions)
|
||||
.map(s -> s.remove(transferToken))
|
||||
.map(s -> s.getSessionByToken(transferToken))
|
||||
.filter(s -> s instanceof DownloadSession)
|
||||
.orElse(null);
|
||||
// 响应会话
|
||||
if (session == null) {
|
||||
|
||||
Reference in New Issue
Block a user