✨ 批量执行命令.
This commit is contained in:
@@ -120,4 +120,20 @@ public interface FileClient {
|
||||
*/
|
||||
OutputStream getContentOutputStream(String path, boolean append) throws Exception;
|
||||
|
||||
/**
|
||||
* 获取返回路径 用于客户端返回
|
||||
*
|
||||
* @param path path
|
||||
* @return returnPath
|
||||
*/
|
||||
String getReturnPath(String path);
|
||||
|
||||
/**
|
||||
* 获取实际存储路径 用于服务端的存储
|
||||
*
|
||||
* @param returnPath returnPath
|
||||
* @return absolutePath
|
||||
*/
|
||||
String getAbsolutePath(String returnPath);
|
||||
|
||||
}
|
||||
|
||||
@@ -152,6 +152,26 @@ public class FileClientUtils {
|
||||
return delegate.getContentOutputStream(path, append);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取返回路径 用于客户端返回
|
||||
*
|
||||
* @param path path
|
||||
* @return returnPath
|
||||
*/
|
||||
public static String getReturnPath(String path) {
|
||||
return delegate.getReturnPath(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取实际存储路径 用于服务端的存储
|
||||
*
|
||||
* @param returnPath returnPath
|
||||
* @return absolutePath
|
||||
*/
|
||||
public static String getAbsolutePath(String returnPath) {
|
||||
return delegate.getAbsolutePath(returnPath);
|
||||
}
|
||||
|
||||
public static void setDelegate(FileClient delegate) {
|
||||
if (FileClientUtils.delegate != null) {
|
||||
// unmodified
|
||||
|
||||
@@ -80,22 +80,6 @@ public abstract class AbstractFileClient<Config extends FileClientConfig> implem
|
||||
*/
|
||||
protected abstract String doUpload(String path, InputStream in, boolean autoClose, boolean overrideIfExist) throws Exception;
|
||||
|
||||
/**
|
||||
* 获取返回路径 用于客户端返回
|
||||
*
|
||||
* @param path path
|
||||
* @return returnPath
|
||||
*/
|
||||
protected abstract String getReturnPath(String path);
|
||||
|
||||
/**
|
||||
* 获取实际存储路径 用于服务端的存储
|
||||
*
|
||||
* @param returnPath returnPath
|
||||
* @return absolutePath
|
||||
*/
|
||||
protected abstract String getAbsolutePath(String returnPath);
|
||||
|
||||
/**
|
||||
* 获取文件路径 拼接前缀
|
||||
*
|
||||
|
||||
@@ -72,6 +72,16 @@ public class PrimaryFileClient implements FileClient {
|
||||
return delegate.getContentOutputStream(path, append);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getReturnPath(String path) {
|
||||
return delegate.getReturnPath(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAbsolutePath(String returnPath) {
|
||||
return delegate.getAbsolutePath(returnPath);
|
||||
}
|
||||
|
||||
public static void setDelegate(FileClient delegate) {
|
||||
if (PrimaryFileClient.delegate != null) {
|
||||
// unmodified
|
||||
|
||||
@@ -57,13 +57,13 @@ public class LocalFileClient extends AbstractFileClient<LocalFileClientConfig> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getReturnPath(String path) {
|
||||
public String getReturnPath(String path) {
|
||||
// 拼接公共路径
|
||||
return Files1.getPath(config.getBasePath() + Const.SLASH + this.getFilePath(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAbsolutePath(String returnPath) {
|
||||
public String getAbsolutePath(String returnPath) {
|
||||
// 拼接存储路径
|
||||
return Files1.getPath(config.getStoragePath() + Const.SLASH + returnPath);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,15 @@
|
||||
### 批量执行
|
||||
POST {{baseUrl}}/asset/exec/start
|
||||
Content-Type: application/json
|
||||
Authorization: {{token}}
|
||||
|
||||
{
|
||||
"description": 1,
|
||||
"timeout": 10,
|
||||
"command": "echo @{{ hostAddress }}\nsleep 10\necho @{{ str }}",
|
||||
"parameter": "{\"str\":\"end\"}",
|
||||
"hostIdList": [1,7]
|
||||
}
|
||||
|
||||
|
||||
###
|
||||
|
||||
@@ -13,9 +13,10 @@ Authorization: {{token}}
|
||||
"limit": 10,
|
||||
"id": "",
|
||||
"userId": "",
|
||||
"username": "",
|
||||
"source": "",
|
||||
"sourceId": "",
|
||||
"desc": "",
|
||||
"description": "",
|
||||
"command": "",
|
||||
"status": ""
|
||||
}
|
||||
|
||||
@@ -15,6 +15,18 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||
*/
|
||||
public interface AssetThreadPools {
|
||||
|
||||
/**
|
||||
* 超时检查线程池
|
||||
*/
|
||||
ThreadPoolExecutor TIMEOUT_CHECK = ExecutorBuilder.create()
|
||||
.namedThreadFactory("timeout-check-")
|
||||
.corePoolSize(1)
|
||||
.maxPoolSize(Integer.MAX_VALUE)
|
||||
.keepAliveTime(Const.MS_S_60)
|
||||
.workQueue(new SynchronousQueue<>())
|
||||
.allowCoreThreadTimeout(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* terminal 标准输出线程池
|
||||
*/
|
||||
@@ -39,4 +51,28 @@ public interface AssetThreadPools {
|
||||
.allowCoreThreadTimeout(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* 批量执行任务线程池
|
||||
*/
|
||||
ThreadPoolExecutor EXEC_TASK = ExecutorBuilder.create()
|
||||
.namedThreadFactory("exec-task-")
|
||||
.corePoolSize(1)
|
||||
.maxPoolSize(Integer.MAX_VALUE)
|
||||
.keepAliveTime(Const.MS_S_60)
|
||||
.workQueue(new SynchronousQueue<>())
|
||||
.allowCoreThreadTimeout(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* 批量执行主机命令线程池
|
||||
*/
|
||||
ThreadPoolExecutor EXEC_HOST = ExecutorBuilder.create()
|
||||
.namedThreadFactory("exec-host-")
|
||||
.corePoolSize(1)
|
||||
.maxPoolSize(Integer.MAX_VALUE)
|
||||
.keepAliveTime(Const.MS_S_60)
|
||||
.workQueue(new SynchronousQueue<>())
|
||||
.allowCoreThreadTimeout(true)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
@@ -64,6 +64,10 @@ public class ExecHostLogDO extends BaseDO {
|
||||
@TableField("log_path")
|
||||
private String logPath;
|
||||
|
||||
@Schema(description = "错误信息")
|
||||
@TableField("error_message")
|
||||
private String errorMessage;
|
||||
|
||||
@Schema(description = "执行开始时间")
|
||||
@TableField("start_time")
|
||||
private Date startTime;
|
||||
|
||||
@@ -36,6 +36,10 @@ public class ExecLogDO extends BaseDO {
|
||||
@TableField("user_id")
|
||||
private Long userId;
|
||||
|
||||
@Schema(description = "执行用户名")
|
||||
@TableField("username")
|
||||
private String username;
|
||||
|
||||
@Schema(description = "执行来源")
|
||||
@TableField("source")
|
||||
private String source;
|
||||
@@ -45,13 +49,17 @@ public class ExecLogDO extends BaseDO {
|
||||
private Long sourceId;
|
||||
|
||||
@Schema(description = "执行描述")
|
||||
@TableField("desc")
|
||||
private String desc;
|
||||
@TableField("description")
|
||||
private String description;
|
||||
|
||||
@Schema(description = "执行命令")
|
||||
@TableField("command")
|
||||
private String command;
|
||||
|
||||
@Schema(description = "超时时间")
|
||||
@TableField("timeout")
|
||||
private Integer timeout;
|
||||
|
||||
@Schema(description = "执行状态")
|
||||
@TableField("status")
|
||||
private String status;
|
||||
|
||||
@@ -29,6 +29,9 @@ public class ExecLogQueryRequest extends PageRequest {
|
||||
@Schema(description = "执行用户id")
|
||||
private Long userId;
|
||||
|
||||
@Schema(description = "执行用户名")
|
||||
private String username;
|
||||
|
||||
@Size(max = 12)
|
||||
@Schema(description = "执行来源")
|
||||
private String source;
|
||||
@@ -38,7 +41,7 @@ public class ExecLogQueryRequest extends PageRequest {
|
||||
|
||||
@Size(max = 128)
|
||||
@Schema(description = "执行描述")
|
||||
private String desc;
|
||||
private String description;
|
||||
|
||||
@Schema(description = "执行命令")
|
||||
private String command;
|
||||
|
||||
@@ -29,7 +29,11 @@ public class ExecRequest extends PageRequest {
|
||||
|
||||
@Size(max = 128)
|
||||
@Schema(description = "执行描述")
|
||||
private String desc;
|
||||
private String description;
|
||||
|
||||
@NonNull
|
||||
@Schema(description = "超时时间")
|
||||
private Integer timeout;
|
||||
|
||||
@NotBlank
|
||||
@Schema(description = "执行命令")
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package com.orion.ops.module.asset.entity.vo;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.math.*;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 批量执行主机日志 视图响应对象
|
||||
@@ -50,6 +52,9 @@ public class ExecHostLogVO implements Serializable {
|
||||
@Schema(description = "日志路径")
|
||||
private String logPath;
|
||||
|
||||
@Schema(description = "错误信息")
|
||||
private String errorMessage;
|
||||
|
||||
@Schema(description = "执行开始时间")
|
||||
private Date startTime;
|
||||
|
||||
|
||||
@@ -31,6 +31,9 @@ public class ExecLogVO implements Serializable {
|
||||
@Schema(description = "执行用户id")
|
||||
private Long userId;
|
||||
|
||||
@Schema(description = "执行用户名")
|
||||
private String username;
|
||||
|
||||
@Schema(description = "执行来源")
|
||||
private String source;
|
||||
|
||||
@@ -38,11 +41,14 @@ public class ExecLogVO implements Serializable {
|
||||
private Long sourceId;
|
||||
|
||||
@Schema(description = "执行描述")
|
||||
private String desc;
|
||||
private String description;
|
||||
|
||||
@Schema(description = "执行命令")
|
||||
private String command;
|
||||
|
||||
@Schema(description = "超时时间")
|
||||
private Integer timeout;
|
||||
|
||||
@Schema(description = "执行状态")
|
||||
private String status;
|
||||
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.orion.ops.module.asset.handler.host.exec;
|
||||
|
||||
import com.orion.ops.module.asset.define.AssetThreadPools;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
|
||||
import com.orion.ops.module.asset.handler.host.exec.handler.ExecTaskHandler;
|
||||
|
||||
/**
|
||||
* 批量执行命令执行器
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/3/12 11:10
|
||||
*/
|
||||
public class ExecTaskExecutors {
|
||||
|
||||
/**
|
||||
* 执行命令
|
||||
*
|
||||
* @param command command
|
||||
*/
|
||||
public static void start(ExecCommandDTO command) {
|
||||
AssetThreadPools.EXEC_TASK.execute(new ExecTaskHandler(command));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.orion.ops.module.asset.entity.dto;
|
||||
package com.orion.ops.module.asset.handler.host.exec.dto;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -19,13 +19,16 @@ import java.util.List;
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Schema(name = "ExecStartDTO", description = "批量执行启动对象")
|
||||
public class ExecStartDTO {
|
||||
@Schema(name = "ExecCommandDTO", description = "批量执行启动对象")
|
||||
public class ExecCommandDTO {
|
||||
|
||||
@Schema(description = "hostId")
|
||||
private Long logId;
|
||||
|
||||
@Schema(description = "超时时间")
|
||||
private Integer timeout;
|
||||
|
||||
@Schema(description = "主机")
|
||||
private List<ExecStartHostDTO> hosts;
|
||||
private List<ExecCommandHostDTO> hosts;
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.orion.ops.module.asset.entity.dto;
|
||||
package com.orion.ops.module.asset.handler.host.exec.dto;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -17,8 +17,8 @@ import lombok.NoArgsConstructor;
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Schema(name = "ExecStartHostDTO", description = "批量执行启动主机对象")
|
||||
public class ExecStartHostDTO {
|
||||
@Schema(name = "ExecCommandHostDTO", description = "批量执行启动主机对象")
|
||||
public class ExecCommandHostDTO {
|
||||
|
||||
@Schema(description = "hostLogId")
|
||||
private Long hostLogId;
|
||||
@@ -32,4 +32,7 @@ public class ExecStartHostDTO {
|
||||
@Schema(description = "执行命令")
|
||||
private String command;
|
||||
|
||||
@Schema(description = "超时时间")
|
||||
private Integer timeout;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,189 @@
|
||||
package com.orion.ops.module.asset.handler.host.exec.handler;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.orion.lang.exception.AuthenticationException;
|
||||
import com.orion.lang.exception.argument.InvalidArgumentException;
|
||||
import com.orion.lang.support.timeout.TimeoutChecker;
|
||||
import com.orion.lang.utils.Strings;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.net.host.SessionStore;
|
||||
import com.orion.net.host.ssh.command.CommandExecutor;
|
||||
import com.orion.ops.framework.common.file.FileClient;
|
||||
import com.orion.ops.module.asset.dao.ExecHostLogDAO;
|
||||
import com.orion.ops.module.asset.entity.domain.ExecHostLogDO;
|
||||
import com.orion.ops.module.asset.enums.ExecHostStatusEnum;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
|
||||
import com.orion.ops.module.asset.service.HostTerminalService;
|
||||
import com.orion.spring.SpringHolder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 命令执行器
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/3/12 11:30
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExecCommandHandler implements IExecCommandHandler {
|
||||
|
||||
private final FileClient fileClient = SpringHolder.getBean("logsFileClient");
|
||||
|
||||
private final HostTerminalService hostTerminalService = SpringHolder.getBean(HostTerminalService.class);
|
||||
|
||||
private final ExecHostLogDAO execHostLogDAO = SpringHolder.getBean(ExecHostLogDAO.class);
|
||||
|
||||
private final ExecCommandHostDTO command;
|
||||
|
||||
private final TimeoutChecker timeoutChecker;
|
||||
|
||||
private SessionStore sessionStore;
|
||||
|
||||
private CommandExecutor executor;
|
||||
|
||||
private OutputStream logOutputStream;
|
||||
|
||||
private volatile boolean closed;
|
||||
|
||||
private volatile boolean interrupted;
|
||||
|
||||
public ExecCommandHandler(ExecCommandHostDTO command, TimeoutChecker timeoutChecker) {
|
||||
this.command = command;
|
||||
this.timeoutChecker = timeoutChecker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Long id = command.getHostLogId();
|
||||
log.info("ExecCommandHandler run start id: {}, info: {}", id, JSON.toJSONString(command));
|
||||
try {
|
||||
// 更新状态
|
||||
this.updateStatus(ExecHostStatusEnum.RUNNING, null);
|
||||
// 执行命令
|
||||
this.execCommand();
|
||||
if (executor.isTimeout()) {
|
||||
// 更新状态
|
||||
this.updateStatus(ExecHostStatusEnum.FAILED, new TimeoutException());
|
||||
} else {
|
||||
// 更新状态
|
||||
this.updateStatus(ExecHostStatusEnum.COMPLETED, null);
|
||||
}
|
||||
log.info("ExecCommandHandler run complete id: {}", id);
|
||||
} catch (Exception e) {
|
||||
log.error("ExecCommandHandler run error id: {}", id, e);
|
||||
// TODO
|
||||
if (this.interrupted) {
|
||||
this.updateStatus(ExecHostStatusEnum.INTERRUPTED, null);
|
||||
} else {
|
||||
this.updateStatus(ExecHostStatusEnum.FAILED, e);
|
||||
}
|
||||
} finally {
|
||||
Streams.close(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行命令
|
||||
*
|
||||
* @throws IOException IOException
|
||||
*/
|
||||
private void execCommand() throws Exception {
|
||||
// 打开日志流
|
||||
this.logOutputStream = fileClient.getContentOutputStream(command.getLogPath());
|
||||
// 打开会话
|
||||
this.sessionStore = hostTerminalService.openSessionStore(command.getHostId());
|
||||
this.executor = sessionStore.getCommandExecutor(Strings.replaceCRLF(command.getCommand()));
|
||||
// TODO 超时
|
||||
// 执行命令
|
||||
executor.timeout(command.getTimeout(), TimeUnit.SECONDS, timeoutChecker);
|
||||
executor.merge();
|
||||
executor.transfer(logOutputStream);
|
||||
executor.connect();
|
||||
executor.exec();
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新状态
|
||||
*
|
||||
* @param status status
|
||||
* @param ex ex
|
||||
*/
|
||||
private void updateStatus(ExecHostStatusEnum status, Exception ex) {
|
||||
Long id = command.getHostLogId();
|
||||
String statusName = status.name();
|
||||
log.info("ExecCommandHandler.updateStatus id: {}, status: {}", id, statusName);
|
||||
ExecHostLogDO update = new ExecHostLogDO();
|
||||
update.setId(id);
|
||||
update.setStatus(statusName);
|
||||
if (ExecHostStatusEnum.RUNNING.equals(status)) {
|
||||
// 运行中
|
||||
update.setStartTime(new Date());
|
||||
} else if (ExecHostStatusEnum.COMPLETED.equals(status)) {
|
||||
// 完成
|
||||
update.setFinishTime(new Date());
|
||||
update.setExitStatus(executor.getExitCode());
|
||||
} else if (ExecHostStatusEnum.FAILED.equals(status)) {
|
||||
// 失败
|
||||
update.setFinishTime(new Date());
|
||||
update.setErrorMessage(this.getErrorMessage(ex));
|
||||
} else if (ExecHostStatusEnum.INTERRUPTED.equals(status)) {
|
||||
// 中断
|
||||
update.setFinishTime(new Date());
|
||||
}
|
||||
execHostLogDAO.updateById(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String msg) {
|
||||
this.executor.write(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void interrupted() {
|
||||
if (this.interrupted || this.closed) {
|
||||
return;
|
||||
}
|
||||
// 关闭
|
||||
this.interrupted = true;
|
||||
this.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取错误信息
|
||||
*
|
||||
* @param ex ex
|
||||
* @return errorMessage
|
||||
*/
|
||||
private String getErrorMessage(Exception ex) {
|
||||
String message;
|
||||
if (ex instanceof TimeoutException) {
|
||||
message = "执行超时";
|
||||
} else if (ex instanceof InvalidArgumentException) {
|
||||
message = ex.getMessage();
|
||||
} else if (ex instanceof AuthenticationException) {
|
||||
message = "认证失败";
|
||||
} else {
|
||||
message = "执行失败";
|
||||
}
|
||||
return Strings.retain(message, 250);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
Streams.close(executor);
|
||||
Streams.close(sessionStore);
|
||||
Streams.close(logOutputStream);
|
||||
// TODO 关闭日志
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package com.orion.ops.module.asset.handler.host.exec.handler;
|
||||
|
||||
import com.orion.lang.support.timeout.TimeoutChecker;
|
||||
import com.orion.lang.utils.Threads;
|
||||
import com.orion.lang.utils.collect.Lists;
|
||||
import com.orion.lang.utils.io.Streams;
|
||||
import com.orion.ops.framework.common.constant.Const;
|
||||
import com.orion.ops.module.asset.dao.ExecLogDAO;
|
||||
import com.orion.ops.module.asset.define.AssetThreadPools;
|
||||
import com.orion.ops.module.asset.entity.domain.ExecLogDO;
|
||||
import com.orion.ops.module.asset.enums.ExecStatusEnum;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
|
||||
import com.orion.spring.SpringHolder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 命令执行任务
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/3/12 11:43
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExecTaskHandler implements IExecTaskHandler {
|
||||
|
||||
private static final ExecLogDAO execLogDAO = SpringHolder.getBean(ExecLogDAO.class);
|
||||
|
||||
private final ExecCommandDTO command;
|
||||
|
||||
private TimeoutChecker timeoutChecker;
|
||||
|
||||
private List<ExecCommandHandler> handlers;
|
||||
|
||||
public ExecTaskHandler(ExecCommandDTO command) {
|
||||
this.command = command;
|
||||
this.handlers = Lists.newList();
|
||||
}
|
||||
|
||||
// TODO manager
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Long id = command.getLogId();
|
||||
log.info("ExecTaskHandler.run start id: {}", id);
|
||||
try {
|
||||
// TODO 添加
|
||||
// 更新状态
|
||||
this.updateStatus(ExecStatusEnum.RUNNING);
|
||||
// 执行命令
|
||||
this.runHostCommand(command.getHosts());
|
||||
// 更新状态-执行完成
|
||||
log.info("ExecTaskHandler.run completed id: {}", id);
|
||||
this.updateStatus(ExecStatusEnum.COMPLETED);
|
||||
} catch (Exception e) {
|
||||
// 更新状态-执行失败
|
||||
this.updateStatus(ExecStatusEnum.FAILED);
|
||||
log.error("ExecTaskHandler.run error id: {}", id, e);
|
||||
// TODO 移除
|
||||
} finally {
|
||||
this.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行主机命令
|
||||
*
|
||||
* @param hosts hosts
|
||||
* @throws Exception Exception
|
||||
*/
|
||||
private void runHostCommand(List<ExecCommandHostDTO> hosts) throws Exception {
|
||||
// 超时检查
|
||||
if (command.getTimeout() != 0) {
|
||||
this.timeoutChecker = TimeoutChecker.create(Const.MS_S_1);
|
||||
AssetThreadPools.TIMEOUT_CHECK.execute(this.timeoutChecker);
|
||||
}
|
||||
if (hosts.size() == 1) {
|
||||
// 单个主机直接执行
|
||||
new ExecCommandHandler(hosts.get(0), timeoutChecker).run();
|
||||
} else {
|
||||
this.handlers = hosts.stream()
|
||||
.map(s -> new ExecCommandHandler(s, timeoutChecker))
|
||||
.collect(Collectors.toList());
|
||||
// 多个主机异步阻塞执行
|
||||
Threads.blockRun(handlers, AssetThreadPools.EXEC_HOST);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Integer updateStatus(ExecStatusEnum status) {
|
||||
Long id = command.getLogId();
|
||||
String statusName = status.name();
|
||||
log.info("ExecTaskHandler-updateStatus id: {}, status: {}", id, statusName);
|
||||
ExecLogDO update = new ExecLogDO();
|
||||
update.setId(id);
|
||||
update.setStatus(statusName);
|
||||
if (ExecStatusEnum.RUNNING.equals(status)) {
|
||||
// 执行中
|
||||
update.setStartTime(new Date());
|
||||
} else if (ExecStatusEnum.COMPLETED.equals(status)) {
|
||||
// 执行完成
|
||||
update.setFinishTime(new Date());
|
||||
} else if (ExecStatusEnum.FAILED.equals(status)) {
|
||||
// 执行失败
|
||||
update.setFinishTime(new Date());
|
||||
}
|
||||
return execLogDAO.updateById(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Streams.close(timeoutChecker);
|
||||
this.handlers.forEach(Streams::close);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.orion.ops.module.asset.handler.host.exec.handler;
|
||||
|
||||
import com.orion.lang.able.SafeCloseable;
|
||||
|
||||
/**
|
||||
* 命令执行器定义
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/3/12 11:31
|
||||
*/
|
||||
public interface IExecCommandHandler extends Runnable, SafeCloseable {
|
||||
|
||||
/**
|
||||
* 写入
|
||||
*
|
||||
* @param msg msg
|
||||
*/
|
||||
void write(String msg);
|
||||
|
||||
/**
|
||||
* 中断执行
|
||||
*/
|
||||
void interrupted();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.orion.ops.module.asset.handler.host.exec.handler;
|
||||
|
||||
import com.orion.lang.able.SafeCloseable;
|
||||
|
||||
/**
|
||||
* 执行任务处理器
|
||||
*
|
||||
* @author Jiahang Li
|
||||
* @version 1.0.0
|
||||
* @since 2024/3/12 11:43
|
||||
*/
|
||||
public interface IExecTaskHandler extends Runnable, SafeCloseable {
|
||||
|
||||
}
|
||||
@@ -31,7 +31,7 @@ public abstract class TerminalSession implements ITerminalSession {
|
||||
@Getter
|
||||
protected final TerminalConfig config;
|
||||
|
||||
protected volatile boolean close;
|
||||
protected volatile boolean closed;
|
||||
|
||||
protected volatile boolean forceOffline;
|
||||
|
||||
@@ -50,7 +50,7 @@ public abstract class TerminalSession implements ITerminalSession {
|
||||
* 发送关闭消息
|
||||
*/
|
||||
protected void sendCloseMessage() {
|
||||
log.info("TerminalSession close {}, forClose: {}, forceOffline: {}", sessionId, this.close, this.forceOffline);
|
||||
log.info("TerminalSession close {}, forClose: {}, forceOffline: {}", sessionId, this.closed, this.forceOffline);
|
||||
// 发送关闭信息
|
||||
TerminalCloseResponse resp = TerminalCloseResponse.builder()
|
||||
.type(OutputTypeEnum.CLOSE.getType())
|
||||
@@ -86,10 +86,10 @@ public abstract class TerminalSession implements ITerminalSession {
|
||||
* @return close
|
||||
*/
|
||||
private boolean checkAndClose() {
|
||||
if (close) {
|
||||
if (closed) {
|
||||
return false;
|
||||
}
|
||||
this.close = true;
|
||||
this.closed = true;
|
||||
// 释放资源
|
||||
try {
|
||||
this.releaseResource();
|
||||
|
||||
@@ -80,9 +80,10 @@ public class ExecLogServiceImpl implements ExecLogService {
|
||||
return execLogDAO.wrapper()
|
||||
.eq(ExecLogDO::getId, request.getId())
|
||||
.eq(ExecLogDO::getUserId, request.getUserId())
|
||||
.eq(ExecLogDO::getUsername, request.getUsername())
|
||||
.eq(ExecLogDO::getSource, request.getSource())
|
||||
.eq(ExecLogDO::getSourceId, request.getSourceId())
|
||||
.eq(ExecLogDO::getDesc, request.getDesc())
|
||||
.eq(ExecLogDO::getDescription, request.getDescription())
|
||||
.eq(ExecLogDO::getCommand, request.getCommand())
|
||||
.eq(ExecLogDO::getStatus, request.getStatus())
|
||||
.ge(ExecLogDO::getStartTime, Arrays1.getIfPresent(request.getStartTimeRange(), 0))
|
||||
|
||||
@@ -9,8 +9,10 @@ import com.orion.lang.utils.json.matcher.NoMatchStrategy;
|
||||
import com.orion.lang.utils.json.matcher.ReplacementFormatter;
|
||||
import com.orion.lang.utils.json.matcher.ReplacementFormatters;
|
||||
import com.orion.lang.utils.time.Dates;
|
||||
import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs;
|
||||
import com.orion.ops.framework.common.constant.Const;
|
||||
import com.orion.ops.framework.common.constant.ErrorMessage;
|
||||
import com.orion.ops.framework.common.file.FileClient;
|
||||
import com.orion.ops.framework.common.security.LoginUser;
|
||||
import com.orion.ops.framework.common.utils.Valid;
|
||||
import com.orion.ops.framework.security.core.utils.SecurityUtils;
|
||||
@@ -26,6 +28,9 @@ import com.orion.ops.module.asset.enums.ExecHostStatusEnum;
|
||||
import com.orion.ops.module.asset.enums.ExecSourceEnum;
|
||||
import com.orion.ops.module.asset.enums.ExecStatusEnum;
|
||||
import com.orion.ops.module.asset.enums.HostConfigTypeEnum;
|
||||
import com.orion.ops.module.asset.handler.host.exec.ExecTaskExecutors;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
|
||||
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
|
||||
import com.orion.ops.module.asset.service.AssetAuthorizedDataService;
|
||||
import com.orion.ops.module.asset.service.ExecService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -53,6 +58,9 @@ public class ExecServiceImpl implements ExecService {
|
||||
private static final ReplacementFormatter FORMATTER = ReplacementFormatters.create("@{{ ", " }}")
|
||||
.noMatchStrategy(NoMatchStrategy.EMPTY);
|
||||
|
||||
@Resource
|
||||
private FileClient logsFileClient;
|
||||
|
||||
@Resource
|
||||
private ExecLogDAO execLogDAO;
|
||||
|
||||
@@ -81,10 +89,12 @@ public class ExecServiceImpl implements ExecService {
|
||||
// 插入日志
|
||||
ExecLogDO execLog = ExecLogDO.builder()
|
||||
.userId(userId)
|
||||
.username(user.getUsername())
|
||||
.source(ExecSourceEnum.BATCH.name())
|
||||
.desc(Strings.ifBlank(request.getDesc(), Strings.retain(command, 60) + Const.OMIT))
|
||||
.description(Strings.ifBlank(request.getDescription(), Strings.retain(command, 60) + Const.OMIT))
|
||||
.command(command)
|
||||
.status(ExecStatusEnum.COMPLETED.name())
|
||||
.timeout(request.getTimeout())
|
||||
.status(ExecStatusEnum.WAITING.name())
|
||||
.build();
|
||||
execLogDAO.insert(execLog);
|
||||
Long execId = execLog.getId();
|
||||
@@ -105,9 +115,23 @@ public class ExecServiceImpl implements ExecService {
|
||||
.build();
|
||||
}).collect(Collectors.toList());
|
||||
execHostLogDAO.insertBatch(execHostLogs);
|
||||
// TODO 开始执行
|
||||
|
||||
|
||||
// 开始执行
|
||||
ExecCommandDTO exec = ExecCommandDTO.builder()
|
||||
.logId(execId)
|
||||
.timeout(request.getTimeout())
|
||||
.hosts(execHostLogs.stream()
|
||||
.map(s -> ExecCommandHostDTO.builder()
|
||||
.hostId(s.getHostId())
|
||||
.hostLogId(s.getId())
|
||||
.command(s.getCommand())
|
||||
.timeout(request.getTimeout())
|
||||
.logPath(s.getLogPath())
|
||||
.build())
|
||||
.collect(Collectors.toList()))
|
||||
.build();
|
||||
ExecTaskExecutors.start(exec);
|
||||
// 操作日志
|
||||
OperatorLogs.add(OperatorLogs.ID, execId);
|
||||
// 返回
|
||||
Map<String, Long> hostIdRel = execHostLogs.stream()
|
||||
.collect(Collectors.toMap(s -> String.valueOf(s.getHostId()), ExecHostLogDO::getId));
|
||||
@@ -125,7 +149,8 @@ public class ExecServiceImpl implements ExecService {
|
||||
* @return logPath
|
||||
*/
|
||||
private String buildLogPath(Long logId, Long hostId) {
|
||||
return "/exec/" + logId + "/" + hostId + ".log";
|
||||
String logFile = "/exec/" + logId + "/" + hostId + ".log";
|
||||
return logsFileClient.getReturnPath(logFile);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
<result column="parameter" property="parameter"/>
|
||||
<result column="exit_status" property="exitStatus"/>
|
||||
<result column="log_path" property="logPath"/>
|
||||
<result column="error_message" property="errorMessage"/>
|
||||
<result column="start_time" property="startTime"/>
|
||||
<result column="finish_time" property="finishTime"/>
|
||||
<result column="create_time" property="createTime"/>
|
||||
@@ -24,7 +25,7 @@
|
||||
|
||||
<!-- 通用查询结果列 -->
|
||||
<sql id="Base_Column_List">
|
||||
id, log_id, host_id, host_name, status, command, parameter, exit_status, log_path, start_time, finish_time, create_time, update_time, creator, updater, deleted
|
||||
id, log_id, host_id, host_name, status, command, parameter, exit_status, log_path, error_message, start_time, finish_time, create_time, update_time, creator, updater, deleted
|
||||
</sql>
|
||||
|
||||
</mapper>
|
||||
|
||||
@@ -6,10 +6,12 @@
|
||||
<resultMap id="BaseResultMap" type="com.orion.ops.module.asset.entity.domain.ExecLogDO">
|
||||
<id column="id" property="id"/>
|
||||
<result column="user_id" property="userId"/>
|
||||
<result column="username" property="username"/>
|
||||
<result column="source" property="source"/>
|
||||
<result column="source_id" property="sourceId"/>
|
||||
<result column="desc" property="desc"/>
|
||||
<result column="description" property="description"/>
|
||||
<result column="command" property="command"/>
|
||||
<result column="timeout" property="timeout"/>
|
||||
<result column="status" property="status"/>
|
||||
<result column="start_time" property="startTime"/>
|
||||
<result column="finish_time" property="finishTime"/>
|
||||
@@ -22,7 +24,7 @@
|
||||
|
||||
<!-- 通用查询结果列 -->
|
||||
<sql id="Base_Column_List">
|
||||
id, user_id, source, source_id, desc, command, status, start_time, finish_time, create_time, update_time, creator, updater, deleted
|
||||
id, user_id, username, source, source_id, description, command, timeout, status, start_time, finish_time, create_time, update_time, creator, updater, deleted
|
||||
</sql>
|
||||
|
||||
</mapper>
|
||||
|
||||
@@ -7,7 +7,7 @@ Authorization: {{token}}
|
||||
"key": "",
|
||||
"valueType": "",
|
||||
"extraSchema": "",
|
||||
"desc": ""
|
||||
"description": ""
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ Authorization: {{token}}
|
||||
"key": "",
|
||||
"valueType": "",
|
||||
"extraSchema": "",
|
||||
"desc": ""
|
||||
"description": ""
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ Authorization: {{token}}
|
||||
"key": "",
|
||||
"label": "",
|
||||
"value": "",
|
||||
"desc": "",
|
||||
"description": "",
|
||||
"extra": "",
|
||||
"sort": ""
|
||||
}
|
||||
@@ -25,7 +25,7 @@ Authorization: {{token}}
|
||||
"key": "",
|
||||
"label": "",
|
||||
"value": "",
|
||||
"desc": "",
|
||||
"description": "",
|
||||
"extra": "",
|
||||
"sort": ""
|
||||
}
|
||||
@@ -47,7 +47,7 @@ Authorization: {{token}}
|
||||
"keyId": "",
|
||||
"label": "",
|
||||
"value": "",
|
||||
"desc": ""
|
||||
"description": ""
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user