From 8a2218b3a87312d8ae2a48e13a5cf7e1d4019b41 Mon Sep 17 00:00:00 2001 From: gaoxq <376340421@qq.com> Date: Wed, 27 Aug 2025 09:52:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EMySQL=E5=92=8Cpg=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E7=9A=84=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/mini/capi/config/springBean.java | 2 +- .../java/com/mini/capi/job/taskDbSync.java | 262 ++++++++++++++++-- .../capi/sys/controller/dbController.java | 43 +++ 3 files changed, 286 insertions(+), 21 deletions(-) create mode 100644 src/main/java/com/mini/capi/sys/controller/dbController.java diff --git a/src/main/java/com/mini/capi/config/springBean.java b/src/main/java/com/mini/capi/config/springBean.java index e17efa5..412ce63 100644 --- a/src/main/java/com/mini/capi/config/springBean.java +++ b/src/main/java/com/mini/capi/config/springBean.java @@ -18,7 +18,7 @@ public class springBean { executor.setCorePoolSize(core * 2); executor.setMaxPoolSize(core * 4); executor.setQueueCapacity(200); - executor.setThreadNamePrefix("host-disk-"); + executor.setThreadNamePrefix("host-task-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; diff --git a/src/main/java/com/mini/capi/job/taskDbSync.java b/src/main/java/com/mini/capi/job/taskDbSync.java index e3e77e1..065c9a8 100644 --- a/src/main/java/com/mini/capi/job/taskDbSync.java +++ b/src/main/java/com/mini/capi/job/taskDbSync.java @@ -8,57 +8,279 @@ import com.mini.capi.config.DataSourceConfig; import com.mini.capi.model.ApiResult; import com.mini.capi.utils.vToken; import jakarta.annotation.Resource; - +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; - - - +import java.sql.*; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.List; - +import java.util.Map; +import java.util.concurrent.Executor; @RestController @RequestMapping("/Sys/dbs") public class taskDbSync { - @Resource private SyncTaskService syncTaskService; @Resource private DbConfigService dbConfigService; - - private static final int BATCH_SIZE = 1000; + @Resource + private Executor hostExecutor; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); - + private static final String dsValue = LocalDate.now().format(DATE_FORMATTER); @GetMapping("/getTaskSyncDbInfo") public ApiResult jobSyncTask(String token) { if (vToken.isValidToken(token)) { - String dsValue = LocalDate.now().format(DATE_FORMATTER); List syncTasks = syncTaskService.list(); + // 记录是否有任务失败(仅用于后台日志,不影响接口返回) + List errorMessages = new ArrayList<>(); for (SyncTask task : syncTasks) { - DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId()); - DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId()); - JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig); - JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig); - - - - - - + // 提交任务到线程池(异步执行,接口不等待) + hostExecutor.execute(() -> { + try { + // 1. 获取源/目标数据库配置 + DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId()); + DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId()); + // 2. 创建对应数据库的JdbcTemplate + JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig); + JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig); + // 3. 执行表同步逻辑(异步执行) + syncTableData(task, sourceJdbc, targetJdbc); + // 同步成功日志 + System.out.println("任务 " + task.getTaskId() + "(" + task.getSourceTable() + "→" + task.getTargetTable() + ")已提交至后台执行"); + } catch (Exception e) { + // 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口) + String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage(); + System.err.println(errorMsg); + // 加锁保证线程安全(多线程操作同一List) + synchronized (errorMessages) { + errorMessages.add(errorMsg); + } + } + }); + System.out.println(errorMessages); } + // 接口立即返回,不等待任务执行完成 return ApiResult.success(); } return ApiResult.error(); } + /** + * 同步表数据 + */ + public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) { + try { + // 确保源表和目标表名转为小写 + String sourceTable = task.getSourceTable().toLowerCase(); + String targetTable = task.getTargetTable().toLowerCase(); + + // 1. 检查并创建目标表 + if (!tableExists(targetJdbc, targetTable)) { + createTargetTable(sourceJdbc, targetJdbc, sourceTable, targetTable); + } + // 2. 清空目标表当前ds值的数据 + clearTargetTableData(targetJdbc, targetTable); + // 3. 全量同步数据 + syncAllData(sourceJdbc, targetJdbc, sourceTable, targetTable); + System.out.println("表 " + sourceTable + " 同步到 " + targetTable + " 成功"); + } catch (Exception e) { + System.err.println("表同步失败: " + e.getMessage()); + } + } + + /** + * 检查目标表是否存在 + */ + private boolean tableExists(JdbcTemplate jdbcTemplate, String tableName) throws SQLException { + // 表名已转为小写,直接使用 + try (Connection connection = jdbcTemplate.getDataSource().getConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet rs = metaData.getTables( + null, null, tableName, new String[]{"TABLE"})) { + return rs.next(); + } + } + } + + /** + * 创建目标表结构(复制源表结构并添加ds字段) + */ + private void createTargetTable(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc, + String sourceTable, String targetTable) throws SQLException { + // 获取源表字段定义 + List columnDefinitions = getColumnDefinitions(sourceJdbc, sourceTable); + // 构建创建表SQL (PostgresSQL语法) + StringBuilder createSql = new StringBuilder("CREATE TABLE ") + .append(targetTable) + .append(" ("); + // 添加原有字段 + for (int i = 0; i < columnDefinitions.size(); i++) { + createSql.append(columnDefinitions.get(i)); + // 最后一个字段后不加逗号 + if (i < columnDefinitions.size() - 1) { + createSql.append(", "); + } + } + // 添加ds字段(如果有其他字段,需要加逗号分隔) + if (!columnDefinitions.isEmpty()) { + createSql.append(", "); + } + createSql.append("ds VARCHAR(20) NOT NULL)"); + // 执行创建表SQL + targetJdbc.execute(createSql.toString()); + System.out.println("已创建目标表: " + targetTable); + } + + /** + * 获取表字段列表 + */ + private List getTableColumns(JdbcTemplate jdbcTemplate, String tableName) throws SQLException { + List columns = new ArrayList<>(); + try (Connection connection = jdbcTemplate.getDataSource().getConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) { + while (rs.next()) { + // 字段名转为小写 + columns.add(rs.getString("COLUMN_NAME").toLowerCase()); + } + } + } + return columns; + } + + /** + * 获取PostgreSQL兼容的字段定义 + */ + private List getColumnDefinitions(JdbcTemplate sourceJdbc, String tableName) throws SQLException { + List definitions = new ArrayList<>(); + try (Connection connection = sourceJdbc.getDataSource().getConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet rs = metaData.getColumns(null, null, tableName, null)) { + while (rs.next()) { + // 字段名转为小写 + String columnName = rs.getString("COLUMN_NAME").toLowerCase(); + String typeName = rs.getString("TYPE_NAME").toUpperCase(); + int columnSize = rs.getInt("COLUMN_SIZE"); + int decimalDigits = rs.getInt("DECIMAL_DIGITS"); + int nullable = rs.getInt("NULLABLE"); + // MySQL到PostgresSQL类型映射 + String pgType = mapMySqlTypeToPgType(typeName, columnSize, decimalDigits); + // 构建字段定义 + StringBuilder colDef = new StringBuilder(); + colDef.append(columnName).append(" ").append(pgType); + if (nullable == DatabaseMetaData.columnNoNulls) { + colDef.append(" NOT NULL"); + } + definitions.add(colDef.toString()); + } + } + } + return definitions; + } + + /** + * MySQL到PostgresSQL数据类型映射 + */ + private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) { + return switch (mySqlType) { + case "INT", "INTEGER" -> "INTEGER"; + case "BIGINT" -> "BIGINT"; + case "VARCHAR", "CHAR" -> "VARCHAR(" + columnSize + ")"; + case "TEXT" -> "TEXT"; + case "DATE" -> "DATE"; + case "DATETIME", "TIMESTAMP" -> "TIMESTAMP"; + case "FLOAT", "DOUBLE" -> "DOUBLE PRECISION"; + case "DECIMAL", "NUMERIC" -> "NUMERIC(" + columnSize + "," + decimalDigits + ")"; + case "BOOLEAN" -> "BOOLEAN"; + case "TINYINT" -> columnSize == 1 ? "BOOLEAN" : "SMALLINT"; + default -> "VARCHAR(" + (columnSize > 0 ? columnSize : 255) + ")"; + }; + } + + /** + * 清空目标表中当前ds值的数据 + */ + private void clearTargetTableData(JdbcTemplate targetJdbc, String targetTable) { + String sql = "DELETE FROM " + targetTable + " WHERE ds = ?"; + targetJdbc.update(sql, dsValue); + } + + /** + * 全量同步数据 + */ + private void syncAllData(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc, + String sourceTable, String targetTable) { + // 1. 获取源表所有数据 + String selectSql = "SELECT * FROM " + sourceTable; + List> dataList = sourceJdbc.queryForList(selectSql); + if (dataList.isEmpty()) { + System.out.println("源表 " + sourceTable + " 没有数据需要同步"); + return; + } + // 2. 构建插入SQL + List columns = getTableColumnsWithoutException(sourceJdbc, sourceTable); + String insertSql = buildInsertSql(targetTable, columns); + // 3. 批量插入数据 + targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + Map row = dataList.get(i); + int paramIndex = 1; + // 设置原有字段值 + for (String column : columns) { + ps.setObject(paramIndex++, row.get(column)); + } + // 设置ds字段值 + ps.setString(paramIndex, dsValue); + } + @Override + public int getBatchSize() { + return dataList.size(); + } + }); + System.out.println("成功同步 " + dataList.size() + " 条数据到表 " + targetTable); + } + + /** + * 构建插入SQL语句 + */ + private String buildInsertSql(String targetTable, List columns) { + StringBuilder sql = new StringBuilder(); + sql.append("INSERT INTO ").append(targetTable).append(" ("); + // 添加原有字段 + for (int i = 0; i < columns.size(); i++) { + sql.append(columns.get(i)); + if (i < columns.size() - 1) { + sql.append(", "); + } + } + // 添加ds字段 + sql.append(", ds) VALUES ("); + // 添加参数占位符 + sql.append("?, ".repeat(columns.size())); + sql.append("?)"); + return sql.toString(); + } + + /** + * 无异常获取表字段(工具方法) + */ + private List getTableColumnsWithoutException(JdbcTemplate jdbcTemplate, String tableName) { + try { + return getTableColumns(jdbcTemplate, tableName); + } catch (SQLException e) { + throw new RuntimeException("获取表字段失败: " + e.getMessage(), e); + } + } } diff --git a/src/main/java/com/mini/capi/sys/controller/dbController.java b/src/main/java/com/mini/capi/sys/controller/dbController.java new file mode 100644 index 0000000..63aaece --- /dev/null +++ b/src/main/java/com/mini/capi/sys/controller/dbController.java @@ -0,0 +1,43 @@ +package com.mini.capi.sys.controller; + +import com.mini.capi.biz.domain.DbConfig; +import com.mini.capi.biz.service.DbConfigService; +import com.mini.capi.config.DataSourceConfig; +import com.mini.capi.model.ApiResult; +import com.mini.capi.utils.vToken; +import jakarta.annotation.Resource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; + +@RestController +@RequestMapping("/Sys/dbs") +public class dbController { + + + @Resource + private DbConfigService dbConfigService; + + /** + * 获取MySQL的当前连接下的所有数据表 + */ + @GetMapping("/getApiSourceTables") + public ApiResult> listSourceTables(String token, String dbId) { + if (vToken.isValidToken(token)) { + DbConfig dbConfig = dbConfigService.getById(dbId); + JdbcTemplate jdbcTemplate = DataSourceConfig.createJdbcTemplate(dbConfig); + String querySql = "SELECT table_name FROM information_schema.tables WHERE table_schema = ?"; + // 执行通用查询 + List> result = jdbcTemplate.queryForList(querySql); + List data = result.stream() + .map(row -> row.values().iterator().next().toString()) + .toList(); + return ApiResult.success(data); + } + return ApiResult.error(); + } +}