feat: 关闭主机回调.

This commit is contained in:
lijiahangmax
2024-01-07 01:29:17 +08:00
parent b160c6317e
commit 48156ebb0d
23 changed files with 287 additions and 160 deletions

View File

@@ -24,7 +24,7 @@ public enum InputTypeEnum {
*/
CHECK("ck",
TerminalCheckHandler.class,
new String[]{"type", "session", "hostId"},
new String[]{"type", "sessionId", "hostId"},
TerminalCheckRequest.class),
/**
@@ -32,7 +32,7 @@ public enum InputTypeEnum {
*/
CONNECT("co",
TerminalConnectHandler.class,
new String[]{"type", "session", "cols", "rows"},
new String[]{"type", "sessionId", "cols", "rows"},
TerminalConnectRequest.class),
/**
@@ -40,7 +40,7 @@ public enum InputTypeEnum {
*/
CLOSE("cl",
TerminalCloseHandler.class,
new String[]{"type", "session"},
new String[]{"type", "sessionId"},
TerminalBasePayload.class),
/**
@@ -56,7 +56,7 @@ public enum InputTypeEnum {
*/
RESIZE("rs",
TerminalResizeHandler.class,
new String[]{"type", "session", "cols", "rows"},
new String[]{"type", "sessionId", "cols", "rows"},
TerminalResizeRequest.class),
/**
@@ -64,7 +64,7 @@ public enum InputTypeEnum {
*/
EXEC("e",
TerminalExecHandler.class,
new String[]{"type", "session", "command"},
new String[]{"type", "sessionId", "command"},
TerminalExecRequest.class),
/**
@@ -72,7 +72,7 @@ public enum InputTypeEnum {
*/
INPUT("i",
TerminalInputHandler.class,
new String[]{"type", "session", "command"},
new String[]{"type", "sessionId", "command"},
TerminalInputRequest.class),
;

View File

@@ -18,12 +18,17 @@ public enum OutputTypeEnum {
/**
* 主机连接检查
*/
CHECK("ck", "${type}|${session}|${result}|${errorMessage}"),
CHECK("ck", "${type}|${sessionId}|${result}|${msg}"),
/**
* 主机连接
*/
CONNECT("co", "${type}|${session}|${result}|${errorMessage}"),
CONNECT("co", "${type}|${sessionId}|${result}|${msg}"),
/**
* 关闭连接
*/
CLOSE("cl", "${type}|${sessionId}|${msg}"),
/**
* pong
@@ -33,7 +38,7 @@ public enum OutputTypeEnum {
/**
* 输出
*/
OUTPUT("o", "${type}|${session}|${body}"),
OUTPUT("o", "${type}|${sessionId}|${body}"),
;

View File

@@ -59,7 +59,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
Long hostId = payload.getHostId();
Long userId = this.getAttr(channel, ExtraFieldConst.USER_ID);
long startTime = System.currentTimeMillis();
String sessionId = payload.getSession();
String sessionId = payload.getSessionId();
log.info("TerminalCheckHandler-handle start userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);
// 检查 session 是否存在
if (this.checkSession(channel, payload)) {
@@ -89,9 +89,9 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
this.send(channel,
OutputTypeEnum.CHECK,
TerminalCheckResponse.builder()
.session(payload.getSession())
.sessionId(payload.getSessionId())
.result(BooleanBit.of(ex == null).getValue())
.errorMessage(ex == null ? null : ex.getMessage())
.msg(ex == null ? null : ex.getMessage())
.build());
}
@@ -103,7 +103,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
* @return 是否存在
*/
private boolean checkSession(WebSocketSession channel, TerminalCheckRequest payload) {
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSession());
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
this.sendCheckFailedMessage(channel, payload, ErrorMessage.SESSION_PRESENT);
return true;
@@ -137,13 +137,13 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
* @param msg msg
*/
private void sendCheckFailedMessage(WebSocketSession channel, TerminalCheckRequest payload, String msg) {
TerminalCheckResponse build = TerminalCheckResponse.builder()
.session(payload.getSession())
TerminalCheckResponse resp = TerminalCheckResponse.builder()
.sessionId(payload.getSessionId())
.result(BooleanBit.FALSE.getValue())
.errorMessage(msg)
.msg(msg)
.build();
// 发送
this.send(channel, OutputTypeEnum.CHECK, build);
this.send(channel, OutputTypeEnum.CHECK, resp);
}
/**

View File

@@ -24,9 +24,9 @@ public class TerminalCloseHandler extends AbstractTerminalHandler<TerminalBasePa
@Override
public void handle(WebSocketSession channel, TerminalBasePayload payload) {
log.info("TerminalCloseHandler-handle start session: {}", payload.getSession());
log.info("TerminalCloseHandler-handle start session: {}", payload.getSessionId());
// 关闭会话
terminalManager.closeSession(channel.getId(), payload.getSession());
terminalManager.closeSession(channel.getId(), payload.getSessionId());
}
}

View File

@@ -48,7 +48,7 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
@Override
public void handle(WebSocketSession channel, TerminalConnectRequest payload) {
String sessionId = payload.getSession();
String sessionId = payload.getSessionId();
log.info("TerminalConnectHandler-handle start sessionId: {}", sessionId);
// 获取主机连接信息
HostTerminalConnectDTO connect = this.getAttr(channel, sessionId);
@@ -57,9 +57,9 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
this.send(channel,
OutputTypeEnum.CONNECT,
TerminalConnectResponse.builder()
.session(payload.getSession())
.sessionId(payload.getSessionId())
.result(BooleanBit.FALSE.getValue())
.errorMessage(ErrorMessage.SESSION_ABSENT)
.msg(ErrorMessage.SESSION_ABSENT)
.build());
return;
}
@@ -80,9 +80,9 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
this.send(channel,
OutputTypeEnum.CONNECT,
TerminalConnectResponse.builder()
.session(payload.getSession())
.sessionId(payload.getSessionId())
.result(BooleanBit.of(ex == null).getValue())
.errorMessage(this.getConnectErrorMessage(ex))
.msg(this.getConnectErrorMessage(ex))
.build());
}

View File

@@ -26,7 +26,7 @@ public class TerminalExecHandler extends AbstractTerminalHandler<TerminalExecReq
@Override
public void handle(WebSocketSession channel, TerminalExecRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSession());
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
// 执行命令
terminalSession.write(payload.getCommand());

View File

@@ -26,7 +26,7 @@ public class TerminalInputHandler extends AbstractTerminalHandler<TerminalInputR
@Override
public void handle(WebSocketSession channel, TerminalInputRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSession());
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
// 处理输入
terminalSession.write(payload.getCommand());

View File

@@ -1,11 +1,17 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.orion.lang.utils.collect.Maps;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalBasePayload;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.Map;
/**
* ping 处理器
*
@@ -17,10 +23,20 @@ import org.springframework.web.socket.WebSocketSession;
@Component
public class TerminalPingHandler extends AbstractTerminalHandler<TerminalBasePayload> {
@Resource
private TerminalManager terminalManager;
@Override
public void handle(WebSocketSession channel, TerminalBasePayload payload) {
// 发送 pong
this.send(channel, OutputTypeEnum.PONG.getType());
// 活跃 terminal
Map<String, ITerminalSession> sessions = terminalManager.getSession(channel.getId());
if (!Maps.isEmpty(sessions)) {
for (ITerminalSession session : sessions.values()) {
session.keepAlive();
}
}
}
}

View File

@@ -26,7 +26,7 @@ public class TerminalResizeHandler extends AbstractTerminalHandler<TerminalResiz
@Override
public void handle(WebSocketSession channel, TerminalResizeRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSession());
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
// 修改大小
terminalSession.resize(payload.getCols(), payload.getRows());

View File

@@ -7,6 +7,7 @@ import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession
import com.orion.ops.module.asset.handler.host.terminal.session.TerminalSession;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -33,17 +34,6 @@ public class TerminalManager {
channelSessions.put(session.getChannel().getId(), session.getSessionId(), session);
}
/**
* 获取会话
*
* @param channelId channelId
* @param sessionId sessionId
* @return session
*/
public ITerminalSession getSession(String channelId, String sessionId) {
return channelSessions.get(channelId, sessionId);
}
/**
* 关闭会话
*
@@ -58,6 +48,27 @@ public class TerminalManager {
}
}
/**
* 获取会话
*
* @param channelId channelId
* @param sessionId sessionId
* @return session
*/
public ITerminalSession getSession(String channelId, String sessionId) {
return channelSessions.get(channelId, sessionId);
}
/**
* 获取会话
*
* @param channelId channelId
* @return session
*/
public Map<String, ITerminalSession> getSession(String channelId) {
return channelSessions.get(channelId);
}
/**
* 关闭全部会话
*

View File

@@ -21,7 +21,7 @@ import lombok.experimental.SuperBuilder;
public class TerminalBasePayload {
@Schema(description = "会话id")
private String session;
private String sessionId;
@Schema(description = "消息类型")
private String type;

View File

@@ -27,6 +27,6 @@ public class TerminalCheckResponse extends TerminalBasePayload {
private Integer result;
@Schema(description = "错误信息")
private String errorMessage;
private String msg;
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.asset.handler.host.terminal.model.response;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalBasePayload;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* 主机连接关闭响应 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 16:20
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Schema(name = "TerminalCloseResponse", description = "主机连接关闭响应 实体对象")
public class TerminalCloseResponse extends TerminalBasePayload {
@Schema(description = "关闭信息")
private String msg;
}

View File

@@ -27,6 +27,6 @@ public class TerminalConnectResponse extends TerminalBasePayload {
private Integer result;
@Schema(description = "错误信息")
private String errorMessage;
private String msg;
}

View File

@@ -41,4 +41,9 @@ public interface ITerminalSession extends SafeCloseable {
*/
void write(byte[] b);
/**
* 活跃回话
*/
void keepAlive();
}

View File

@@ -2,14 +2,15 @@ package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.ssh.TerminalType;
import com.orion.net.host.ssh.shell.ShellExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.enums.HostConnectStatusEnum;
import com.orion.ops.module.asset.handler.host.terminal.constant.TerminalMessage;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalCloseResponse;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalOutputResponse;
import com.orion.ops.module.asset.service.HostConnectLogService;
import com.orion.spring.SpringHolder;
@@ -64,9 +65,9 @@ public class TerminalSession implements ITerminalSession {
config.setRows(rows);
// 打开 shell
this.executor = sessionStore.getShellExecutor();
executor.terminalType(TerminalType.XTERM_256_COLOR);
executor.size(cols, rows);
executor.streamHandler(this::streamHandler);
executor.callback(this::eofCallback);
executor.connect();
// 开始监听输出
AssetThreadPools.TERMINAL_SCHEDULER.execute(executor);
@@ -97,8 +98,19 @@ public class TerminalSession implements ITerminalSession {
executor.write(b);
}
@Override
public void keepAlive() {
try {
// 发送个信号 保证 socket 不自动关闭
executor.sendSignal(Const.EMPTY);
} catch (Exception e) {
log.error("terminal keep-alive error {}", sessionId, e);
}
}
@Override
public void close() {
log.info("terminal close {}", sessionId);
if (close) {
return;
}
@@ -128,8 +140,8 @@ public class TerminalSession implements ITerminalSession {
String body = lastLine = new String(bs, 0, read, config.getCharset());
// 响应
TerminalOutputResponse resp = TerminalOutputResponse.builder()
.session(sessionId)
.type(OutputTypeEnum.OUTPUT.getType())
.sessionId(sessionId)
.body(body)
.build();
WebSockets.sendText(channel, OutputTypeEnum.OUTPUT.format(resp));
@@ -137,10 +149,22 @@ public class TerminalSession implements ITerminalSession {
} catch (IOException ex) {
log.error("terminal 读取流失败", ex);
}
// eof
if (close) {
log.info("terminal eof回调 {}", sessionId);
}
}
/**
* eof 回调
*/
private void eofCallback() {
log.info("terminal eof回调 {}, forClose: {}", sessionId, this.close);
// 发送关闭信息
TerminalCloseResponse resp = TerminalCloseResponse.builder()
.type(OutputTypeEnum.CLOSE.getType())
.sessionId(this.sessionId)
.msg(TerminalMessage.CLOSED_CONNECTION)
.build();
WebSockets.sendText(channel, OutputTypeEnum.CLOSE.format(resp));
// 需要调用关闭 - 可能是 logout 需要手动触发
this.close();
}
}