🔨 上传文件.
This commit is contained in:
@@ -1,49 +0,0 @@
|
|||||||
package com.orion.ops.framework.websocket.core.handler;
|
|
||||||
|
|
||||||
import org.springframework.web.socket.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 文本类型消息处理器
|
|
||||||
*
|
|
||||||
* @author Jiahang Li
|
|
||||||
* @version 1.0.0
|
|
||||||
* @since 2023/12/29 18:23
|
|
||||||
*/
|
|
||||||
public abstract class TextWebSocketHandler implements WebSocketHandler {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterConnectionEstablished(WebSocketSession session) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
|
|
||||||
// 非 text message 不处理
|
|
||||||
if (!(message instanceof TextMessage)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 处理消息
|
|
||||||
this.onMessage(session, (String) message.getPayload());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理消息
|
|
||||||
*
|
|
||||||
* @param session session
|
|
||||||
* @param payload payload
|
|
||||||
*/
|
|
||||||
public abstract void onMessage(WebSocketSession session, String payload);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean supportsPartialMessages() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.orion.ops.module.asset.config;
|
package com.orion.ops.module.asset.config;
|
||||||
|
|
||||||
|
import com.orion.ops.module.asset.handler.host.sftp.TransferMessageHandler;
|
||||||
import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher;
|
import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher;
|
||||||
import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor;
|
import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@@ -28,12 +29,19 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
|
|||||||
@Resource
|
@Resource
|
||||||
private TerminalMessageDispatcher terminalMessageDispatcher;
|
private TerminalMessageDispatcher terminalMessageDispatcher;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private TransferMessageHandler transferMessageHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
||||||
// 终端
|
// 终端
|
||||||
registry.addHandler(terminalMessageDispatcher, prefix + "/host/terminal/{accessToken}")
|
registry.addHandler(terminalMessageDispatcher, prefix + "/host/terminal/{accessToken}")
|
||||||
.addInterceptors(terminalAccessInterceptor)
|
.addInterceptors(terminalAccessInterceptor)
|
||||||
.setAllowedOrigins("*");
|
.setAllowedOrigins("*");
|
||||||
|
// 文件传输
|
||||||
|
registry.addHandler(transferMessageHandler, prefix + "/host/transfer/{accessToken}")
|
||||||
|
.addInterceptors(terminalAccessInterceptor)
|
||||||
|
.setAllowedOrigins("*");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package com.orion.ops.module.asset.handler.host.sftp;
|
||||||
|
|
||||||
|
import com.orion.lang.define.collect.MultiConcurrentHashMap;
|
||||||
|
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 传输管理器
|
||||||
|
*
|
||||||
|
* @author Jiahang Li
|
||||||
|
* @version 1.0.0
|
||||||
|
* @since 2024/2/21 19:05
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class TransferManager {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 会话存储器
|
||||||
|
*/
|
||||||
|
private final MultiConcurrentHashMap<String, Long, ITerminalSession> channelSessions = MultiConcurrentHashMap.create();
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package com.orion.ops.module.asset.handler.host.sftp;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.socket.BinaryMessage;
|
||||||
|
import org.springframework.web.socket.CloseStatus;
|
||||||
|
import org.springframework.web.socket.TextMessage;
|
||||||
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
|
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sftp 传输消息处理器
|
||||||
|
*
|
||||||
|
* @author Jiahang Li
|
||||||
|
* @version 1.0.0
|
||||||
|
* @since 2024/2/21 18:22
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class TransferMessageHandler extends AbstractWebSocketHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
||||||
|
System.out.println("text");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
|
||||||
|
System.out.println("binary");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterConnectionEstablished(WebSocketSession session) {
|
||||||
|
log.info("TransferMessageHandler-afterConnectionEstablished id: {}", session.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleTransportError(WebSocketSession session, Throwable exception) {
|
||||||
|
log.error("TransferMessageHandler-handleTransportError id: {}", session.getId(), exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
|
||||||
|
String id = session.getId();
|
||||||
|
log.info("TransferMessageHandler-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package com.orion.ops.module.asset.handler.host.sftp.upload;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Jiahang Li
|
||||||
|
* @version 1.0.0
|
||||||
|
* @since 2024/2/21 19:04
|
||||||
|
*/
|
||||||
|
public class FileUploader {
|
||||||
|
}
|
||||||
@@ -1,12 +1,13 @@
|
|||||||
package com.orion.ops.module.asset.handler.host.terminal;
|
package com.orion.ops.module.asset.handler.host.terminal;
|
||||||
|
|
||||||
import com.orion.ops.framework.websocket.core.handler.TextWebSocketHandler;
|
|
||||||
import com.orion.ops.module.asset.handler.host.terminal.enums.InputTypeEnum;
|
import com.orion.ops.module.asset.handler.host.terminal.enums.InputTypeEnum;
|
||||||
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
|
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.CloseStatus;
|
||||||
|
import org.springframework.web.socket.TextMessage;
|
||||||
import org.springframework.web.socket.WebSocketSession;
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
|
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
@@ -19,13 +20,14 @@ import javax.annotation.Resource;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class TerminalMessageDispatcher extends TextWebSocketHandler {
|
public class TerminalMessageDispatcher extends AbstractWebSocketHandler {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private TerminalManager terminalManager;
|
private TerminalManager terminalManager;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(WebSocketSession session, String payload) {
|
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
|
||||||
|
String payload = message.getPayload();
|
||||||
try {
|
try {
|
||||||
// 解析类型
|
// 解析类型
|
||||||
InputTypeEnum type = InputTypeEnum.of(payload);
|
InputTypeEnum type = InputTypeEnum.of(payload);
|
||||||
@@ -38,6 +40,11 @@ public class TerminalMessageDispatcher extends TextWebSocketHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterConnectionEstablished(WebSocketSession session) {
|
||||||
|
log.info("TerminalMessageDispatcher-afterConnectionEstablished id: {}", session.getId());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleTransportError(WebSocketSession session, Throwable exception) {
|
public void handleTransportError(WebSocketSession session, Throwable exception) {
|
||||||
log.error("TerminalMessageDispatcher-handleTransportError id: {}", session.getId(), exception);
|
log.error("TerminalMessageDispatcher-handleTransportError id: {}", session.getId(), exception);
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import type { TerminalTheme, TerminalThemeSchema } from '@/api/asset/host-termin
|
|||||||
import { getTerminalThemes } from '@/api/asset/host-terminal';
|
import { getTerminalThemes } from '@/api/asset/host-terminal';
|
||||||
import { defineStore } from 'pinia';
|
import { defineStore } from 'pinia';
|
||||||
import { getPreference, updatePreference } from '@/api/user/preference';
|
import { getPreference, updatePreference } from '@/api/user/preference';
|
||||||
import { nextSessionId } from '@/utils';
|
import { nextId } from '@/utils';
|
||||||
import { Message } from '@arco-design/web-vue';
|
import { Message } from '@arco-design/web-vue';
|
||||||
import { PanelSessionType, TerminalTabs } from '@/views/host/terminal/types/terminal.const';
|
import { PanelSessionType, TerminalTabs } from '@/views/host/terminal/types/terminal.const';
|
||||||
import TerminalTabManager from '@/views/host/terminal/handler/terminal-tab-manager';
|
import TerminalTabManager from '@/views/host/terminal/handler/terminal-tab-manager';
|
||||||
@@ -152,7 +152,7 @@ export default defineStore('terminal', {
|
|||||||
: 1;
|
: 1;
|
||||||
// 打开 tab
|
// 打开 tab
|
||||||
this.panelManager.getPanel(panelIndex).openTab({
|
this.panelManager.getPanel(panelIndex).openTab({
|
||||||
key: nextSessionId(10),
|
key: nextId(10),
|
||||||
seq: nextSeq,
|
seq: nextSeq,
|
||||||
title: `(${nextSeq}) ${record.alias || record.name}`,
|
title: `(${nextSeq}) ${record.alias || record.name}`,
|
||||||
hostId: record.id,
|
hostId: record.id,
|
||||||
|
|||||||
@@ -206,7 +206,7 @@ export function getUUID() {
|
|||||||
/**
|
/**
|
||||||
* 获取会话id
|
* 获取会话id
|
||||||
*/
|
*/
|
||||||
export const nextSessionId = (len: number): string => {
|
export const nextId = (len: number): string => {
|
||||||
return getUUID().replaceAll('-', '').substring(0, len);
|
return getUUID().replaceAll('-', '').substring(0, len);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,7 @@
|
|||||||
import { Message } from '@arco-design/web-vue';
|
import { Message } from '@arco-design/web-vue';
|
||||||
import useVisible from '@/hooks/visible';
|
import useVisible from '@/hooks/visible';
|
||||||
import { TransferStatus, TransferType } from '../../types/terminal.const';
|
import { TransferStatus, TransferType } from '../../types/terminal.const';
|
||||||
|
import { nextId } from '@/utils';
|
||||||
|
|
||||||
const { visible, setVisible } = useVisible();
|
const { visible, setVisible } = useVisible();
|
||||||
const { transferManager } = useTerminalStore();
|
const { transferManager } = useTerminalStore();
|
||||||
@@ -106,6 +107,7 @@
|
|||||||
// 添加到上传列表
|
// 添加到上传列表
|
||||||
const files = fileList.value.map(s => {
|
const files = fileList.value.map(s => {
|
||||||
return {
|
return {
|
||||||
|
id: nextId(10),
|
||||||
type: TransferType.UPLOAD,
|
type: TransferType.UPLOAD,
|
||||||
hostId: hostId.value,
|
hostId: hostId.value,
|
||||||
name: s.file.webkitRelativePath || s.file.name,
|
name: s.file.webkitRelativePath || s.file.name,
|
||||||
@@ -116,7 +118,7 @@
|
|||||||
file: s.file
|
file: s.file
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
transferManager.addUpload(files);
|
transferManager.addTransfer(files);
|
||||||
Message.success('已开始上传, 点击右侧传输列表查看进度');
|
Message.success('已开始上传, 点击右侧传输列表查看进度');
|
||||||
// 清空
|
// 清空
|
||||||
handlerClear();
|
handlerClear();
|
||||||
|
|||||||
@@ -14,7 +14,8 @@
|
|||||||
:data="transferManager.transferList">
|
:data="transferManager.transferList">
|
||||||
<!-- 空数据 -->
|
<!-- 空数据 -->
|
||||||
<template #empty>
|
<template #empty>
|
||||||
<a-empty description="无数据" />
|
<a-empty style="flex-direction: column;"
|
||||||
|
description="无传输文件" />
|
||||||
</template>
|
</template>
|
||||||
<!-- 数据 -->
|
<!-- 数据 -->
|
||||||
<template #item="{ item }">
|
<template #item="{ item }">
|
||||||
|
|||||||
@@ -1,17 +1,142 @@
|
|||||||
import type { ISftpTransferManager, SftpTransferItem } from '../types/terminal.type';
|
import type { ISftpTransferManager, SftpTransferItem } from '../types/terminal.type';
|
||||||
|
import { TransferStatus, TransferType } from '../types/terminal.const';
|
||||||
|
import { sleep } from '@/utils';
|
||||||
|
import { Message } from '@arco-design/web-vue';
|
||||||
|
import { getTerminalAccessToken } from '@/api/asset/host-terminal';
|
||||||
|
|
||||||
|
export const BLOCK_SIZE = 1024 * 1024;
|
||||||
|
|
||||||
|
export const wsBase = import.meta.env.VITE_WS_BASE_URL;
|
||||||
|
|
||||||
|
// todo 考虑一下单文件上传失败 (网络/文件被删除)
|
||||||
|
|
||||||
// sftp 传输管理器实现
|
// sftp 传输管理器实现
|
||||||
export default class SftpTransferManager implements ISftpTransferManager {
|
export default class SftpTransferManager implements ISftpTransferManager {
|
||||||
|
|
||||||
|
private client?: WebSocket;
|
||||||
|
|
||||||
|
private run: boolean;
|
||||||
|
|
||||||
transferList: Array<SftpTransferItem>;
|
transferList: Array<SftpTransferItem>;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
|
this.run = false;
|
||||||
this.transferList = [];
|
this.transferList = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加上传文件
|
// 添加传输
|
||||||
addUpload(items: Array<SftpTransferItem>): void {
|
addTransfer(items: Array<SftpTransferItem>): void {
|
||||||
this.transferList.push(...items);
|
this.transferList.push(...items);
|
||||||
|
// 开始传输
|
||||||
|
if (!this.run) {
|
||||||
|
this.startTransfer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 打开会话
|
||||||
|
private async openClient() {
|
||||||
|
// 获取 access
|
||||||
|
const { data: accessToken } = await getTerminalAccessToken();
|
||||||
|
// 打开会话
|
||||||
|
this.client = new WebSocket(`${wsBase}/host/transfer/${accessToken}`);
|
||||||
|
this.client.onerror = event => {
|
||||||
|
// 打开失败将传输列表置为失效
|
||||||
|
Message.error('会话打开失败');
|
||||||
|
console.error('error', event);
|
||||||
|
this.transferList.forEach(s => {
|
||||||
|
s.status = TransferStatus.ERROR;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
this.client.onclose = event => {
|
||||||
|
// 关闭会话重置 run
|
||||||
|
this.run = false;
|
||||||
|
console.warn('close', event);
|
||||||
|
};
|
||||||
|
this.client.onmessage = this.resolveMessage.bind(this);
|
||||||
|
// 等待会话连接
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
await sleep(50);
|
||||||
|
if (this.client.readyState !== WebSocket.CONNECTING) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 开始传输
|
||||||
|
private async startTransfer() {
|
||||||
|
this.run = true;
|
||||||
|
// 打开会话
|
||||||
|
await this.openClient();
|
||||||
|
if (!this.run) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 开始传输
|
||||||
|
while (true) {
|
||||||
|
const item = this.transferList.find(s => s.status === TransferStatus.WAITING);
|
||||||
|
if (!item) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// 开始传输
|
||||||
|
try {
|
||||||
|
item.status = TransferStatus.TRANSFERRING;
|
||||||
|
if (item.type === TransferType.UPLOAD) {
|
||||||
|
// 上传
|
||||||
|
await this.uploadFile(item);
|
||||||
|
} else {
|
||||||
|
// 下载
|
||||||
|
await this.uploadDownload(item);
|
||||||
|
}
|
||||||
|
item.status = TransferStatus.SUCCESS;
|
||||||
|
} catch (e) {
|
||||||
|
item.status = TransferStatus.ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 接收消息
|
||||||
|
private async resolveMessage(message: MessageEvent) {
|
||||||
|
// TODO
|
||||||
|
console.log();
|
||||||
|
const data = message.data;
|
||||||
|
if (data === 'flush') {
|
||||||
|
|
||||||
|
} else if (data === 'error') {
|
||||||
|
|
||||||
|
} else if (data === 'close') {
|
||||||
|
// TODO 关闭会话
|
||||||
|
this.client?.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 上传文件
|
||||||
|
private async uploadFile(item: SftpTransferItem) {
|
||||||
|
const file = item.file;
|
||||||
|
// TODO 发送开始
|
||||||
|
// 计算分片数量
|
||||||
|
const totalBlock = Math.ceil(file.size / BLOCK_SIZE);
|
||||||
|
// 分片上传
|
||||||
|
for (let i = 0; i < totalBlock; i++) {
|
||||||
|
// TODO wait ACK
|
||||||
|
// 读取数据
|
||||||
|
const start = i * BLOCK_SIZE;
|
||||||
|
const end = Math.min(file.size, start + BLOCK_SIZE);
|
||||||
|
const chunk = file.slice(start, end);
|
||||||
|
const reader = new FileReader();
|
||||||
|
const arrayBuffer = await new Promise((resolve, reject) => {
|
||||||
|
reader.onload = () => resolve(reader.result);
|
||||||
|
reader.onerror = (error) => reject(error);
|
||||||
|
reader.readAsArrayBuffer(chunk);
|
||||||
|
});
|
||||||
|
// 上传 TODO
|
||||||
|
console.log(arrayBuffer);
|
||||||
|
this.client?.send(arrayBuffer as ArrayBuffer);
|
||||||
|
}
|
||||||
|
// TODO 发送 END
|
||||||
|
}
|
||||||
|
|
||||||
|
// 下载文件
|
||||||
|
private async uploadDownload(item: SftpTransferItem) {
|
||||||
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -370,12 +370,13 @@ export interface SftpFile {
|
|||||||
// sftp 传输管理器定义
|
// sftp 传输管理器定义
|
||||||
export interface ISftpTransferManager {
|
export interface ISftpTransferManager {
|
||||||
transferList: Array<SftpTransferItem>;
|
transferList: Array<SftpTransferItem>;
|
||||||
// 添加上传文件
|
// 添加传输
|
||||||
addUpload: (items: Array<SftpTransferItem>) => void;
|
addTransfer: (items: Array<SftpTransferItem>) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sftp 上传文件项
|
// sftp 上传文件项
|
||||||
export interface SftpTransferItem {
|
export interface SftpTransferItem {
|
||||||
|
id: string;
|
||||||
type: string;
|
type: string;
|
||||||
hostId: number;
|
hostId: number;
|
||||||
name: string;
|
name: string;
|
||||||
|
|||||||
Reference in New Issue
Block a user