✨ 修改下载文件逻辑.
This commit is contained in:
@@ -63,13 +63,14 @@ public class HostSftpLogController {
|
||||
|
||||
@PermitAll
|
||||
@IgnoreWrapper
|
||||
@IgnoreLog(IgnoreLogMode.RET)
|
||||
@GetMapping("/download")
|
||||
@Operation(summary = "下载文件")
|
||||
@Parameter(name = "channelId", description = "channelId", required = true)
|
||||
@Parameter(name = "transferToken", description = "transferToken", required = true)
|
||||
public StreamingResponseBody downloadFile(@RequestParam("channelId") String channelId,
|
||||
@RequestParam("transferToken") String transferToken,
|
||||
HttpServletResponse response) {
|
||||
public StreamingResponseBody downloadWithTransferToken(@RequestParam("channelId") String channelId,
|
||||
@RequestParam("transferToken") String transferToken,
|
||||
HttpServletResponse response) {
|
||||
return hostSftpService.downloadWithTransferToken(channelId, transferToken, response);
|
||||
}
|
||||
|
||||
|
||||
@@ -30,9 +30,9 @@ public enum TransferOperatorType {
|
||||
UPLOAD_ERROR(TransferOperatorType.UPLOAD, "uploadError"),
|
||||
|
||||
/**
|
||||
* 开始下载
|
||||
* 初始化下载
|
||||
*/
|
||||
DOWNLOAD_START(TransferOperatorType.DOWNLOAD, "downloadStart"),
|
||||
DOWNLOAD_INIT(TransferOperatorType.DOWNLOAD, "downloadInit"),
|
||||
|
||||
/**
|
||||
* 中断下载
|
||||
|
||||
@@ -34,11 +34,6 @@ public enum TransferReceiverType {
|
||||
*/
|
||||
UPLOAD_ERROR("uploadError"),
|
||||
|
||||
/**
|
||||
* 下载完成
|
||||
*/
|
||||
DOWNLOAD_FINISH("downloadFinish"),
|
||||
|
||||
/**
|
||||
* 开始下载
|
||||
*/
|
||||
@@ -49,6 +44,11 @@ public enum TransferReceiverType {
|
||||
*/
|
||||
DOWNLOAD_PROGRESS("downloadProgress"),
|
||||
|
||||
/**
|
||||
* 下载完成
|
||||
*/
|
||||
DOWNLOAD_FINISH("downloadFinish"),
|
||||
|
||||
/**
|
||||
* 下载失败
|
||||
*/
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.handler;
|
||||
|
||||
import com.orion.lang.id.UUIds;
|
||||
import com.orion.lang.utils.Exceptions;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.net.host.SessionStore;
|
||||
@@ -14,6 +15,7 @@ import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperator
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.*;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import com.orion.visor.module.asset.service.HostTerminalService;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
@@ -45,10 +47,14 @@ public class TransferHandler implements ITransferHandler {
|
||||
*/
|
||||
private final ConcurrentHashMap<String, ITransferHostSession> sessions;
|
||||
|
||||
@Getter
|
||||
private final ConcurrentHashMap<String, IDownloadSession> tokenSessions;
|
||||
|
||||
public TransferHandler(WebSocketSession channel) {
|
||||
this.channel = channel;
|
||||
this.userId = WebSockets.getAttr(channel, ExtraFieldConst.USER_ID);
|
||||
this.sessions = new ConcurrentHashMap<>();
|
||||
this.tokenSessions = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -73,9 +79,11 @@ public class TransferHandler implements ITransferHandler {
|
||||
// 上传失败
|
||||
((IUploadSession) currentSession).uploadError();
|
||||
break;
|
||||
case DOWNLOAD_START:
|
||||
case DOWNLOAD_INIT:
|
||||
// 开始下载
|
||||
((IDownloadSession) currentSession).startDownload(payload.getPath());
|
||||
String token = UUIds.random32();
|
||||
tokenSessions.put(token, (IDownloadSession) currentSession);
|
||||
((IDownloadSession) currentSession).downloadInit(payload.getPath(), token);
|
||||
break;
|
||||
case DOWNLOAD_ABORT:
|
||||
// 中断下载
|
||||
@@ -100,7 +108,7 @@ public class TransferHandler implements ITransferHandler {
|
||||
*/
|
||||
private boolean getAndInitSession(TransferOperatorRequest payload, TransferOperatorType type) {
|
||||
Long hostId = payload.getHostId();
|
||||
String sessionKey = hostId + "_" + type.getOperator();
|
||||
String sessionKey = hostId + "_" + type.getKind();
|
||||
try {
|
||||
// 获取会话
|
||||
ITransferHostSession session = sessions.get(sessionKey);
|
||||
@@ -109,10 +117,10 @@ public class TransferHandler implements ITransferHandler {
|
||||
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(this.userId, hostId);
|
||||
SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo);
|
||||
// 打开会话并初始化
|
||||
if (TransferOperatorType.UPLOAD.equals(type.getOperator())) {
|
||||
if (TransferOperatorType.UPLOAD.equals(type.getKind())) {
|
||||
// 上传操作
|
||||
session = new UploadSession(connectInfo, sessionStore, this.channel);
|
||||
} else if (TransferOperatorType.DOWNLOAD.equals(type.getOperator())) {
|
||||
} else if (TransferOperatorType.DOWNLOAD.equals(type.getKind())) {
|
||||
// 下载操作
|
||||
session = new DownloadSession(connectInfo, sessionStore, this.channel);
|
||||
} else {
|
||||
@@ -136,6 +144,7 @@ public class TransferHandler implements ITransferHandler {
|
||||
public void close() {
|
||||
log.info("TransferHandler.close channelId: {}", channel.getId());
|
||||
sessions.values().forEach(Streams::close);
|
||||
tokenSessions.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@ import lombok.NoArgsConstructor;
|
||||
@Schema(name = "FileOperatorResponse", description = "文件操作响应 实体对象")
|
||||
public class TransferOperatorResponse {
|
||||
|
||||
@Schema(description = "channelId")
|
||||
private String channelId;
|
||||
|
||||
@Schema(description = "type")
|
||||
private String type;
|
||||
|
||||
@@ -29,6 +32,12 @@ public class TransferOperatorResponse {
|
||||
@Schema(description = "是否成功")
|
||||
private Boolean success;
|
||||
|
||||
@Schema(description = "传输的大小")
|
||||
private Integer currentSize;
|
||||
|
||||
@Schema(description = "transferToken")
|
||||
private String transferToken;
|
||||
|
||||
@Schema(description = "消息")
|
||||
private String msg;
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.orion.visor.module.asset.handler.host.transfer.session;
|
||||
|
||||
import com.orion.lang.define.wrapper.Ref;
|
||||
import com.orion.lang.utils.Threads;
|
||||
import com.orion.lang.utils.Valid;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
@@ -12,11 +13,13 @@ import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType;
|
||||
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.utils.TransferUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.BinaryMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* 下载会话实现
|
||||
@@ -28,6 +31,9 @@ import java.io.InputStream;
|
||||
@Slf4j
|
||||
public class DownloadSession extends TransferHostSession implements IDownloadSession {
|
||||
|
||||
@Getter
|
||||
private String path;
|
||||
|
||||
private InputStream inputStream;
|
||||
|
||||
public DownloadSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore, WebSocketSession channel) {
|
||||
@@ -35,7 +41,8 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startDownload(String path) {
|
||||
public void downloadInit(String path, String token) {
|
||||
this.path = path;
|
||||
String channelId = channel.getId();
|
||||
try {
|
||||
log.info("DownloadSession.startDownload open start channelId: {}, path: {}", channelId, path);
|
||||
@@ -54,39 +61,17 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
}
|
||||
// 打开输入流
|
||||
this.inputStream = executor.openInputStream(path);
|
||||
// 响应开始下载
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_START, null, e -> {
|
||||
e.setChannelId(channelId);
|
||||
e.setTransferToken(token);
|
||||
});
|
||||
log.info("DownloadSession.startDownload open success channelId: {}, path: {}", channelId, path);
|
||||
} catch (Exception e) {
|
||||
log.error("DownloadSession.startDownload open error channelId: {}, path: {}", channelId, path, e);
|
||||
// 响应结果
|
||||
// 响应下载失败
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
|
||||
return;
|
||||
}
|
||||
// 异步读取文件内容
|
||||
AssetThreadPools.TERMINAL_OPERATOR.execute(() -> {
|
||||
Exception ex = null;
|
||||
try {
|
||||
byte[] buffer = new byte[Const.BUFFER_KB_32];
|
||||
int len;
|
||||
// 响应文件内容
|
||||
while (this.inputStream != null && (len = this.inputStream.read(buffer)) != -1) {
|
||||
this.channel.sendMessage(new BinaryMessage(buffer, 0, len, true));
|
||||
}
|
||||
log.info("DownloadSession.download finish channelId: {}, path: {}", channelId, path);
|
||||
} catch (Exception e) {
|
||||
log.error("DownloadSession.download error channelId: {}, path: {}", channelId, path, e);
|
||||
ex = e;
|
||||
}
|
||||
// 关闭等待 jsch 内部处理
|
||||
Threads.sleep(100);
|
||||
this.closeStream();
|
||||
Threads.sleep(100);
|
||||
// 响应结果
|
||||
if (ex == null) {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
|
||||
} else {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -96,9 +81,72 @@ public class DownloadSession extends TransferHostSession implements IDownloadSes
|
||||
this.closeStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream outputStream) {
|
||||
String channelId = channel.getId();
|
||||
Ref<Exception> ex = new Ref<>();
|
||||
try {
|
||||
byte[] buffer = new byte[Const.BUFFER_KB_32];
|
||||
int len;
|
||||
int i = 0;
|
||||
int size = 0;
|
||||
// 响应文件内容
|
||||
while (this.inputStream != null && (len = this.inputStream.read(buffer)) != -1) {
|
||||
outputStream.write(buffer, 0, len);
|
||||
size += len;
|
||||
// 不要每次都 flush 和 send > 1mb
|
||||
if (i == 32) {
|
||||
i = 0;
|
||||
}
|
||||
// 首次触发
|
||||
if (i == 0) {
|
||||
this.flushAndSendProgress(outputStream, size);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
// 最后一次也要 flush
|
||||
if (i != 0) {
|
||||
this.flushAndSendProgress(outputStream, size);
|
||||
}
|
||||
log.info("DownloadSession.download finish channelId: {}, path: {}", channelId, path);
|
||||
} catch (Exception e) {
|
||||
log.error("DownloadSession.download error channelId: {}, path: {}", channelId, path, e);
|
||||
ex.set(e);
|
||||
}
|
||||
// 异步关闭
|
||||
AssetThreadPools.TERMINAL_OPERATOR.execute(() -> {
|
||||
// 关闭等待 jsch 内部处理
|
||||
Threads.sleep(100);
|
||||
this.closeStream();
|
||||
Threads.sleep(100);
|
||||
// 响应结果
|
||||
Exception e = ex.getValue();
|
||||
if (e == null) {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_FINISH, null);
|
||||
} else {
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_ERROR, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 刷流 & 发送进度
|
||||
*
|
||||
* @param outputStream outputStream
|
||||
* @param size size
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
private void flushAndSendProgress(OutputStream outputStream, int size) throws IOException {
|
||||
// flush
|
||||
outputStream.flush();
|
||||
// send
|
||||
TransferUtils.sendMessage(this.channel, TransferReceiverType.DOWNLOAD_PROGRESS, null, e -> e.setCurrentSize(size));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeStream() {
|
||||
// 关闭 inputStream 可能会被阻塞 ???...??? 只能关闭 executor
|
||||
this.path = null;
|
||||
Streams.close(this.executor);
|
||||
this.executor = null;
|
||||
this.inputStream = null;
|
||||
|
||||
@@ -12,12 +12,12 @@ import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBo
|
||||
public interface IDownloadSession extends StreamingResponseBody {
|
||||
|
||||
/**
|
||||
* 开始下载
|
||||
* 初始化下载
|
||||
*
|
||||
* @param path path
|
||||
* @param token token
|
||||
*/
|
||||
void startDownload(String path, String token);
|
||||
void downloadInit(String path, String token);
|
||||
|
||||
/**
|
||||
* 停止下载
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.orion.visor.framework.common.constant.ErrorMessage;
|
||||
import com.orion.visor.framework.websocket.core.utils.WebSockets;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.enums.TransferReceiverType;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.model.TransferOperatorResponse;
|
||||
import org.apache.catalina.connector.ClientAbortException;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
@@ -62,9 +63,10 @@ public class TransferUtils {
|
||||
public static String getErrorMessage(Exception ex) {
|
||||
if (ex == null) {
|
||||
return null;
|
||||
}
|
||||
if (ex instanceof InvalidArgumentException) {
|
||||
} else if (ex instanceof InvalidArgumentException) {
|
||||
return ex.getMessage();
|
||||
} else if (ex instanceof ClientAbortException) {
|
||||
return ErrorMessage.CLIENT_ABORT;
|
||||
}
|
||||
return ErrorMessage.OPERATE_ERROR;
|
||||
}
|
||||
|
||||
@@ -2,26 +2,37 @@ package com.orion.visor.module.asset.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.orion.lang.constant.StandardContentType;
|
||||
import com.orion.lang.define.wrapper.DataGrid;
|
||||
import com.orion.lang.utils.Arrays1;
|
||||
import com.orion.lang.utils.Strings;
|
||||
import com.orion.lang.utils.io.Files1;
|
||||
import com.orion.visor.framework.biz.operator.log.core.utils.OperatorLogs;
|
||||
import com.orion.visor.framework.common.constant.Const;
|
||||
import com.orion.visor.framework.common.constant.ErrorMessage;
|
||||
import com.orion.visor.framework.common.constant.ExtraFieldConst;
|
||||
import com.orion.visor.module.asset.convert.HostSftpLogConvert;
|
||||
import com.orion.visor.module.asset.define.operator.HostTerminalOperatorType;
|
||||
import com.orion.visor.module.asset.entity.request.host.HostSftpLogQueryRequest;
|
||||
import com.orion.visor.module.asset.entity.vo.HostSftpLogVO;
|
||||
import com.orion.visor.module.asset.service.HostSftpLogService;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.handler.ITransferHandler;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.manager.HostTransferManager;
|
||||
import com.orion.visor.module.asset.handler.host.transfer.session.IDownloadSession;
|
||||
import com.orion.visor.module.asset.service.HostSftpService;
|
||||
import com.orion.visor.module.infra.api.OperatorLogApi;
|
||||
import com.orion.visor.module.infra.entity.dto.operator.OperatorLogQueryDTO;
|
||||
import com.orion.web.servlet.web.Servlets;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* SFTP 操作日志 服务实现类
|
||||
* SFTP 操作 服务实现类
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
@@ -29,11 +40,14 @@ import java.util.List;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class HostSftpLogServiceImpl implements HostSftpLogService {
|
||||
public class HostSftpServiceImpl implements HostSftpService {
|
||||
|
||||
@Resource
|
||||
private OperatorLogApi operatorLogApi;
|
||||
|
||||
@Resource
|
||||
private HostTransferManager hostTransferManager;
|
||||
|
||||
@Override
|
||||
public DataGrid<HostSftpLogVO> getHostSftpLogPage(HostSftpLogQueryRequest request) {
|
||||
// 查询
|
||||
@@ -62,6 +76,25 @@ public class HostSftpLogServiceImpl implements HostSftpLogService {
|
||||
return effect;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamingResponseBody downloadWithTransferToken(String channelId, String transferToken, HttpServletResponse response) {
|
||||
// 获取会话
|
||||
IDownloadSession session = Optional.ofNullable(channelId)
|
||||
.map(hostTransferManager::getHandler)
|
||||
.map(ITransferHandler::getTokenSessions)
|
||||
.map(s -> s.remove(transferToken))
|
||||
.orElse(null);
|
||||
// 响应会话
|
||||
if (session == null) {
|
||||
Servlets.setContentType(response, StandardContentType.TEXT_HTML);
|
||||
Servlets.setCharset(response, Const.UTF_8);
|
||||
return outputStream -> outputStream.write(Strings.bytes(ErrorMessage.SESSION_ABSENT));
|
||||
}
|
||||
// 响应文件
|
||||
Servlets.setAttachmentHeader(response, Files1.getFileName(session.getPath()));
|
||||
return session;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建查询对象
|
||||
*
|
||||
Reference in New Issue
Block a user