Files
c-api/src/main/java/com/mini/capi/sys/service/DbService.java
2025-09-22 23:43:05 +08:00

422 lines
18 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.mini.capi.sys.service;
import com.mini.capi.biz.domain.DbConfig;
import com.mini.capi.biz.domain.SyncTask;
import com.mini.capi.biz.domain.SyncTaskLog;
import com.mini.capi.biz.service.DbConfigService;
import com.mini.capi.biz.service.SyncTaskLogService;
import com.mini.capi.biz.service.SyncTaskService;
import com.mini.capi.config.DataSourceConfig;
import com.mini.capi.model.ApiResult;
import com.mini.capi.model.TabResult;
import com.mini.capi.utils.DateUtils;
import jakarta.annotation.Resource;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Executor;
@Service
public class DbService {
@Resource
private SyncTaskService syncTaskService;
@Resource
private DbConfigService dbConfigService;
@Resource
private SyncTaskLogService taskLogService;
@Resource
private Executor hostExecutor;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
public ApiResult<List<TabResult>> listSourceTables(String dbId) {
// 1. 校验dbId参数
if (dbId == null || dbId.trim().isEmpty()) {
return ApiResult.error(400, "数据库ID不能为空");
}
// 2. 查询数据库配置并校验
DbConfig dbConfig = dbConfigService.getById(dbId);
if (dbConfig == null) {
return ApiResult.error(404, "未找到ID为[" + dbId + "]的数据库配置");
}
try {
JdbcTemplate jdbcTemplate = DataSourceConfig.createJdbcTemplate(dbConfig);
// 补充参数传递
String querySql = "SELECT TABLE_NAME,TABLE_COMMENT FROM information_schema.tables WHERE TABLE_SCHEMA = ?";
List<Map<String, Object>> result = jdbcTemplate.queryForList(querySql, dbConfig.getDbName());
List<TabResult> data = result.stream()
.map(row -> {
String tableName = row.get("TABLE_NAME") != null ? row.get("TABLE_NAME").toString() : "";
String tableDesc = row.get("TABLE_COMMENT") != null ? row.get("TABLE_COMMENT").toString() : "";
return new TabResult(tableName, getComment(tableName, tableDesc));
})
.sorted(Comparator.comparing(TabResult::getTableName)) // 按表名排序
.toList();
return ApiResult.success(data);
} catch (Exception e) {
return ApiResult.error(101, e.getMessage());
}
}
private String getComment(String tableName, String tableDesc) {
boolean hasTableDesc = tableDesc != null && !tableDesc.trim().isEmpty();
// 根据表描述是否存在返回不同格式
if (hasTableDesc) {
return String.format("%s(%s)", tableDesc.trim(), tableName);
} else {
return tableName;
}
}
/**
* 运行全部任务
*/
public ApiResult<?> jobSyncAllTask() {
List<SyncTask> syncTasks = syncTaskService.list();
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
List<String> errorMessages = new ArrayList<>();
for (SyncTask task : syncTasks) {
// 提交任务到线程池(异步执行,接口不等待)
execSyncTask(task, errorMessages);
}
// 接口立即返回,不等待任务执行完成
return ApiResult.success();
}
/**
* 运行单个任务
*/
public ApiResult<?> jobSyncOneTask(String taskId) {
try {
SyncTask task = syncTaskService.getById(taskId);
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
List<String> errorMessages = new ArrayList<>();
execSyncTask(task, errorMessages);
return ApiResult.success();
} catch (Exception e) {
return ApiResult.error(101, e.getMessage());
}
}
private void execSyncTask(SyncTask task, List<String> errorMessages) {
hostExecutor.execute(() -> {
try {
// 1. 获取源/目标数据库配置
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
// 2. 创建对应数据库的JdbcTemplate
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
// 3. 执行表同步逻辑(异步执行)
syncTableData(task, sourceJdbc, targetJdbc);
} catch (Exception e) {
// 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口)
String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage();
System.err.println(errorMsg);
// 加锁保证线程安全多线程操作同一List
synchronized (errorMessages) {
errorMessages.add(errorMsg);
}
}
});
System.out.println(errorMessages);
}
/**
* 同步表数据
*/
public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) {
String dsValue = LocalDate.now().format(DATE_FORMATTER);
try {
// 确保源表和目标表名转为小写
String sourceTable = task.getSourceTable();
String targetTable = task.getTargetTable().toLowerCase();
// 1. 检查并创建目标表
if (!tableExists(targetJdbc, targetTable)) {
createTargetTable(sourceJdbc, targetJdbc, sourceTable, targetTable);
}
// 2. 清空目标表当前ds7天前值的数据
clearTargetTableLastWeekData(targetJdbc, targetTable);
// 2. 清空目标表当前ds值的数据
clearTargetTableData(targetJdbc, targetTable, dsValue);
// 3. 全量同步数据
syncAllData(task, sourceJdbc, targetJdbc, sourceTable, targetTable, dsValue);
} catch (Exception e) {
System.err.println("表同步失败: " + e.getMessage());
}
}
/**
* 检查目标表是否存在
*/
private boolean tableExists(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
// 表名已转为小写,直接使用
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getTables(
null, null, tableName, new String[]{"TABLE"})) {
return rs.next();
}
}
}
/**
* 创建目标表结构复制源表结构并添加ds字段
*/
private void createTargetTable(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
String sourceTable, String targetTable) throws SQLException {
// 获取源表字段定义
List<String> columnDefinitions = getColumnDefinitions(sourceJdbc, sourceTable);
// 构建创建表SQL (PostgresSQL语法)
StringBuilder createSql = new StringBuilder("CREATE TABLE ")
.append(targetTable)
.append(" (");
// 添加原有字段
for (int i = 0; i < columnDefinitions.size(); i++) {
createSql.append(columnDefinitions.get(i));
// 最后一个字段后不加逗号
if (i < columnDefinitions.size() - 1) {
createSql.append(", ");
}
}
// 添加ds字段如果有其他字段需要加逗号分隔
if (!columnDefinitions.isEmpty()) {
createSql.append(", ");
}
createSql.append("ds VARCHAR(20) NOT NULL)");
// 执行创建表SQL
targetJdbc.execute(createSql.toString());
System.out.println("已创建目标表: " + targetTable);
}
/**
* 获取表字段列表
*/
private List<String> getTableColumns(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
List<String> columns = new ArrayList<>();
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
while (rs.next()) {
// 字段名转为小写
columns.add(rs.getString("COLUMN_NAME").toLowerCase());
}
}
}
return columns;
}
/**
* 获取PostgresSQL兼容的字段定义
*/
private List<String> getColumnDefinitions(JdbcTemplate sourceJdbc, String tableName) throws SQLException {
List<String> definitions = new ArrayList<>();
try (Connection connection = sourceJdbc.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
while (rs.next()) {
// 字段名转为小写
String columnName = rs.getString("COLUMN_NAME").toLowerCase();
String typeName = rs.getString("TYPE_NAME").toUpperCase();
int columnSize = rs.getInt("COLUMN_SIZE");
int decimalDigits = rs.getInt("DECIMAL_DIGITS");
int nullable = rs.getInt("NULLABLE");
// MySQL到PostgresSQL类型映射
String pgType = mapMySqlTypeToPgType(typeName, columnSize, decimalDigits);
// 构建字段定义
StringBuilder colDef = new StringBuilder();
colDef.append(columnName).append(" ").append(pgType);
if (nullable == DatabaseMetaData.columnNoNulls) {
colDef.append(" NOT NULL");
}
definitions.add(colDef.toString());
}
}
}
return definitions;
}
/**
* MySQL到PostgresSQL数据类型映射
*/
private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) {
// 统一转为大写处理,避免类型字符串大小写问题
String type = mySqlType.toUpperCase();
return switch (type) {
// 整数类型映射
case "INT", "INTEGER" -> "INTEGER";
case "TINYINT" -> columnSize == 1 ? "BOOLEAN" : "SMALLINT"; // TINYINT(1)通常表示布尔值
case "SMALLINT" -> "SMALLINT";
case "MEDIUMINT" -> "INTEGER"; // PostgresSQL无MEDIUMINT用INTEGER兼容
case "BIGINT" -> "BIGINT";
// 浮点类型映射
case "FLOAT" -> columnSize > 24 ? "DOUBLE PRECISION" : "REAL"; // FLOAT(24)以下映射为REAL
case "DOUBLE", "DOUBLE PRECISION" -> "DOUBLE PRECISION";
case "DECIMAL", "NUMERIC" -> {
int precision = columnSize > 0 ? columnSize : 10;
int scale = Math.max(decimalDigits, 0);
yield "NUMERIC(" + precision + "," + scale + ")";
}
// 字符串类型映射
case "VARCHAR" -> {
// PostgresSQL VARCHAR无长度限制时建议用TEXT
int length = Math.max(columnSize, 0);
yield length > 0 ? "VARCHAR(" + length + ")" : "TEXT";
}
case "CHAR" -> "CHAR(" + (columnSize > 0 ? columnSize : 1) + ")";
case "TEXT", "MEDIUMTEXT", "TINYTEXT" -> "TEXT";
case "LONGTEXT" -> "TEXT"; // PostgresSQL TEXT无长度限制
// 二进制类型映射
case "BLOB" -> "BYTEA";
case "TINYBLOB", "MEDIUMBLOB", "LONGBLOB" -> "BYTEA";
case "BINARY" -> "BYTEA";
case "VARBINARY" -> "BYTEA";
// 日期时间类型映射
case "DATE" -> "DATE";
case "TIME" -> "TIME";
case "DATETIME", "TIMESTAMP" -> "TIMESTAMP";
case "YEAR" -> "SMALLINT"; // YEAR用SMALLINT存储更高效
// 特殊类型映射
case "BOOLEAN" -> "BOOLEAN";
case "JSON", "JSONB" -> "JSONB"; // PostgresSQL推荐用JSONB
case "ENUM" -> "VARCHAR(255)"; // ENUM转为字符串存储需业务层保证合法性
case "SET" -> "TEXT"; // SET用TEXT存储逗号分隔
// 几何类型(简化映射)
case "POINT" -> "POINT";
case "LINESTRING" -> "LINESTRING";
case "POLYGON" -> "POLYGON";
// 未匹配类型的默认处理
default -> {
// 日志输出未匹配的类型,便于后续优化
System.err.println("未处理的MySQL类型: " + mySqlType);
yield "TEXT"; // 用TEXT兼容大多数未明确映射的类型
}
};
}
/**
* 清空目标表中当前ds值的数据
*/
private void clearTargetTableData(JdbcTemplate targetJdbc, String targetTable, String dsValue) {
String sql = "DELETE FROM " + targetTable + " WHERE ds = ?";
targetJdbc.update(sql, dsValue);
}
private void clearTargetTableLastWeekData(JdbcTemplate targetJdbc, String targetTable) {
String sql = "DELETE FROM " + targetTable + " WHERE ds = ?";
targetJdbc.update(sql, DateUtils.getSevenDaysAgo());
}
/**
* 全量同步数据
*/
private void syncAllData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
String sourceTable, String targetTable, String dsValue) {
LocalDateTime startTime = LocalDateTime.now();
int totalRows = 0;
int successRows = 0;
int failRows = 0;
String ustatus = "1";
String errorMsg = null;
try {
// 1. 获取源表所有数据
String selectSql = "SELECT * FROM " + sourceTable;
List<Map<String, Object>> dataList = sourceJdbc.queryForList(selectSql);
totalRows = dataList.size();
if (dataList.isEmpty()) {
System.out.println("源表 " + sourceTable + " 没有数据需要同步");
return;
}
// 2. 构建插入SQL
List<String> columns = getTableColumnsWithoutException(sourceJdbc, sourceTable);
String insertSql = buildInsertSql(targetTable, columns);
// 3. 批量插入数据
int[] batchResult = targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> row = dataList.get(i);
int paramIndex = 1;
// 设置原有字段值
for (String column : columns) {
ps.setObject(paramIndex++, row.get(column));
}
// 设置ds字段值
ps.setString(paramIndex, dsValue);
}
@Override
public int getBatchSize() {
return dataList.size();
}
});
// 统计成功/失败行数
successRows = (int) Arrays.stream(batchResult).filter(row -> row >= 0).count();
failRows = totalRows - successRows;
} catch (Exception e) {
ustatus = "2";
errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
System.err.println("同步数据失败: " + errorMsg);
}
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
LocalDateTime endTime = LocalDateTime.now();
SyncTaskLog taskLog = new SyncTaskLog(task.getTaskId(), task.getTaskName(), task.getSourceDbId(),
sourceDbConfig.getDbName(), sourceTable, task.getTargetDbId(), targetDbConfig.getDbName(), targetTable,
startTime, endTime, (long) totalRows, (long) successRows, (long) failRows, ustatus,
errorMsg, (int) Duration.between(startTime, endTime).getSeconds(), "0");
task.setLastSyncTime(endTime);
taskLogService.save(taskLog);
syncTaskService.updateById(task);
}
/**
* 构建插入SQL语句
*/
private String buildInsertSql(String targetTable, List<String> columns) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ").append(targetTable).append(" (");
// 添加原有字段
for (int i = 0; i < columns.size(); i++) {
sql.append(columns.get(i));
if (i < columns.size() - 1) {
sql.append(", ");
}
}
// 添加ds字段
sql.append(", ds) VALUES (");
// 添加参数占位符
sql.append("?, ".repeat(columns.size()));
sql.append("?)");
return sql.toString();
}
/**
* 无异常获取表字段(工具方法)
*/
private List<String> getTableColumnsWithoutException(JdbcTemplate jdbcTemplate, String tableName) {
try {
return getTableColumns(jdbcTemplate, tableName);
} catch (SQLException e) {
throw new RuntimeException("获取表字段失败: " + e.getMessage(), e);
}
}
}