优化批量上传逻辑.

This commit is contained in:
lijiahang
2024-09-26 17:38:39 +08:00
parent 011e8ad089
commit 541a9790ad
13 changed files with 212 additions and 88 deletions

View File

@@ -14,7 +14,7 @@ public interface AppConst extends OrionConst {
/**
* 同 ${orion.version} 迭代时候需要手动更改
*/
String VERSION = "2.1.6";
String VERSION = "2.1.7";
/**
* 同 ${spring.application.name}

View File

@@ -21,6 +21,8 @@ public interface ExtraFieldConst extends FieldConst {
String USERNAME = "username";
String HOME = "home";
String STATUS_NAME = "statusName";
String KEY_NAME = "keyName";

View File

@@ -1,6 +1,8 @@
package com.orion.visor.framework.common.utils;
import com.orion.lang.utils.Objects1;
import com.orion.lang.utils.Systems;
import com.orion.lang.utils.io.Files1;
import com.orion.visor.framework.common.constant.AppConst;
import com.orion.visor.framework.common.constant.Const;
@@ -24,9 +26,25 @@ public class PathUtils {
* @return 用户目录
*/
public static String getHomePath(boolean isWindows, String username) {
return getHomePath(isWindows, username, false);
}
/**
* 获取用户根目录
*
* @param isWindows isWindows
* @param username 用户名
* @param prependSeparator 是否在头部添加分隔符
* @return 用户目录
*/
public static String getHomePath(boolean isWindows, String username, boolean prependSeparator) {
if (isWindows) {
// windows
return "C:/Users/" + username;
if (prependSeparator) {
return "/C:/Users/" + username;
} else {
return "C:/Users/" + username;
}
} else {
// linux
if (Const.ROOT.equals(username)) {
@@ -66,4 +84,34 @@ public class PathUtils {
return path.toString();
}
/**
* 头部添加分隔符
*
* @param path path
* @return path
*/
public static String prependSeparator(String path) {
if (path.startsWith("/")) {
return path;
}
return "/" + path;
}
/**
* 获取 orion path
*
* @param path path
* @return path
*/
public static String getOrionPath(String path) {
path = Systems.HOME_DIR
+ Files1.SEPARATOR
+ AppConst.ORION
+ Files1.SEPARATOR
+ AppConst.APP_NAME
+ Files1.SEPARATOR
+ path;
return Files1.getPath(path);
}
}

View File

@@ -40,4 +40,6 @@ public interface HostConvert {
List<HostVO> toList(List<HostDO> domain);
List<HostBaseVO> toBaseList(List<HostDO> domain);
}

View File

@@ -48,6 +48,10 @@ public class UploadTaskFileDO extends BaseDO {
@TableField("file_path")
private String filePath;
@Schema(description = "实际文件路径")
@TableField("real_file_path")
private String realFilePath;
@Schema(description = "文件大小")
@TableField("file_size")
private Long fileSize;

View File

@@ -40,6 +40,9 @@ public class UploadTaskFileVO implements Serializable {
@Schema(description = "文件路径")
private String filePath;
@Schema(description = "实际文件路径")
private String realFilePath;
@Schema(description = "文件大小")
private Long fileSize;

View File

@@ -40,4 +40,24 @@ public enum HostSshOsTypeEnum {
return LINUX;
}
/**
* 是否为 linux 系统
*
* @param type type
* @return isLinux
*/
public static boolean isLinux(String type) {
return LINUX.name().equals(type);
}
/**
* 是否为 windows 系统
*
* @param type type
* @return isWindows
*/
public static boolean isWindows(String type) {
return WINDOWS.name().equals(type);
}
}

View File

@@ -17,6 +17,7 @@ import com.orion.net.host.ssh.command.CommandExecutor;
import com.orion.spring.SpringHolder;
import com.orion.visor.framework.common.constant.ErrorMessage;
import com.orion.visor.framework.common.file.FileClient;
import com.orion.visor.framework.common.utils.PathUtils;
import com.orion.visor.module.asset.dao.ExecHostLogDAO;
import com.orion.visor.module.asset.entity.domain.ExecHostLogDO;
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
@@ -159,11 +160,8 @@ public abstract class BaseExecCommandHandler implements IExecCommandHandler {
// 打开 sftp
sftpExecutor = sessionStore.getSftpExecutor(execHostCommand.getFileNameCharset());
sftpExecutor.connect();
// 必须要以 / 开头
String scriptPath = execHostCommand.getScriptPath();
if (!scriptPath.startsWith("/")) {
scriptPath = "/" + scriptPath;
}
// 文件上传必须要以 / 开头
String scriptPath = PathUtils.prependSeparator(execHostCommand.getScriptPath());
// 创建文件
sftpExecutor.touch(scriptPath);
// 写入命令
@@ -225,29 +223,33 @@ public abstract class BaseExecCommandHandler implements IExecCommandHandler {
Long id = execHostCommand.getHostLogId();
String statusName = status.name();
log.info("BaseExecCommandHandler.updateStatus start id: {}, status: {}", id, statusName);
updateRecord.setId(id);
updateRecord.setStatus(statusName);
if (ExecHostStatusEnum.RUNNING.equals(status)) {
// 运行中
updateRecord.setStartTime(new Date());
} else if (ExecHostStatusEnum.COMPLETED.equals(status)) {
// 完成
updateRecord.setFinishTime(new Date());
updateRecord.setExitCode(executor.getExitCode());
this.exitCode = executor.getExitCode();
} else if (ExecHostStatusEnum.FAILED.equals(status)) {
// 失败
updateRecord.setFinishTime(new Date());
updateRecord.setErrorMessage(this.getErrorMessage(ex));
} else if (ExecHostStatusEnum.TIMEOUT.equals(status)) {
// 超时
updateRecord.setFinishTime(new Date());
} else if (ExecHostStatusEnum.INTERRUPTED.equals(status)) {
// 中断
updateRecord.setFinishTime(new Date());
try {
updateRecord.setId(id);
updateRecord.setStatus(statusName);
if (ExecHostStatusEnum.RUNNING.equals(status)) {
// 运行中
updateRecord.setStartTime(new Date());
} else if (ExecHostStatusEnum.COMPLETED.equals(status)) {
// 完成
updateRecord.setFinishTime(new Date());
updateRecord.setExitCode(executor.getExitCode());
this.exitCode = executor.getExitCode();
} else if (ExecHostStatusEnum.FAILED.equals(status)) {
// 失败
updateRecord.setFinishTime(new Date());
updateRecord.setErrorMessage(this.getErrorMessage(ex));
} else if (ExecHostStatusEnum.TIMEOUT.equals(status)) {
// 超时
updateRecord.setFinishTime(new Date());
} else if (ExecHostStatusEnum.INTERRUPTED.equals(status)) {
// 中断
updateRecord.setFinishTime(new Date());
}
int effect = execHostLogDAO.updateById(updateRecord);
log.info("BaseExecCommandHandler.updateStatus finish id: {}, effect: {}", id, effect);
} catch (Exception e) {
log.error("BaseExecCommandHandler.updateStatus error id: {}", id, e);
}
int effect = execHostLogDAO.updateById(updateRecord);
log.info("BaseExecCommandHandler.updateStatus finish id: {}, effect: {}", id, effect);
}
@Override

View File

@@ -154,24 +154,28 @@ public class ExecTaskHandler implements IExecTaskHandler {
*/
private void updateStatus(ExecStatusEnum status) {
Long id = execCommand.getLogId();
String statusName = status.name();
log.info("ExecTaskHandler-updateStatus start id: {}, status: {}", id, statusName);
ExecLogDO update = new ExecLogDO();
update.setId(id);
update.setStatus(statusName);
if (ExecStatusEnum.RUNNING.equals(status)) {
// 执行中
this.startTime = new Date();
update.setStartTime(new Date());
} else if (ExecStatusEnum.COMPLETED.equals(status)) {
// 执行完成
update.setFinishTime(new Date());
} else if (ExecStatusEnum.FAILED.equals(status)) {
// 执行失败
update.setFinishTime(new Date());
try {
String statusName = status.name();
log.info("ExecTaskHandler-updateStatus start id: {}, status: {}", id, statusName);
ExecLogDO update = new ExecLogDO();
update.setId(id);
update.setStatus(statusName);
if (ExecStatusEnum.RUNNING.equals(status)) {
// 执行中
this.startTime = new Date();
update.setStartTime(new Date());
} else if (ExecStatusEnum.COMPLETED.equals(status)) {
// 执行完成
update.setFinishTime(new Date());
} else if (ExecStatusEnum.FAILED.equals(status)) {
// 执行失败
update.setFinishTime(new Date());
}
int effect = execLogDAO.updateById(update);
log.info("ExecTaskHandler-updateStatus finish id: {}, effect: {}", id, effect);
} catch (Exception e) {
log.error("ExecTaskHandler-updateStatus error id: {}", id, e);
}
int effect = execLogDAO.updateById(update);
log.info("ExecTaskHandler-updateStatus finish id: {}, effect: {}", id, effect);
}
/**

View File

@@ -1,11 +1,9 @@
package com.orion.visor.module.asset.handler.host.upload.task;
import com.orion.lang.utils.Threads;
import com.orion.lang.utils.io.Files1;
import com.orion.lang.utils.io.Streams;
import com.orion.lang.utils.time.Dates;
import com.orion.spring.SpringHolder;
import com.orion.visor.framework.common.constant.Const;
import com.orion.visor.framework.common.constant.ExtraFieldConst;
import com.orion.visor.module.asset.dao.UploadTaskDAO;
import com.orion.visor.module.asset.dao.UploadTaskFileDAO;
@@ -143,7 +141,7 @@ public class FileUploadTask implements IFileUploadTask {
.map(s -> FileUploadFileItemDTO.builder()
.id(s.getId())
.fileId(s.getFileId())
.remotePath(Files1.getPath(Const.SLASH + record.getRemotePath() + Const.SLASH + s.getFilePath()))
.remotePath(s.getRealFilePath())
.status(UploadTaskFileStatusEnum.WAITING.name())
.current(0L)
.build())

View File

@@ -1,8 +1,5 @@
package com.orion.visor.module.asset.handler.host.upload.uploader;
import com.orion.lang.utils.Strings;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Files1;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
@@ -10,12 +7,10 @@ import com.orion.spring.SpringHolder;
import com.orion.visor.framework.common.constant.Const;
import com.orion.visor.framework.common.enums.EndpointDefine;
import com.orion.visor.framework.common.file.FileClient;
import com.orion.visor.framework.common.utils.PathUtils;
import com.orion.visor.module.asset.dao.UploadTaskFileDAO;
import com.orion.visor.module.asset.define.config.AppSftpConfig;
import com.orion.visor.module.asset.entity.domain.UploadTaskFileDO;
import com.orion.visor.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.visor.module.asset.enums.HostSshOsTypeEnum;
import com.orion.visor.module.asset.enums.UploadTaskFileStatusEnum;
import com.orion.visor.module.asset.handler.host.jsch.SessionStores;
import com.orion.visor.module.asset.handler.host.upload.model.FileUploadFileItemDTO;
@@ -28,7 +23,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -106,10 +100,8 @@ public class FileUploader implements IFileUploader {
private boolean initSession() {
log.info("HostFileUploader.initSession start taskId: {}, hostId: {}", taskId, hostId);
try {
// 替换用户路径
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId);
this.replaceRemotePathVariable(connectInfo.getOsType(), connectInfo.getUsername());
// 打开会话
HostTerminalConnectDTO connectInfo = hostTerminalService.getTerminalConnectInfo(hostId);
this.sessionStore = SessionStores.openSessionStore(connectInfo);
this.executor = sessionStore.getSftpExecutor(connectInfo.getFileNameCharset());
executor.connect();
@@ -222,27 +214,6 @@ public class FileUploader implements IFileUploader {
uploadTaskFileDAO.updateById(update);
}
/**
* 替换文件路径变量
*
* @param osType osType
* @param username username
*/
private void replaceRemotePathVariable(String osType, String username) {
// 包含变量
if (!files.get(0).getRemotePath().contains(Const.DOLLAR)) {
return;
}
String home = PathUtils.getHomePath(HostSshOsTypeEnum.WINDOWS.name().equals(osType), username);
// 替换变量
Map<String, String> env = Maps.newMap(4);
env.put("username", username);
env.put("home", home);
for (FileUploadFileItemDTO file : files) {
file.setRemotePath(Files1.getPath(Strings.format(file.getRemotePath(), env)));
}
}
@Override
public void cancel() {
log.info("HostFileUploader.cancel taskId: {}, hostId: {}, canceled: {}, closed: {}", taskId, hostId, canceled, closed);

View File

@@ -3,6 +3,7 @@ package com.orion.visor.module.asset.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.orion.lang.annotation.Keep;
import com.orion.lang.define.collect.MultiHashMap;
import com.orion.lang.define.wrapper.DataGrid;
import com.orion.lang.utils.Arrays1;
import com.orion.lang.utils.Booleans;
@@ -15,9 +16,11 @@ import com.orion.lang.utils.time.Dates;
import com.orion.visor.framework.biz.operator.log.core.utils.OperatorLogs;
import com.orion.visor.framework.common.constant.Const;
import com.orion.visor.framework.common.constant.ErrorMessage;
import com.orion.visor.framework.common.constant.ExtraFieldConst;
import com.orion.visor.framework.common.enums.EndpointDefine;
import com.orion.visor.framework.common.file.FileClient;
import com.orion.visor.framework.common.security.LoginUser;
import com.orion.visor.framework.common.utils.PathUtils;
import com.orion.visor.framework.common.utils.SqlUtils;
import com.orion.visor.framework.common.utils.Valid;
import com.orion.visor.framework.mybatis.core.query.Conditions;
@@ -28,20 +31,21 @@ import com.orion.visor.module.asset.convert.UploadTaskFileConvert;
import com.orion.visor.module.asset.dao.HostDAO;
import com.orion.visor.module.asset.dao.UploadTaskDAO;
import com.orion.visor.module.asset.dao.UploadTaskFileDAO;
import com.orion.visor.module.asset.entity.domain.HostDO;
import com.orion.visor.module.asset.entity.domain.UploadTaskDO;
import com.orion.visor.module.asset.entity.domain.UploadTaskFileDO;
import com.orion.visor.module.asset.entity.dto.UploadTaskExtraDTO;
import com.orion.visor.module.asset.entity.request.upload.*;
import com.orion.visor.module.asset.entity.vo.*;
import com.orion.visor.module.asset.enums.HostTypeEnum;
import com.orion.visor.module.asset.enums.UploadTaskFileStatusEnum;
import com.orion.visor.module.asset.enums.UploadTaskStatusEnum;
import com.orion.visor.module.asset.enums.*;
import com.orion.visor.module.asset.handler.host.config.model.HostSshConfigModel;
import com.orion.visor.module.asset.handler.host.upload.FileUploadTasks;
import com.orion.visor.module.asset.handler.host.upload.manager.FileUploadTaskManager;
import com.orion.visor.module.asset.handler.host.upload.model.FileUploadFileItemDTO;
import com.orion.visor.module.asset.handler.host.upload.task.IFileUploadTask;
import com.orion.visor.module.asset.handler.host.upload.uploader.IFileUploader;
import com.orion.visor.module.asset.service.AssetAuthorizedDataService;
import com.orion.visor.module.asset.service.HostConfigService;
import com.orion.visor.module.asset.service.UploadTaskFileService;
import com.orion.visor.module.asset.service.UploadTaskService;
import com.orion.visor.module.infra.api.FileUploadApi;
@@ -83,6 +87,9 @@ public class UploadTaskServiceImpl implements UploadTaskService {
@Resource
private AssetAuthorizedDataService assetAuthorizedDataService;
@Resource
private HostConfigService hostConfigService;
@Resource
private FileUploadTaskManager fileUploadTaskManager;
@@ -103,10 +110,9 @@ public class UploadTaskServiceImpl implements UploadTaskService {
// 检查主机是否有权限
this.checkHostPermission(hostIdList);
// 查询主机信息
List<HostBaseVO> hosts = hostDAO.selectBaseByIdList(hostIdList)
.stream()
.map(HostConvert.MAPPER::toBase)
.collect(Collectors.toList());
List<HostDO> hosts = this.getUploadTaskHosts(hostIdList);
// 计算文件路径
MultiHashMap<Long, String, String> realRemoteFilePathMap = this.setFileRealRemotePath(request, hosts);
// 转换
UploadTaskDO record = UploadTaskConvert.MAPPER.to(request);
record.setUserId(user.getId());
@@ -117,7 +123,7 @@ public class UploadTaskServiceImpl implements UploadTaskService {
record.setHostCount(hostIdList.size());
UploadTaskExtraDTO extra = UploadTaskExtraDTO.builder()
.hostIdList(hostIdList)
.hosts(hosts)
.hosts(HostConvert.MAPPER.toBaseList(hosts))
.build();
record.setExtraInfo(JSON.toJSONString(extra));
// 插入任务表
@@ -132,6 +138,7 @@ public class UploadTaskServiceImpl implements UploadTaskService {
.hostId(hostId)
.fileId(s.getFileId())
.filePath(s.getFilePath())
.realFilePath(realRemoteFilePathMap.get(hostId, s.getFileId()))
.fileSize(s.getFileSize())
.status(UploadTaskFileStatusEnum.WAITING.name())
.build())
@@ -336,6 +343,68 @@ public class UploadTaskServiceImpl implements UploadTaskService {
}
}
/**
* 查询上传任务主机信息
*
* @param hostIdList hostIdList
* @return hosts
*/
public List<HostDO> getUploadTaskHosts(List<Long> hostIdList) {
// 查询主机信息
List<HostDO> hosts = hostDAO.selectBatchIds(hostIdList);
// 检查主机数量
Valid.eq(hosts.size(), hostIdList.size(), ErrorMessage.HOST_ABSENT);
// 检查主机状态
boolean allEnabled = hosts.stream()
.map(HostDO::getStatus)
.allMatch(s -> HostStatusEnum.ENABLED.name().equals(s));
Valid.isTrue(allEnabled, ErrorMessage.HOST_NOT_ENABLED);
return hosts;
}
/**
* 设置文件实际路径
*
* @param request request
* @param hosts hosts
* @return realRemoteFilePathMap
*/
public MultiHashMap<Long, String, String> setFileRealRemotePath(UploadTaskCreateRequest request,
List<HostDO> hosts) {
MultiHashMap<Long, String, String> realRemoteFilePathMap = MultiHashMap.create();
// 计算上传目录
String remotePath = request.getRemotePath();
List<UploadTaskFileRequest> files = request.getFiles();
boolean containsEnv = remotePath.contains(Const.DOLLAR);
if (containsEnv) {
// 获取主机配置信息
Map<Long, HostSshConfigModel> hostConfigMap = hostConfigService.buildHostConfigMap(hosts, HostTypeEnum.SSH);
hostConfigMap.forEach((k, v) -> {
// 替换占位符
String username = v.getUsername();
String home = PathUtils.getHomePath(HostSshOsTypeEnum.isWindows(v.getOsType()), username);
// 替换环境变量路径
Map<String, String> env = Maps.newMap(4);
env.put(ExtraFieldConst.USERNAME, username);
env.put(ExtraFieldConst.HOME, home);
// 设置主机上传路径
String realRemotePath = Files1.getPath(Strings.format(remotePath, env));
for (UploadTaskFileRequest file : files) {
realRemoteFilePathMap.put(k, file.getFileId(), Files1.getPath(realRemotePath, file.getFilePath()));
}
});
} else {
// 无占位符
for (UploadTaskFileRequest file : files) {
String path = Files1.getPath(remotePath, file.getFilePath());
for (HostDO host : hosts) {
realRemoteFilePathMap.put(host.getId(), file.getFileId(), path);
}
}
}
return realRemoteFilePathMap;
}
/**
* 检查文件完整性
*

View File

@@ -9,6 +9,7 @@
<result column="host_id" property="hostId"/>
<result column="file_id" property="fileId"/>
<result column="file_path" property="filePath"/>
<result column="real_file_path" property="realFilePath"/>
<result column="file_size" property="fileSize"/>
<result column="status" property="status"/>
<result column="start_time" property="startTime"/>
@@ -22,7 +23,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, task_id, host_id, file_id, file_path, file_size, status, start_time, end_time, create_time, update_time, creator, updater, deleted
id, task_id, host_id, file_id, file_path, real_file_path, file_size, status, start_time, end_time, create_time, update_time, creator, updater, deleted
</sql>
</mapper>