🔨 上传.

This commit is contained in:
lijiahangmax
2024-02-22 00:06:24 +08:00
parent 8377294662
commit 25b15559a4
16 changed files with 555 additions and 58 deletions

View File

@@ -1,6 +1,6 @@
package com.orion.ops.module.asset.config;
import com.orion.ops.module.asset.handler.host.sftp.TransferMessageHandler;
import com.orion.ops.module.asset.handler.host.transfer.TransferMessageDispatcher;
import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher;
import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor;
import org.springframework.beans.factory.annotation.Value;
@@ -30,7 +30,7 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
private TerminalMessageDispatcher terminalMessageDispatcher;
@Resource
private TransferMessageHandler transferMessageHandler;
private TransferMessageDispatcher transferMessageDispatcher;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
@@ -39,7 +39,7 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
.addInterceptors(terminalAccessInterceptor)
.setAllowedOrigins("*");
// 文件传输
registry.addHandler(transferMessageHandler, prefix + "/host/transfer/{accessToken}")
registry.addHandler(transferMessageDispatcher, prefix + "/host/transfer/{accessToken}")
.addInterceptors(terminalAccessInterceptor)
.setAllowedOrigins("*");
}

View File

@@ -1,23 +0,0 @@
package com.orion.ops.module.asset.handler.host.sftp;
import com.orion.lang.define.collect.MultiConcurrentHashMap;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import org.springframework.stereotype.Component;
/**
* 传输管理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 19:05
*/
@Component
public class TransferManager {
/**
* 会话存储器
*/
private final MultiConcurrentHashMap<String, Long, ITerminalSession> channelSessions = MultiConcurrentHashMap.create();
}

View File

@@ -1,9 +0,0 @@
package com.orion.ops.module.asset.handler.host.sftp.upload;
/**
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 19:04
*/
public class FileUploader {
}

View File

@@ -60,7 +60,7 @@ public abstract class AbstractTerminalHandler<T extends TerminalBasePayload> imp
}
/**
* 获取 sftp 错误信息
* 获取错误信息
*
* @param ex ex
* @return msg

View File

@@ -49,7 +49,7 @@ public class SftpSession extends TerminalSession implements ISftpSession {
@Override
public void connect() {
// 打开 shell
// 打开 sftp
this.executor = sessionStore.getSftpExecutor(config.getFileNameCharset());
executor.connect();
}

View File

@@ -1,5 +1,10 @@
package com.orion.ops.module.asset.handler.host.sftp;
package com.orion.ops.module.asset.handler.host.transfer;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.module.asset.handler.host.transfer.handler.ITransferHandler;
import com.orion.ops.module.asset.handler.host.transfer.handler.TransferHandler;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
@@ -8,6 +13,8 @@ import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.concurrent.ConcurrentHashMap;
/**
* sftp 传输消息处理器
*
@@ -17,16 +24,21 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler;
*/
@Slf4j
@Component
public class TransferMessageHandler extends AbstractWebSocketHandler {
public class TransferMessageDispatcher extends AbstractWebSocketHandler {
private final ConcurrentHashMap<String, ITransferHandler> handlers = new ConcurrentHashMap<>();
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("text");
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 获取处理器
ITransferHandler handler = handlers.computeIfAbsent(session.getId(), s -> new TransferHandler(session));
// 处理消息
handler.handleMessage(JSON.parseObject(message.getPayload(), TransferOperatorRequest.class));
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
System.out.println("binary");
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
handlers.get(session.getId()).putContent(message.getPayload().array());
}
@Override
@@ -42,6 +54,8 @@ public class TransferMessageHandler extends AbstractWebSocketHandler {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String id = session.getId();
// 关闭会话
Streams.close(handlers.get(id));
log.info("TransferMessageHandler-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason());
}

View File

@@ -0,0 +1,48 @@
package com.orion.ops.module.asset.handler.host.transfer.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 消息操作类型
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 22:03
*/
@Getter
@AllArgsConstructor
public enum TransferOperatorType {
/**
* 处理完成
*/
PROCESSED("processed"),
/**
* 开始上传
*/
UPLOAD_START("upload_start"),
/**
* 上传完成
*/
UPLOAD_FINISH("upload_finish"),
;
private final String type;
public static TransferOperatorType of(String type) {
if (type == null) {
return null;
}
for (TransferOperatorType value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.asset.handler.host.transfer.handler;
import com.orion.lang.able.SafeCloseable;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
/**
* 传输处理器定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 22:46
*/
public interface ITransferHandler extends SafeCloseable {
/**
* 处理消息
*
* @param payload payload
*/
void handleMessage(TransferOperatorRequest payload);
/**
* 写入内容
*
* @param content content
*/
void putContent(byte[] content);
}

View File

@@ -0,0 +1,192 @@
package com.orion.ops.module.asset.handler.host.transfer.handler;
import com.alibaba.fastjson.JSON;
import com.orion.lang.exception.argument.InvalidArgumentException;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse;
import com.orion.ops.module.asset.handler.host.transfer.session.ITransferHostSession;
import com.orion.ops.module.asset.handler.host.transfer.session.TransferHostSession;
import com.orion.ops.module.asset.service.HostTerminalService;
import com.orion.spring.SpringHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* 传输处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 20:57
*/
@Slf4j
public class TransferHandler implements ITransferHandler {
private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
private final Long userId;
private final WebSocketSession channel;
/**
* 当前会话
*/
private ITransferHostSession currentSession;
/**
* 会话列表
*/
private final ConcurrentHashMap<Long, ITransferHostSession> sessions;
public TransferHandler(WebSocketSession channel) {
this.channel = channel;
this.userId = (Long) channel.getAttributes().get(ExtraFieldConst.USER_ID);
this.sessions = new ConcurrentHashMap<>();
}
@Override
public void handleMessage(TransferOperatorRequest payload) {
// 解析消息类型
TransferOperatorType type = TransferOperatorType.of(payload.getType());
// 获取会话
if (!this.getAndInitSession(payload)) {
return;
}
// 处理消息
switch (type) {
case UPLOAD_START:
// 准备上传
this.uploadStart(payload);
break;
case UPLOAD_FINISH:
// 上传完成
this.uploadFinish();
break;
default:
break;
}
}
@Override
public void putContent(byte[] content) {
Exception ex = null;
try {
// 写入内容
currentSession.putContent(content);
} catch (IOException e) {
ex = e;
log.error("TransferHandler.putContent error", e);
// 写入完成
currentSession.putFinish();
}
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
* 准备上传
*
* @param payload payload
*/
private void uploadStart(TransferOperatorRequest payload) {
Exception ex = null;
try {
currentSession.startUpload(payload.getPath());
} catch (Exception e) {
ex = e;
log.error("TransferHandler.uploadStart error", e);
}
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
* 上传完成
*/
private void uploadFinish() {
currentSession.putFinish();
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(true)
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
* 获取并且初始化会话
*
* @param payload payload
* @return success
*/
private boolean getAndInitSession(TransferOperatorRequest payload) {
Long hostId = payload.getHostId();
try {
// 获取会话
ITransferHostSession session = sessions.get(hostId);
if (session == null) {
// 获取主机信息
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId, this.userId, HostConnectTypeEnum.SFTP);
SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo);
// 打开会话并初始化
session = new TransferHostSession(connectInfo, sessionStore);
session.init();
this.currentSession = session;
sessions.put(hostId, session);
}
return true;
} catch (Exception e) {
log.error("TransferHandler.getAndInitSession error", e);
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(false)
.msg(this.getErrorMessage(e))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
return false;
}
}
/**
* 获取错误信息
*
* @param ex ex
* @return msg
*/
private String getErrorMessage(Exception ex) {
if (ex == null) {
return null;
}
if (ex instanceof InvalidArgumentException) {
return ex.getMessage();
}
return ErrorMessage.OPERATE_ERROR;
}
@Override
public void close() {
sessions.values().forEach(Streams::close);
}
}

View File

@@ -0,0 +1,35 @@
package com.orion.ops.module.asset.handler.host.transfer.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* 文件操作请求 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 21:01
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileOperatorRequest", description = "文件操作请求 实体对象")
public class TransferOperatorRequest {
@Schema(description = "上传路径")
private String path;
@Schema(description = "type")
private String type;
@Schema(description = "fileId")
private String fileId;
@Schema(description = "主机id")
private Long hostId;
}

View File

@@ -0,0 +1,38 @@
package com.orion.ops.module.asset.handler.host.transfer.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* 文件操作响应 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 22:38
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileOperatorResponse", description = "文件操作响应 实体对象")
public class TransferOperatorResponse {
@Schema(description = "type")
private String type;
@Schema(description = "fileId")
private String fileId;
@Schema(description = "主机id")
private Long hostId;
@Schema(description = "是否成功")
private Boolean success;
@Schema(description = "消息")
private String msg;
}

View File

@@ -0,0 +1,42 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.able.SafeCloseable;
import java.io.IOException;
/**
* 主机传输会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 23:06
*/
public interface ITransferHostSession extends SafeCloseable {
/**
* 初始化
*/
void init();
/**
* 开始上传
*
* @param path path
* @throws IOException IOException
*/
void startUpload(String path) throws IOException;
/**
* 写入内容
*
* @param bytes bytes
* @throws IOException IOException
*/
void putContent(byte[] bytes) throws IOException;
/**
* 写入完成
*/
void putFinish();
}

View File

@@ -0,0 +1,79 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import java.io.IOException;
import java.io.OutputStream;
/**
* 主机传输会话实现
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 21:12
*/
public class TransferHostSession implements ITransferHostSession {
private final HostTerminalConnectDTO connectInfo;
private final SessionStore sessionStore;
private SftpExecutor executor;
private OutputStream currentOutputStream;
public TransferHostSession(HostTerminalConnectDTO connectInfo, SessionStore sessionStore) {
this.connectInfo = connectInfo;
this.sessionStore = sessionStore;
}
@Override
public void init() {
if (executor == null) {
// 建立连接
this.executor = sessionStore.getSftpExecutor(connectInfo.getFileNameCharset());
executor.connect();
} else {
// 检查连接
if (!this.executor.isConnected()) {
executor.connect();
}
}
}
@Override
public void startUpload(String path) throws IOException {
// 检查连接
this.init();
SftpFile file = executor.getFile(path);
if (file != null) {
// 文件存在则重命名
executor.move(path, file.getName() + "_bk_" + System.currentTimeMillis());
}
// 打开输出流
this.currentOutputStream = executor.openOutputStream(path);
}
@Override
public void putContent(byte[] bytes) throws IOException {
currentOutputStream.write(bytes);
}
@Override
public void putFinish() {
Streams.close(currentOutputStream);
this.currentOutputStream = null;
}
@Override
public void close() {
Streams.close(executor);
Streams.close(sessionStore);
Streams.close(currentOutputStream);
}
}

View File

@@ -1,8 +1,10 @@
import type { ISftpTransferManager, SftpTransferItem } from '../types/terminal.type';
import { TransferStatus, TransferType } from '../types/terminal.const';
import { TransferOperatorResponse } from '../types/terminal.type';
import { TransferOperatorType, TransferStatus, TransferType } from '../types/terminal.const';
import { sleep } from '@/utils';
import { Message } from '@arco-design/web-vue';
import { getTerminalAccessToken } from '@/api/asset/host-terminal';
import { getPath } from '@/utils/file';
export const BLOCK_SIZE = 1024 * 1024;
@@ -17,6 +19,8 @@ export default class SftpTransferManager implements ISftpTransferManager {
private run: boolean;
private resp?: TransferOperatorResponse;
transferList: Array<SftpTransferItem>;
constructor() {
@@ -96,27 +100,35 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 接收消息
private async resolveMessage(message: MessageEvent) {
// TODO
console.log();
const data = message.data;
if (data === 'flush') {
} else if (data === 'error') {
} else if (data === 'close') {
// TODO 关闭会话
this.client?.close();
}
this.resp = JSON.parse(message.data);
// // TODO 关闭会话
// this.client?.close();
// }
}
// 上传文件
private async uploadFile(item: SftpTransferItem) {
const file = item.file;
// TODO 发送开始
// 发送开始上传信息
this.client?.send(JSON.stringify({
type: TransferOperatorType.UPLOAD_START,
path: getPath(item.parentPath + '/' + item.name),
hostId: item.hostId
}));
// TODO 等待处理结果 吧错误信息展示出来
try {
await this.awaitProcessedThrow();
} catch (ex: any) {
console.log(ex);
item.status = TransferStatus.ERROR;
item.errorMessage = ex.message;
return;
}
// 计算分片数量
const totalBlock = Math.ceil(file.size / BLOCK_SIZE);
// 分片上传
for (let i = 0; i < totalBlock; i++) {
// TODO wait ACK
// 读取数据
const start = i * BLOCK_SIZE;
const end = Math.min(file.size, start + BLOCK_SIZE);
@@ -127,9 +139,9 @@ export default class SftpTransferManager implements ISftpTransferManager {
reader.onerror = (error) => reject(error);
reader.readAsArrayBuffer(chunk);
});
// 上传 TODO
console.log(arrayBuffer);
this.client?.send(arrayBuffer as ArrayBuffer);
// TODO 等待处理结果
await this.awaitProcessedThrow();
}
// TODO 发送 END
}
@@ -139,4 +151,27 @@ export default class SftpTransferManager implements ISftpTransferManager {
// TODO
}
// 等待处理完成
private async awaitProcessedThrow() {
for (let i = 0; i < 100; i++) {
await sleep(50);
if (this.resp) {
break;
}
}
const resp = this.resp;
// const resp = undefined;
this.resp = undefined;
// 抛出异常
if (resp) {
if (resp.success) {
return;
} else {
throw new Error(resp.msg || '处理失败');
}
} else {
throw new Error('处理超时');
}
}
}

View File

@@ -299,6 +299,13 @@ export const TransferType = {
DOWNLOAD: 'download'
};
// 传输操作类型
export const TransferOperatorType = {
PROCESSED: 'processed',
UPLOAD_START: 'upload_start',
UPLOAD_FINISH: 'upload_finish'
};
// 打开 sshSettingModal key
export const openSshSettingModalKey = Symbol();

View File

@@ -384,5 +384,15 @@ export interface SftpTransferItem {
currentSize: number,
totalSize: number;
status: string;
errorMessage?: string;
file: File;
}
// 传输操作响应
export interface TransferOperatorResponse {
type: string;
fileId?: string;
hostId?: number;
success: boolean;
msg?: string;
}