主机在线会话.

This commit is contained in:
lijiahang
2024-05-07 10:52:30 +08:00
parent 320c4c272a
commit b19911bfa7
12 changed files with 113 additions and 29 deletions

View File

@@ -49,6 +49,14 @@ public class HostConnectLogController {
return hostConnectLogService.getHostConnectLogPage(request); return hostConnectLogService.getHostConnectLogPage(request);
} }
@IgnoreLog(IgnoreLogMode.RET)
@PostMapping("/session")
@Operation(summary = "分页查询主机连接会话")
@PreAuthorize("@ss.hasPermission('asset:host-connect-session:management:query')")
public List<HostConnectLogVO> getHostConnectSessions(@Validated @RequestBody HostConnectLogQueryRequest request) {
return hostConnectLogService.getHostConnectSessions(request);
}
@IgnoreLog(IgnoreLogMode.RET) @IgnoreLog(IgnoreLogMode.RET)
@PostMapping("/latest-connect") @PostMapping("/latest-connect")
@Operation(summary = "查询用户最近连接的主机") @Operation(summary = "查询用户最近连接的主机")
@@ -83,7 +91,7 @@ public class HostConnectLogController {
@OperatorLog(HostConnectLogOperatorType.FORCE_OFFLINE) @OperatorLog(HostConnectLogOperatorType.FORCE_OFFLINE)
@PutMapping("/force-offline") @PutMapping("/force-offline")
@Operation(summary = "强制断开主机连接") @Operation(summary = "强制断开主机连接")
@PreAuthorize("@ss.hasPermission('asset:host-connect-log:management:force-offline')") @PreAuthorize("@ss.hasPermission('asset:host-connect-log:management:force-offline', 'asset:host-connect-session:management:force-offline')")
public Integer forceOffline(@Validated(Id.class) @RequestBody HostConnectLogQueryRequest request) { public Integer forceOffline(@Validated(Id.class) @RequestBody HostConnectLogQueryRequest request) {
return hostConnectLogService.forceOffline(request); return hostConnectLogService.forceOffline(request);
} }

View File

@@ -23,6 +23,9 @@ import lombok.NoArgsConstructor;
@Schema(name = "HostTerminalConnectDTO", description = "主机终端连接参数") @Schema(name = "HostTerminalConnectDTO", description = "主机终端连接参数")
public class HostTerminalConnectDTO { public class HostTerminalConnectDTO {
@Schema(description = "logId")
private Long logId;
@Schema(description = "连接类型") @Schema(description = "连接类型")
private String connectType; private String connectType;

View File

@@ -7,6 +7,7 @@ import lombok.*;
import javax.validation.constraints.Size; import javax.validation.constraints.Size;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* 主机连接日志 查询请求对象 * 主机连接日志 查询请求对象
@@ -26,6 +27,9 @@ public class HostConnectLogQueryRequest extends PageRequest {
@Schema(description = "id") @Schema(description = "id")
private Long id; private Long id;
@Schema(description = "id")
private List<Long> idList;
@Schema(description = "用户id") @Schema(description = "用户id")
private Long userId; private Long userId;

View File

@@ -65,7 +65,7 @@ public class TerminalMessageDispatcher extends AbstractWebSocketHandler {
String id = session.getId(); String id = session.getId();
log.info("TerminalMessageDispatcher-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason()); log.info("TerminalMessageDispatcher-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason());
// 关闭会话 // 关闭会话
terminalManager.closeAll(id); terminalManager.closeSession(id);
} }
} }

View File

@@ -75,10 +75,11 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
log.info("TerminalCheckHandler-handle unknown host userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId); log.info("TerminalCheckHandler-handle unknown host userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);
return; return;
} }
HostTerminalConnectDTO connect = null;
Exception ex = null; Exception ex = null;
try { try {
// 获取连接信息 // 获取连接信息
HostTerminalConnectDTO connect = hostTerminalService.getTerminalConnectInfo(userId, host, connectType); connect = hostTerminalService.getTerminalConnectInfo(userId, host, connectType);
// 设置到缓存中 // 设置到缓存中
channel.getAttributes().put(sessionId, connect); channel.getAttributes().put(sessionId, connect);
log.info("TerminalCheckHandler-handle success userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId); log.info("TerminalCheckHandler-handle success userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);
@@ -93,7 +94,10 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
log.error("TerminalCheckHandler-handle exception userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId, e); log.error("TerminalCheckHandler-handle exception userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId, e);
} }
// 记录主机日志 // 记录主机日志
this.saveHostLog(channel, userId, host, startTime, ex, sessionId, connectType); Long logId = this.saveHostLog(channel, userId, host, startTime, ex, sessionId, connectType);
if (connect != null) {
connect.setLogId(logId);
}
// 响应检查结果 // 响应检查结果
this.send(channel, this.send(channel,
OutputTypeEnum.CHECK, OutputTypeEnum.CHECK,
@@ -165,8 +169,9 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
* @param ex ex * @param ex ex
* @param sessionId sessionId * @param sessionId sessionId
* @param connectType connectType * @param connectType connectType
* @return logId
*/ */
private void saveHostLog(WebSocketSession channel, private Long saveHostLog(WebSocketSession channel,
Long userId, Long userId,
HostDO host, HostDO host,
long startTime, long startTime,
@@ -206,7 +211,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
extra.put(OperatorLogs.USER_AGENT, logModel.getUserAgent()); extra.put(OperatorLogs.USER_AGENT, logModel.getUserAgent());
extra.put(OperatorLogs.ERROR_MESSAGE, logModel.getErrorMessage()); extra.put(OperatorLogs.ERROR_MESSAGE, logModel.getErrorMessage());
// 记录连接日志 // 记录连接日志
hostConnectLogService.create(connectType, connectLog); return hostConnectLogService.create(connectType, connectLog);
} }
} }

View File

@@ -78,7 +78,7 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
// 修改连接状态为失败 // 修改连接状态为失败
Map<String, Object> extra = Maps.newMap(4); Map<String, Object> extra = Maps.newMap(4);
extra.put(ExtraFieldConst.ERROR_MESSAGE, this.getConnectErrorMessage(e)); extra.put(ExtraFieldConst.ERROR_MESSAGE, this.getConnectErrorMessage(e));
hostConnectLogService.updateStatusByToken(sessionId, HostConnectStatusEnum.FAILED, extra); hostConnectLogService.updateStatusById(connect.getLogId(), HostConnectStatusEnum.FAILED, extra);
} }
// 返回连接状态 // 返回连接状态
this.send(channel, this.send(channel,
@@ -108,6 +108,7 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
try { try {
// 连接配置 // 连接配置
TerminalConfig config = TerminalConfig.builder() TerminalConfig config = TerminalConfig.builder()
.logId(connect.getLogId())
.hostId(connect.getHostId()) .hostId(connect.getHostId())
.hostName(connect.getHostName()) .hostName(connect.getHostName())
.address(connect.getHostAddress()) .address(connect.getHostAddress())

View File

@@ -34,7 +34,20 @@ public class TerminalManager {
} }
/** /**
* 关闭会话 * 通过 channel 关闭会话
*
* @param channelId channelId
*/
public void closeSession(String channelId) {
// 获取并移除
ConcurrentHashMap<String, ITerminalSession> session = channelSessions.remove(channelId);
if (!Maps.isEmpty(session)) {
session.values().forEach(Streams::close);
}
}
/**
* 通过 channel + sessionId 关闭会话
* *
* @param channelId channelId * @param channelId channelId
* @param sessionId sessionId * @param sessionId sessionId
@@ -71,16 +84,12 @@ public class TerminalManager {
} }
/** /**
* 关闭全部会话 * 获取全部会话
* *
* @param channelId channelId * @return session
*/ */
public void closeAll(String channelId) { public MultiConcurrentHashMap<String, String, ITerminalSession> getChannelSessions() {
// 获取并移除 return channelSessions;
ConcurrentHashMap<String, ITerminalSession> session = channelSessions.remove(channelId);
if (!Maps.isEmpty(session)) {
session.values().forEach(Streams::close);
}
} }
} }

View File

@@ -22,6 +22,9 @@ import lombok.NoArgsConstructor;
@Schema(name = "TerminalConfig", description = "主机终端连接参数") @Schema(name = "TerminalConfig", description = "主机终端连接参数")
public class TerminalConfig { public class TerminalConfig {
@Schema(description = "logId")
private Long logId;
@Schema(description = "主机id") @Schema(description = "主机id")
private Long hostId; private Long hostId;

View File

@@ -43,4 +43,11 @@ public interface ITerminalSession extends SafeCloseable {
*/ */
void forceOffline(); void forceOffline();
/**
* 是否已关闭
*
* @return closed
*/
boolean isClosed();
} }

View File

@@ -31,6 +31,7 @@ public abstract class TerminalSession implements ITerminalSession {
@Getter @Getter
protected final TerminalConfig config; protected final TerminalConfig config;
@Getter
protected volatile boolean closed; protected volatile boolean closed;
protected volatile boolean forceOffline; protected volatile boolean forceOffline;
@@ -68,7 +69,7 @@ public abstract class TerminalSession implements ITerminalSession {
if (this.checkAndClose()) { if (this.checkAndClose()) {
// 修改状态 // 修改状态
SpringHolder.getBean(HostConnectLogService.class) SpringHolder.getBean(HostConnectLogService.class)
.updateStatusByToken(sessionId, HostConnectStatusEnum.COMPLETE, null); .updateStatusById(config.getLogId(), HostConnectStatusEnum.COMPLETE, null);
} }
} }

View File

@@ -21,12 +21,13 @@ import java.util.concurrent.Future;
public interface HostConnectLogService { public interface HostConnectLogService {
/** /**
* 创建 * 创建主机连接日志
* *
* @param type type * @param type type
* @param request request * @param request request
* @return id
*/ */
void create(HostConnectTypeEnum type, HostConnectLogCreateRequest request); Long create(HostConnectTypeEnum type, HostConnectLogCreateRequest request);
/** /**
* 分页查询主机连接日志 * 分页查询主机连接日志
@@ -36,15 +37,23 @@ public interface HostConnectLogService {
*/ */
DataGrid<HostConnectLogVO> getHostConnectLogPage(HostConnectLogQueryRequest request); DataGrid<HostConnectLogVO> getHostConnectLogPage(HostConnectLogQueryRequest request);
/**
* 分页查询主机连接会话
*
* @param request request
* @return rows
*/
List<HostConnectLogVO> getHostConnectSessions(HostConnectLogQueryRequest request);
/** /**
* 更新连接状态 * 更新连接状态
* *
* @param token token * @param id id
* @param status status * @param status status
* @param extra extra * @param extra extra
* @return effect * @return effect
*/ */
Integer updateStatusByToken(String token, HostConnectStatusEnum status, Map<String, Object> extra); Integer updateStatusById(Long id, HostConnectStatusEnum status, Map<String, Object> extra);
/** /**
* 查询用户最近连接的主机 * 查询用户最近连接的主机

View File

@@ -6,6 +6,7 @@ import com.orion.lang.constant.Const;
import com.orion.lang.define.wrapper.DataGrid; import com.orion.lang.define.wrapper.DataGrid;
import com.orion.lang.utils.Arrays1; import com.orion.lang.utils.Arrays1;
import com.orion.lang.utils.Valid; import com.orion.lang.utils.Valid;
import com.orion.lang.utils.collect.Lists;
import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs; import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs;
import com.orion.ops.framework.common.constant.ErrorMessage; import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.security.core.utils.SecurityUtils; import com.orion.ops.framework.security.core.utils.SecurityUtils;
@@ -19,6 +20,7 @@ import com.orion.ops.module.asset.entity.vo.HostConnectLogVO;
import com.orion.ops.module.asset.enums.HostConnectStatusEnum; import com.orion.ops.module.asset.enums.HostConnectStatusEnum;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum; import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager; import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession; import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import com.orion.ops.module.asset.service.HostConnectLogService; import com.orion.ops.module.asset.service.HostConnectLogService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -26,11 +28,11 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors;
/** /**
* 主机连接日志 服务实现类 * 主机连接日志 服务实现类
@@ -50,7 +52,7 @@ public class HostConnectLogServiceImpl implements HostConnectLogService {
private TerminalManager terminalManager; private TerminalManager terminalManager;
@Override @Override
public void create(HostConnectTypeEnum type, HostConnectLogCreateRequest request) { public Long create(HostConnectTypeEnum type, HostConnectLogCreateRequest request) {
HostConnectLogDO record = HostConnectLogConvert.MAPPER.to(request); HostConnectLogDO record = HostConnectLogConvert.MAPPER.to(request);
record.setType(type.name()); record.setType(type.name());
String status = request.getStatus(); String status = request.getStatus();
@@ -62,6 +64,7 @@ public class HostConnectLogServiceImpl implements HostConnectLogService {
record.setEndTime(new Date()); record.setEndTime(new Date());
} }
hostConnectLogDAO.insert(record); hostConnectLogDAO.insert(record);
return record.getId();
} }
@Override @Override
@@ -79,17 +82,47 @@ public class HostConnectLogServiceImpl implements HostConnectLogService {
} }
@Override @Override
public Integer updateStatusByToken(String token, HostConnectStatusEnum status, Map<String, Object> partial) { public List<HostConnectLogVO> getHostConnectSessions(HostConnectLogQueryRequest request) {
log.info("HostConnectLogService-updateStatusByToken start token: {}, status: {}", token, status); // 查询全部
List<Long> idList = terminalManager.getChannelSessions()
.values()
.stream()
.map(ConcurrentHashMap::values)
.flatMap(Collection::stream)
.filter(s -> !s.isClosed())
.map(ITerminalSession::getConfig)
.filter(Objects::nonNull)
.map(TerminalConfig::getLogId)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (idList.isEmpty()) {
return Lists.empty();
}
// 条件
request.setIdList(idList);
request.setStatus(HostConnectStatusEnum.CONNECTING.name());
LambdaQueryWrapper<HostConnectLogDO> wrapper = this.buildQueryWrapper(request);
// 查询
return hostConnectLogDAO.of(wrapper)
.list(s -> {
HostConnectLogVO vo = HostConnectLogConvert.MAPPER.to(s);
vo.setExtra(JSON.parseObject(s.getExtraInfo(), HostConnectLogExtraDTO.class));
return vo;
});
}
@Override
public Integer updateStatusById(Long id, HostConnectStatusEnum status, Map<String, Object> partial) {
log.info("HostConnectLogService-updateStatusByToken start id: {}, status: {}", id, status);
// 查询 // 查询
HostConnectLogDO record = hostConnectLogDAO.of() HostConnectLogDO record = hostConnectLogDAO.of()
.createWrapper() .createWrapper()
.eq(HostConnectLogDO::getToken, token) .eq(HostConnectLogDO::getId, id)
.orderByDesc(HostConnectLogDO::getId) .orderByDesc(HostConnectLogDO::getId)
.then() .then()
.getOne(); .getOne();
if (record == null) { if (record == null) {
log.info("HostConnectLogService-updateStatusByToken no record token: {}", token); log.info("HostConnectLogService-updateStatusByToken no record id: {}", id);
return Const.N_0; return Const.N_0;
} }
return this.updateStatus(record, status, partial); return this.updateStatus(record, status, partial);
@@ -189,6 +222,7 @@ public class HostConnectLogServiceImpl implements HostConnectLogService {
private LambdaQueryWrapper<HostConnectLogDO> buildQueryWrapper(HostConnectLogQueryRequest request) { private LambdaQueryWrapper<HostConnectLogDO> buildQueryWrapper(HostConnectLogQueryRequest request) {
return hostConnectLogDAO.wrapper() return hostConnectLogDAO.wrapper()
.eq(HostConnectLogDO::getId, request.getId()) .eq(HostConnectLogDO::getId, request.getId())
.in(HostConnectLogDO::getId, request.getIdList())
.eq(HostConnectLogDO::getUserId, request.getUserId()) .eq(HostConnectLogDO::getUserId, request.getUserId())
.eq(HostConnectLogDO::getHostId, request.getHostId()) .eq(HostConnectLogDO::getHostId, request.getHostId())
.like(HostConnectLogDO::getHostAddress, request.getHostAddress()) .like(HostConnectLogDO::getHostAddress, request.getHostAddress())