新增MySQL和pg数据库的同步

This commit is contained in:
2025-08-27 09:52:25 +08:00
parent cb1b6f9349
commit 8a2218b3a8
3 changed files with 286 additions and 21 deletions

View File

@@ -18,7 +18,7 @@ public class springBean {
executor.setCorePoolSize(core * 2);
executor.setMaxPoolSize(core * 4);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("host-disk-");
executor.setThreadNamePrefix("host-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;

View File

@@ -8,57 +8,279 @@ import com.mini.capi.config.DataSourceConfig;
import com.mini.capi.model.ApiResult;
import com.mini.capi.utils.vToken;
import jakarta.annotation.Resource;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.*;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@RestController
@RequestMapping("/Sys/dbs")
public class taskDbSync {
@Resource
private SyncTaskService syncTaskService;
@Resource
private DbConfigService dbConfigService;
private static final int BATCH_SIZE = 1000;
@Resource
private Executor hostExecutor;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final String dsValue = LocalDate.now().format(DATE_FORMATTER);
@GetMapping("/getTaskSyncDbInfo")
public ApiResult<?> jobSyncTask(String token) {
if (vToken.isValidToken(token)) {
String dsValue = LocalDate.now().format(DATE_FORMATTER);
List<SyncTask> syncTasks = syncTaskService.list();
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
List<String> errorMessages = new ArrayList<>();
for (SyncTask task : syncTasks) {
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
// 提交任务到线程池(异步执行,接口不等待)
hostExecutor.execute(() -> {
try {
// 1. 获取源/目标数据库配置
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
// 2. 创建对应数据库的JdbcTemplate
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
// 3. 执行表同步逻辑(异步执行)
syncTableData(task, sourceJdbc, targetJdbc);
// 同步成功日志
System.out.println("任务 " + task.getTaskId() + "" + task.getSourceTable() + "" + task.getTargetTable() + ")已提交至后台执行");
} catch (Exception e) {
// 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口)
String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage();
System.err.println(errorMsg);
// 加锁保证线程安全多线程操作同一List
synchronized (errorMessages) {
errorMessages.add(errorMsg);
}
}
});
System.out.println(errorMessages);
}
// 接口立即返回,不等待任务执行完成
return ApiResult.success();
}
return ApiResult.error();
}
/**
* 同步表数据
*/
public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) {
try {
// 确保源表和目标表名转为小写
String sourceTable = task.getSourceTable().toLowerCase();
String targetTable = task.getTargetTable().toLowerCase();
// 1. 检查并创建目标表
if (!tableExists(targetJdbc, targetTable)) {
createTargetTable(sourceJdbc, targetJdbc, sourceTable, targetTable);
}
// 2. 清空目标表当前ds值的数据
clearTargetTableData(targetJdbc, targetTable);
// 3. 全量同步数据
syncAllData(sourceJdbc, targetJdbc, sourceTable, targetTable);
System.out.println("" + sourceTable + " 同步到 " + targetTable + " 成功");
} catch (Exception e) {
System.err.println("表同步失败: " + e.getMessage());
}
}
/**
* 检查目标表是否存在
*/
private boolean tableExists(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
// 表名已转为小写,直接使用
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getTables(
null, null, tableName, new String[]{"TABLE"})) {
return rs.next();
}
}
}
/**
* 创建目标表结构复制源表结构并添加ds字段
*/
private void createTargetTable(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
String sourceTable, String targetTable) throws SQLException {
// 获取源表字段定义
List<String> columnDefinitions = getColumnDefinitions(sourceJdbc, sourceTable);
// 构建创建表SQL (PostgresSQL语法)
StringBuilder createSql = new StringBuilder("CREATE TABLE ")
.append(targetTable)
.append(" (");
// 添加原有字段
for (int i = 0; i < columnDefinitions.size(); i++) {
createSql.append(columnDefinitions.get(i));
// 最后一个字段后不加逗号
if (i < columnDefinitions.size() - 1) {
createSql.append(", ");
}
}
// 添加ds字段如果有其他字段需要加逗号分隔
if (!columnDefinitions.isEmpty()) {
createSql.append(", ");
}
createSql.append("ds VARCHAR(20) NOT NULL)");
// 执行创建表SQL
targetJdbc.execute(createSql.toString());
System.out.println("已创建目标表: " + targetTable);
}
/**
* 获取表字段列表
*/
private List<String> getTableColumns(JdbcTemplate jdbcTemplate, String tableName) throws SQLException {
List<String> columns = new ArrayList<>();
try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
while (rs.next()) {
// 字段名转为小写
columns.add(rs.getString("COLUMN_NAME").toLowerCase());
}
}
}
return columns;
}
/**
* 获取PostgreSQL兼容的字段定义
*/
private List<String> getColumnDefinitions(JdbcTemplate sourceJdbc, String tableName) throws SQLException {
List<String> definitions = new ArrayList<>();
try (Connection connection = sourceJdbc.getDataSource().getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) {
while (rs.next()) {
// 字段名转为小写
String columnName = rs.getString("COLUMN_NAME").toLowerCase();
String typeName = rs.getString("TYPE_NAME").toUpperCase();
int columnSize = rs.getInt("COLUMN_SIZE");
int decimalDigits = rs.getInt("DECIMAL_DIGITS");
int nullable = rs.getInt("NULLABLE");
// MySQL到PostgresSQL类型映射
String pgType = mapMySqlTypeToPgType(typeName, columnSize, decimalDigits);
// 构建字段定义
StringBuilder colDef = new StringBuilder();
colDef.append(columnName).append(" ").append(pgType);
if (nullable == DatabaseMetaData.columnNoNulls) {
colDef.append(" NOT NULL");
}
definitions.add(colDef.toString());
}
}
}
return definitions;
}
/**
* MySQL到PostgresSQL数据类型映射
*/
private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) {
return switch (mySqlType) {
case "INT", "INTEGER" -> "INTEGER";
case "BIGINT" -> "BIGINT";
case "VARCHAR", "CHAR" -> "VARCHAR(" + columnSize + ")";
case "TEXT" -> "TEXT";
case "DATE" -> "DATE";
case "DATETIME", "TIMESTAMP" -> "TIMESTAMP";
case "FLOAT", "DOUBLE" -> "DOUBLE PRECISION";
case "DECIMAL", "NUMERIC" -> "NUMERIC(" + columnSize + "," + decimalDigits + ")";
case "BOOLEAN" -> "BOOLEAN";
case "TINYINT" -> columnSize == 1 ? "BOOLEAN" : "SMALLINT";
default -> "VARCHAR(" + (columnSize > 0 ? columnSize : 255) + ")";
};
}
/**
* 清空目标表中当前ds值的数据
*/
private void clearTargetTableData(JdbcTemplate targetJdbc, String targetTable) {
String sql = "DELETE FROM " + targetTable + " WHERE ds = ?";
targetJdbc.update(sql, dsValue);
}
/**
* 全量同步数据
*/
private void syncAllData(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
String sourceTable, String targetTable) {
// 1. 获取源表所有数据
String selectSql = "SELECT * FROM " + sourceTable;
List<Map<String, Object>> dataList = sourceJdbc.queryForList(selectSql);
if (dataList.isEmpty()) {
System.out.println("源表 " + sourceTable + " 没有数据需要同步");
return;
}
// 2. 构建插入SQL
List<String> columns = getTableColumnsWithoutException(sourceJdbc, sourceTable);
String insertSql = buildInsertSql(targetTable, columns);
// 3. 批量插入数据
targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> row = dataList.get(i);
int paramIndex = 1;
// 设置原有字段值
for (String column : columns) {
ps.setObject(paramIndex++, row.get(column));
}
// 设置ds字段值
ps.setString(paramIndex, dsValue);
}
@Override
public int getBatchSize() {
return dataList.size();
}
});
System.out.println("成功同步 " + dataList.size() + " 条数据到表 " + targetTable);
}
/**
* 构建插入SQL语句
*/
private String buildInsertSql(String targetTable, List<String> columns) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ").append(targetTable).append(" (");
// 添加原有字段
for (int i = 0; i < columns.size(); i++) {
sql.append(columns.get(i));
if (i < columns.size() - 1) {
sql.append(", ");
}
}
// 添加ds字段
sql.append(", ds) VALUES (");
// 添加参数占位符
sql.append("?, ".repeat(columns.size()));
sql.append("?)");
return sql.toString();
}
/**
* 无异常获取表字段(工具方法)
*/
private List<String> getTableColumnsWithoutException(JdbcTemplate jdbcTemplate, String tableName) {
try {
return getTableColumns(jdbcTemplate, tableName);
} catch (SQLException e) {
throw new RuntimeException("获取表字段失败: " + e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,43 @@
package com.mini.capi.sys.controller;
import com.mini.capi.biz.domain.DbConfig;
import com.mini.capi.biz.service.DbConfigService;
import com.mini.capi.config.DataSourceConfig;
import com.mini.capi.model.ApiResult;
import com.mini.capi.utils.vToken;
import jakarta.annotation.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/Sys/dbs")
public class dbController {
@Resource
private DbConfigService dbConfigService;
/**
* 获取MySQL的当前连接下的所有数据表
*/
@GetMapping("/getApiSourceTables")
public ApiResult<List<String>> listSourceTables(String token, String dbId) {
if (vToken.isValidToken(token)) {
DbConfig dbConfig = dbConfigService.getById(dbId);
JdbcTemplate jdbcTemplate = DataSourceConfig.createJdbcTemplate(dbConfig);
String querySql = "SELECT table_name FROM information_schema.tables WHERE table_schema = ?";
// 执行通用查询
List<Map<String, Object>> result = jdbcTemplate.queryForList(querySql);
List<String> data = result.stream()
.map(row -> row.values().iterator().next().toString())
.toList();
return ApiResult.success(data);
}
return ApiResult.error();
}
}