feat: 连接主机终端.

This commit is contained in:
lijiahang
2024-01-03 16:05:11 +08:00
parent f7867a8bcb
commit 195a0ba7dc
11 changed files with 284 additions and 87 deletions

View File

@@ -0,0 +1,30 @@
package com.orion.ops.module.asset.define;
import com.orion.lang.define.thread.ExecutorBuilder;
import com.orion.ops.framework.common.constant.Const;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 资产线程池
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/3 11:21
*/
public interface AssetThreadPools {
/**
* terminal 调度线程池
*/
ThreadPoolExecutor TERMINAL_SCHEDULER = ExecutorBuilder.create()
.namedThreadFactory("terminal-thread-")
.corePoolSize(1)
.maxPoolSize(Integer.MAX_VALUE)
.keepAliveTime(Const.MS_S_60)
.workQueue(new SynchronousQueue<>())
.allowCoreThreadTimeout(true)
.build();
}

View File

@@ -44,15 +44,9 @@ public class HostTerminalConnectDTO {
@Schema(description = "超时时间")
private Integer timeout;
@Schema(description = "SSH输出编码")
private String charset;
@Schema(description = "文件名称编码")
private String fileNameCharset;
@Schema(description = "文件内容编码")
private String fileContentCharset;
@Schema(description = "用户名")
private String username;

View File

@@ -54,21 +54,11 @@ public class HostSshConfigModel implements GenericsDataModel, UpdatePasswordActi
@Schema(description = "连接超时时间")
private Integer connectTimeout;
@NotBlank
@Size(max = 12)
@Schema(description = "SSH输出编码")
private String charset;
@NotBlank
@Size(max = 12)
@Schema(description = "文件名称编码")
private String fileNameCharset;
@NotBlank
@Size(max = 12)
@Schema(description = "文件内容编码")
private String fileContentCharset;
@Schema(description = "是否使用新密码 仅参数")
private Boolean useNewPassword;

View File

@@ -45,10 +45,8 @@ public class HostSshConfigStrategy implements MapDataStrategy<HostSshConfigModel
.port(SSH_PORT)
.username(USERNAME)
.authType(HostSshAuthTypeEnum.PASSWORD.name())
.charset(Const.UTF_8)
.connectTimeout(Const.MS_S_10)
.fileNameCharset(Const.UTF_8)
.fileContentCharset(Const.UTF_8)
.build();
}
@@ -57,9 +55,7 @@ public class HostSshConfigStrategy implements MapDataStrategy<HostSshConfigModel
// 验证认证类型
Valid.valid(HostSshAuthTypeEnum::of, model.getAuthType());
// 验证编码格式
this.validCharset(model.getCharset());
this.validCharset(model.getFileNameCharset());
this.validCharset(model.getFileContentCharset());
// 检查主机秘钥是否存在
Long keyId = model.getKeyId();
if (keyId != null) {

View File

@@ -43,12 +43,12 @@ public abstract class AbstractTerminalHandler<T> implements ITerminalHandler {
*
* @param session session
* @param attr attr
* @param <T> T
* @param <E> T
* @return T
*/
@SuppressWarnings("unchecked")
protected <T> T getAttr(WebSocketSession session, String attr) {
return (T) session.getAttributes().get(attr);
protected <E> E getAttr(WebSocketSession session, String attr) {
return (E) session.getAttributes().get(attr);
}
}

View File

@@ -1,6 +1,8 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.orion.lang.id.UUIds;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.ops.framework.biz.operator.log.core.service.OperatorLogFrameworkService;
import com.orion.ops.framework.biz.operator.log.core.uitls.OperatorLogFiller;
@@ -14,7 +16,8 @@ import com.orion.ops.module.asset.entity.request.host.HostConnectLogCreateReques
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.entity.Message;
import com.orion.ops.module.asset.handler.host.terminal.entity.request.TerminalConnectRequest;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalSession;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.session.TerminalSession;
import com.orion.ops.module.asset.service.HostConnectLogService;
import com.orion.ops.module.asset.service.HostTerminalService;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +50,9 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
@Resource
private OperatorLogFrameworkService operatorLogFrameworkService;
@Resource
private TerminalManager terminalManager;
public TerminalConnectHandler() {
super(TerminalConnectRequest.class);
}
@@ -66,21 +72,22 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
}
// 日志信息
long startTime = System.currentTimeMillis();
String terminalToken = UUIds.random15();
TerminalSession terminalSession = null;
Exception ex = null;
String terminalToken = null;
try {
// 连接主机
HostTerminalConnectDTO connect = hostTerminalService.getTerminalConnectInfo(userId, host);
terminalToken = connect.getToken();
SessionStore sessionStore = hostTerminalService.openSessionStore(connect);
TerminalSession terminalSession = new TerminalSession(session, connect, sessionStore);
terminalSession = new TerminalSession(terminalToken, session, sessionStore);
terminalSession.connect(body.getCols(), body.getRows());
log.info("TerminalConnectHandler-handle success userId: {}, hostId: {}", userId, hostId);
// TODO 添加到 manager
log.info("TerminalConnectHandler-handle success userId: {}, hostId: {}, token: {}", userId, hostId, terminalToken);
// 添加会话到 manager
terminalManager.addSession(terminalSession);
} catch (Exception e) {
log.error("TerminalConnectHandler-handle error userId: {}, hostId: {}", userId, hostId, e);
log.error("TerminalConnectHandler-handle error userId: {}, hostId: {}, token: {}", userId, hostId, terminalToken, e);
ex = e;
Streams.close(terminalSession);
} finally {
// 记录主机日志
this.saveTerminalLog(session, userId, host, startTime, ex, terminalToken);

View File

@@ -0,0 +1,73 @@
package com.orion.ops.module.asset.handler.host.terminal.manager;
import com.orion.lang.define.collect.MultiConcurrentHashMap;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import com.orion.ops.module.asset.handler.host.terminal.session.TerminalSession;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/**
* 终端管理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/3 11:35
*/
@Component
public class TerminalManager {
/**
* 会话存储器
*/
private final MultiConcurrentHashMap<String, String, ITerminalSession> sessions = MultiConcurrentHashMap.create();
/**
* 添加会话
*
* @param terminalSession terminalSession
*/
public void addSession(TerminalSession terminalSession) {
sessions.put(terminalSession.getSession().getId(), terminalSession.getToken(), terminalSession);
}
/**
* 获取会话
*
* @param id id
* @param token token
* @return session
*/
public ITerminalSession getSession(String id, String token) {
return sessions.get(id, token);
}
/**
* 关闭会话
*
* @param id id
* @param token token
*/
public void closeSession(String id, String token) {
ITerminalSession session = sessions.get(id, token);
Streams.close(session);
sessions.removeElement(id, token);
}
/**
* 关闭全部会话
*
* @param id id
*/
public void closeAll(String id) {
ConcurrentHashMap<String, ITerminalSession> session = sessions.get(id);
if (Maps.isEmpty(session)) {
return;
}
session.values().forEach(Streams::close);
sessions.remove(id);
}
}

View File

@@ -1,49 +0,0 @@
package com.orion.ops.module.asset.handler.host.terminal.manager;
import com.orion.lang.able.SafeCloseable;
import com.orion.net.host.SessionStore;
import com.orion.net.host.ssh.shell.ShellExecutor;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import org.springframework.web.socket.WebSocketSession;
/**
* 终端会话
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
*/
public class TerminalSession implements SafeCloseable {
private final WebSocketSession session;
private final HostTerminalConnectDTO connect;
private final SessionStore sessionStore;
private ShellExecutor executor;
public TerminalSession(WebSocketSession session,
HostTerminalConnectDTO connect,
SessionStore sessionStore) {
this.session = session;
this.connect = connect;
this.sessionStore = sessionStore;
}
/**
* 连接
*
* @param cols cols
* @param rows rows
*/
public void connect(int cols, int rows) {
this.executor = sessionStore.getShellExecutor();
executor.size(cols, rows);
}
@Override
public void close() {
}
}

View File

@@ -0,0 +1,44 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.able.SafeCloseable;
/**
* 终端会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
*/
public interface ITerminalSession extends SafeCloseable {
/**
* 连接
*
* @param cols cols
* @param rows rows
*/
void connect(int cols, int rows);
/**
* 重置大小
*
* @param cols cols
* @param rows rows
*/
void resize(int cols, int rows);
/**
* 写入内容
*
* @param b b
*/
void write(String b);
/**
* 写入内容
*
* @param b b
*/
void write(byte[] b);
}

View File

@@ -0,0 +1,118 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.ssh.TerminalType;
import com.orion.net.host.ssh.shell.ShellExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.module.asset.define.AssetThreadPools;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* 终端会话
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
*/
@Slf4j
public class TerminalSession implements ITerminalSession {
@Getter
private final String token;
@Getter
private final WebSocketSession session;
private final SessionStore sessionStore;
private ShellExecutor executor;
private volatile boolean close;
public TerminalSession(String token,
WebSocketSession session,
SessionStore sessionStore) {
this.token = token;
this.session = session;
this.sessionStore = sessionStore;
}
@Override
public void connect(int cols, int rows) {
this.executor = sessionStore.getShellExecutor();
executor.terminalType(TerminalType.XTERM_256_COLOR);
executor.size(cols, rows);
executor.streamHandler(this::streamHandler);
executor.connect();
// 开始监听输出
AssetThreadPools.TERMINAL_SCHEDULER.execute(executor);
}
@Override
public void resize(int cols, int rows) {
if (!executor.isConnected()) {
executor.connect();
}
executor.size(cols, rows);
executor.resize();
}
@Override
public void write(String b) {
executor.write(b);
}
@Override
public void write(byte[] b) {
executor.write(b);
}
@Override
public void close() {
if (close) {
return;
}
this.close = true;
try {
Streams.close(executor);
Streams.close(sessionStore);
} catch (Exception e) {
log.error("terminal 断开连接 失败 token: {}, {}", token, e);
}
}
/**
* 标准输出处理
*
* @param inputStream stream
*/
private void streamHandler(InputStream inputStream) {
byte[] bs = new byte[Const.BUFFER_KB_4];
BufferedInputStream in = new BufferedInputStream(inputStream, Const.BUFFER_KB_4);
int read;
try {
while (session.isOpen() && (read = in.read(bs)) != -1) {
// 响应
// byte[] msg = WsProtocol.OK.msg(bs, 0, read);
// WebSockets.sendText(session, msg);
}
} catch (IOException ex) {
log.error("terminal 读取流失败", ex);
// WebSockets.close(session, WsCloseCode.READ_EXCEPTION);
}
// eof
if (close) {
return;
}
// WebSockets.close(session, WsCloseCode.EOF);
log.info("terminal eof回调 {}", token);
}
}

View File

@@ -149,11 +149,7 @@ public class HostTerminalServiceImpl implements HostTerminalService {
}
}
// 获取连接配置
// TODO 看看需不需要 不需要的话就修改位置
HostTerminalConnectDTO connect = this.getHostConnectInfo(host, config, extra);
connect.setUserId(userId);
connect.setToken(UUIds.random15());
return connect;
return this.getHostConnectInfo(host, config, extra);
}
@Override
@@ -258,9 +254,7 @@ public class HostTerminalServiceImpl implements HostTerminalService {
conn.setHostName(host.getName());
conn.setHostAddress(host.getAddress());
conn.setPort(config.getPort());
conn.setCharset(config.getCharset());
conn.setFileNameCharset(config.getFileNameCharset());
conn.setFileContentCharset(config.getFileContentCharset());
conn.setTimeout(config.getConnectTimeout());
conn.setUsername(config.getUsername());
// 填充身份信息