重写复现方法
This commit is contained in:
412
src/main/java/com/mini/capi/sys/service/DbService.java
Normal file
412
src/main/java/com/mini/capi/sys/service/DbService.java
Normal file
@@ -0,0 +1,412 @@
|
||||
package com.mini.capi.sys.service;
|
||||
|
||||
import com.mini.capi.biz.domain.DbConfig;
|
||||
import com.mini.capi.biz.domain.SyncTask;
|
||||
import com.mini.capi.biz.domain.SyncTaskLog;
|
||||
import com.mini.capi.biz.service.DbConfigService;
|
||||
import com.mini.capi.biz.service.SyncTaskLogService;
|
||||
import com.mini.capi.biz.service.SyncTaskService;
|
||||
import com.mini.capi.config.DataSourceConfig;
|
||||
import com.mini.capi.model.ApiResult;
|
||||
import com.mini.capi.model.TabResult;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.sql.*;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@Service
|
||||
public class DbService {
|
||||
|
||||
@Resource
|
||||
private SyncTaskService syncTaskService;
|
||||
|
||||
@Resource
|
||||
private DbConfigService dbConfigService;
|
||||
|
||||
@Resource
|
||||
private SyncTaskLogService taskLogService;
|
||||
|
||||
@Resource
|
||||
private Executor hostExecutor;
|
||||
|
||||
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final String dsValue = LocalDate.now().format(DATE_FORMATTER);
|
||||
|
||||
|
||||
public ApiResult<List<TabResult>> listSourceTables(String dbId) {
|
||||
// 1. 校验dbId参数
|
||||
if (dbId == null || dbId.trim().isEmpty()) {
|
||||
return ApiResult.error(400, "数据库ID不能为空");
|
||||
}
|
||||
// 2. 查询数据库配置并校验
|
||||
DbConfig dbConfig = dbConfigService.getById(dbId);
|
||||
if (dbConfig == null) {
|
||||
return ApiResult.error(404, "未找到ID为[" + dbId + "]的数据库配置");
|
||||
}
|
||||
try {
|
||||
JdbcTemplate jdbcTemplate = DataSourceConfig.createJdbcTemplate(dbConfig);
|
||||
// 补充参数传递
|
||||
String querySql = "SELECT TABLE_NAME,TABLE_COMMENT FROM information_schema.tables WHERE TABLE_SCHEMA = ?";
|
||||
List<Map<String, Object>> result = jdbcTemplate.queryForList(querySql, dbConfig.getDbName());
|
||||
List<TabResult> data = result.stream()
|
||||
.map(row -> {
|
||||
String tableName = row.get("TABLE_NAME") != null ? row.get("TABLE_NAME").toString() : "";
|
||||
String tableDesc = row.get("TABLE_COMMENT") != null ? row.get("TABLE_COMMENT").toString() : "";
|
||||
return new TabResult(tableName, getComment(tableName, tableDesc));
|
||||
})
|
||||
.sorted(Comparator.comparing(TabResult::getTableName)) // 按表名排序
|
||||
.toList();
|
||||
return ApiResult.success(data);
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getComment(String tableName, String tableDesc) {
|
||||
boolean hasTableDesc = tableDesc != null && !tableDesc.trim().isEmpty();
|
||||
// 根据表描述是否存在返回不同格式
|
||||
if (hasTableDesc) {
|
||||
return String.format("%s(%s)", tableDesc.trim(), tableName);
|
||||
} else {
|
||||
return tableName;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 运行全部任务
|
||||
*/
|
||||
public ApiResult<?> jobSyncAllTask() {
|
||||
List<SyncTask> syncTasks = syncTaskService.list();
|
||||
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
|
||||
List<String> errorMessages = new ArrayList<>();
|
||||
for (SyncTask task : syncTasks) {
|
||||
// 提交任务到线程池(异步执行,接口不等待)
|
||||
execSyncTask(task, errorMessages);
|
||||
}
|
||||
// 接口立即返回,不等待任务执行完成
|
||||
return ApiResult.success();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 运行单个任务
|
||||
*/
|
||||
public ApiResult<?> jobSyncOneTask( String taskId) {
|
||||
try {
|
||||
SyncTask task = syncTaskService.getById(taskId);
|
||||
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
|
||||
List<String> errorMessages = new ArrayList<>();
|
||||
execSyncTask(task, errorMessages);
|
||||
return ApiResult.success();
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void execSyncTask(SyncTask task, List<String> errorMessages) {
|
||||
hostExecutor.execute(() -> {
|
||||
try {
|
||||
// 1. 获取源/目标数据库配置
|
||||
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
|
||||
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
|
||||
// 2. 创建对应数据库的JdbcTemplate
|
||||
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
|
||||
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
|
||||
// 3. 执行表同步逻辑(异步执行)
|
||||
syncTableData(task, sourceJdbc, targetJdbc);
|
||||
} catch (Exception e) {
|
||||
// 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口)
|
||||
String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage();
|
||||
System.err.println(errorMsg);
|
||||
// 加锁保证线程安全(多线程操作同一List)
|
||||
synchronized (errorMessages) {
|
||||
errorMessages.add(errorMsg);
|
||||
}
|
||||
}
|
||||
});
|
||||
System.out.println(errorMessages);
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步表数据
|
||||
*/
|
||||
public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) {
|
||||
try {
|
||||
// 确保源表和目标表名转为小写
|
||||
String sourceTable = task.getSourceTable();
|
||||
String targetTable = task.getTargetTable().toLowerCase();
|
||||
// 1. 检查并创建目标表
|
||||
if (!tableExists(targetJdbc, targetTable)) {
|
||||
createTargetTable(sourceJdbc, targetJdbc, sourceTable, targetTable);
|
||||
}
|
||||
// 2. 清空目标表当前ds值的数据
|
||||
clearTargetTableData(targetJdbc, targetTable);
|
||||
// 3. 全量同步数据
|
||||
syncAllData(task, sourceJdbc, targetJdbc, sourceTable, targetTable);
|
||||
} catch (Exception e) {
|
||||
System.err.println("表同步失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查目标表是否存在
|
||||
*/
|
||||
private boolean tableExists(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
|
||||
// 表名已转为小写,直接使用
|
||||
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
|
||||
DatabaseMetaData metaData = connection.getMetaData();
|
||||
try (ResultSet rs = metaData.getTables(
|
||||
null, null, tableName, new String[]{"TABLE"})) {
|
||||
return rs.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建目标表结构(复制源表结构并添加ds字段)
|
||||
*/
|
||||
private void createTargetTable(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
|
||||
String sourceTable, String targetTable) throws SQLException {
|
||||
// 获取源表字段定义
|
||||
List<String> columnDefinitions = getColumnDefinitions(sourceJdbc, sourceTable);
|
||||
// 构建创建表SQL (PostgresSQL语法)
|
||||
StringBuilder createSql = new StringBuilder("CREATE TABLE ")
|
||||
.append(targetTable)
|
||||
.append(" (");
|
||||
// 添加原有字段
|
||||
for (int i = 0; i < columnDefinitions.size(); i++) {
|
||||
createSql.append(columnDefinitions.get(i));
|
||||
// 最后一个字段后不加逗号
|
||||
if (i < columnDefinitions.size() - 1) {
|
||||
createSql.append(", ");
|
||||
}
|
||||
}
|
||||
// 添加ds字段(如果有其他字段,需要加逗号分隔)
|
||||
if (!columnDefinitions.isEmpty()) {
|
||||
createSql.append(", ");
|
||||
}
|
||||
createSql.append("ds VARCHAR(20) NOT NULL)");
|
||||
// 执行创建表SQL
|
||||
targetJdbc.execute(createSql.toString());
|
||||
System.out.println("已创建目标表: " + targetTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表字段列表
|
||||
*/
|
||||
private List<String> getTableColumns(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
|
||||
List<String> columns = new ArrayList<>();
|
||||
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
|
||||
DatabaseMetaData metaData = connection.getMetaData();
|
||||
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
|
||||
while (rs.next()) {
|
||||
// 字段名转为小写
|
||||
columns.add(rs.getString("COLUMN_NAME").toLowerCase());
|
||||
}
|
||||
}
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取PostgresSQL兼容的字段定义
|
||||
*/
|
||||
private List<String> getColumnDefinitions(JdbcTemplate sourceJdbc, String tableName) throws SQLException {
|
||||
List<String> definitions = new ArrayList<>();
|
||||
try (Connection connection = sourceJdbc.getDataSource().getConnection()) {
|
||||
DatabaseMetaData metaData = connection.getMetaData();
|
||||
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
|
||||
while (rs.next()) {
|
||||
// 字段名转为小写
|
||||
String columnName = rs.getString("COLUMN_NAME").toLowerCase();
|
||||
String typeName = rs.getString("TYPE_NAME").toUpperCase();
|
||||
int columnSize = rs.getInt("COLUMN_SIZE");
|
||||
int decimalDigits = rs.getInt("DECIMAL_DIGITS");
|
||||
int nullable = rs.getInt("NULLABLE");
|
||||
// MySQL到PostgresSQL类型映射
|
||||
String pgType = mapMySqlTypeToPgType(typeName, columnSize, decimalDigits);
|
||||
// 构建字段定义
|
||||
StringBuilder colDef = new StringBuilder();
|
||||
colDef.append(columnName).append(" ").append(pgType);
|
||||
if (nullable == DatabaseMetaData.columnNoNulls) {
|
||||
colDef.append(" NOT NULL");
|
||||
}
|
||||
definitions.add(colDef.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return definitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL到PostgresSQL数据类型映射
|
||||
*/
|
||||
private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) {
|
||||
// 统一转为大写处理,避免类型字符串大小写问题
|
||||
String type = mySqlType.toUpperCase();
|
||||
return switch (type) {
|
||||
// 整数类型映射
|
||||
case "INT", "INTEGER" -> "INTEGER";
|
||||
case "TINYINT" -> columnSize == 1 ? "BOOLEAN" : "SMALLINT"; // TINYINT(1)通常表示布尔值
|
||||
case "SMALLINT" -> "SMALLINT";
|
||||
case "MEDIUMINT" -> "INTEGER"; // PostgresSQL无MEDIUMINT,用INTEGER兼容
|
||||
case "BIGINT" -> "BIGINT";
|
||||
// 浮点类型映射
|
||||
case "FLOAT" -> columnSize > 24 ? "DOUBLE PRECISION" : "REAL"; // FLOAT(24)以下映射为REAL
|
||||
case "DOUBLE", "DOUBLE PRECISION" -> "DOUBLE PRECISION";
|
||||
case "DECIMAL", "NUMERIC" -> {
|
||||
int precision = columnSize > 0 ? columnSize : 10;
|
||||
int scale = Math.max(decimalDigits, 0);
|
||||
yield "NUMERIC(" + precision + "," + scale + ")";
|
||||
}
|
||||
// 字符串类型映射
|
||||
case "VARCHAR" -> {
|
||||
// PostgresSQL VARCHAR无长度限制时建议用TEXT
|
||||
int length = Math.max(columnSize, 0);
|
||||
yield length > 0 ? "VARCHAR(" + length + ")" : "TEXT";
|
||||
}
|
||||
case "CHAR" -> "CHAR(" + (columnSize > 0 ? columnSize : 1) + ")";
|
||||
case "TEXT", "MEDIUMTEXT", "TINYTEXT" -> "TEXT";
|
||||
case "LONGTEXT" -> "TEXT"; // PostgresSQL TEXT无长度限制
|
||||
// 二进制类型映射
|
||||
case "BLOB" -> "BYTEA";
|
||||
case "TINYBLOB", "MEDIUMBLOB", "LONGBLOB" -> "BYTEA";
|
||||
case "BINARY" -> "BYTEA";
|
||||
case "VARBINARY" -> "BYTEA";
|
||||
// 日期时间类型映射
|
||||
case "DATE" -> "DATE";
|
||||
case "TIME" -> "TIME";
|
||||
case "DATETIME", "TIMESTAMP" -> "TIMESTAMP";
|
||||
case "YEAR" -> "SMALLINT"; // YEAR用SMALLINT存储更高效
|
||||
// 特殊类型映射
|
||||
case "BOOLEAN" -> "BOOLEAN";
|
||||
case "JSON", "JSONB" -> "JSONB"; // PostgresSQL推荐用JSONB
|
||||
case "ENUM" -> "VARCHAR(255)"; // ENUM转为字符串存储,需业务层保证合法性
|
||||
case "SET" -> "TEXT"; // SET用TEXT存储,逗号分隔
|
||||
// 几何类型(简化映射)
|
||||
case "POINT" -> "POINT";
|
||||
case "LINESTRING" -> "LINESTRING";
|
||||
case "POLYGON" -> "POLYGON";
|
||||
// 未匹配类型的默认处理
|
||||
default -> {
|
||||
// 日志输出未匹配的类型,便于后续优化
|
||||
System.err.println("未处理的MySQL类型: " + mySqlType);
|
||||
yield "TEXT"; // 用TEXT兼容大多数未明确映射的类型
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空目标表中当前ds值的数据
|
||||
*/
|
||||
private void clearTargetTableData(JdbcTemplate targetJdbc, String targetTable) {
|
||||
String sql = "DELETE FROM " + targetTable + " WHERE ds = ?";
|
||||
targetJdbc.update(sql, dsValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量同步数据
|
||||
*/
|
||||
private void syncAllData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
|
||||
String sourceTable, String targetTable) {
|
||||
LocalDateTime startTime = LocalDateTime.now();
|
||||
int totalRows = 0;
|
||||
int successRows = 0;
|
||||
int failRows = 0;
|
||||
String ustatus = "1";
|
||||
String errorMsg = null;
|
||||
try {
|
||||
// 1. 获取源表所有数据
|
||||
String selectSql = "SELECT * FROM " + sourceTable;
|
||||
List<Map<String, Object>> dataList = sourceJdbc.queryForList(selectSql);
|
||||
totalRows = dataList.size();
|
||||
if (dataList.isEmpty()) {
|
||||
System.out.println("源表 " + sourceTable + " 没有数据需要同步");
|
||||
return;
|
||||
}
|
||||
// 2. 构建插入SQL
|
||||
List<String> columns = getTableColumnsWithoutException(sourceJdbc, sourceTable);
|
||||
String insertSql = buildInsertSql(targetTable, columns);
|
||||
// 3. 批量插入数据
|
||||
int[] batchResult = targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
|
||||
@Override
|
||||
public void setValues(PreparedStatement ps, int i) throws SQLException {
|
||||
Map<String, Object> row = dataList.get(i);
|
||||
int paramIndex = 1;
|
||||
// 设置原有字段值
|
||||
for (String column : columns) {
|
||||
ps.setObject(paramIndex++, row.get(column));
|
||||
}
|
||||
// 设置ds字段值
|
||||
ps.setString(paramIndex, dsValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBatchSize() {
|
||||
return dataList.size();
|
||||
}
|
||||
});
|
||||
// 统计成功/失败行数
|
||||
successRows = (int) Arrays.stream(batchResult).filter(row -> row >= 0).count();
|
||||
failRows = totalRows - successRows;
|
||||
} catch (Exception e) {
|
||||
ustatus = "2";
|
||||
errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
|
||||
System.err.println("同步数据失败: " + errorMsg);
|
||||
}
|
||||
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
|
||||
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
|
||||
LocalDateTime endTime = LocalDateTime.now();
|
||||
SyncTaskLog taskLog = new SyncTaskLog(task.getTaskId(), task.getTaskName(), task.getSourceDbId(),
|
||||
sourceDbConfig.getDbName(), sourceTable, task.getTargetDbId(), targetDbConfig.getDbName(), targetTable,
|
||||
startTime, endTime, (long) totalRows, (long) successRows, (long) failRows, ustatus,
|
||||
errorMsg, (int) Duration.between(startTime, endTime).getSeconds(), "0");
|
||||
task.setLastSyncTime(endTime);
|
||||
taskLogService.save(taskLog);
|
||||
syncTaskService.updateById(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建插入SQL语句
|
||||
*/
|
||||
private String buildInsertSql(String targetTable, List<String> columns) {
|
||||
StringBuilder sql = new StringBuilder();
|
||||
sql.append("INSERT INTO ").append(targetTable).append(" (");
|
||||
// 添加原有字段
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
sql.append(columns.get(i));
|
||||
if (i < columns.size() - 1) {
|
||||
sql.append(", ");
|
||||
}
|
||||
}
|
||||
// 添加ds字段
|
||||
sql.append(", ds) VALUES (");
|
||||
// 添加参数占位符
|
||||
sql.append("?, ".repeat(columns.size()));
|
||||
sql.append("?)");
|
||||
return sql.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 无异常获取表字段(工具方法)
|
||||
*/
|
||||
private List<String> getTableColumnsWithoutException(JdbcTemplate jdbcTemplate, String tableName) {
|
||||
try {
|
||||
return getTableColumns(jdbcTemplate, tableName);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("获取表字段失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
117
src/main/java/com/mini/capi/sys/service/DockerService.java
Normal file
117
src/main/java/com/mini/capi/sys/service/DockerService.java
Normal file
@@ -0,0 +1,117 @@
|
||||
package com.mini.capi.sys.service;
|
||||
|
||||
import com.mini.capi.biz.domain.*;
|
||||
import com.mini.capi.biz.service.*;
|
||||
import com.mini.capi.model.ApiResult;
|
||||
import com.mini.capi.utils.HostInfo;
|
||||
import com.mini.capi.utils.vDate;
|
||||
import com.mini.capi.utils.vId;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class DockerService {
|
||||
|
||||
|
||||
|
||||
@Resource
|
||||
private SshInfoService sshInfoService;
|
||||
|
||||
@Resource
|
||||
private SshUserService sshUserService;
|
||||
|
||||
@Resource
|
||||
private DockerHostService dockerHostService;
|
||||
|
||||
@Resource
|
||||
private DiskMountService diskMountService;
|
||||
|
||||
@Resource
|
||||
private SysHostService sysHostService;
|
||||
|
||||
|
||||
@Resource
|
||||
private Executor hostExecutor;
|
||||
|
||||
|
||||
/**
|
||||
* 获取容器主机的磁盘使用情况
|
||||
*/
|
||||
public ApiResult<?> jobHostDisk() {
|
||||
try {
|
||||
List<DockerHost> dockerHosts = dockerHostService.list();
|
||||
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
||||
// 并行处理所有宿主机
|
||||
CompletableFuture<?>[] futures = dockerHosts.stream()
|
||||
.map(host -> CompletableFuture.runAsync(() -> handleSingleHost(host, errorList), hostExecutor))
|
||||
.toArray(CompletableFuture[]::new);
|
||||
// 等待全部完成
|
||||
CompletableFuture.allOf(futures).join();
|
||||
return errorList.isEmpty()
|
||||
? ApiResult.success()
|
||||
: ApiResult.error();
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void handleSingleHost(DockerHost host, List<String> errorList) {
|
||||
try {
|
||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||
/* 1. 采集实时数据 */
|
||||
HostInfo.Result r = HostInfo.collect(
|
||||
sshInfo.getHostIp(),
|
||||
Integer.parseInt(sshInfo.getHostPort()),
|
||||
sshUser.getCUsername(),
|
||||
sshUser.getCPassword());
|
||||
/* 2. 主机维度 saveOrUpdate */
|
||||
SysHost sysHost = r.host;
|
||||
sysHost.setSysHostId(host.getHostId());
|
||||
sysHost.setUpdateTime(vDate.getNow());
|
||||
sysHost.setDokerHostId(host.getDokerHostId());
|
||||
sysHostService.saveOrUpdate(sysHost);
|
||||
/* 3. 处理磁盘:先查库做索引,再比对 */
|
||||
List<DiskMount> dbDisks = diskMountService.lambdaQuery()
|
||||
.eq(DiskMount::getSysHostId, host.getHostId())
|
||||
.list();
|
||||
Map<String, DiskMount> dbDiskMap = dbDisks.stream()
|
||||
.collect(Collectors.toMap(DiskMount::getMountPoint, Function.identity()));
|
||||
|
||||
List<DiskMount> toSaveOrUpdate = new ArrayList<>();
|
||||
Set<String> liveMountPoint = new HashSet<>();
|
||||
for (DiskMount d : r.disks) {
|
||||
liveMountPoint.add(d.getMountPoint());
|
||||
DiskMount exist = dbDiskMap.get(d.getMountPoint());
|
||||
if (exist != null) {
|
||||
d.setDiskMountId(exist.getDiskMountId());
|
||||
} else {
|
||||
d.setDiskMountId(vId.getUid());
|
||||
}
|
||||
d.setSysHostId(host.getHostId());
|
||||
d.setUpdateTime(vDate.getNow());
|
||||
toSaveOrUpdate.add(d);
|
||||
}
|
||||
/* 4. 批量保存/更新 */
|
||||
diskMountService.saveOrUpdateBatch(toSaveOrUpdate);
|
||||
/* 5. 删除实时已消失的盘 */
|
||||
List<String> delIds = dbDisks.stream()
|
||||
.filter(d -> !liveMountPoint.contains(d.getMountPoint()))
|
||||
.map(DiskMount::getDiskMountId)
|
||||
.collect(Collectors.toList());
|
||||
if (!delIds.isEmpty()) {
|
||||
diskMountService.removeByIds(delIds);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 仅记录异常,不中断其它任务
|
||||
errorList.add(String.format("hostId=%s, error=%s", host.getHostId(), e.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
345
src/main/java/com/mini/capi/sys/service/HostService.java
Normal file
345
src/main/java/com/mini/capi/sys/service/HostService.java
Normal file
@@ -0,0 +1,345 @@
|
||||
package com.mini.capi.sys.service;
|
||||
|
||||
import com.mini.capi.biz.domain.DockerContainerInfo;
|
||||
import com.mini.capi.biz.domain.DockerHost;
|
||||
import com.mini.capi.biz.domain.SshInfo;
|
||||
import com.mini.capi.biz.domain.SshUser;
|
||||
import com.mini.capi.biz.service.DockerContainerInfoService;
|
||||
import com.mini.capi.biz.service.DockerHostService;
|
||||
import com.mini.capi.biz.service.SshInfoService;
|
||||
import com.mini.capi.biz.service.SshUserService;
|
||||
import com.mini.capi.model.ApiResult;
|
||||
import com.mini.capi.utils.HostRuntime;
|
||||
import com.mini.capi.utils.docker;
|
||||
import com.mini.capi.utils.vDate;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class HostService {
|
||||
|
||||
|
||||
@Resource
|
||||
private SshInfoService sshInfoService;
|
||||
|
||||
@Resource
|
||||
private SshUserService sshUserService;
|
||||
|
||||
@Resource
|
||||
private DockerHostService dockerHostService;
|
||||
@Resource
|
||||
private DockerContainerInfoService dockerInfoService;
|
||||
|
||||
|
||||
@Resource
|
||||
private Executor hostExecutor;
|
||||
|
||||
|
||||
public static class SnapshotDTO {
|
||||
public String hostName;
|
||||
public String timestamp;
|
||||
public String cpuUsage;
|
||||
public String memTotal;
|
||||
public String memFree;
|
||||
public String memUsed;
|
||||
public String memUsage;
|
||||
public String swapTotal;
|
||||
public String swapUsed;
|
||||
public List<SnapshotDTO.DiskDTO> disks;
|
||||
public String netRxBytes;
|
||||
public String netTxBytes;
|
||||
public double load1;
|
||||
public int processCount;
|
||||
public String uptimeSec;
|
||||
|
||||
public static SnapshotDTO from(HostRuntime.Snapshot s) {
|
||||
SnapshotDTO dto = new SnapshotDTO();
|
||||
dto.hostName = s.hostName;
|
||||
dto.timestamp = Instant.ofEpochMilli(s.timestamp)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
||||
dto.cpuUsage = String.format("%.2f %%", s.cpuUsage * 100);
|
||||
dto.memTotal = humanBytes(s.memTotal);
|
||||
dto.memFree = humanBytes(s.memFree);
|
||||
dto.memUsed = humanBytes(s.memUsed);
|
||||
dto.memUsage = String.format("%.2f %%", s.memUsage * 100);
|
||||
dto.swapTotal = humanBytes(s.swapTotal);
|
||||
dto.swapUsed = humanBytes(s.swapUsed);
|
||||
dto.disks = s.disks.stream()
|
||||
.map(d -> new SnapshotDTO.DiskDTO(d.path,
|
||||
humanBytes(d.total),
|
||||
humanBytes(d.free),
|
||||
humanBytes(d.used),
|
||||
String.format("%.2f %%", d.usage * 100)))
|
||||
.collect(Collectors.toList());
|
||||
dto.netRxBytes = humanBytes(s.netRxBytes);
|
||||
dto.netTxBytes = humanBytes(s.netTxBytes);
|
||||
dto.load1 = s.load1;
|
||||
dto.processCount = s.processCount;
|
||||
dto.uptimeSec = uptimeToHuman(s.uptimeSec);
|
||||
return dto;
|
||||
}
|
||||
|
||||
private static String humanBytes(long bytes) {
|
||||
if (bytes < 1024) return bytes + " B";
|
||||
int exp = (int) (Math.log(bytes) / Math.log(1024));
|
||||
String pre = "KMGTPE".charAt(exp - 1) + "iB";
|
||||
return String.format("%.1f %s", bytes / Math.pow(1024, exp), pre);
|
||||
}
|
||||
|
||||
private static String uptimeToHuman(long sec) {
|
||||
long h = sec / 3600;
|
||||
long m = (sec % 3600) / 60;
|
||||
long s = sec % 60;
|
||||
if (h > 0) return String.format("%d 时 %d 分 %d 秒", h, m, s);
|
||||
if (m > 0) return String.format("%d 分 %d 秒", m, s);
|
||||
return String.format("%d 秒", s);
|
||||
}
|
||||
|
||||
public static class DiskDTO {
|
||||
public String path;
|
||||
public String total;
|
||||
public String free;
|
||||
public String used;
|
||||
public String usage;
|
||||
|
||||
public DiskDTO(String path, String total, String free, String used, String usage) {
|
||||
this.path = path;
|
||||
this.total = total;
|
||||
this.free = free;
|
||||
this.used = used;
|
||||
this.usage = usage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final int MAX_SIZE = 100;
|
||||
|
||||
|
||||
private String buildKey(String dockerHostId, String image, String containerName) {
|
||||
return dockerHostId + "|" + image + "|" + containerName;
|
||||
}
|
||||
|
||||
private void addOrEdit(String dockerHostId, Map<String, DockerContainerInfo> oldMap, SshInfo sshInfo, List<docker.DockerInfo> remoteList) {
|
||||
for (docker.DockerInfo d : remoteList) {
|
||||
String key = buildKey(dockerHostId, d.getImageName(), d.getNames());
|
||||
DockerContainerInfo entity = oldMap.get(key);
|
||||
if (entity == null) {
|
||||
entity = new DockerContainerInfo();
|
||||
entity.setDokerHostId(dockerHostId);
|
||||
entity.setContainerId(d.getContainerId());
|
||||
entity.setImageName(d.getImageName());
|
||||
entity.setCommand(d.getCommand());
|
||||
entity.setCreatedAt(d.getCreatedAt());
|
||||
entity.setUnames(d.getNames());
|
||||
entity.setUstatus(d.getStatus());
|
||||
entity.setHostIp(sshInfo.getHostIp());
|
||||
entity.setPorts(d.getPorts());
|
||||
entity.setGetTime(vDate.getNow());
|
||||
entity.setDokerHostId(dockerHostId);
|
||||
entity.setFTenantId("0");
|
||||
dockerInfoService.save(entity);
|
||||
} else {
|
||||
entity.setContainerId(d.getContainerId());
|
||||
entity.setImageName(d.getImageName());
|
||||
entity.setCommand(d.getCommand());
|
||||
entity.setCreatedAt(d.getCreatedAt());
|
||||
entity.setUnames(d.getNames());
|
||||
entity.setUstatus(d.getStatus());
|
||||
entity.setHostIp(sshInfo.getHostIp());
|
||||
entity.setPorts(d.getPorts());
|
||||
entity.setDokerHostId(dockerHostId);
|
||||
entity.setUpdateTime(vDate.getNow());
|
||||
dockerInfoService.updateById(entity);
|
||||
oldMap.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshContainerTable(String dockerHostId,
|
||||
List<docker.DockerInfo> remoteList,
|
||||
SshInfo sshInfo) {
|
||||
|
||||
/* 1. 旧数据 */
|
||||
List<DockerContainerInfo> oldList = dockerInfoService.lambdaQuery()
|
||||
.eq(DockerContainerInfo::getDokerHostId, dockerHostId)
|
||||
.list();
|
||||
Map<String, DockerContainerInfo> oldMap = oldList.stream()
|
||||
.collect(Collectors.toMap(
|
||||
c -> buildKey(dockerHostId, c.getImageName(), c.getUnames()),
|
||||
c -> c));
|
||||
|
||||
/* 2. 遍历远端 */
|
||||
addOrEdit(dockerHostId, oldMap, sshInfo, remoteList);
|
||||
|
||||
/* 3. 无效删除 */
|
||||
if (!oldMap.isEmpty()) {
|
||||
List<String> ids = new ArrayList<>();
|
||||
for (DockerContainerInfo e : oldMap.values()) {
|
||||
ids.add(e.getId());
|
||||
}
|
||||
dockerInfoService.removeByIds(ids);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateRunNum(DockerHost host, List<docker.DockerInfo> list) {
|
||||
long runningCnt = list.stream()
|
||||
.filter(r -> "1".equals(r.getStatus()))
|
||||
.count();
|
||||
host.setRunNum(runningCnt);
|
||||
host.setUpdateTime(vDate.getNow());
|
||||
dockerHostService.updateById(host);
|
||||
}
|
||||
|
||||
|
||||
private void handleSingleDockerHost(DockerHost host, List<String> errorList) {
|
||||
try {
|
||||
/* 1. 一次性加载现有数据 */
|
||||
List<DockerContainerInfo> oldList =
|
||||
dockerInfoService.lambdaQuery()
|
||||
.eq(DockerContainerInfo::getDokerHostId, host.getDokerHostId())
|
||||
.list();
|
||||
|
||||
Map<String, DockerContainerInfo> oldMap = oldList.stream()
|
||||
.collect(Collectors.toMap(
|
||||
c -> buildKey(c.getDokerHostId(), c.getImageName(), c.getUnames()),
|
||||
c -> c));
|
||||
|
||||
/* 2. 远程最新数据 */
|
||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||
List<docker.DockerInfo> remoteList =
|
||||
docker.getDockerInfo(sshInfo.getHostIp(),
|
||||
Long.valueOf(sshInfo.getHostPort()),
|
||||
sshUser.getCUsername(),
|
||||
sshUser.getCPassword());
|
||||
|
||||
/* 3. 新增或更新 */
|
||||
addOrEdit(host.getDokerHostId(), oldMap, sshInfo, remoteList);
|
||||
|
||||
/* 4. 删除已失效容器 */
|
||||
if (!oldMap.isEmpty()) {
|
||||
List<String> idsToDel = oldMap.values()
|
||||
.stream()
|
||||
.map(DockerContainerInfo::getId)
|
||||
.collect(Collectors.toList());
|
||||
dockerInfoService.removeByIds(idsToDel);
|
||||
}
|
||||
|
||||
/* 5. 更新运行数 */
|
||||
long runningCnt = remoteList.stream()
|
||||
.filter(r -> "1".equals(r.getStatus()))
|
||||
.count();
|
||||
host.setRunNum(runningCnt);
|
||||
host.setUpdateTime(vDate.getNow());
|
||||
dockerHostService.updateById(host);
|
||||
} catch (Exception e) {
|
||||
errorList.add(String.format("hostId=%s, error=%s",
|
||||
host.getDokerHostId(), e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public ApiResult<List<SnapshotDTO>> getApiInfo() {
|
||||
try {
|
||||
// 1. 新建一个一次性 List
|
||||
List<HostRuntime.Snapshot> snapshots =
|
||||
Collections.synchronizedList(new LinkedList<>());
|
||||
// 2. 采集并加入
|
||||
HostRuntime.Snapshot snap = HostRuntime.collect();
|
||||
snapshots.add(snap);
|
||||
// 3. 如果只想保留一条,直接清空多余
|
||||
while (snapshots.size() > MAX_SIZE) {
|
||||
((LinkedList<HostRuntime.Snapshot>) snapshots).removeFirst();
|
||||
}
|
||||
return ApiResult.success(Collections.singletonList(SnapshotDTO.from(snap)));
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取容器列表
|
||||
*/
|
||||
public ApiResult<?> getDockerInfo() {
|
||||
try {
|
||||
List<DockerHost> dockerHosts = dockerHostService.list();
|
||||
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
||||
CompletableFuture<?>[] futures = dockerHosts.stream()
|
||||
.map(host -> CompletableFuture.runAsync(
|
||||
() -> handleSingleDockerHost(host, errorList), hostExecutor))
|
||||
.toArray(CompletableFuture[]::new);
|
||||
// 等待全部执行完毕
|
||||
CompletableFuture.allOf(futures).join();
|
||||
return errorList.isEmpty()
|
||||
? ApiResult.success()
|
||||
: ApiResult.error();
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 启动容器
|
||||
*/
|
||||
public ApiResult<?> startDockerInfo(String id) {
|
||||
try {
|
||||
DockerContainerInfo cur = dockerInfoService.getById(id);
|
||||
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||
docker.startDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
||||
/* 2. 取回最新列表 */
|
||||
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
||||
sshInfo.getHostIp(),
|
||||
Long.valueOf(sshInfo.getHostPort()),
|
||||
sshUser.getCUsername(),
|
||||
sshUser.getCPassword());
|
||||
/* 3. 有则更新、无则插入、失效删除 */
|
||||
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
||||
/* 4. 更新主机运行数 */
|
||||
updateRunNum(host, remoteList);
|
||||
return ApiResult.success();
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 停止容器
|
||||
*/
|
||||
public ApiResult<?> stopDockerInfo(String id) {
|
||||
try {
|
||||
DockerContainerInfo cur = dockerInfoService.getById(id);
|
||||
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||
docker.stopDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
||||
/* 2. 取回最新列表 */
|
||||
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
||||
sshInfo.getHostIp(),
|
||||
Long.valueOf(sshInfo.getHostPort()),
|
||||
sshUser.getCUsername(),
|
||||
sshUser.getCPassword());
|
||||
/* 3. 有则更新、无则插入、失效删除 */
|
||||
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
||||
/* 4. 更新主机运行数 */
|
||||
updateRunNum(host, remoteList);
|
||||
return ApiResult.success();
|
||||
} catch (Exception e) {
|
||||
return ApiResult.error(101, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user