🔨 批量执行日志.

This commit is contained in:
lijiahang
2024-03-18 18:58:48 +08:00
parent 6c0f20a6de
commit 2aaf3ee907
15 changed files with 152 additions and 38 deletions

View File

@@ -29,6 +29,8 @@ public interface FieldConst {
String STATUS = "status";
String INFO = "info";
String REL_ID = "relId";
String BEFORE = "before";

View File

@@ -1,7 +1,9 @@
package com.orion.ops.module.asset.config;
import com.orion.ops.module.asset.handler.host.transfer.TransferMessageDispatcher;
import com.orion.ops.module.asset.handler.host.exec.log.handler.ExecLogTailHandler;
import com.orion.ops.module.asset.handler.host.terminal.TerminalMessageDispatcher;
import com.orion.ops.module.asset.handler.host.transfer.TransferMessageDispatcher;
import com.orion.ops.module.asset.interceptor.ExecLogTailInterceptor;
import com.orion.ops.module.asset.interceptor.TerminalAccessInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@@ -26,12 +28,18 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
@Resource
private TerminalAccessInterceptor terminalAccessInterceptor;
@Resource
private ExecLogTailInterceptor execLogTailInterceptor;
@Resource
private TerminalMessageDispatcher terminalMessageDispatcher;
@Resource
private TransferMessageDispatcher transferMessageDispatcher;
@Resource
private ExecLogTailHandler execLogTailHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 终端
@@ -42,6 +50,10 @@ public class AssetWebSocketConfiguration implements WebSocketConfigurer {
registry.addHandler(transferMessageDispatcher, prefix + "/host/transfer/{accessToken}")
.addInterceptors(terminalAccessInterceptor)
.setAllowedOrigins("*");
// 执行日志
registry.addHandler(execLogTailHandler, prefix + "/exec/log/{token}")
.addInterceptors(execLogTailInterceptor)
.setAllowedOrigins("*");
}
}

View File

@@ -1,8 +1,8 @@
package com.orion.ops.module.asset.handler.host.exec;
package com.orion.ops.module.asset.handler.host.exec.command;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.handler.ExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.command.handler.ExecTaskHandler;
/**
* 批量执行命令执行器

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.dto;
package com.orion.ops.module.asset.handler.host.exec.command.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.dto;
package com.orion.ops.module.asset.handler.host.exec.command.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.handler;
package com.orion.ops.module.asset.handler.host.exec.command.handler;
import com.alibaba.fastjson.JSON;
import com.orion.lang.exception.AuthenticationException;
@@ -13,7 +13,7 @@ import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.module.asset.dao.ExecHostLogDAO;
import com.orion.ops.module.asset.entity.domain.ExecHostLogDO;
import com.orion.ops.module.asset.enums.ExecHostStatusEnum;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.service.HostTerminalService;
import com.orion.spring.SpringHolder;
import lombok.Getter;

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.handler;
package com.orion.ops.module.asset.handler.host.exec.command.handler;
import com.orion.lang.support.timeout.TimeoutChecker;
import com.orion.lang.utils.Threads;
@@ -9,9 +9,9 @@ import com.orion.ops.module.asset.dao.ExecLogDAO;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.entity.domain.ExecLogDO;
import com.orion.ops.module.asset.enums.ExecStatusEnum;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.manager.ExecManager;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.command.manager.ExecTaskManager;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -32,7 +32,7 @@ public class ExecTaskHandler implements IExecTaskHandler {
private static final ExecLogDAO execLogDAO = SpringHolder.getBean(ExecLogDAO.class);
private static final ExecManager execManager = SpringHolder.getBean(ExecManager.class);
private static final ExecTaskManager EXEC_TASK_MANAGER = SpringHolder.getBean(ExecTaskManager.class);
private final ExecCommandDTO execCommand;
@@ -50,7 +50,7 @@ public class ExecTaskHandler implements IExecTaskHandler {
public void run() {
Long id = execCommand.getLogId();
// 添加任务
execManager.addTask(id, this);
EXEC_TASK_MANAGER.addTask(id, this);
log.info("ExecTaskHandler.run start id: {}", id);
// 更新状态
this.updateStatus(ExecStatusEnum.RUNNING);
@@ -68,7 +68,7 @@ public class ExecTaskHandler implements IExecTaskHandler {
// 释放资源
Streams.close(this);
// 移除任务
execManager.removeTask(id);
EXEC_TASK_MANAGER.removeTask(id);
}
}

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.handler;
package com.orion.ops.module.asset.handler.host.exec.command.handler;
import com.orion.lang.able.SafeCloseable;
import com.orion.ops.module.asset.enums.ExecHostStatusEnum;

View File

@@ -1,4 +1,4 @@
package com.orion.ops.module.asset.handler.host.exec.handler;
package com.orion.ops.module.asset.handler.host.exec.command.handler;
import com.orion.lang.able.SafeCloseable;

View File

@@ -1,6 +1,6 @@
package com.orion.ops.module.asset.handler.host.exec.manager;
package com.orion.ops.module.asset.handler.host.exec.command.manager;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecTaskHandler;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@@ -13,7 +13,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @since 2024/3/13 11:20
*/
@Component
public class ExecManager {
public class ExecTaskManager {
private final ConcurrentHashMap<Long, IExecTaskHandler> execTasks = new ConcurrentHashMap<>();

View File

@@ -0,0 +1,49 @@
package com.orion.ops.module.asset.handler.host.exec.log.handler;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.entity.dto.ExecLogTailDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
/**
* 执行日志查看处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/3/18 18:38
*/
@Slf4j
@Component
public class ExecLogTailHandler extends AbstractWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
log.info("ExecLogTailHandler-afterConnectionEstablished id: {}", session.getId());
// 获取参数
ExecLogTailDTO info = WebSockets.getAttr(session, ExtraFieldConst.INFO);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("ExecLogTailHandler-handleTransportError id: {}", session.getId(), exception);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String id = session.getId();
log.info("ExecLogTailHandler-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason());
// 关闭会话
}
}

View File

@@ -0,0 +1,51 @@
package com.orion.ops.module.asset.interceptor;
import com.orion.lang.utils.Urls;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.module.asset.entity.dto.ExecLogTailDTO;
import com.orion.ops.module.asset.service.ExecService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.annotation.Resource;
import java.util.Map;
/**
* 执行日志拦截器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/3/18 17:32
*/
@Slf4j
@Component
public class ExecLogTailInterceptor implements HandshakeInterceptor {
@Resource
private ExecService execService;
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// 获取 token
String token = Urls.getUrlSource(request.getURI().getPath());
log.info("ExecLogTailInterceptor-beforeHandshake start token: {}", token);
// 获取日志数据
ExecLogTailDTO info = execService.getExecLogTailInfo(token);
if (info == null) {
log.error("ExecLogTailInterceptor-beforeHandshake absent token: {}", token);
return false;
}
// 保存
attributes.put(ExtraFieldConst.INFO, info);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

View File

@@ -9,9 +9,9 @@ import com.orion.ops.module.asset.convert.ExecHostLogConvert;
import com.orion.ops.module.asset.dao.ExecHostLogDAO;
import com.orion.ops.module.asset.entity.domain.ExecHostLogDO;
import com.orion.ops.module.asset.entity.vo.ExecHostLogVO;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecCommandHandler;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.manager.ExecManager;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecCommandHandler;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.command.manager.ExecTaskManager;
import com.orion.ops.module.asset.service.ExecHostLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -36,7 +36,7 @@ public class ExecHostLogServiceImpl implements ExecHostLogService {
private ExecHostLogDAO execHostLogDAO;
@Resource
private ExecManager execManager;
private ExecTaskManager execTaskManager;
@Override
public List<ExecHostLogVO> getExecHostLogList(Long logId) {
@@ -74,7 +74,7 @@ public class ExecHostLogServiceImpl implements ExecHostLogService {
Valid.notNull(record, ErrorMessage.DATA_ABSENT);
// 中断
Optional.ofNullable(record.getLogId())
.map(execManager::getTask)
.map(execTaskManager::getTask)
.map(IExecTaskHandler::getHandlers)
.flatMap(s -> s.stream()
.filter(h -> h.getHostId().equals(record.getHostId()))

View File

@@ -18,8 +18,8 @@ import com.orion.ops.module.asset.entity.request.exec.ExecLogQueryRequest;
import com.orion.ops.module.asset.entity.vo.ExecHostLogVO;
import com.orion.ops.module.asset.entity.vo.ExecLogStatusVO;
import com.orion.ops.module.asset.entity.vo.ExecLogVO;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.manager.ExecManager;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.command.manager.ExecTaskManager;
import com.orion.ops.module.asset.service.ExecHostLogService;
import com.orion.ops.module.asset.service.ExecLogService;
import lombok.extern.slf4j.Slf4j;
@@ -53,7 +53,7 @@ public class ExecLogServiceImpl implements ExecLogService {
private ExecHostLogService execHostLogService;
@Resource
private ExecManager execManager;
private ExecTaskManager execTaskManager;
@Override
public DataGrid<ExecLogVO> getExecLogPage(ExecLogQueryRequest request) {
@@ -226,7 +226,7 @@ public class ExecLogServiceImpl implements ExecLogService {
*/
private void interruptedTask(List<Long> idList) {
idList.stream()
.map(execManager::getTask)
.map(execTaskManager::getTask)
.filter(Objects::nonNull)
.forEach(IExecTaskHandler::interrupted);
}

View File

@@ -39,12 +39,12 @@ import com.orion.ops.module.asset.enums.ExecHostStatusEnum;
import com.orion.ops.module.asset.enums.ExecSourceEnum;
import com.orion.ops.module.asset.enums.ExecStatusEnum;
import com.orion.ops.module.asset.enums.HostConfigTypeEnum;
import com.orion.ops.module.asset.handler.host.exec.ExecTaskExecutors;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecCommandHandler;
import com.orion.ops.module.asset.handler.host.exec.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.manager.ExecManager;
import com.orion.ops.module.asset.handler.host.exec.command.ExecTaskExecutors;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandDTO;
import com.orion.ops.module.asset.handler.host.exec.command.dto.ExecCommandHostDTO;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecCommandHandler;
import com.orion.ops.module.asset.handler.host.exec.command.handler.IExecTaskHandler;
import com.orion.ops.module.asset.handler.host.exec.command.manager.ExecTaskManager;
import com.orion.ops.module.asset.service.AssetAuthorizedDataService;
import com.orion.ops.module.asset.service.ExecService;
import com.orion.web.servlet.web.Servlets;
@@ -93,7 +93,7 @@ public class ExecServiceImpl implements ExecService {
private AssetAuthorizedDataService assetAuthorizedDataService;
@Resource
private ExecManager execManager;
private ExecTaskManager execTaskManager;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -201,7 +201,7 @@ public class ExecServiceImpl implements ExecService {
return;
}
// 中断执行
IExecTaskHandler task = execManager.getTask(logId);
IExecTaskHandler task = execTaskManager.getTask(logId);
if (task != null) {
log.info("ExecService.interruptExec interrupted logId: {}", logId);
// 中断
@@ -242,7 +242,7 @@ public class ExecServiceImpl implements ExecService {
return;
}
// 中断执行
IExecTaskHandler task = execManager.getTask(logId);
IExecTaskHandler task = execTaskManager.getTask(logId);
if (task != null) {
log.info("ExecService.interruptHostExec interrupted logId: {}, hostLogId: {}", logId, hostLogId);
IExecCommandHandler handler = task.getHandlers()