优化上传逻辑.

This commit is contained in:
lijiahang
2024-02-22 16:31:45 +08:00
parent 5bd49d97f7
commit 1711981d80
11 changed files with 277 additions and 90 deletions

View File

@@ -4,7 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 消息操作类型
* 传输操作类型
*
* @author Jiahang Li
* @version 1.0.0
@@ -14,20 +14,20 @@ import lombok.Getter;
@AllArgsConstructor
public enum TransferOperatorType {
/**
* 处理完成
*/
PROCESSED("processed"),
/**
* 开始上传
*/
UPLOAD_START("upload_start"),
UPLOAD_START("uploadStart"),
/**
* 上传完成
*/
UPLOAD_FINISH("upload_finish"),
UPLOAD_FINISH("uploadFinish"),
/**
* 上传失败
*/
UPLOAD_ERROR("uploadError"),
;

View File

@@ -0,0 +1,43 @@
package com.orion.ops.module.asset.handler.host.transfer.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 传输响应类型
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 22:03
*/
@Getter
@AllArgsConstructor
public enum TransferReceiverType {
/**
* 请求下一块上传数据
*/
NEXT_BLOCK("nextBlock"),
/**
* 请求下一个传输任务
*/
NEXT_TRANSFER("nextTransfer"),
;
private final String type;
public static TransferReceiverType of(String type) {
if (type == null) {
return null;
}
for (TransferReceiverType value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSON;
import com.orion.lang.exception.argument.InvalidArgumentException;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorResponse;
import com.orion.ops.module.asset.handler.host.transfer.session.ITransferHostSession;
@@ -72,6 +74,10 @@ public class TransferHandler implements ITransferHandler {
// 上传完成
this.uploadFinish();
break;
case UPLOAD_ERROR:
// 上传失败
this.uploadError();
break;
default:
break;
}
@@ -79,23 +85,18 @@ public class TransferHandler implements ITransferHandler {
@Override
public void putContent(byte[] content) {
Exception ex = null;
try {
// 写入内容
currentSession.putContent(content);
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_BLOCK, null);
} catch (IOException e) {
ex = e;
log.error("TransferHandler.putContent error", e);
// 写入完成
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
}
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
@@ -104,20 +105,18 @@ public class TransferHandler implements ITransferHandler {
* @param payload payload
*/
private void uploadStart(TransferOperatorRequest payload) {
Exception ex = null;
try {
// 开始上传
currentSession.startUpload(payload.getPath());
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_BLOCK, null);
} catch (Exception e) {
ex = e;
log.error("TransferHandler.uploadStart error", e);
// 传输完成
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
}
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
@@ -126,11 +125,16 @@ public class TransferHandler implements ITransferHandler {
private void uploadFinish() {
currentSession.putFinish();
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(true)
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, null);
}
/**
* 上传失败
*/
private void uploadError() {
currentSession.putFinish();
// 响应结果
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, new InvalidArgumentException(Const.EMPTY));
}
/**
@@ -158,16 +162,26 @@ public class TransferHandler implements ITransferHandler {
} catch (Exception e) {
log.error("TransferHandler.getAndInitSession error", e);
// 响应结果
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(TransferOperatorType.PROCESSED.getType())
.success(false)
.msg(this.getErrorMessage(e))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
this.sendMessage(TransferReceiverType.NEXT_TRANSFER, e);
return false;
}
}
/**
* 发送消息
*
* @param type type
* @param ex ex
*/
private void sendMessage(TransferReceiverType type, Exception ex) {
TransferOperatorResponse resp = TransferOperatorResponse.builder()
.type(type.getType())
.success(ex == null)
.msg(this.getErrorMessage(ex))
.build();
WebSockets.sendText(this.channel, JSON.toJSONString(resp));
}
/**
* 获取错误信息
*

View File

@@ -11,8 +11,11 @@
:on-before-ok="handlerOk"
@cancel="handleClose">
<div class="upload-container">
<div class="mb16">
上传至文件夹: {{ parentPath }}
<div class="parent-wrapper mb16">
<span class="parent-label">上传至文件夹:</span>
<a-input class="parent-input"
v-model="parentPath"
placeholder="上传目录" />
</div>
<a-space>
<!-- 选择文件 -->
@@ -101,13 +104,17 @@
// 确定
const handlerOk = () => {
if (!parentPath.value) {
Message.error('请输入上传目录');
return false;
}
if (!fileList.value.length) {
return true;
}
// 添加到上传列表
const files = fileList.value.map(s => {
return {
id: nextId(10),
fileId: nextId(10),
type: TransferType.UPLOAD,
hostId: hostId.value,
name: s.file.webkitRelativePath || s.file.name,
@@ -142,6 +149,20 @@
width: 100%;
}
.parent-wrapper {
display: flex;
justify-content: space-between;
align-items: center;
.parent-label {
width: 98px;
}
.parent-input {
width: 386px;
}
}
.file-list-uploader {
margin-top: 24px;

View File

@@ -207,6 +207,7 @@
onMounted(async () => {
// 创建终端处理器
session.value = await sessionManager.openSftp(props.tab, {
setLoading: setTableLoading,
connectCallback,
resolveList,
resolveSftpMkdir: resolveFileAction,

View File

@@ -2,7 +2,6 @@
<a-drawer v-model:visible="visible"
title="文件传输列表"
:width="388"
:mask-closable="false"
:unmount-on-close="false"
:footer="false">
<a-spin class="full" :loading="loading">
@@ -42,7 +41,15 @@
</a-tooltip>
<!-- 传输进度 -->
<span class="transfer-progress">
{{ getFileSize(item.currentSize) }}/{{ getFileSize(item.totalSize) }}
<!-- 当前大小 -->
<span v-if="item.status === TransferStatus.TRANSFERRING">{{ getFileSize(item.currentSize) }}</span>
<span class="mx4" v-if="item.status === TransferStatus.TRANSFERRING">/</span>
<!-- 总大小 -->
<span>{{ getFileSize(item.totalSize) }}</span>
<!-- 进度百分比 -->
<span class="ml8" v-if="item.status === TransferStatus.TRANSFERRING">
{{ (item.currentSize / item.totalSize * 100).toFixed(2) }}%
</span>
</span>
<!-- 目标目录 -->
<a-tooltip v-if="item.parentPath"
@@ -71,14 +78,24 @@
</div>
<!-- 右侧状态/操作-->
<div class="transfer-item-right">
<!-- 等待传输 -->
<icon-loading v-if="item.status === TransferStatus.WAITING" />
<!-- 传输进度 -->
<a-progress v-else
type="circle"
size="mini"
:status="item.status"
:percent="item.currentSize / item.totalSize" />
<!-- 传输状态 -->
<div class="transfer-item-right-progress">
<!-- 等待传输 -->
<icon-loading v-if="item.status === TransferStatus.WAITING" />
<!-- 传输进度 -->
<a-progress v-else
type="circle"
size="mini"
:status="item.status"
:percent="item.currentSize / item.totalSize" />
</div>
<!-- 传输操作 -->
<div class="transfer-item-right-actions">
<!-- 关闭 -->
<span class="close-icon" @click="removeTask(item.fileId)">
<icon-close />
</span>
</div>
</div>
</div>
</a-list-item>
@@ -112,6 +129,11 @@
defineExpose({ open });
// 移除任务
const removeTask = (fileId: string) => {
transferManager.cancelTransfer(fileId);
};
// 关闭
const handleClose = () => {
handlerClear();
@@ -125,6 +147,7 @@
</script>
<style lang="less" scoped>
@icon-size: 20px;
@item-left-width: 42px;
@item-right-width: 42px;
@item-center-width: 388px - @item-left-width - @item-right-width;
@@ -135,6 +158,16 @@
display: flex;
align-items: center;
&:hover {
.transfer-item-right-progress {
display: none;
}
.transfer-item-right-actions {
display: flex;
}
}
&-left {
width: @item-left-width;
display: flex;
@@ -173,8 +206,30 @@
&-right {
width: @item-right-width;
display: flex;
justify-content: center;
&-progress {
display: flex;
justify-content: center;
}
&-actions {
display: none;
justify-content: center;
.close-icon {
width: @icon-size;
height: @icon-size;
border-radius: 50%;
display: flex;
justify-content: center;
align-items: center;
cursor: pointer;
&:hover {
background: var(--color-fill-2);
}
}
}
}
}

View File

@@ -56,6 +56,7 @@ export default class SftpSession implements ISftpSession {
// 创建文件夹
mkdir(path: string) {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_MKDIR, {
sessionId: this.sessionId,
path
@@ -64,6 +65,7 @@ export default class SftpSession implements ISftpSession {
// 创建文件
touch(path: string) {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_TOUCH, {
sessionId: this.sessionId,
path
@@ -72,6 +74,7 @@ export default class SftpSession implements ISftpSession {
// 移动文件
move(path: string, target: string) {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_MOVE, {
sessionId: this.sessionId,
path,
@@ -85,6 +88,7 @@ export default class SftpSession implements ISftpSession {
title: '删除确认',
content: `确定要删除 ${path} 吗? 确定后将立即删除且无法恢复!`,
onOk: () => {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_REMOVE, {
sessionId: this.sessionId,
path: path.join('|')
@@ -95,6 +99,7 @@ export default class SftpSession implements ISftpSession {
// 修改权限
chmod(path: string, mod: number) {
this.resolver.setLoading(true);
this.channel.send(InputProtocol.SFTP_CHMOD, {
sessionId: this.sessionId,
path,

View File

@@ -1,15 +1,12 @@
import type { ISftpTransferManager, ISftpTransferUploader, SftpTransferItem } from '../types/terminal.type';
import { TransferOperatorResponse } from '../types/terminal.type';
import { TransferOperatorType, TransferStatus, TransferType } from '../types/terminal.const';
import { TransferReceiverType, TransferStatus, TransferType } from '../types/terminal.const';
import { Message } from '@arco-design/web-vue';
import { getTerminalAccessToken } from '@/api/asset/host-terminal';
import SftpTransferUploader from '@/views/host/terminal/handler/sftp-transfer-uploader';
export const wsBase = import.meta.env.VITE_WS_BASE_URL;
// todo 考虑一下单文件上传失败 (网络/文件被删除)
// todo 取消任务
// sftp 传输管理器实现
export default class SftpTransferManager implements ISftpTransferManager {
@@ -37,6 +34,23 @@ export default class SftpTransferManager implements ISftpTransferManager {
}
}
// 取消传输
cancelTransfer(fileId: string): void {
const index = this.transferList.findIndex(s => s.fileId === fileId);
if (index === -1) {
return;
}
const item = this.transferList[index];
if (item.status === TransferStatus.TRANSFERRING) {
// 传输中则中断传输
if (this.currentUploader) {
this.currentUploader.uploadAbort();
}
}
// 从列表中移除
this.transferList.splice(index, 1);
}
// 打开会话
private async openClient() {
this.run = true;
@@ -91,9 +105,12 @@ export default class SftpTransferManager implements ISftpTransferManager {
// 接收消息
private async resolveMessage(message: MessageEvent) {
const data = JSON.parse(message.data) as TransferOperatorResponse;
if (data.type === TransferOperatorType.PROCESSED) {
// 接收处理完成
this.resolveProcessed(data);
if (data.type === TransferReceiverType.NEXT_BLOCK) {
// 接收下一块上传数据
await this.resolveNextBlock();
} else if (data.type === TransferReceiverType.NEXT_TRANSFER) {
// 接收接收下一个传输任务处理完成
this.resolveNextTransfer(data);
}
}
@@ -110,35 +127,40 @@ export default class SftpTransferManager implements ISftpTransferManager {
// TODO
}
// 接收处理完成回调
private resolveProcessed(data: TransferOperatorResponse) {
// 操作回调
if (data.success) {
// 操作成功
if (this.currentUploader) {
if (this.currentUploader.hasNextBlock()) {
// 有下一个分片则上传 (上一个分片传输完成)
this.currentUploader.uploadNextBlock();
} else {
// 有下一个分片则检查是否完成
if (this.currentUploader.finish) {
// 已完成 开始下一个传输任务 (发送 finish 后的回调)
this.transferNextItem();
} else {
// 未完成则发送完成 (最后一个分片传输完成但还未发送 finish 指令)
this.currentUploader.uploadFinish();
}
}
// 接收下一块上传数据
private async resolveNextBlock() {
// 只可能为上传并且成功
if (!this.currentUploader) {
return;
}
if (this.currentUploader.hasNextBlock()
&& !this.currentUploader.abort
&& !this.currentUploader.finish) {
try {
// 有下一个分片则上传 (上一个分片传输完成)
await this.currentUploader.uploadNextBlock();
} catch (e) {
// 读取文件失败
this.currentUploader.uploadError((e as Error).message);
}
} else {
// 操作失败
if (this.currentUploader) {
// 上传失败
this.currentUploader.uploadError(data.msg);
}
// 开始下一个传输任务
this.transferNextItem();
// 没有下一个分片则发送完成
this.currentUploader.uploadFinish();
}
}
// 接收下一个传输任务
private resolveNextTransfer(data: TransferOperatorResponse) {
if (this.currentItem) {
if (data.success) {
this.currentItem.status = TransferStatus.SUCCESS;
} else {
this.currentItem.status = TransferStatus.ERROR;
this.currentItem.errorMessage = data.msg || '上传失败';
}
}
// 开始下一个传输任务
this.transferNextItem();
}
}

View File

@@ -7,6 +7,7 @@ export const BLOCK_SIZE = 1024 * 1024;
// sftp 上传器实现
export default class SftpTransferUploader implements ISftpTransferUploader {
public abort: boolean;
public finish: boolean;
private currentBlock: number;
private totalBlock: number;
@@ -15,6 +16,7 @@ export default class SftpTransferUploader implements ISftpTransferUploader {
private file: File;
constructor(item: SftpTransferItem, client: WebSocket) {
this.abort = false;
this.finish = false;
this.item = item;
this.client = client;
@@ -73,6 +75,16 @@ export default class SftpTransferUploader implements ISftpTransferUploader {
this.finish = true;
this.item.status = TransferStatus.ERROR;
this.item.errorMessage = msg || '上传失败';
// 发送上传完成的信息
this.client?.send(JSON.stringify({
type: TransferOperatorType.UPLOAD_ERROR,
hostId: this.item.hostId
}));
}
// 上传中断
uploadAbort() {
this.abort = true;
}
}

View File

@@ -301,9 +301,15 @@ export const TransferType = {
// 传输操作类型
export const TransferOperatorType = {
PROCESSED: 'processed',
UPLOAD_START: 'upload_start',
UPLOAD_FINISH: 'upload_finish'
UPLOAD_START: 'uploadStart',
UPLOAD_FINISH: 'uploadFinish',
UPLOAD_ERROR: 'uploadError',
};
// 传输响应类型
export const TransferReceiverType = {
NEXT_BLOCK: 'nextBlock',
NEXT_TRANSFER: 'nextTransfer',
};
// 打开 sshSettingModal key

View File

@@ -333,6 +333,8 @@ export interface ISftpSession extends ITerminalSession {
// sftp 会话接收器定义
export interface ISftpSessionResolver {
// 设置加载状态
setLoading: (loading: boolean) => void;
// 连接后回调
connectCallback: () => void;
// 接受文件列表响应
@@ -372,27 +374,33 @@ export interface ISftpTransferManager {
transferList: Array<SftpTransferItem>;
// 添加传输
addTransfer: (items: Array<SftpTransferItem>) => void;
// 取消传输
cancelTransfer: (fileId: string) => void;
}
// sftp 上传器定义
export interface ISftpTransferUploader {
// 是否完成
finish: boolean;
// 是否中断
abort: boolean;
// 开始上传
startUpload: () => void;
// 是否有下一个分片
hasNextBlock: () => boolean;
// 上传下一个分片
uploadNextBlock: () => void;
uploadNextBlock: () => Promise<void>;
// 上传完成
uploadFinish: () => void;
// 上传失败
uploadError: (msg: string | undefined) => void;
// 上传中断
uploadAbort: () => void;
}
// sftp 上传文件项
export interface SftpTransferItem {
id: string;
fileId: string;
type: string;
hostId: number;
name: string;