From 26172ea651100e4074877b0565f160120f2fa701 Mon Sep 17 00:00:00 2001 From: lijiahang Date: Wed, 8 May 2024 19:13:06 +0800 Subject: [PATCH] =?UTF-8?q?:hammer:=20=E6=89=B9=E9=87=8F=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/ErrorMessage.java | 2 + .../asset/controller/ExecJobController.http | 4 +- .../module/asset/dao/UploadTaskFileDAO.java | 48 ++++ .../module/asset/define/AssetThreadPools.java | 24 ++ .../upload/UploadTaskQueryRequest.java | 4 +- .../asset/entity/vo/UploadTaskFileVO.java | 3 + .../asset/enums/UploadTaskFileStatusEnum.java | 5 + .../asset/enums/UploadTaskStatusEnum.java | 15 +- .../handler/BaseExecCommandHandler.java | 8 +- .../handler/TerminalCheckHandler.java | 3 +- .../host/terminal/session/SftpSession.java | 2 +- .../transfer/handler/TransferHandler.java | 3 +- .../transfer/session/TransferHostSession.java | 26 -- .../host/transfer/session/UploadSession.java | 3 +- .../handler/host/upload/FileUploadTasks.java | 27 ++ .../upload/dto/FileUploadFileItemDTO.java | 38 +++ .../upload/manager/FileUploadTaskManager.java | 49 ++++ .../host/upload/task/FileUploadTask.java | 181 +++++++++++++ .../host/upload/task/IFileUploadTask.java | 29 +++ .../host/upload/uploader/FileUploader.java | 244 ++++++++++++++++++ .../host/upload/uploader/IFileUploader.java | 29 +++ .../asset/service/HostTerminalService.java | 15 +- .../asset/service/UploadTaskService.java | 2 + .../service/impl/HostTerminalServiceImpl.java | 31 ++- .../impl/UploadTaskFileServiceImpl.java | 10 +- .../service/impl/UploadTaskServiceImpl.java | 117 ++++++++- .../ops/module/asset/utils/SftpUtils.java | 46 ++++ 27 files changed, 891 insertions(+), 77 deletions(-) create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/FileUploadTasks.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/dto/FileUploadFileItemDTO.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/manager/FileUploadTaskManager.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/FileUploadTask.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/IFileUploadTask.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/FileUploader.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/IFileUploader.java create mode 100644 orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/utils/SftpUtils.java diff --git a/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java b/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java index 3c56ac5b..07283454 100644 --- a/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java +++ b/orion-ops-framework/orion-ops-framework-common/src/main/java/com/orion/ops/framework/common/constant/ErrorMessage.java @@ -87,6 +87,8 @@ public interface ErrorMessage { String LOG_ABSENT = "日志不存在"; + String TASK_ABSENT = "任务不存在"; + String ILLEGAL_STATUS = "当前状态不支持此操作"; String CHECK_AUTHORIZED_HOST = "请选择已授权的主机"; diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/controller/ExecJobController.http b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/controller/ExecJobController.http index 053e4425..42d16123 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/controller/ExecJobController.http +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/controller/ExecJobController.http @@ -10,7 +10,9 @@ Authorization: {{token}} "scriptExec": 0, "command": "echo 123", "parameterSchema": "[]", - "hostIdList": [1] + "hostIdList": [ + 1 + ] } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/dao/UploadTaskFileDAO.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/dao/UploadTaskFileDAO.java index 344cb15c..7ce7b3f1 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/dao/UploadTaskFileDAO.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/dao/UploadTaskFileDAO.java @@ -1,9 +1,13 @@ package com.orion.ops.module.asset.dao; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.orion.ops.framework.mybatis.core.mapper.IMapper; +import com.orion.ops.framework.mybatis.core.query.Conditions; import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO; import org.apache.ibatis.annotations.Mapper; +import java.util.List; + /** * 上传任务文件 Mapper 接口 * @@ -14,4 +18,48 @@ import org.apache.ibatis.annotations.Mapper; @Mapper public interface UploadTaskFileDAO extends IMapper { + /** + * 通过 taskId 查询 + * + * @param taskId taskId + * @return files + */ + default List selectByTaskId(Long taskId) { + return this.selectList(Conditions.eq(UploadTaskFileDO::getTaskId, taskId)); + } + + /** + * 通过 taskId hostId 更新状态 + * + * @param taskId taskId + * @param hostId hostId + * @param status status + * @return effect + */ + default int updateStatusByTaskHostId(Long taskId, Long hostId, String status) { + // 条件 + LambdaQueryWrapper wrapper = this.wrapper() + .eq(UploadTaskFileDO::getTaskId, taskId) + .eq(UploadTaskFileDO::getHostId, hostId); + // 修改值 + UploadTaskFileDO update = new UploadTaskFileDO(); + update.setStatus(status); + // 更新 + return this.update(update, wrapper); + } + + /** + * 通过 id 更新状态 + * + * @param idList idList + * @param status status + * @return effect + */ + default int updateStatusByIdList(List idList, String status) { + UploadTaskFileDO update = new UploadTaskFileDO(); + update.setStatus(status); + // 更新 + return this.update(update, Conditions.in(UploadTaskFileDO::getId, idList)); + } + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java index d4e4030f..e299a7a0 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/define/AssetThreadPools.java @@ -87,4 +87,28 @@ public interface AssetThreadPools { .allowCoreThreadTimeout(true) .build(); + /** + * 批量上传任务线程池 + */ + ThreadPoolExecutor UPLOAD_TASK = ExecutorBuilder.create() + .namedThreadFactory("upload-task-") + .corePoolSize(1) + .maxPoolSize(Integer.MAX_VALUE) + .keepAliveTime(Const.MS_S_60) + .workQueue(new SynchronousQueue<>()) + .allowCoreThreadTimeout(true) + .build(); + + /** + * 批量上传主机线程池 + */ + ThreadPoolExecutor UPLOAD_HOST = ExecutorBuilder.create() + .namedThreadFactory("upload-host-") + .corePoolSize(1) + .maxPoolSize(Integer.MAX_VALUE) + .keepAliveTime(Const.MS_S_60) + .workQueue(new SynchronousQueue<>()) + .allowCoreThreadTimeout(true) + .build(); + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/request/upload/UploadTaskQueryRequest.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/request/upload/UploadTaskQueryRequest.java index 0978c6fd..2c87ca8d 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/request/upload/UploadTaskQueryRequest.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/request/upload/UploadTaskQueryRequest.java @@ -40,8 +40,8 @@ public class UploadTaskQueryRequest extends PageRequest { @Schema(description = "状态") private String status; - @Schema(description = "开始时间-区间") + @Schema(description = "创建时间-区间") @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") - private Date[] startTimeRange; + private Date[] createTimeRange; } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/vo/UploadTaskFileVO.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/vo/UploadTaskFileVO.java index ce39459e..2282903a 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/vo/UploadTaskFileVO.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/entity/vo/UploadTaskFileVO.java @@ -52,4 +52,7 @@ public class UploadTaskFileVO implements Serializable { @Schema(description = "结束时间") private Date endTime; + @Schema(description = "传输进度") + private Long current; + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskFileStatusEnum.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskFileStatusEnum.java index 8b63d344..94387de7 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskFileStatusEnum.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskFileStatusEnum.java @@ -24,6 +24,11 @@ public enum UploadTaskFileStatusEnum { */ FINISHED, + /** + * 已失败 + */ + FAILED, + /** * 已取消 */ diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskStatusEnum.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskStatusEnum.java index 90e5b00e..1f78bb48 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskStatusEnum.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/enums/UploadTaskStatusEnum.java @@ -1,5 +1,8 @@ package com.orion.ops.module.asset.enums; +import lombok.AllArgsConstructor; +import lombok.Getter; + /** * 上传任务状态 * @@ -7,30 +10,34 @@ package com.orion.ops.module.asset.enums; * @version 1.0.0 * @since 2024/5/7 22:21 */ +@Getter +@AllArgsConstructor public enum UploadTaskStatusEnum { /** * 准备中 */ - PREPARATION, + PREPARATION(true), /** * 上传中 */ - UPLOADING, + UPLOADING(true), /** * 已完成 */ - FINISHED, + FINISHED(false), /** * 已取消 */ - CANCELED, + CANCELED(false), ; + private final boolean cancelable; + public static UploadTaskStatusEnum of(String status) { if (status == null) { return null; diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/exec/command/handler/BaseExecCommandHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/exec/command/handler/BaseExecCommandHandler.java index 86f3187f..dde24a38 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/exec/command/handler/BaseExecCommandHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/exec/command/handler/BaseExecCommandHandler.java @@ -42,13 +42,13 @@ import java.util.concurrent.TimeUnit; @Slf4j public abstract class BaseExecCommandHandler implements IExecCommandHandler { - private final FileClient fileClient = SpringHolder.getBean("logsFileClient"); + private static final FileClient fileClient = SpringHolder.getBean("logsFileClient"); - private final ExecLogManager execLogManager = SpringHolder.getBean(ExecLogManager.class); + private static final ExecLogManager execLogManager = SpringHolder.getBean(ExecLogManager.class); - private final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class); + private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class); - private final ExecHostLogDAO execHostLogDAO = SpringHolder.getBean(ExecHostLogDAO.class); + private static final ExecHostLogDAO execHostLogDAO = SpringHolder.getBean(ExecHostLogDAO.class); protected final ExecCommandDTO execCommand; diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/TerminalCheckHandler.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/TerminalCheckHandler.java index c9ada7e6..851ef690 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/TerminalCheckHandler.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/terminal/handler/TerminalCheckHandler.java @@ -79,7 +79,8 @@ public class TerminalCheckHandler extends AbstractTerminalHandler tasks = new ConcurrentHashMap<>(); + + /** + * 添加任务 + * + * @param id id + * @param task task + */ + public void addTask(Long id, IFileUploadTask task) { + tasks.put(id, task); + } + + /** + * 移除任务 + * + * @param id id + */ + public void removeTask(Long id) { + tasks.remove(id); + } + + /** + * 获取任务 + * + * @param id id + * @return task + */ + public IFileUploadTask getTask(Long id) { + return tasks.get(id); + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/FileUploadTask.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/FileUploadTask.java new file mode 100644 index 00000000..a07149ab --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/FileUploadTask.java @@ -0,0 +1,181 @@ +package com.orion.ops.module.asset.handler.host.upload.task; + +import com.orion.lang.utils.Threads; +import com.orion.lang.utils.io.Streams; +import com.orion.ops.framework.common.constant.Const; +import com.orion.ops.module.asset.dao.UploadTaskDAO; +import com.orion.ops.module.asset.dao.UploadTaskFileDAO; +import com.orion.ops.module.asset.define.AssetThreadPools; +import com.orion.ops.module.asset.entity.domain.UploadTaskDO; +import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO; +import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum; +import com.orion.ops.module.asset.enums.UploadTaskStatusEnum; +import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO; +import com.orion.ops.module.asset.handler.host.upload.manager.FileUploadTaskManager; +import com.orion.ops.module.asset.handler.host.upload.uploader.FileUploader; +import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader; +import com.orion.ops.module.asset.service.UploadTaskService; +import com.orion.spring.SpringHolder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 上传任务 实现类 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/5/8 13:38 + */ +@Slf4j +public class FileUploadTask implements IFileUploadTask { + + private static final UploadTaskDAO uploadTaskDAO = SpringHolder.getBean(UploadTaskDAO.class); + + private static final UploadTaskFileDAO uploadTaskFileDAO = SpringHolder.getBean(UploadTaskFileDAO.class); + + private static final UploadTaskService uploadTaskService = SpringHolder.getBean(UploadTaskService.class); + + private static final FileUploadTaskManager fileUploadTaskManager = SpringHolder.getBean(FileUploadTaskManager.class); + + private final Long id; + + @Getter + private final List uploaderList; + + private UploadTaskDO record; + + private volatile boolean canceled; + + private volatile boolean closed; + + public FileUploadTask(Long id) { + this.id = id; + this.uploaderList = new ArrayList<>(); + } + + @Override + public void run() { + log.info("FileUploadTask.run start id: {}", id); + // 查询任务 + this.record = uploadTaskDAO.selectById(id); + if (record == null) { + return; + } + // 检查任务状态 非准备中则取消执行 + if (!UploadTaskStatusEnum.PREPARATION.name().equals(record.getStatus())) { + return; + } + try { + // 添加任务 + fileUploadTaskManager.addTask(id, this); + // 修改状态 + this.updateStatus(UploadTaskStatusEnum.UPLOADING); + // 创建文件上传器 + this.createFileUploader(); + // 执行上传 + this.runUpload(); + log.info("FileUploadTask.run finish id: {}", id); + } catch (Exception e) { + log.error("FileUploadTask.run error id: {}", id, e); + } finally { + // 修改状态 + if (canceled) { + this.updateStatus(UploadTaskStatusEnum.CANCELED); + } else { + this.updateStatus(UploadTaskStatusEnum.FINISHED); + } + // 移除任务 + fileUploadTaskManager.removeTask(id); + // 释放资源 + this.close(); + } + } + + @Override + public void cancel() { + log.info("FileUploadTask.cancel id: {}, canceled: {}, closed: {}", id, canceled, closed); + if (this.canceled || this.closed) { + return; + } + // 关闭 + this.canceled = true; + uploaderList.forEach(IFileUploader::cancel); + this.close(); + } + + @Override + public void close() { + log.info("FileUploadTask.close id: {}, canceled: {}, closed: {}", id, canceled, closed); + if (closed) { + return; + } + this.closed = true; + // 删除临时文件 + uploadTaskService.clearUploadSwapFiles(id); + // 关闭 + uploaderList.forEach(Streams::close); + } + + /** + * 创建文件上传器 + */ + private void createFileUploader() { + // 查询文件 + List uploadFiles = uploadTaskFileDAO.selectByTaskId(id); + Map> hostFileGroup = uploadFiles.stream() + .collect(Collectors.groupingBy(UploadTaskFileDO::getHostId)); + hostFileGroup.forEach((k, v) -> { + // 设置上传的文件 + List files = v.stream() + .map(s -> FileUploadFileItemDTO.builder() + .id(s.getId()) + .remotePath(record.getRemotePath() + Const.SLASH + s.getFilePath()) + .status(UploadTaskFileStatusEnum.WAITING.name()) + .current(0L) + .build()) + .collect(Collectors.toList()); + // 添加到上传器 + uploaderList.add(new FileUploader(id, k, files)); + }); + } + + /** + * 执行上传 + */ + private void runUpload() throws Exception { + if (uploaderList.size() == 1) { + // 单个主机直接执行 + IFileUploader handler = uploaderList.get(0); + handler.run(); + } else { + // 多个主机异步阻塞执行 + Threads.blockRun(uploaderList, AssetThreadPools.UPLOAD_HOST); + } + } + + /** + * 更新状态 + * + * @param status status + */ + private void updateStatus(UploadTaskStatusEnum status) { + UploadTaskDO update = new UploadTaskDO(); + update.setId(id); + update.setStatus(status.name()); + if (UploadTaskStatusEnum.UPLOADING.equals(status)) { + update.setStartTime(new Date()); + } else if (UploadTaskStatusEnum.FINISHED.equals(status)) { + update.setEndTime(new Date()); + } else if (UploadTaskStatusEnum.CANCELED.equals(status)) { + update.setEndTime(new Date()); + } + uploadTaskDAO.updateById(update); + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/IFileUploadTask.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/IFileUploadTask.java new file mode 100644 index 00000000..9baa58ba --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/task/IFileUploadTask.java @@ -0,0 +1,29 @@ +package com.orion.ops.module.asset.handler.host.upload.task; + +import com.orion.lang.able.SafeCloseable; +import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader; + +import java.util.List; + +/** + * 上传任务 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/5/8 12:28 + */ +public interface IFileUploadTask extends Runnable, SafeCloseable { + + /** + * 取消上传 + */ + void cancel(); + + /** + * 获取上传器 + * + * @return uploader + */ + List getUploaderList(); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/FileUploader.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/FileUploader.java new file mode 100644 index 00000000..693b3a86 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/FileUploader.java @@ -0,0 +1,244 @@ +package com.orion.ops.module.asset.handler.host.upload.uploader; + +import com.orion.lang.utils.Strings; +import com.orion.lang.utils.io.Streams; +import com.orion.net.host.SessionStore; +import com.orion.net.host.sftp.SftpExecutor; +import com.orion.ops.framework.common.constant.Const; +import com.orion.ops.framework.common.file.FileClient; +import com.orion.ops.module.asset.dao.UploadTaskFileDAO; +import com.orion.ops.module.asset.define.config.AppSftpConfig; +import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO; +import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; +import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum; +import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO; +import com.orion.ops.module.asset.service.HostTerminalService; +import com.orion.ops.module.asset.service.UploadTaskService; +import com.orion.ops.module.asset.utils.SftpUtils; +import com.orion.spring.SpringHolder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 主机文件上传器 实现类 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/5/8 13:41 + */ +@Slf4j +public class FileUploader implements IFileUploader { + + private static final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class); + + private static final UploadTaskFileDAO uploadTaskFileDAO = SpringHolder.getBean(UploadTaskFileDAO.class); + + private static final AppSftpConfig SFTP_CONFIG = SpringHolder.getBean(AppSftpConfig.class); + + private static final FileClient localFileClient = SpringHolder.getBean("localFileClient"); + + private SessionStore sessionStore; + + private SftpExecutor executor; + + private final Long taskId; + + private final Long hostId; + + @Getter + private final List files; + + private InputStream inputStream; + + private OutputStream outputStream; + + private volatile boolean canceled; + + private volatile boolean closed; + + public FileUploader(Long taskId, Long hostId, List files) { + this.taskId = taskId; + this.hostId = hostId; + this.files = files; + } + + @Override + public void run() { + try { + // 初始化会话 + boolean run = this.initSession(); + if (!run) { + return; + } + // 上传文件 + for (FileUploadFileItemDTO file : files) { + if (closed) { + break; + } + // 上传 + this.uploadFile(file); + } + // 检查是否取消 + this.finishCheckCancel(); + } finally { + // 释放资源 + this.close(); + } + } + + /** + * 初始化会话 + * + * @return 是否执行 + */ + private boolean initSession() { + log.info("HostFileUploader.initSession start taskId: {}, hostId: {}", taskId, hostId); + try { + // TODO 测试 打开 executor 后 是否会connect, 不需要的话就关闭 executor 然后重新打开 + // 打开会话 + HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId); + this.sessionStore = hostTerminalService.openSessionStore(connectInfo); + this.executor = sessionStore.getSftpExecutor(connectInfo.getFileNameCharset()); + executor.connect(); + log.info("HostFileUploader.initSession success taskId: {}, hostId: {}", taskId, hostId); + return true; + } catch (Exception e) { + log.error("HostFileUploader.initSession error taskId: {}, hostId: {}", taskId, hostId, e); + // 修改状态 + uploadTaskFileDAO.updateStatusByTaskHostId(taskId, hostId, UploadTaskFileStatusEnum.FAILED.name()); + files.forEach(s -> s.setStatus(UploadTaskFileStatusEnum.FAILED.name())); + return false; + } + } + + /** + * 上传文件 + * + * @param file file + */ + private void uploadFile(FileUploadFileItemDTO file) { + log.info("HostFileUploader.uploadFile start taskId: {}, hostId: {}, id: {}", taskId, hostId, file.getId()); + // 修改状态 + this.updateStatus(file, UploadTaskFileStatusEnum.UPLOADING); + try { + // 获取本地文件路径 + String endpoint = Strings.format(UploadTaskService.SWAP_ENDPOINT, taskId); + String localPath = localFileClient.getReturnPath(endpoint + Const.SLASH + file.getFileId()); + // 检查文件是否存在 + String remotePath = file.getRemotePath(); + SftpUtils.checkUploadFilePresent(SFTP_CONFIG, executor, remotePath); + // 打开输出流 + this.inputStream = localFileClient.getContentInputStream(localPath); + this.outputStream = executor.openOutputStream(remotePath); + // 传输 + byte[] buffer = new byte[executor.getBufferSize()]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, read); + // FIXME test + file.setCurrent(file.getCurrent() + read); + } + outputStream.flush(); + // 修改状态 + this.updateStatus(file, UploadTaskFileStatusEnum.FINISHED); + log.info("HostFileUploader.uploadFile finish taskId: {}, hostId: {}, id: {}", taskId, hostId, file.getId()); + } catch (Exception e) { + log.info("HostFileUploader.uploadFile error taskId: {}, hostId: {}, id: {}, canceled: {}", taskId, hostId, file.getId(), canceled); + // 修改状态 + if (canceled) { + this.updateStatus(file, UploadTaskFileStatusEnum.CANCELED); + } else { + this.updateStatus(file, UploadTaskFileStatusEnum.FAILED); + } + } finally { + // 释放文件 + this.resetFile(); + } + } + + /** + * 释放文件 + */ + private void resetFile() { + Streams.close(outputStream); + Streams.close(inputStream); + } + + /** + * 检查是否取消 + */ + private void finishCheckCancel() { + if (!canceled) { + return; + } + // 将等待中的文件修改为已取消 + List idList = files.stream() + .filter(s -> UploadTaskFileStatusEnum.WAITING.name().equals(s.getStatus())) + .map(FileUploadFileItemDTO::getId) + .collect(Collectors.toList()); + if (idList.isEmpty()) { + return; + } + // 修改状态 + uploadTaskFileDAO.updateStatusByIdList(idList, UploadTaskFileStatusEnum.CANCELED.name()); + } + + /** + * 更新状态 + * + * @param file file + * @param status status + */ + private void updateStatus(FileUploadFileItemDTO file, UploadTaskFileStatusEnum status) { + file.setStatus(status.name()); + UploadTaskFileDO update = new UploadTaskFileDO(); + update.setId(file.getId()); + update.setStatus(status.name()); + if (UploadTaskFileStatusEnum.UPLOADING.equals(status)) { + // 上传中 + update.setStartTime(new Date()); + } else if (UploadTaskFileStatusEnum.FINISHED.equals(status)) { + // 已完成 + update.setEndTime(new Date()); + } else if (UploadTaskFileStatusEnum.FAILED.equals(status)) { + // 已失败 + update.setEndTime(new Date()); + } else if (UploadTaskFileStatusEnum.CANCELED.equals(status)) { + // 已失败 + update.setEndTime(new Date()); + } + uploadTaskFileDAO.updateById(update); + } + + @Override + public void cancel() { + log.info("HostFileUploader.cancel taskId: {}, hostId: {}, canceled: {}, closed: {}", taskId, hostId, canceled, closed); + if (this.canceled || this.closed) { + return; + } + // 关闭 + this.canceled = true; + this.close(); + } + + @Override + public void close() { + log.info("HostFileUploader.close taskId: {}, hostId: {}, closed: {}", taskId, hostId, closed); + if (closed) { + return; + } + this.closed = true; + // 释放资源 + Streams.close(outputStream); + Streams.close(inputStream); + Streams.close(executor); + Streams.close(sessionStore); + } + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/IFileUploader.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/IFileUploader.java new file mode 100644 index 00000000..a1390158 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/handler/host/upload/uploader/IFileUploader.java @@ -0,0 +1,29 @@ +package com.orion.ops.module.asset.handler.host.upload.uploader; + +import com.orion.lang.able.SafeCloseable; +import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO; + +import java.util.List; + +/** + * 主机文件上传器 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/5/8 13:41 + */ +public interface IFileUploader extends Runnable, SafeCloseable { + + /** + * 取消上传 + */ + void cancel(); + + /** + * 获取传输文件 + * + * @return files + */ + List getFiles(); + +} diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/HostTerminalService.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/HostTerminalService.java index 2fb76acf..5be516b3 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/HostTerminalService.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/HostTerminalService.java @@ -5,7 +5,6 @@ import com.orion.net.host.SessionStore; import com.orion.ops.module.asset.entity.domain.HostDO; import com.orion.ops.module.asset.entity.dto.HostTerminalAccessDTO; import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO; -import com.orion.ops.module.asset.enums.HostConnectTypeEnum; /** * 主机终端服务 @@ -38,25 +37,31 @@ public interface HostTerminalService { */ HostTerminalAccessDTO getAccessInfoByToken(String token); + /** + * 获取连接信息 + * + * @param hostId hostId + * @return session + */ + HostTerminalConnectDTO getTerminalConnectInfo(Long hostId); + /** * 使用用户配置获取连接信息 * * @param hostId hostId * @param userId userId - * @param type type * @return session */ - HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type); + HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId); /** * 使用用户配置获取连接信息 * * @param host host * @param userId userId - * @param type type * @return session */ - HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type); + HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host); /** * 使用默认配置打开主机会话 diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/UploadTaskService.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/UploadTaskService.java index d0049eb1..d652bf5d 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/UploadTaskService.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/UploadTaskService.java @@ -17,6 +17,8 @@ import java.util.List; */ public interface UploadTaskService { + String SWAP_ENDPOINT = "/upload/swap/{}"; + /** * 创建上传任务 * diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/HostTerminalServiceImpl.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/HostTerminalServiceImpl.java index 5f4f5a26..e6b94524 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/HostTerminalServiceImpl.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/HostTerminalServiceImpl.java @@ -113,15 +113,28 @@ public class HostTerminalServiceImpl implements HostTerminalService { } @Override - public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type) { + public HostTerminalConnectDTO getTerminalConnectInfo(Long hostId) { + log.info("HostConnectService.getTerminalConnectInfo-withHost hostId: {}", hostId); // 查询主机 HostDO host = hostDAO.selectById(hostId); Valid.notNull(host, ErrorMessage.HOST_ABSENT); - return this.getTerminalConnectInfo(userId, host, type); + // 查询主机配置 + HostSshConfigModel model = hostConfigService.getHostConfig(hostId, HostConfigTypeEnum.SSH); + Valid.notNull(model, ErrorMessage.CONFIG_ABSENT); + // 获取配置 + return this.getHostConnectInfo(host, model, null); } @Override - public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type) { + public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId) { + // 查询主机 + HostDO host = hostDAO.selectById(hostId); + Valid.notNull(host, ErrorMessage.HOST_ABSENT); + return this.getTerminalConnectInfo(userId, host); + } + + @Override + public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host) { Long hostId = host.getId(); log.info("HostConnectService.getTerminalConnectInfo hostId: {}, userId: {}", hostId, userId); // 验证主机是否有权限 @@ -149,22 +162,14 @@ public class HostTerminalServiceImpl implements HostTerminalService { } } // 获取连接配置 - HostTerminalConnectDTO connectInfo = this.getHostConnectInfo(host, config, extra); - connectInfo.setConnectType(type.name()); - return connectInfo; + return this.getHostConnectInfo(host, config, extra); } @Override public SessionStore openSessionStore(Long hostId) { log.info("HostConnectService.openSessionStore-withHost hostId: {}", hostId); - // 查询主机 - HostDO host = hostDAO.selectById(hostId); - Valid.notNull(host, ErrorMessage.HOST_ABSENT); - // 查询主机配置 - HostSshConfigModel model = hostConfigService.getHostConfig(hostId, HostConfigTypeEnum.SSH); - Valid.notNull(model, ErrorMessage.CONFIG_ABSENT); // 获取配置 - HostTerminalConnectDTO connect = this.getHostConnectInfo(host, model, null); + HostTerminalConnectDTO connect = this.getTerminalConnectInfo(hostId); // 打开连接 return this.openSessionStore(connect); } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskFileServiceImpl.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskFileServiceImpl.java index f98ce2be..bb8de5b8 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskFileServiceImpl.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskFileServiceImpl.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; +import java.util.stream.Collectors; /** * 上传任务文件 服务实现类 @@ -29,11 +30,10 @@ public class UploadTaskFileServiceImpl implements UploadTaskFileService { @Override public List getFileByTaskId(Long taskId) { // 查询 - return uploadTaskFileDAO.of() - .createWrapper() - .eq(UploadTaskFileDO::getTaskId, taskId) - .then() - .list(UploadTaskFileConvert.MAPPER::to); + return uploadTaskFileDAO.selectByTaskId(taskId) + .stream() + .map(UploadTaskFileConvert.MAPPER::to) + .collect(Collectors.toList()); } @Override diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskServiceImpl.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskServiceImpl.java index 68c49e4d..f4f7e409 100644 --- a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskServiceImpl.java +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/service/impl/UploadTaskServiceImpl.java @@ -6,6 +6,7 @@ import com.orion.lang.define.wrapper.DataGrid; import com.orion.lang.utils.Arrays1; import com.orion.lang.utils.Strings; import com.orion.lang.utils.collect.Lists; +import com.orion.lang.utils.collect.Maps; import com.orion.lang.utils.io.Files1; import com.orion.lang.utils.time.Dates; import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs; @@ -13,6 +14,7 @@ import com.orion.ops.framework.common.constant.ErrorMessage; import com.orion.ops.framework.common.file.FileClient; import com.orion.ops.framework.common.security.LoginUser; import com.orion.ops.framework.common.utils.Valid; +import com.orion.ops.framework.mybatis.core.query.Conditions; import com.orion.ops.framework.security.core.utils.SecurityUtils; import com.orion.ops.module.asset.convert.HostConvert; import com.orion.ops.module.asset.convert.UploadTaskConvert; @@ -32,6 +34,11 @@ import com.orion.ops.module.asset.entity.vo.UploadTaskVO; import com.orion.ops.module.asset.enums.HostConfigTypeEnum; import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum; import com.orion.ops.module.asset.enums.UploadTaskStatusEnum; +import com.orion.ops.module.asset.handler.host.upload.FileUploadTasks; +import com.orion.ops.module.asset.handler.host.upload.dto.FileUploadFileItemDTO; +import com.orion.ops.module.asset.handler.host.upload.manager.FileUploadTaskManager; +import com.orion.ops.module.asset.handler.host.upload.task.IFileUploadTask; +import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader; import com.orion.ops.module.asset.service.AssetAuthorizedDataService; import com.orion.ops.module.asset.service.UploadTaskFileService; import com.orion.ops.module.asset.service.UploadTaskService; @@ -41,8 +48,8 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -56,7 +63,7 @@ import java.util.stream.Collectors; @Service public class UploadTaskServiceImpl implements UploadTaskService { - private static final String SWAP_ENDPOINT = "/upload/swap/{}"; + // TODO 测试空文件上传 0B private static final String DEFAULT_DESC = "上传文件 {}"; @@ -75,6 +82,9 @@ public class UploadTaskServiceImpl implements UploadTaskService { @Resource private AssetAuthorizedDataService assetAuthorizedDataService; + @Resource + private FileUploadTaskManager fileUploadTaskManager; + @Resource private FileClient localFileClient; @@ -141,12 +151,44 @@ public class UploadTaskServiceImpl implements UploadTaskService { // 查询任务 UploadTaskDO record = uploadTaskDAO.selectById(id); Valid.notNull(record, ErrorMessage.DATA_ABSENT); - // 查询任务文件 TODO PROGRESS + // 查询任务文件 List files = uploadTaskFileService.getFileByTaskId(id); + // 获取上传任务进度 + IFileUploadTask task = fileUploadTaskManager.getTask(id); + Map fileItemMap; + if (task == null) { + fileItemMap = Maps.empty(); + } else { + fileItemMap = task.getUploaderList() + .stream() + .map(IFileUploader::getFiles) + .flatMap(Collection::stream) + .collect(Collectors.toMap(FileUploadFileItemDTO::getId, Function.identity())); + } + // 设置进度 + for (UploadTaskFileVO file : files) { + String status = file.getStatus(); + if (UploadTaskFileStatusEnum.WAITING.name().equals(status)) { + file.setCurrent(0L); + } else if (UploadTaskFileStatusEnum.UPLOADING.name().equals(status)) { + // 上传中获取当前值 + Long current = Optional.ofNullable(file.getId()) + .map(fileItemMap::get) + .map(FileUploadFileItemDTO::getCurrent) + .orElse(file.getFileSize()); + file.setCurrent(current); + } else if (UploadTaskFileStatusEnum.FINISHED.name().equals(status)) { + file.setCurrent(file.getFileSize()); + } else if (UploadTaskFileStatusEnum.FAILED.name().equals(status)) { + file.setCurrent(0L); + } else if (UploadTaskFileStatusEnum.CANCELED.name().equals(status)) { + file.setCurrent(0L); + } + } // 返回 - UploadTaskVO task = UploadTaskConvert.MAPPER.to(record); - task.setFiles(files); - return task; + UploadTaskVO uploadTask = UploadTaskConvert.MAPPER.to(record); + uploadTask.setFiles(files); + return uploadTask; } @Override @@ -185,8 +227,11 @@ public class UploadTaskServiceImpl implements UploadTaskService { @Transactional(rollbackFor = Exception.class) @Override public Integer deleteUploadTaskByIdList(List idList) { - // TODO 暂停 / 删除交换区文件 log.info("UploadTaskService-deleteUploadTaskByIdList idList: {}", idList); + // 查询任务 + List records = uploadTaskDAO.selectBatchIds(idList); + // 取消任务 + this.checkCancelTask(records); // 删除任务 int effect = uploadTaskDAO.deleteBatchIds(idList); // 删除任务文件 @@ -199,12 +244,24 @@ public class UploadTaskServiceImpl implements UploadTaskService { @Override public void startUploadTask(Long id) { - + // 查询任务 + UploadTaskDO record = uploadTaskDAO.selectById(id); + Valid.notNull(record, ErrorMessage.TASK_ABSENT); + // 检查状态 + Valid.eq(record.getStatus(), UploadTaskStatusEnum.PREPARATION.name(), ErrorMessage.ILLEGAL_STATUS); + // 执行任务 + FileUploadTasks.start(id); } @Override public void cancelUploadTask(Long id) { - + // 查询任务 + UploadTaskDO record = uploadTaskDAO.selectById(id); + Valid.notNull(record, ErrorMessage.TASK_ABSENT); + // 检查状态 + Valid.isTrue(UploadTaskStatusEnum.of(record.getStatus()).isCancelable(), ErrorMessage.ILLEGAL_STATUS); + // 取消任务 + this.checkCancelTask(Lists.singleton(record)); } @Override @@ -239,8 +296,8 @@ public class UploadTaskServiceImpl implements UploadTaskService { .in(UploadTaskDO::getDescription, request.getDescription()) .eq(UploadTaskDO::getRemotePath, request.getRemotePath()) .eq(UploadTaskDO::getStatus, request.getStatus()) - .ge(UploadTaskDO::getStartTime, Arrays1.getIfPresent(request.getStartTimeRange(), 0)) - .le(UploadTaskDO::getStartTime, Arrays1.getIfPresent(request.getStartTimeRange(), 1)) + .ge(UploadTaskDO::getCreateTime, Arrays1.getIfPresent(request.getCreateTimeRange(), 0)) + .le(UploadTaskDO::getCreateTime, Arrays1.getIfPresent(request.getCreateTimeRange(), 1)) .orderByDesc(UploadTaskDO::getId); } @@ -257,4 +314,40 @@ public class UploadTaskServiceImpl implements UploadTaskService { } } + /** + * 检查需要取消的任务 + * + * @param records records + */ + private void checkCancelTask(List records) { + // 需要取消的记录 + List cancelableRecords = records.stream() + .filter(s -> UploadTaskStatusEnum.of(s.getStatus()).isCancelable()) + .collect(Collectors.toList()); + if (cancelableRecords.isEmpty()) { + return; + } + // 更新状态的 id + List updateIdList = cancelableRecords.stream() + .map(UploadTaskDO::getId) + .filter(s -> fileUploadTaskManager.getTask(s) == null) + .collect(Collectors.toList()); + // 取消上传 + cancelableRecords.stream() + .map(UploadTaskDO::getId) + .map(fileUploadTaskManager::getTask) + .filter(Objects::nonNull) + .forEach(IFileUploadTask::cancel); + // 更新状态 + if (!updateIdList.isEmpty()) { + UploadTaskDO update = new UploadTaskDO(); + update.setStatus(UploadTaskStatusEnum.CANCELED.name()); + update.setEndTime(new Date()); + // 更新 + uploadTaskDAO.update(update, Conditions.in(UploadTaskDO::getId, updateIdList)); + // 删除文件 + this.clearUploadSwapFiles(updateIdList); + } + } + } diff --git a/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/utils/SftpUtils.java b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/utils/SftpUtils.java new file mode 100644 index 00000000..8856ce82 --- /dev/null +++ b/orion-ops-module-asset/orion-ops-module-asset-service/src/main/java/com/orion/ops/module/asset/utils/SftpUtils.java @@ -0,0 +1,46 @@ +package com.orion.ops.module.asset.utils; + +import com.alibaba.fastjson.JSON; +import com.orion.lang.utils.Booleans; +import com.orion.lang.utils.Strings; +import com.orion.net.host.sftp.SftpExecutor; +import com.orion.net.host.sftp.SftpFile; +import com.orion.ops.module.asset.define.config.AppSftpConfig; +import com.orion.ops.module.asset.handler.host.transfer.model.SftpFileBackupParams; + +/** + * sftp 工具类 + * + * @author Jiahang Li + * @version 1.0.0 + * @since 2024/5/8 16:17 + */ +public class SftpUtils { + + private SftpUtils() { + } + + /** + * 检查上传文件是否存在 并且执行响应策略 + * + * @param config config + * @param executor executor + * @param path path + */ + public static void checkUploadFilePresent(AppSftpConfig config, SftpExecutor executor, String path) { + // 重复不备份 + if (!Booleans.isTrue(config.getUploadPresentBackup())) { + return; + } + // 检查文件是否存在 + SftpFile file = executor.getFile(path); + if (file != null) { + // 文件存在则备份 + SftpFileBackupParams backupParams = new SftpFileBackupParams(file.getName(), System.currentTimeMillis()); + String target = Strings.format(config.getBackupFileName(), JSON.parseObject(JSON.toJSONString(backupParams))); + // 移动 + executor.move(path, target); + } + } + +}