🚧 连接sftp.

This commit is contained in:
lijiahang
2024-02-04 18:17:39 +08:00
parent de82a94f13
commit b4ceb3839c
26 changed files with 385 additions and 293 deletions

View File

@@ -1,8 +1,7 @@
{
"local": {
"baseUrl": "http://127.0.0.1:9200/orion-api",
"token": "Bearer YQJ3IpwJJv5HujIWY6ZTNDgUxXRY6aDt",
"timestamp": 1689577685914
"token": "Bearer YQJ3IpwJJv5HujIWY6ZTNDgUxXRY6aDt"
},
"gateway": {
"baseUrl": "http://127.0.0.1:9200/orion-api",

View File

@@ -35,4 +35,6 @@ public interface ExtraFieldConst extends FieldConst {
String SESSION_ID = "sessionId";
String CONNECT_TYPE = "connectType";
}

View File

@@ -23,6 +23,9 @@ import lombok.NoArgsConstructor;
@Schema(name = "HostTerminalConnectDTO", description = "主机终端连接参数")
public class HostTerminalConnectDTO {
@Schema(description = "连接类型")
private String connectType;
@Schema(description = "hostId")
private Long hostId;

View File

@@ -14,6 +14,11 @@ public enum HostConnectTypeEnum {
*/
SSH,
/**
* sftp
*/
SFTP,
;
public static HostConnectTypeEnum of(String type) {

View File

@@ -3,7 +3,10 @@ package com.orion.ops.module.asset.handler.host.terminal.enums;
import com.alibaba.fastjson.JSONObject;
import com.orion.ops.module.asset.handler.host.terminal.handler.*;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalBasePayload;
import com.orion.ops.module.asset.handler.host.terminal.model.request.*;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalCheckRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalConnectRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalInputRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalResizeRequest;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import org.springframework.stereotype.Component;
@@ -24,7 +27,7 @@ public enum InputTypeEnum {
*/
CHECK("ck",
TerminalCheckHandler.class,
new String[]{"type", "sessionId", "hostId"},
new String[]{"type", "sessionId", "hostId", "connectType"},
TerminalCheckRequest.class),
/**
@@ -59,14 +62,6 @@ public enum InputTypeEnum {
new String[]{"type", "sessionId", "cols", "rows"},
TerminalResizeRequest.class),
/**
* 执行
*/
EXEC("e",
TerminalExecHandler.class,
new String[]{"type", "sessionId", "command"},
TerminalExecRequest.class),
/**
* 输入
*/
@@ -76,10 +71,10 @@ public enum InputTypeEnum {
TerminalInputRequest.class),
// LS
// DEL
// MK
// RM
// MV
// TC
// MK
// CD
;

View File

@@ -61,6 +61,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
Long hostId = payload.getHostId();
Long userId = this.getAttr(channel, ExtraFieldConst.USER_ID);
long startTime = System.currentTimeMillis();
HostConnectTypeEnum connectType = HostConnectTypeEnum.of(payload.getConnectType());
String sessionId = payload.getSessionId();
log.info("TerminalCheckHandler-handle start userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);
// 检查 session 是否存在
@@ -77,7 +78,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
Exception ex = null;
try {
// 获取连接信息
HostTerminalConnectDTO connect = hostTerminalService.getTerminalConnectInfo(userId, host);
HostTerminalConnectDTO connect = hostTerminalService.getTerminalConnectInfo(userId, host, connectType);
// 设置到缓存中
channel.getAttributes().put(sessionId, connect);
log.info("TerminalCheckHandler-handle success userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId);
@@ -89,7 +90,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
log.error("TerminalCheckHandler-handle exception userId: {}, hostId: {}, sessionId: {}", userId, hostId, sessionId, e);
}
// 记录主机日志
this.saveTerminalLog(channel, userId, host, startTime, ex, sessionId);
this.saveHostLog(channel, userId, host, startTime, ex, sessionId, connectType);
// 响应检查结果
this.send(channel,
OutputTypeEnum.CHECK,
@@ -108,8 +109,8 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
* @return 是否存在
*/
private boolean checkSession(WebSocketSession channel, TerminalCheckRequest payload) {
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
ITerminalSession session = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (session != null) {
this.sendCheckFailedMessage(channel, payload, ErrorMessage.SESSION_PRESENT);
return true;
}
@@ -154,19 +155,21 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
/**
* 记录主机日志
*
* @param channel channel
* @param userId userId
* @param host host
* @param startTime startTime
* @param ex ex
* @param sessionId sessionId
* @param channel channel
* @param userId userId
* @param host host
* @param startTime startTime
* @param ex ex
* @param sessionId sessionId
* @param connectType connectType
*/
private void saveTerminalLog(WebSocketSession channel,
Long userId,
HostDO host,
long startTime,
Exception ex,
String sessionId) {
private void saveHostLog(WebSocketSession channel,
Long userId,
HostDO host,
long startTime,
Exception ex,
String sessionId,
HostConnectTypeEnum connectType) {
Long hostId = host.getId();
String hostName = host.getName();
String username = this.getAttr(channel, ExtraFieldConst.USERNAME);
@@ -174,6 +177,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
Map<String, Object> extra = Maps.newMap();
extra.put(OperatorLogs.ID, hostId);
extra.put(OperatorLogs.NAME, hostName);
extra.put(OperatorLogs.CONNECT_TYPE, connectType.name());
extra.put(OperatorLogs.CHANNEL_ID, channel.getId());
extra.put(OperatorLogs.SESSION_ID, sessionId);
// 日志参数
@@ -205,7 +209,7 @@ public class TerminalCheckHandler extends AbstractTerminalHandler<TerminalCheckR
.token(sessionId)
.extra(extra)
.build();
hostConnectLogService.create(HostConnectTypeEnum.SSH, connectLog);
hostConnectLogService.create(connectType, connectLog);
}
}

View File

@@ -11,13 +11,16 @@ import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.enums.BooleanBit;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectStatusEnum;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.constant.TerminalMessage;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalConnectRequest;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalConnectResponse;
import com.orion.ops.module.asset.handler.host.terminal.session.TerminalSession;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import com.orion.ops.module.asset.handler.host.terminal.session.SftpSession;
import com.orion.ops.module.asset.handler.host.terminal.session.SshSession;
import com.orion.ops.module.asset.service.HostConnectLogService;
import com.orion.ops.module.asset.service.HostTerminalService;
import lombok.extern.slf4j.Slf4j;
@@ -68,9 +71,9 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
Exception ex = null;
try {
// 连接主机
TerminalSession terminalSession = this.connect(sessionId, connect, channel, payload);
ITerminalSession session = this.connect(sessionId, connect, channel, payload);
// 添加会话到 manager
terminalManager.addSession(terminalSession);
terminalManager.addSession(session);
} catch (Exception e) {
ex = e;
// 修改连接状态为失败
@@ -95,11 +98,12 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
* @param body body
* @return channel
*/
private TerminalSession connect(String sessionId,
HostTerminalConnectDTO connect,
WebSocketSession channel,
TerminalConnectRequest body) {
TerminalSession terminalSession = null;
private ITerminalSession connect(String sessionId,
HostTerminalConnectDTO connect,
WebSocketSession channel,
TerminalConnectRequest body) {
String connectType = connect.getConnectType();
ITerminalSession session = null;
try {
// 连接配置
TerminalConfig config = TerminalConfig.builder()
@@ -109,12 +113,21 @@ public class TerminalConnectHandler extends AbstractTerminalHandler<TerminalConn
.build();
// 建立连接
SessionStore sessionStore = hostTerminalService.openSessionStore(connect);
terminalSession = new TerminalSession(sessionId, channel, sessionStore, config);
terminalSession.connect(body.getTerminalType(), body.getCols(), body.getRows());
if (HostConnectTypeEnum.SSH.name().equals(connectType)) {
// 打开 ssh 会话
SshSession sshSession = new SshSession(sessionId, channel, sessionStore, config);
sshSession.connect(body.getTerminalType(), body.getCols(), body.getRows());
session = sshSession;
} else if (HostConnectTypeEnum.SFTP.name().equals(connectType)) {
// 打开 sftp 会话
SftpSession sftpSession = new SftpSession(sessionId, channel, sessionStore, config);
sftpSession.connect();
session = sftpSession;
}
log.info("TerminalConnectHandler-handle success sessionId: {}", sessionId);
return terminalSession;
return session;
} catch (Exception e) {
Streams.close(terminalSession);
Streams.close(session);
log.error("TerminalConnectHandler-handle error sessionId: {}", sessionId, e);
throw e;
}

View File

@@ -1,36 +0,0 @@
package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalExecRequest;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
/**
* 执行命令处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 15:32
*/
@Slf4j
@Component
public class TerminalExecHandler extends AbstractTerminalHandler<TerminalExecRequest> {
@Resource
private TerminalManager terminalManager;
@Override
public void handle(WebSocketSession channel, TerminalExecRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
// 执行命令
terminalSession.write(payload.getCommand());
}
}
}

View File

@@ -2,6 +2,7 @@ package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalInputRequest;
import com.orion.ops.module.asset.handler.host.terminal.session.ISshSession;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,10 +27,10 @@ public class TerminalInputHandler extends AbstractTerminalHandler<TerminalInputR
@Override
public void handle(WebSocketSession channel, TerminalInputRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
ITerminalSession session = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (session instanceof ISshSession) {
// 处理输入
terminalSession.write(payload.getCommand());
((ISshSession) session).write(payload.getCommand());
}
}

View File

@@ -2,6 +2,7 @@ package com.orion.ops.module.asset.handler.host.terminal.handler;
import com.orion.ops.module.asset.handler.host.terminal.manager.TerminalManager;
import com.orion.ops.module.asset.handler.host.terminal.model.request.TerminalResizeRequest;
import com.orion.ops.module.asset.handler.host.terminal.session.ISshSession;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,10 +27,10 @@ public class TerminalResizeHandler extends AbstractTerminalHandler<TerminalResiz
@Override
public void handle(WebSocketSession channel, TerminalResizeRequest payload) {
// 获取会话
ITerminalSession terminalSession = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (terminalSession != null) {
ITerminalSession session = terminalManager.getSession(channel.getId(), payload.getSessionId());
if (session instanceof ISshSession) {
// 修改大小
terminalSession.resize(payload.getCols(), payload.getRows());
((ISshSession) session).resize(payload.getCols(), payload.getRows());
}
}

View File

@@ -4,7 +4,6 @@ import com.orion.lang.define.collect.MultiConcurrentHashMap;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.module.asset.handler.host.terminal.session.ITerminalSession;
import com.orion.ops.module.asset.handler.host.terminal.session.TerminalSession;
import org.springframework.stereotype.Component;
import java.util.Map;
@@ -30,8 +29,8 @@ public class TerminalManager {
*
* @param session session
*/
public void addSession(TerminalSession session) {
channelSessions.put(session.getChannel().getId(), session.getSessionId(), session);
public void addSession(ITerminalSession session) {
channelSessions.put(session.getChannelId(), session.getSessionId(), session);
}
/**

View File

@@ -28,4 +28,7 @@ public class TerminalCheckRequest extends TerminalBasePayload {
@Schema(description = "主机id")
private Long hostId;
@Schema(description = "连接类型")
private String connectType;
}

View File

@@ -1,31 +0,0 @@
package com.orion.ops.module.asset.handler.host.terminal.model.request;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalBasePayload;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* 执行命令请求 实体对象
* <p>
* e|eff00a1|command
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/29 16:20
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Schema(name = "TerminalExecRequest", description = "执行命令请求 实体对象")
public class TerminalExecRequest extends TerminalBasePayload {
@Schema(description = "command")
private String command;
}

View File

@@ -0,0 +1,17 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
/**
* sftp 会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/4 16:48
*/
public interface ISftpSession extends ITerminalSession {
/**
* 建立连接
*/
void connect();
}

View File

@@ -0,0 +1,43 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
/**
* ssh 会话定义
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/4 16:47
*/
public interface ISshSession extends ITerminalSession {
/**
* 连接
*
* @param terminalType terminalType
* @param cols cols
* @param rows rows
*/
void connect(String terminalType, int cols, int rows);
/**
* 重置大小
*
* @param cols cols
* @param rows rows
*/
void resize(int cols, int rows);
/**
* 写入内容
*
* @param b b
*/
void write(String b);
/**
* 写入内容
*
* @param b b
*/
void write(byte[] b);
}

View File

@@ -12,38 +12,21 @@ import com.orion.lang.able.SafeCloseable;
public interface ITerminalSession extends SafeCloseable {
/**
* 连接
* 获取 sessionId
*
* @param terminalType terminalType
* @param cols cols
* @param rows rows
* @return sessionId
*/
void connect(String terminalType, int cols, int rows);
String getSessionId();
/**
* 重置大小
* 获取 channelId
*
* @param cols cols
* @param rows rows
* @return channelId
*/
void resize(int cols, int rows);
String getChannelId();
/**
* 写入内容
*
* @param b b
*/
void write(String b);
/**
* 写入内容
*
* @param b b
*/
void write(byte[] b);
/**
* 活跃回话
* 活跃会话
*/
void keepAlive();

View File

@@ -0,0 +1,59 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.sftp.SftpExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
/**
* 终端 ssh 会话
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
*/
@Slf4j
public class SftpSession extends TerminalSession implements ISftpSession {
private final TerminalConfig config;
private final SessionStore sessionStore;
private SftpExecutor executor;
public SftpSession(String sessionId,
WebSocketSession channel,
SessionStore sessionStore,
TerminalConfig config) {
super(sessionId, channel);
this.sessionStore = sessionStore;
this.config = config;
}
@Override
public void connect() {
// 打开 shell
this.executor = sessionStore.getSftpExecutor(config.getFileNameCharset());
executor.connect();
}
@Override
public void keepAlive() {
try {
// 发送个信号 保证 socket 不自动关闭
executor.sendSignal(Const.EMPTY);
} catch (Exception e) {
log.error("sftp keep-alive error {}", sessionId, e);
}
}
@Override
protected void releaseResource() {
Streams.close(executor);
Streams.close(sessionStore);
}
}

View File

@@ -0,0 +1,148 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.ssh.shell.ShellExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.handler.host.terminal.constant.TerminalMessage;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalCloseResponse;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalOutputResponse;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* 终端 ssh 会话
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
*/
@Slf4j
public class SshSession extends TerminalSession implements ISshSession {
private final TerminalConfig config;
private final SessionStore sessionStore;
private ShellExecutor executor;
@Getter
private String lastLine;
public SshSession(String sessionId,
WebSocketSession channel,
SessionStore sessionStore,
TerminalConfig config) {
super(sessionId, channel);
this.sessionStore = sessionStore;
this.config = config;
}
@Override
public void connect(String terminalType, int cols, int rows) {
config.setCols(cols);
config.setRows(rows);
// 打开 shell
this.executor = sessionStore.getShellExecutor();
executor.size(cols, rows);
executor.terminalType(terminalType);
executor.streamHandler(this::streamHandler);
executor.callback(this::eofCallback);
executor.connect();
// 开始监听输出
AssetThreadPools.TERMINAL_SCHEDULER.execute(executor);
}
@Override
public void resize(int cols, int rows) {
// FIXME 没啥用就删了
// if (!executor.isConnected()) {
// executor.connect();
// }
// 大小发生变化 则修改大小
if (cols != config.getCols() ||
rows != config.getRows()) {
config.setCols(cols);
config.setRows(rows);
executor.size(cols, rows);
executor.resize();
}
}
@Override
public void write(String b) {
executor.write(b);
}
@Override
public void write(byte[] b) {
executor.write(b);
}
@Override
public void keepAlive() {
try {
// 发送个信号 保证 socket 不自动关闭
executor.sendSignal(Const.EMPTY);
} catch (Exception e) {
log.error("ssh keep-alive error {}", sessionId, e);
}
}
@Override
protected void releaseResource() {
Streams.close(executor);
Streams.close(sessionStore);
}
/**
* 标准输出处理
*
* @param inputStream stream
*/
private void streamHandler(InputStream inputStream) {
byte[] bs = new byte[Const.BUFFER_KB_4];
BufferedInputStream in = new BufferedInputStream(inputStream, Const.BUFFER_KB_4);
int read;
try {
while (channel.isOpen() && (read = in.read(bs)) != -1) {
String body = lastLine = new String(bs, 0, read, config.getCharset());
// 响应
TerminalOutputResponse resp = TerminalOutputResponse.builder()
.type(OutputTypeEnum.OUTPUT.getType())
.sessionId(sessionId)
.body(body)
.build();
WebSockets.sendText(channel, OutputTypeEnum.OUTPUT.format(resp));
}
} catch (IOException ex) {
log.error("terminal 读取流失败", ex);
}
}
/**
* eof 回调
*/
private void eofCallback() {
log.info("terminal eof回调 {}, forClose: {}", sessionId, this.close);
// 发送关闭信息
TerminalCloseResponse resp = TerminalCloseResponse.builder()
.type(OutputTypeEnum.CLOSE.getType())
.sessionId(this.sessionId)
.msg(TerminalMessage.CLOSED_CONNECTION)
.build();
WebSockets.sendText(channel, OutputTypeEnum.CLOSE.format(resp));
// 需要调用关闭 - 可能是 logout 需要手动触发
this.close();
}
}

View File

@@ -1,114 +1,38 @@
package com.orion.ops.module.asset.handler.host.terminal.session;
import com.orion.lang.utils.io.Streams;
import com.orion.net.host.SessionStore;
import com.orion.net.host.ssh.TerminalType;
import com.orion.net.host.ssh.shell.ShellExecutor;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.asset.define.AssetThreadPools;
import com.orion.ops.module.asset.enums.HostConnectStatusEnum;
import com.orion.ops.module.asset.handler.host.terminal.constant.TerminalMessage;
import com.orion.ops.module.asset.handler.host.terminal.enums.OutputTypeEnum;
import com.orion.ops.module.asset.handler.host.terminal.model.TerminalConfig;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalCloseResponse;
import com.orion.ops.module.asset.handler.host.terminal.model.response.TerminalOutputResponse;
import com.orion.ops.module.asset.service.HostConnectLogService;
import com.orion.spring.SpringHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* 终端会话
* 终端会话基类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/1/2 17:28
* @since 2024/2/4 16:51
*/
@Slf4j
public class TerminalSession implements ITerminalSession {
public abstract class TerminalSession implements ITerminalSession {
@Getter
private final String sessionId;
protected final String sessionId;
@Getter
private final WebSocketSession channel;
protected final WebSocketSession channel;
private final TerminalConfig config;
protected volatile boolean close;
private final SessionStore sessionStore;
private ShellExecutor executor;
@Getter
private String lastLine;
private volatile boolean close;
public TerminalSession(String sessionId,
WebSocketSession channel,
SessionStore sessionStore,
TerminalConfig config) {
public TerminalSession(String sessionId, WebSocketSession channel) {
this.sessionId = sessionId;
this.channel = channel;
this.sessionStore = sessionStore;
this.config = config;
}
@Override
public void connect(String terminalType, int cols, int rows) {
config.setCols(cols);
config.setRows(rows);
// 打开 shell
this.executor = sessionStore.getShellExecutor();
executor.size(cols, rows);
executor.terminalType(terminalType);
executor.streamHandler(this::streamHandler);
executor.callback(this::eofCallback);
executor.connect();
// 开始监听输出
AssetThreadPools.TERMINAL_SCHEDULER.execute(executor);
}
@Override
public void resize(int cols, int rows) {
if (!executor.isConnected()) {
executor.connect();
}
// 大小发生变化 则修改大小
if (cols != config.getCols() ||
rows != config.getRows()) {
config.setCols(cols);
config.setRows(rows);
executor.size(cols, rows);
executor.resize();
}
}
@Override
public void write(String b) {
executor.write(b);
}
@Override
public void write(byte[] b) {
executor.write(b);
}
@Override
public void keepAlive() {
try {
// 发送个信号 保证 socket 不自动关闭
executor.sendSignal(Const.EMPTY);
} catch (Exception e) {
log.error("terminal keep-alive error {}", sessionId, e);
}
}
/**
* 释放资源
*/
protected abstract void releaseResource();
@Override
public void close() {
@@ -117,56 +41,19 @@ public class TerminalSession implements ITerminalSession {
return;
}
this.close = true;
// 关闭流
// 释放资源
try {
Streams.close(executor);
Streams.close(sessionStore);
this.releaseResource();
} catch (Exception e) {
log.error("terminal 断开连接失败 {}", sessionId, e);
log.error("terminal release error {}", sessionId, e);
}
// 修改状态
SpringHolder.getBean(HostConnectLogService.class).updateStatusByToken(sessionId, HostConnectStatusEnum.COMPLETE);
}
/**
* 标准输出处理
*
* @param inputStream stream
*/
private void streamHandler(InputStream inputStream) {
byte[] bs = new byte[Const.BUFFER_KB_4];
BufferedInputStream in = new BufferedInputStream(inputStream, Const.BUFFER_KB_4);
int read;
try {
while (channel.isOpen() && (read = in.read(bs)) != -1) {
String body = lastLine = new String(bs, 0, read, config.getCharset());
// 响应
TerminalOutputResponse resp = TerminalOutputResponse.builder()
.type(OutputTypeEnum.OUTPUT.getType())
.sessionId(sessionId)
.body(body)
.build();
WebSockets.sendText(channel, OutputTypeEnum.OUTPUT.format(resp));
}
} catch (IOException ex) {
log.error("terminal 读取流失败", ex);
}
}
/**
* eof 回调
*/
private void eofCallback() {
log.info("terminal eof回调 {}, forClose: {}", sessionId, this.close);
// 发送关闭信息
TerminalCloseResponse resp = TerminalCloseResponse.builder()
.type(OutputTypeEnum.CLOSE.getType())
.sessionId(this.sessionId)
.msg(TerminalMessage.CLOSED_CONNECTION)
.build();
WebSockets.sendText(channel, OutputTypeEnum.CLOSE.format(resp));
// 需要调用关闭 - 可能是 logout 需要手动触发
this.close();
@Override
public String getChannelId() {
return channel.getId();
}
}

View File

@@ -5,6 +5,7 @@ import com.orion.net.host.SessionStore;
import com.orion.ops.module.asset.entity.domain.HostDO;
import com.orion.ops.module.asset.entity.dto.HostTerminalAccessDTO;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConnectTypeEnum;
/**
* 主机终端服务
@@ -42,18 +43,20 @@ public interface HostTerminalService {
*
* @param hostId hostId
* @param userId userId
* @param type type
* @return session
*/
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId);
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type);
/**
* 使用用户配置获取连接信息
*
* @param host host
* @param userId userId
* @param type type
* @return session
*/
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host);
HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type);
/**
* 使用默认配置打开主机会话

View File

@@ -24,10 +24,7 @@ import com.orion.ops.module.asset.entity.domain.HostIdentityDO;
import com.orion.ops.module.asset.entity.domain.HostKeyDO;
import com.orion.ops.module.asset.entity.dto.HostTerminalAccessDTO;
import com.orion.ops.module.asset.entity.dto.HostTerminalConnectDTO;
import com.orion.ops.module.asset.enums.HostConfigTypeEnum;
import com.orion.ops.module.asset.enums.HostExtraItemEnum;
import com.orion.ops.module.asset.enums.HostExtraSshAuthTypeEnum;
import com.orion.ops.module.asset.enums.HostSshAuthTypeEnum;
import com.orion.ops.module.asset.enums.*;
import com.orion.ops.module.asset.handler.host.config.model.HostSshConfigModel;
import com.orion.ops.module.asset.handler.host.extra.model.HostSshExtraModel;
import com.orion.ops.module.asset.service.HostConfigService;
@@ -121,15 +118,15 @@ public class HostTerminalServiceImpl implements HostTerminalService {
}
@Override
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId) {
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, Long hostId, HostConnectTypeEnum type) {
// 查询主机
HostDO host = hostDAO.selectById(hostId);
Valid.notNull(host, ErrorMessage.HOST_ABSENT);
return this.getTerminalConnectInfo(userId, host);
return this.getTerminalConnectInfo(userId, host, type);
}
@Override
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host) {
public HostTerminalConnectDTO getTerminalConnectInfo(Long userId, HostDO host, HostConnectTypeEnum type) {
Long hostId = host.getId();
log.info("HostConnectService.getTerminalConnectInfo hostId: {}, userId: {}", hostId, userId);
// 查询用户
@@ -164,7 +161,9 @@ public class HostTerminalServiceImpl implements HostTerminalService {
}
}
// 获取连接配置
return this.getHostConnectInfo(host, config, extra);
HostTerminalConnectDTO connectInfo = this.getHostConnectInfo(host, config, extra);
connectInfo.setConnectType(type.name());
return connectInfo;
}
@Override

View File

@@ -18,7 +18,6 @@ public class TerminalPreferenceStrategy implements IPreferenceStrategy<TerminalP
@Override
public TerminalPreferenceModel getDefault() {
// ...快捷键 ...背景
// 默认显示设置
String defaultDisplaySetting = TerminalPreferenceModel.DisplaySettingModel
.builder()

View File

@@ -53,7 +53,7 @@ public interface MineService {
/**
* 获取当前用户会话列表
*
* @return 话列表
* @return 话列表
*/
List<UserSessionVO> getCurrentUserSessionList();

View File

@@ -18,7 +18,7 @@ public interface SystemUserManagementService {
* 获取用户会话列表
*
* @param userId userId
* @return 话列表
* @return 话列表
*/
List<UserSessionVO> getUserSessionList(Long userId);

View File

@@ -45,7 +45,8 @@ export default class TerminalSessionManager implements ITerminalSessionManager {
// 发送会话初始化请求
this.channel.send(InputProtocol.CHECK, {
sessionId,
hostId
hostId,
connectType: 'SSH'
});
return session;
}

View File

@@ -3,7 +3,7 @@ export const InputProtocol = {
// 主机连接检查
CHECK: {
type: 'ck',
template: ['type', 'sessionId', 'hostId']
template: ['type', 'sessionId', 'hostId', 'connectType']
},
// 连接主机
CONNECT: {
@@ -25,11 +25,6 @@ export const InputProtocol = {
type: 'rs',
template: ['type', 'sessionId', 'cols', 'rows']
},
// 执行
EXEC: {
type: 'e',
template: ['type', 'sessionId', 'command']
},
// 输入
INPUT: {
type: 'i',