站内消息.

This commit is contained in:
lijiahang
2024-05-14 19:17:12 +08:00
parent 2fa3eb2251
commit 4fe6208d0e
19 changed files with 385 additions and 52 deletions

View File

@@ -0,0 +1,53 @@
package com.orion.ops.module.asset.define.message;
import com.orion.ops.module.infra.define.SystemMessageDefine;
import com.orion.ops.module.infra.enums.MessageClassifyEnum;
import lombok.Getter;
/**
* 命令执行 系统消息定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/14 17:23
*/
@Getter
public enum ExecMessageDefine implements SystemMessageDefine {
/**
* 命令执行部分失败
*/
EXEC_FAILED(MessageClassifyEnum.NOTICE,
"命令执行失败",
"您在 <sb>${time}</sb> 执行的命令部分失败, 或者返回了非零的 exitCode。点击查看详情 <sb>#${id}</sb> >>>"),
;
ExecMessageDefine(MessageClassifyEnum classify, String title, String content) {
this.classify = classify;
this.type = this.name();
this.title = title;
this.content = content;
}
/**
* 消息分类
*/
private final MessageClassifyEnum classify;
/**
* 消息类型
*/
private final String type;
/**
* 标题
*/
private final String title;
/**
* 内容
*/
private final String content;
}

View File

@@ -0,0 +1,53 @@
package com.orion.ops.module.asset.define.message;
import com.orion.ops.module.infra.define.SystemMessageDefine;
import com.orion.ops.module.infra.enums.MessageClassifyEnum;
import lombok.Getter;
/**
* 上传任务 系统消息定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/14 17:23
*/
@Getter
public enum UploadMessageDefine implements SystemMessageDefine {
/**
* 上传任务部分失败
*/
UPLOAD_FAILED(MessageClassifyEnum.NOTICE,
"批量上传失败",
"您在 <sb>${time}</sb> 提交的上传任务中, 有部分主机文件上传失败。点击查看详情 <sb>#${id}</sb> >>>"),
;
UploadMessageDefine(MessageClassifyEnum classify, String title, String content) {
this.classify = classify;
this.type = this.name();
this.title = title;
this.content = content;
}
/**
* 消息分类
*/
private final MessageClassifyEnum classify;
/**
* 消息类型
*/
private final String type;
/**
* 标题
*/
private final String title;
/**
* 内容
*/
private final String content;
}

View File

@@ -22,7 +22,7 @@ import java.util.List;
@Schema(name = "ExecCommandDTO", description = "批量执行启动对象")
public class ExecCommandDTO {
@Schema(description = "hostId")
@Schema(description = "logId")
private Long logId;
@Schema(description = "用户id")

View File

@@ -67,6 +67,9 @@ public abstract class BaseExecCommandHandler implements IExecCommandHandler {
private CommandExecutor executor;
@Getter
private Integer exitCode;
private volatile boolean closed;
private volatile boolean interrupted;
@@ -228,6 +231,7 @@ public abstract class BaseExecCommandHandler implements IExecCommandHandler {
// 完成
updateRecord.setFinishTime(new Date());
updateRecord.setExitStatus(executor.getExitCode());
this.exitCode = executor.getExitCode();
} else if (ExecHostStatusEnum.FAILED.equals(status)) {
// 失败
updateRecord.setFinishTime(new Date());

View File

@@ -7,20 +7,29 @@ import com.orion.lang.utils.Booleans;
import com.orion.lang.utils.Threads;
import com.orion.lang.utils.collect.Lists;
import com.orion.lang.utils.io.Streams;
import com.orion.lang.utils.time.Dates;
import com.orion.net.host.ssh.ExitCode;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.module.asset.dao.ExecLogDAO;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.define.config.AppExecLogConfig;
import com.orion.ops.module.asset.define.message.ExecMessageDefine;
import com.orion.ops.module.asset.entity.domain.ExecLogDO;
import com.orion.ops.module.asset.enums.ExecHostStatusEnum;
import com.orion.ops.module.asset.enums.ExecStatusEnum;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.command.manager.ExecTaskManager;
import com.orion.ops.module.infra.api.SystemMessageApi;
import com.orion.ops.module.infra.entity.dto.message.SystemMessageDTO;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 命令执行任务
@@ -38,6 +47,8 @@ public class ExecTaskHandler implements IExecTaskHandler {
private static final AppExecLogConfig appExecLogConfig = SpringHolder.getBean(AppExecLogConfig.class);
private static final SystemMessageApi systemMessageApi = SpringHolder.getBean(SystemMessageApi.class);
private final ExecCommandDTO execCommand;
private TimeoutChecker<TimeoutEndpoint> timeoutChecker;
@@ -45,6 +56,8 @@ public class ExecTaskHandler implements IExecTaskHandler {
@Getter
private final List<IExecCommandHandler> handlers;
private Date startTime;
public ExecTaskHandler(ExecCommandDTO execCommand) {
this.execCommand = execCommand;
this.handlers = Lists.newList();
@@ -56,9 +69,9 @@ public class ExecTaskHandler implements IExecTaskHandler {
// 添加任务
execTaskManager.addTask(id, this);
log.info("ExecTaskHandler.run start id: {}", id);
// 更新状态
this.updateStatus(ExecStatusEnum.RUNNING);
try {
// 更新状态
this.updateStatus(ExecStatusEnum.RUNNING);
// 执行命令
this.runHostCommand();
// 更新状态-执行完成
@@ -69,10 +82,12 @@ public class ExecTaskHandler implements IExecTaskHandler {
this.updateStatus(ExecStatusEnum.FAILED);
log.error("ExecTaskHandler.run error id: {}", id, e);
} finally {
// 释放资源
Streams.close(this);
// 检查是否发送消息
this.checkSendMessage();
// 移除任务
execTaskManager.removeTask(id);
// 释放资源
this.close();
}
}
@@ -82,6 +97,13 @@ public class ExecTaskHandler implements IExecTaskHandler {
handlers.forEach(IExecCommandHandler::interrupt);
}
@Override
public void close() {
log.info("ExecTaskHandler-close id: {}", execCommand.getLogId());
Streams.close(timeoutChecker);
this.handlers.forEach(Streams::close);
}
/**
* 执行主机命令
*
@@ -139,6 +161,7 @@ public class ExecTaskHandler implements IExecTaskHandler {
update.setStatus(statusName);
if (ExecStatusEnum.RUNNING.equals(status)) {
// 执行中
this.startTime = new Date();
update.setStartTime(new Date());
} else if (ExecStatusEnum.COMPLETED.equals(status)) {
// 执行完成
@@ -151,11 +174,30 @@ public class ExecTaskHandler implements IExecTaskHandler {
log.info("ExecTaskHandler-updateStatus finish id: {}, effect: {}", id, effect);
}
@Override
public void close() {
log.info("ExecTaskHandler-close id: {}", execCommand.getLogId());
Streams.close(timeoutChecker);
this.handlers.forEach(Streams::close);
/**
* 检查是否发送消息
*/
private void checkSendMessage() {
// 检查是否执行失败/exitCode
boolean hasError = handlers.stream().anyMatch(s ->
ExecHostStatusEnum.FAILED.equals(s.getStatus())
|| ExecHostStatusEnum.TIMEOUT.equals(s.getStatus())
|| !ExitCode.isSuccess(s.getExitCode()));
if (!hasError) {
return;
}
// 参数
Map<String, Object> params = new HashMap<>();
params.put(ExtraFieldConst.ID, execCommand.getLogId());
params.put(ExtraFieldConst.TIME, Dates.format(this.startTime, Dates.MD_HM));
SystemMessageDTO message = SystemMessageDTO.builder()
.receiverId(execCommand.getUserId())
.receiverUsername(execCommand.getUsername())
.relKey(String.valueOf(execCommand.getLogId()))
.params(params)
.build();
// 发送
systemMessageApi.create(ExecMessageDefine.EXEC_FAILED, message);
}
}

View File

@@ -31,6 +31,13 @@ public interface IExecCommandHandler extends Runnable, SafeCloseable {
*/
ExecHostStatusEnum getStatus();
/**
* 获取退出码
*
* @return exit code
*/
Integer getExitCode();
/**
* 获取主机 id
*

View File

@@ -3,10 +3,13 @@ package com.orion.ops.module.asset.handler.host.upload.task;
import com.orion.lang.utils.Threads;
import com.orion.lang.utils.io.Files1;
import com.orion.lang.utils.io.Streams;
import com.orion.lang.utils.time.Dates;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.module.asset.dao.UploadTaskDAO;
import com.orion.ops.module.asset.dao.UploadTaskFileDAO;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.define.message.UploadMessageDefine;
import com.orion.ops.module.asset.entity.domain.UploadTaskDO;
import com.orion.ops.module.asset.entity.domain.UploadTaskFileDO;
import com.orion.ops.module.asset.enums.UploadTaskFileStatusEnum;
@@ -16,14 +19,13 @@ import com.orion.ops.module.asset.handler.host.upload.manager.FileUploadTaskMana
import com.orion.ops.module.asset.handler.host.upload.uploader.FileUploader;
import com.orion.ops.module.asset.handler.host.upload.uploader.IFileUploader;
import com.orion.ops.module.asset.service.UploadTaskService;
import com.orion.ops.module.infra.api.SystemMessageApi;
import com.orion.ops.module.infra.entity.dto.message.SystemMessageDTO;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -42,6 +44,8 @@ public class FileUploadTask implements IFileUploadTask {
private static final UploadTaskService uploadTaskService = SpringHolder.getBean(UploadTaskService.class);
private static final SystemMessageApi systemMessageApi = SpringHolder.getBean(SystemMessageApi.class);
private static final FileUploadTaskManager fileUploadTaskManager = SpringHolder.getBean(FileUploadTaskManager.class);
private final Long id;
@@ -91,6 +95,8 @@ public class FileUploadTask implements IFileUploadTask {
} else {
this.updateStatus(UploadTaskStatusEnum.FINISHED);
}
// 检查是否发送消息
this.checkSendMessage();
// 移除任务
fileUploadTaskManager.removeTask(id);
// 释放资源
@@ -187,4 +193,33 @@ public class FileUploadTask implements IFileUploadTask {
uploadTaskDAO.updateById(update);
}
/**
* 检查是否发送消息
*/
private void checkSendMessage() {
if (canceled) {
return;
}
// 检查是否上传失败
boolean hasError = uploaderList.stream()
.map(IFileUploader::getFiles)
.flatMap(Collection::stream)
.anyMatch(s -> UploadTaskFileStatusEnum.FAILED.name().equals(s.getStatus()));
if (!hasError) {
return;
}
// 参数
Map<String, Object> params = new HashMap<>();
params.put(ExtraFieldConst.ID, record.getId());
params.put(ExtraFieldConst.TIME, Dates.format(record.getCreateTime(), Dates.MD_HM));
SystemMessageDTO message = SystemMessageDTO.builder()
.receiverId(record.getUserId())
.receiverUsername(record.getUsername())
.relKey(String.valueOf(record.getId()))
.params(params)
.build();
// 发送
systemMessageApi.create(UploadMessageDefine.UPLOAD_FAILED, message);
}
}