🔨 批量上传任务.

This commit is contained in:
lijiahang
2024-05-08 19:13:06 +08:00
parent 0774662b4f
commit 26172ea651
27 changed files with 891 additions and 77 deletions

View File

@@ -87,6 +87,8 @@ public interface ErrorMessage {
String LOG_ABSENT = "日志不存在";
String TASK_ABSENT = "任务不存在";
String ILLEGAL_STATUS = "当前状态不支持此操作";
String CHECK_AUTHORIZED_HOST = "请选择已授权的主机";

View File

@@ -10,7 +10,9 @@ Authorization: {{token}}
"scriptExec": 0,
"command": "echo 123",
"parameterSchema": "[]",
"hostIdList": [1]
"hostIdList": [
1
]
}

View File

@@ -1,9 +1,13 @@
package com.orion.ops.module.asset.dao;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.orion.ops.framework.mybatis.core.mapper.IMapper;
import com.orion.ops.framework.mybatis.core.query.Conditions;
import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* 上传任务文件 Mapper 接口
*
@@ -14,4 +18,48 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface UploadTaskFileDAO extends IMapper<UploadTaskFileDO> {
/**
* 通过 taskId 查询
*
* @param taskId taskId
* @return files
*/
default List<UploadTaskFileDO> selectByTaskId(Long taskId) {
return this.selectList(Conditions.eq(UploadTaskFileDO::getTaskId, taskId));
}
/**
* 通过 taskId hostId 更新状态
*
* @param taskId taskId
* @param hostId hostId
* @param status status
* @return effect
*/
default int updateStatusByTaskHostId(Long taskId, Long hostId, String status) {
// 条件
LambdaQueryWrapper<UploadTaskFileDO> wrapper = this.wrapper()
.eq(UploadTaskFileDO::getTaskId, taskId)
.eq(UploadTaskFileDO::getHostId, hostId);
// 修改值
UploadTaskFileDO update = new UploadTaskFileDO();
update.setStatus(status);
// 更新
return this.update(update, wrapper);
}
/**
* 通过 id 更新状态
*
* @param idList idList
* @param status status
* @return effect
*/
default int updateStatusByIdList(List<Long> idList, String status) {
UploadTaskFileDO update = new UploadTaskFileDO();
update.setStatus(status);
// 更新
return this.update(update, Conditions.in(UploadTaskFileDO::getId, idList));
}
}

View File

@@ -87,4 +87,28 @@ public interface AssetThreadPools {
.allowCoreThreadTimeout(true)
.build();
/**
* 批量上传任务线程池
*/
ThreadPoolExecutor UPLOAD_TASK = ExecutorBuilder.create()
.namedThreadFactory("upload-task-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)
.workQueue(new SynchronousQueue<>())
.allowCoreThreadTimeout(true)
.build();
/**
* 批量上传主机线程池
*/
ThreadPoolExecutor UPLOAD_HOST = ExecutorBuilder.create()
.namedThreadFactory("upload-host-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)
.workQueue(new SynchronousQueue<>())
.allowCoreThreadTimeout(true)
.build();
}

View File

@@ -40,8 +40,8 @@ public class UploadTaskQueryRequest extends PageRequest {
@Schema(description = "状态")
private String status;
@Schema(description = "开始时间-区间")
@Schema(description = "创建时间-区间")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date[] startTimeRange;
private Date[] createTimeRange;
}

View File

@@ -52,4 +52,7 @@ public class UploadTaskFileVO implements Serializable {
@Schema(description = "结束时间")
private Date endTime;
@Schema(description = "传输进度")
private Long current;
}

View File

@@ -24,6 +24,11 @@ public enum UploadTaskFileStatusEnum {
*/
FINISHED,
/**
* 已失败
*/
FAILED,
/**
* 已取消
*/

View File

@@ -1,5 +1,8 @@
package com.orion.ops.module.asset.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 上传任务状态
*
@@ -7,30 +10,34 @@ package com.orion.ops.module.asset.enums;
* @version 1.0.0
* @since 2024/5/7 22:21
*/
@Getter
@AllArgsConstructor
public enum UploadTaskStatusEnum {
/**
* 准备中
*/
PREPARATION,
PREPARATION(true),
/**
* 上传中
*/
UPLOADING,
UPLOADING(true),
/**
* 已完成
*/
FINISHED,
FINISHED(false),
/**
* 已取消
*/
CANCELED,
CANCELED(false),
;
private final boolean cancelable;
public static UploadTaskStatusEnum of(String status) {
if (status == null) {
return null;

View File

@@ -42,13 +42,13 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class BaseExecCommandHandler implements IExecCommandHandler {
private final FileClient fileClient = SpringHolder.getBean("logsFileClient");
private static final FileClient fileClient = SpringHolder.getBean("logsFileClient");
private final ExecLogManager execLogManager = SpringHolder.getBean(ExecLogManager.class);
private static final ExecLogManager execLogManager = SpringHolder.getBean(ExecLogManager.class);
private final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
private final ExecHostLogDAO execHostLogDAO = SpringHolder.getBean(ExecHostLogDAO.class);
private static final ExecHostLogDAO execHostLogDAO = SpringHolder.getBean(ExecHostLogDAO.class);
protected final ExecCommandDTO execCommand;

View File

@@ -79,7 +79,8 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
Exception ex = null;
try {
// 获取连接信息
connect = hostTerminalService.getTerminalConnectInfo(userId, host, connectType);
connect = hostTerminalService.getTerminalConnectInfo(userId, host);
connect.setConnectType(connectType.name());
// 设置到缓存中
channel.getAttributes().put(sessionId, connect);
log.info("TerminalCheckHandler-handle success userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);

View File

@@ -128,7 +128,7 @@ public class SftpSession extends TerminalSession implements ISftpSession {
} catch (Exception e) {
throw Exceptions.ioRuntime(e);
} finally {
// 同关闭 transfer downloader
// TODO Test
// 关闭 inputStream 可能会被阻塞 ???...??? 只能关闭 executor
Streams.close(this.executor);
this.connect();

View File

@@ -7,7 +7,6 @@ import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferOperatorType;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.model.TransferOperatorRequest;
@@ -107,7 +106,7 @@ public class TransferHandler implements ITransferHandler {
ITransferHostSession session = sessions.get(sessionKey);
if (session == null) {
// 获取主机信息
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(this.userId, hostId, HostConnectTypeEnum.SFTP);
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(this.userId, hostId);
SessionStore sessionStore = hostTerminalService.openSessionStore(connectInfo);
// 打开会话并初始化
if (TransferOperatorType.UPLOAD.equals(type.getOperator())) {

View File

@@ -1,20 +1,15 @@
package com.orion.ops.module.asset.handler.host.transfer.session;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.Booleans;
import com.orion.lang.utils.Strings;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.framework.biz.operator.log.core.model.OperatorLogModel;
import com.orion.ops.framework.biz.operator.log.core.service.OperatorLogFrameworkService;
import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs;
import com.orion.ops.module.asset.define.config.AppSftpConfig;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.handler.host.terminal.utils.TerminalUtils;
import com.orion.ops.module.asset.handler.host.transfer.model.SftpFileBackupParams;
import com.orion.spring.SpringHolder;
import org.springframework.web.socket.WebSocketSession;
@@ -59,27 +54,6 @@ public abstract class TransferHostSession implements ITransferHostSession {
}
}
/**
* 检查文件是否存在 并且执行响应策略
*
* @param path path
*/
protected void doCheckFilePresent(String path) {
// 重复不备份
if (!Booleans.isTrue(SFTP_CONFIG.getUploadPresentBackup())) {
return;
}
// 检查文件是否存在
SftpFile file = executor.getFile(path);
if (file != null) {
// 文件存在则备份
SftpFileBackupParams backupParams = new SftpFileBackupParams(file.getName(), System.currentTimeMillis());
String target = Strings.format(SFTP_CONFIG.getBackupFileName(), JSON.parseObject(JSON.toJSONString(backupParams)));
// 移动
executor.move(path, target);
}
}
/**
* 保存操作日志
*

View File

@@ -7,6 +7,7 @@ import com.orion.ops.module.asset.define.operator.HostTerminalOperatorType;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.handler.host.transfer.enums.TransferReceiverType;
import com.orion.ops.module.asset.handler.host.transfer.utils.TransferUtils;
import com.orion.ops.module.asset.utils.SftpUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
@@ -39,7 +40,7 @@ public class UploadSession extends TransferHostSession implements IUploadSession
// 检查连接
this.init();
// 检查文件是否存在
this.doCheckFilePresent(path);
SftpUtils.checkUploadFilePresent(SFTP_CONFIG, executor, path);
// 打开输出流
this.outputStream = executor.openOutputStream(path);
// 响应结果

View File

@@ -0,0 +1,27 @@
package com.orion.ops.module.asset.handler.host.upload;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.handler.host.upload.task.FileUploadTask;
/**
* 批量上传执行器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 17:23
*/
public class FileUploadTasks {
private FileUploadTasks() {
}
/**
* 上传
*
* @param taskId taskId
*/
public static void start(Long taskId) {
AssetThreadPools.UPLOAD_TASK.execute(new FileUploadTask(taskId));
}
}

View File

@@ -0,0 +1,38 @@
package com.orion.ops.module.asset.handler.host.upload.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 文件上传文件对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 14:35
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileUploadFileItemDTO", description = "文件上传文件对象")
public class FileUploadFileItemDTO {
@Schema(description = "id")
private Long id;
@Schema(description = "fileId")
private String fileId;
@Schema(description = "远程路径")
private String remotePath;
@Schema(description = "当前大小")
private Long current;
@Schema(description = "状态")
private String status;
}

View File

@@ -0,0 +1,49 @@
package com.orion.ops.module.asset.handler.host.upload.manager;
import com.orion.ops.module.asset.handler.host.upload.task.IFileUploadTask;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/**
* 文件上传管理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 17:26
*/
@Component
public class FileUploadTaskManager {
private final ConcurrentHashMap<Long, IFileUploadTask> tasks = new ConcurrentHashMap<>();
/**
* 添加任务
*
* @param id id
* @param task task
*/
public void addTask(Long id, IFileUploadTask task) {
tasks.put(id, task);
}
/**
* 移除任务
*
* @param id id
*/
public void removeTask(Long id) {
tasks.remove(id);
}
/**
* 获取任务
*
* @param id id
* @return task
*/
public IFileUploadTask getTask(Long id) {
return tasks.get(id);
}
}

View File

@@ -0,0 +1,181 @@
package com.orion.ops.module.asset.handler.host.upload.task;
import com.orion.lang.utils.Threads;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.module.asset.dao.UploadTaskDAO;
import com.orion.ops.module.asset.dao.UploadTaskFileDAO;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.entity.domain.UploadTaskDO;
import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO;
import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum;
import com.orion.ops.module.asset.enums.UploadTaskStatusEnum;
import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO;
import com.orion.ops.module.asset.handler.host.upload.manager.FileUploadTaskManager;
import com.orion.ops.module.asset.handler.host.upload.uploader.FileUploader;
import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader;
import com.orion.ops.module.asset.service.UploadTaskService;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 上传任务 实现类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 13:38
*/
@Slf4j
public class FileUploadTask implements IFileUploadTask {
private static final UploadTaskDAO uploadTaskDAO = SpringHolder.getBean(UploadTaskDAO.class);
private static final UploadTaskFileDAO uploadTaskFileDAO = SpringHolder.getBean(UploadTaskFileDAO.class);
private static final UploadTaskService uploadTaskService = SpringHolder.getBean(UploadTaskService.class);
private static final FileUploadTaskManager fileUploadTaskManager = SpringHolder.getBean(FileUploadTaskManager.class);
private final Long id;
@Getter
private final List<IFileUploader> uploaderList;
private UploadTaskDO record;
private volatile boolean canceled;
private volatile boolean closed;
public FileUploadTask(Long id) {
this.id = id;
this.uploaderList = new ArrayList<>();
}
@Override
public void run() {
log.info("FileUploadTask.run start id: {}", id);
// 查询任务
this.record = uploadTaskDAO.selectById(id);
if (record == null) {
return;
}
// 检查任务状态 非准备中则取消执行
if (!UploadTaskStatusEnum.PREPARATION.name().equals(record.getStatus())) {
return;
}
try {
// 添加任务
fileUploadTaskManager.addTask(id, this);
// 修改状态
this.updateStatus(UploadTaskStatusEnum.UPLOADING);
// 创建文件上传器
this.createFileUploader();
// 执行上传
this.runUpload();
log.info("FileUploadTask.run finish id: {}", id);
} catch (Exception e) {
log.error("FileUploadTask.run error id: {}", id, e);
} finally {
// 修改状态
if (canceled) {
this.updateStatus(UploadTaskStatusEnum.CANCELED);
} else {
this.updateStatus(UploadTaskStatusEnum.FINISHED);
}
// 移除任务
fileUploadTaskManager.removeTask(id);
// 释放资源
this.close();
}
}
@Override
public void cancel() {
log.info("FileUploadTask.cancel id: {}, canceled: {}, closed: {}", id, canceled, closed);
if (this.canceled || this.closed) {
return;
}
// 关闭
this.canceled = true;
uploaderList.forEach(IFileUploader::cancel);
this.close();
}
@Override
public void close() {
log.info("FileUploadTask.close id: {}, canceled: {}, closed: {}", id, canceled, closed);
if (closed) {
return;
}
this.closed = true;
// 删除临时文件
uploadTaskService.clearUploadSwapFiles(id);
// 关闭
uploaderList.forEach(Streams::close);
}
/**
* 创建文件上传器
*/
private void createFileUploader() {
// 查询文件
List<UploadTaskFileDO> uploadFiles = uploadTaskFileDAO.selectByTaskId(id);
Map<Long, List<UploadTaskFileDO>> hostFileGroup = uploadFiles.stream()
.collect(Collectors.groupingBy(UploadTaskFileDO::getHostId));
hostFileGroup.forEach((k, v) -> {
// 设置上传的文件
List<FileUploadFileItemDTO> files = v.stream()
.map(s -> FileUploadFileItemDTO.builder()
.id(s.getId())
.remotePath(record.getRemotePath() + Const.SLASH + s.getFilePath())
.status(UploadTaskFileStatusEnum.WAITING.name())
.current(0L)
.build())
.collect(Collectors.toList());
// 添加到上传器
uploaderList.add(new FileUploader(id, k, files));
});
}
/**
* 执行上传
*/
private void runUpload() throws Exception {
if (uploaderList.size() == 1) {
// 单个主机直接执行
IFileUploader handler = uploaderList.get(0);
handler.run();
} else {
// 多个主机异步阻塞执行
Threads.blockRun(uploaderList, AssetThreadPools.UPLOAD_HOST);
}
}
/**
* 更新状态
*
* @param status status
*/
private void updateStatus(UploadTaskStatusEnum status) {
UploadTaskDO update = new UploadTaskDO();
update.setId(id);
update.setStatus(status.name());
if (UploadTaskStatusEnum.UPLOADING.equals(status)) {
update.setStartTime(new Date());
} else if (UploadTaskStatusEnum.FINISHED.equals(status)) {
update.setEndTime(new Date());
} else if (UploadTaskStatusEnum.CANCELED.equals(status)) {
update.setEndTime(new Date());
}
uploadTaskDAO.updateById(update);
}
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.asset.handler.host.upload.task;
import com.orion.lang.able.SafeCloseable;
import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader;
import java.util.List;
/**
* 上传任务
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 12:28
*/
public interface IFileUploadTask extends Runnable, SafeCloseable {
/**
* 取消上传
*/
void cancel();
/**
* 获取上传器
*
* @return uploader
*/
List<IFileUploader> getUploaderList();
}

View File

@@ -0,0 +1,244 @@
package com.orion.ops.module.asset.handler.host.upload.uploader;
import com.orion.lang.utils.Strings;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.module.asset.dao.UploadTaskFileDAO;
import com.orion.ops.module.asset.define.config.AppSftpConfig;
import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum;
import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO;
import com.orion.ops.module.asset.service.HostTerminalService;
import com.orion.ops.module.asset.service.UploadTaskService;
import com.orion.ops.module.asset.utils.SftpUtils;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* 主机文件上传器 实现类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 13:41
*/
@Slf4j
public class FileUploader implements IFileUploader {
private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
private static final UploadTaskFileDAO uploadTaskFileDAO = SpringHolder.getBean(UploadTaskFileDAO.class);
private static final AppSftpConfig SFTP_CONFIG = SpringHolder.getBean(AppSftpConfig.class);
private static final FileClient localFileClient = SpringHolder.getBean("localFileClient");
private SessionStore sessionStore;
private SftpExecutor executor;
private final Long taskId;
private final Long hostId;
@Getter
private final List<FileUploadFileItemDTO> files;
private InputStream inputStream;
private OutputStream outputStream;
private volatile boolean canceled;
private volatile boolean closed;
public FileUploader(Long taskId, Long hostId, List<FileUploadFileItemDTO> files) {
this.taskId = taskId;
this.hostId = hostId;
this.files = files;
}
@Override
public void run() {
try {
// 初始化会话
boolean run = this.initSession();
if (!run) {
return;
}
// 上传文件
for (FileUploadFileItemDTO file : files) {
if (closed) {
break;
}
// 上传
this.uploadFile(file);
}
// 检查是否取消
this.finishCheckCancel();
} finally {
// 释放资源
this.close();
}
}
/**
* 初始化会话
*
* @return 是否执行
*/
private boolean initSession() {
log.info("HostFileUploader.initSession start taskId: {}, hostId: {}", taskId, hostId);
try {
// TODO 测试 打开 executor 后 是否会connect, 不需要的话就关闭 executor 然后重新打开
// 打开会话
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId);
this.sessionStore = hostTerminalService.openSessionStore(connectInfo);
this.executor = sessionStore.getSftpExecutor(connectInfo.getFileNameCharset());
executor.connect();
log.info("HostFileUploader.initSession success taskId: {}, hostId: {}", taskId, hostId);
return true;
} catch (Exception e) {
log.error("HostFileUploader.initSession error taskId: {}, hostId: {}", taskId, hostId, e);
// 修改状态
uploadTaskFileDAO.updateStatusByTaskHostId(taskId, hostId, UploadTaskFileStatusEnum.FAILED.name());
files.forEach(s -> s.setStatus(UploadTaskFileStatusEnum.FAILED.name()));
return false;
}
}
/**
* 上传文件
*
* @param file file
*/
private void uploadFile(FileUploadFileItemDTO file) {
log.info("HostFileUploader.uploadFile start taskId: {}, hostId: {}, id: {}", taskId, hostId, file.getId());
// 修改状态
this.updateStatus(file, UploadTaskFileStatusEnum.UPLOADING);
try {
// 获取本地文件路径
String endpoint = Strings.format(UploadTaskService.SWAP_ENDPOINT, taskId);
String localPath = localFileClient.getReturnPath(endpoint + Const.SLASH + file.getFileId());
// 检查文件是否存在
String remotePath = file.getRemotePath();
SftpUtils.checkUploadFilePresent(SFTP_CONFIG, executor, remotePath);
// 打开输出流
this.inputStream = localFileClient.getContentInputStream(localPath);
this.outputStream = executor.openOutputStream(remotePath);
// 传输
byte[] buffer = new byte[executor.getBufferSize()];
int read;
while ((read = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, read);
// FIXME test
file.setCurrent(file.getCurrent() + read);
}
outputStream.flush();
// 修改状态
this.updateStatus(file, UploadTaskFileStatusEnum.FINISHED);
log.info("HostFileUploader.uploadFile finish taskId: {}, hostId: {}, id: {}", taskId, hostId, file.getId());
} catch (Exception e) {
log.info("HostFileUploader.uploadFile error taskId: {}, hostId: {}, id: {}, canceled: {}", taskId, hostId, file.getId(), canceled);
// 修改状态
if (canceled) {
this.updateStatus(file, UploadTaskFileStatusEnum.CANCELED);
} else {
this.updateStatus(file, UploadTaskFileStatusEnum.FAILED);
}
} finally {
// 释放文件
this.resetFile();
}
}
/**
* 释放文件
*/
private void resetFile() {
Streams.close(outputStream);
Streams.close(inputStream);
}
/**
* 检查是否取消
*/
private void finishCheckCancel() {
if (!canceled) {
return;
}
// 将等待中的文件修改为已取消
List<Long> idList = files.stream()
.filter(s -> UploadTaskFileStatusEnum.WAITING.name().equals(s.getStatus()))
.map(FileUploadFileItemDTO::getId)
.collect(Collectors.toList());
if (idList.isEmpty()) {
return;
}
// 修改状态
uploadTaskFileDAO.updateStatusByIdList(idList, UploadTaskFileStatusEnum.CANCELED.name());
}
/**
* 更新状态
*
* @param file file
* @param status status
*/
private void updateStatus(FileUploadFileItemDTO file, UploadTaskFileStatusEnum status) {
file.setStatus(status.name());
UploadTaskFileDO update = new UploadTaskFileDO();
update.setId(file.getId());
update.setStatus(status.name());
if (UploadTaskFileStatusEnum.UPLOADING.equals(status)) {
// 上传中
update.setStartTime(new Date());
} else if (UploadTaskFileStatusEnum.FINISHED.equals(status)) {
// 已完成
update.setEndTime(new Date());
} else if (UploadTaskFileStatusEnum.FAILED.equals(status)) {
// 已失败
update.setEndTime(new Date());
} else if (UploadTaskFileStatusEnum.CANCELED.equals(status)) {
// 已失败
update.setEndTime(new Date());
}
uploadTaskFileDAO.updateById(update);
}
@Override
public void cancel() {
log.info("HostFileUploader.cancel taskId: {}, hostId: {}, canceled: {}, closed: {}", taskId, hostId, canceled, closed);
if (this.canceled || this.closed) {
return;
}
// 关闭
this.canceled = true;
this.close();
}
@Override
public void close() {
log.info("HostFileUploader.close taskId: {}, hostId: {}, closed: {}", taskId, hostId, closed);
if (closed) {
return;
}
this.closed = true;
// 释放资源
Streams.close(outputStream);
Streams.close(inputStream);
Streams.close(executor);
Streams.close(sessionStore);
}
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.asset.handler.host.upload.uploader;
import com.orion.lang.able.SafeCloseable;
import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO;
import java.util.List;
/**
* 主机文件上传器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 13:41
*/
public interface IFileUploader extends Runnable, SafeCloseable {
/**
* 取消上传
*/
void cancel();
/**
* 获取传输文件
*
* @return files
*/
List<FileUploadFileItemDTO> getFiles();
}

View File

@@ -5,7 +5,6 @@ import com.orion.net.host.SessionStore;
import com.orion.ops.module.asset.entity.domain.HostDO;
import com.orion.ops.module.asset.entity.dto.HostTerminalAccessDTO;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
/**
* 主机终端服务
@@ -38,25 +37,31 @@ public interface HostTerminalService {
*/
HostTerminalAccessDTO getAccessInfoByToken(String token);
/**
* 获取连接信息
*
* @param hostId hostId
* @return session
*/
HostTerminalConnectDTO getTerminalConnectInfo(Long hostId);
/**
* 使用用户配置获取连接信息
*
* @param hostId hostId
* @param userId userId
* @param type type
* @return session
*/
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type);
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId);
/**
* 使用用户配置获取连接信息
*
* @param host host
* @param userId userId
* @param type type
* @return session
*/
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type);
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host);
/**
* 使用默认配置打开主机会话

View File

@@ -17,6 +17,8 @@ import java.util.List;
*/
public interface UploadTaskService {
String SWAP_ENDPOINT = "/upload/swap/{}";
/**
* 创建上传任务
*

View File

@@ -113,15 +113,28 @@ public class HostTerminalServiceImpl implements HostTerminalService {
}
@Override
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type) {
public HostTerminalConnectDTO getTerminalConnectInfo(Long hostId) {
log.info("HostConnectService.getTerminalConnectInfo-withHost hostId: {}", hostId);
// 查询主机
HostDO host = hostDAO.selectById(hostId);
Valid.notNull(host, ErrorMessage.HOST_ABSENT);
return this.getTerminalConnectInfo(userId, host, type);
// 查询主机配置
HostSshConfigModel model = hostConfigService.getHostConfig(hostId, HostConfigTypeEnum.SSH);
Valid.notNull(model, ErrorMessage.CONFIG_ABSENT);
// 获取配置
return this.getHostConnectInfo(host, model, null);
}
@Override
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type) {
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId) {
// 查询主机
HostDO host = hostDAO.selectById(hostId);
Valid.notNull(host, ErrorMessage.HOST_ABSENT);
return this.getTerminalConnectInfo(userId, host);
}
@Override
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host) {
Long hostId = host.getId();
log.info("HostConnectService.getTerminalConnectInfo hostId: {}, userId: {}", hostId, userId);
// 验证主机是否有权限
@@ -149,22 +162,14 @@ public class HostTerminalServiceImpl implements HostTerminalService {
}
}
// 获取连接配置
HostTerminalConnectDTO connectInfo = this.getHostConnectInfo(host, config, extra);
connectInfo.setConnectType(type.name());
return connectInfo;
return this.getHostConnectInfo(host, config, extra);
}
@Override
public SessionStore openSessionStore(Long hostId) {
log.info("HostConnectService.openSessionStore-withHost hostId: {}", hostId);
// 查询主机
HostDO host = hostDAO.selectById(hostId);
Valid.notNull(host, ErrorMessage.HOST_ABSENT);
// 查询主机配置
HostSshConfigModel model = hostConfigService.getHostConfig(hostId, HostConfigTypeEnum.SSH);
Valid.notNull(model, ErrorMessage.CONFIG_ABSENT);
// 获取配置
HostTerminalConnectDTO connect = this.getHostConnectInfo(host, model, null);
HostTerminalConnectDTO connect = this.getTerminalConnectInfo(hostId);
// 打开连接
return this.openSessionStore(connect);
}

View File

@@ -11,6 +11,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 上传任务文件 服务实现类
@@ -29,11 +30,10 @@ public class UploadTaskFileServiceImpl implements UploadTaskFileService {
@Override
public List<UploadTaskFileVO> getFileByTaskId(Long taskId) {
// 查询
return uploadTaskFileDAO.of()
.createWrapper()
.eq(UploadTaskFileDO::getTaskId, taskId)
.then()
.list(UploadTaskFileConvert.MAPPER::to);
return uploadTaskFileDAO.selectByTaskId(taskId)
.stream()
.map(UploadTaskFileConvert.MAPPER::to)
.collect(Collectors.toList());
}
@Override

View File

@@ -6,6 +6,7 @@ import com.orion.lang.define.wrapper.DataGrid;
import com.orion.lang.utils.Arrays1;
import com.orion.lang.utils.Strings;
import com.orion.lang.utils.collect.Lists;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Files1;
import com.orion.lang.utils.time.Dates;
import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs;
@@ -13,6 +14,7 @@ import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.framework.common.security.LoginUser;
import com.orion.ops.framework.common.utils.Valid;
import com.orion.ops.framework.mybatis.core.query.Conditions;
import com.orion.ops.framework.security.core.utils.SecurityUtils;
import com.orion.ops.module.asset.convert.HostConvert;
import com.orion.ops.module.asset.convert.UploadTaskConvert;
@@ -32,6 +34,11 @@ import com.orion.ops.module.asset.entity.vo.UploadTaskVO;
import com.orion.ops.module.asset.enums.HostConfigTypeEnum;
import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum;
import com.orion.ops.module.asset.enums.UploadTaskStatusEnum;
import com.orion.ops.module.asset.handler.host.upload.FileUploadTasks;
import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO;
import com.orion.ops.module.asset.handler.host.upload.manager.FileUploadTaskManager;
import com.orion.ops.module.asset.handler.host.upload.task.IFileUploadTask;
import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader;
import com.orion.ops.module.asset.service.AssetAuthorizedDataService;
import com.orion.ops.module.asset.service.UploadTaskFileService;
import com.orion.ops.module.asset.service.UploadTaskService;
@@ -41,8 +48,8 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -56,7 +63,7 @@ import java.util.stream.Collectors;
@Service
public class UploadTaskServiceImpl implements UploadTaskService {
private static final String SWAP_ENDPOINT = "/upload/swap/{}";
// TODO 测试空文件上传 0B
private static final String DEFAULT_DESC = "上传文件 {}";
@@ -75,6 +82,9 @@ public class UploadTaskServiceImpl implements UploadTaskService {
@Resource
private AssetAuthorizedDataService assetAuthorizedDataService;
@Resource
private FileUploadTaskManager fileUploadTaskManager;
@Resource
private FileClient localFileClient;
@@ -141,12 +151,44 @@ public class UploadTaskServiceImpl implements UploadTaskService {
// 查询任务
UploadTaskDO record = uploadTaskDAO.selectById(id);
Valid.notNull(record, ErrorMessage.DATA_ABSENT);
// 查询任务文件 TODO PROGRESS
// 查询任务文件
List<UploadTaskFileVO> files = uploadTaskFileService.getFileByTaskId(id);
// 获取上传任务进度
IFileUploadTask task = fileUploadTaskManager.getTask(id);
Map<Long, FileUploadFileItemDTO> fileItemMap;
if (task == null) {
fileItemMap = Maps.empty();
} else {
fileItemMap = task.getUploaderList()
.stream()
.map(IFileUploader::getFiles)
.flatMap(Collection::stream)
.collect(Collectors.toMap(FileUploadFileItemDTO::getId, Function.identity()));
}
// 设置进度
for (UploadTaskFileVO file : files) {
String status = file.getStatus();
if (UploadTaskFileStatusEnum.WAITING.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.UPLOADING.name().equals(status)) {
// 上传中获取当前值
Long current = Optional.ofNullable(file.getId())
.map(fileItemMap::get)
.map(FileUploadFileItemDTO::getCurrent)
.orElse(file.getFileSize());
file.setCurrent(current);
} else if (UploadTaskFileStatusEnum.FINISHED.name().equals(status)) {
file.setCurrent(file.getFileSize());
} else if (UploadTaskFileStatusEnum.FAILED.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.CANCELED.name().equals(status)) {
file.setCurrent(0L);
}
}
// 返回
UploadTaskVO task = UploadTaskConvert.MAPPER.to(record);
task.setFiles(files);
return task;
UploadTaskVO uploadTask = UploadTaskConvert.MAPPER.to(record);
uploadTask.setFiles(files);
return uploadTask;
}
@Override
@@ -185,8 +227,11 @@ public class UploadTaskServiceImpl implements UploadTaskService {
@Transactional(rollbackFor = Exception.class)
@Override
public Integer deleteUploadTaskByIdList(List<Long> idList) {
// TODO 暂停 / 删除交换区文件
log.info("UploadTaskService-deleteUploadTaskByIdList idList: {}", idList);
// 查询任务
List<UploadTaskDO> records = uploadTaskDAO.selectBatchIds(idList);
// 取消任务
this.checkCancelTask(records);
// 删除任务
int effect = uploadTaskDAO.deleteBatchIds(idList);
// 删除任务文件
@@ -199,12 +244,24 @@ public class UploadTaskServiceImpl implements UploadTaskService {
@Override
public void startUploadTask(Long id) {
// 查询任务
UploadTaskDO record = uploadTaskDAO.selectById(id);
Valid.notNull(record, ErrorMessage.TASK_ABSENT);
// 检查状态
Valid.eq(record.getStatus(), UploadTaskStatusEnum.PREPARATION.name(), ErrorMessage.ILLEGAL_STATUS);
// 执行任务
FileUploadTasks.start(id);
}
@Override
public void cancelUploadTask(Long id) {
// 查询任务
UploadTaskDO record = uploadTaskDAO.selectById(id);
Valid.notNull(record, ErrorMessage.TASK_ABSENT);
// 检查状态
Valid.isTrue(UploadTaskStatusEnum.of(record.getStatus()).isCancelable(), ErrorMessage.ILLEGAL_STATUS);
// 取消任务
this.checkCancelTask(Lists.singleton(record));
}
@Override
@@ -239,8 +296,8 @@ public class UploadTaskServiceImpl implements UploadTaskService {
.in(UploadTaskDO::getDescription, request.getDescription())
.eq(UploadTaskDO::getRemotePath, request.getRemotePath())
.eq(UploadTaskDO::getStatus, request.getStatus())
.ge(UploadTaskDO::getStartTime, Arrays1.getIfPresent(request.getStartTimeRange(), 0))
.le(UploadTaskDO::getStartTime, Arrays1.getIfPresent(request.getStartTimeRange(), 1))
.ge(UploadTaskDO::getCreateTime, Arrays1.getIfPresent(request.getCreateTimeRange(), 0))
.le(UploadTaskDO::getCreateTime, Arrays1.getIfPresent(request.getCreateTimeRange(), 1))
.orderByDesc(UploadTaskDO::getId);
}
@@ -257,4 +314,40 @@ public class UploadTaskServiceImpl implements UploadTaskService {
}
}
/**
* 检查需要取消的任务
*
* @param records records
*/
private void checkCancelTask(List<UploadTaskDO> records) {
// 需要取消的记录
List<UploadTaskDO> cancelableRecords = records.stream()
.filter(s -> UploadTaskStatusEnum.of(s.getStatus()).isCancelable())
.collect(Collectors.toList());
if (cancelableRecords.isEmpty()) {
return;
}
// 更新状态的 id
List<Long> updateIdList = cancelableRecords.stream()
.map(UploadTaskDO::getId)
.filter(s -> fileUploadTaskManager.getTask(s) == null)
.collect(Collectors.toList());
// 取消上传
cancelableRecords.stream()
.map(UploadTaskDO::getId)
.map(fileUploadTaskManager::getTask)
.filter(Objects::nonNull)
.forEach(IFileUploadTask::cancel);
// 更新状态
if (!updateIdList.isEmpty()) {
UploadTaskDO update = new UploadTaskDO();
update.setStatus(UploadTaskStatusEnum.CANCELED.name());
update.setEndTime(new Date());
// 更新
uploadTaskDAO.update(update, Conditions.in(UploadTaskDO::getId, updateIdList));
// 删除文件
this.clearUploadSwapFiles(updateIdList);
}
}
}

View File

@@ -0,0 +1,46 @@
package com.orion.ops.module.asset.utils;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.Booleans;
import com.orion.lang.utils.Strings;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.net.host.sftp.SftpFile;
import com.orion.ops.module.asset.define.config.AppSftpConfig;
import com.orion.ops.module.asset.handler.host.transfer.model.SftpFileBackupParams;
/**
* sftp 工具类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/8 16:17
*/
public class SftpUtils {
private SftpUtils() {
}
/**
* 检查上传文件是否存在 并且执行响应策略
*
* @param config config
* @param executor executor
* @param path path
*/
public static void checkUploadFilePresent(AppSftpConfig config, SftpExecutor executor, String path) {
// 重复不备份
if (!Booleans.isTrue(config.getUploadPresentBackup())) {
return;
}
// 检查文件是否存在
SftpFile file = executor.getFile(path);
if (file != null) {
// 文件存在则备份
SftpFileBackupParams backupParams = new SftpFileBackupParams(file.getName(), System.currentTimeMillis());
String target = Strings.format(config.getBackupFileName(), JSON.parseObject(JSON.toJSONString(backupParams)));
// 移动
executor.move(path, target);
}
}
}