新增MySQL和pg数据库的同步

This commit is contained in:
2025-08-27 16:39:54 +08:00
parent ff5006a660
commit 306c93e9ad
8 changed files with 352 additions and 33 deletions

View File

@@ -0,0 +1,18 @@
package com.mini.capi.biz.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 数据同步任务执行日志表 前端控制器
* </p>
*
* @author gaoxq
* @since 2025-08-27
*/
@RestController
@RequestMapping("/biz/syncTaskLog")
public class SyncTaskLogController {
}

View File

@@ -0,0 +1,183 @@
package com.mini.capi.biz.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 数据同步任务执行日志表
* </p>
*
* @author gaoxq
* @since 2025-08-27
*/
@Getter
@Setter
@TableName("biz_sync_task_log")
public class SyncTaskLog implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 记录创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 日志ID
*/
@TableId(value = "task_log_id", type = IdType.AUTO)
private String taskLogId;
/**
* 关联的同步任务ID
*/
@TableField("task_id")
private String taskId;
/**
* 同步任务名称
*/
@TableField("task_name")
private String taskName;
/**
* 源数据库ID
*/
@TableField("source_db_id")
private String sourceDbId;
/**
* 源数据库名称
*/
@TableField("source_db_name")
private String sourceDbName;
/**
* 源表名
*/
@TableField("source_table")
private String sourceTable;
/**
* 目标数据库ID
*/
@TableField("target_db_id")
private String targetDbId;
/**
* 目标数据库名称
*/
@TableField("target_db_name")
private String targetDbName;
/**
* 目标表名
*/
@TableField("target_table")
private String targetTable;
/**
* 同步开始时间
*/
@TableField("start_time")
private LocalDateTime startTime;
/**
* 同步结束时间
*/
@TableField("end_time")
private LocalDateTime endTime;
/**
* 总记录数
*/
@TableField("total_rows")
private Long totalRows;
/**
* 成功同步记录数
*/
@TableField("success_rows")
private Long successRows;
/**
* 失败记录数
*/
@TableField("fail_rows")
private Long failRows;
/**
* 同步状态(0:执行中,1:成功,2:失败)
*/
@TableField("sync_status")
private String syncStatus;
/**
* 错误信息(失败时记录)
*/
@TableField("error_msg")
private String errorMsg;
/**
* 耗时(秒)
*/
@TableField("cost_time")
private Integer costTime;
/**
* 租户id
*/
@TableField("f_tenant_id")
private String fTenantId;
/**
* 流程id
*/
@TableField("f_flow_id")
private String fFlowId;
/**
* 流程任务主键
*/
@TableField("f_flow_task_id")
private String fFlowTaskId;
/**
* 流程任务状态
*/
@TableField("f_flow_state")
private Integer fFlowState;
public SyncTaskLog(String taskId, String taskName, String sourceDbId, String sourceDbName, String sourceTable, String targetDbId
, String targetDbName, String targetTable, LocalDateTime startTime, LocalDateTime endTime, Long totalRows, Long successRows, Long failRows, String syncStatus
, String errorMsg, Integer costTime, String fTenantId) {
this.taskId = taskId;
this.taskName = taskName;
this.sourceDbId = sourceDbId;
this.sourceDbName = sourceDbName;
this.sourceTable = sourceTable;
this.targetDbId = targetDbId;
this.targetDbName = targetDbName;
this.targetTable = targetTable;
this.startTime = startTime;
this.endTime = endTime;
this.totalRows = totalRows;
this.successRows = successRows;
this.failRows = failRows;
this.syncStatus = syncStatus;
this.errorMsg = errorMsg;
this.costTime = costTime;
this.fTenantId = fTenantId;
}
}

View File

@@ -0,0 +1,16 @@
package com.mini.capi.biz.mapper;
import com.mini.capi.biz.domain.SyncTaskLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* <p>
* 数据同步任务执行日志表 Mapper 接口
* </p>
*
* @author gaoxq
* @since 2025-08-27
*/
public interface SyncTaskLogMapper extends BaseMapper<SyncTaskLog> {
}

View File

@@ -0,0 +1,16 @@
package com.mini.capi.biz.service;
import com.mini.capi.biz.domain.SyncTaskLog;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* <p>
* 数据同步任务执行日志表 服务类
* </p>
*
* @author gaoxq
* @since 2025-08-27
*/
public interface SyncTaskLogService extends IService<SyncTaskLog> {
}

View File

@@ -0,0 +1,20 @@
package com.mini.capi.biz.service.impl;
import com.mini.capi.biz.domain.SyncTaskLog;
import com.mini.capi.biz.mapper.SyncTaskLogMapper;
import com.mini.capi.biz.service.SyncTaskLogService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
/**
* <p>
* 数据同步任务执行日志表 服务实现类
* </p>
*
* @author gaoxq
* @since 2025-08-27
*/
@Service
public class SyncTaskLogServiceImpl extends ServiceImpl<SyncTaskLogMapper, SyncTaskLog> implements SyncTaskLogService {
}

View File

@@ -2,7 +2,9 @@ package com.mini.capi.job;
import com.mini.capi.biz.domain.DbConfig;
import com.mini.capi.biz.domain.SyncTask;
import com.mini.capi.biz.domain.SyncTaskLog;
import com.mini.capi.biz.service.DbConfigService;
import com.mini.capi.biz.service.SyncTaskLogService;
import com.mini.capi.biz.service.SyncTaskService;
import com.mini.capi.config.DataSourceConfig;
import com.mini.capi.model.ApiResult;
@@ -15,9 +17,12 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.*;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -32,6 +37,9 @@ public class taskDbSync {
@Resource
private DbConfigService dbConfigService;
@Resource
private SyncTaskLogService taskLogService;
@Resource
private Executor hostExecutor;
@@ -80,7 +88,7 @@ public class taskDbSync {
public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) {
try {
// 确保源表和目标表名转为小写
String sourceTable = task.getSourceTable().toLowerCase();
String sourceTable = task.getSourceTable();
String targetTable = task.getTargetTable().toLowerCase();
// 1. 检查并创建目标表
if (!tableExists(targetJdbc, targetTable)) {
@@ -89,8 +97,7 @@ public class taskDbSync {
// 2. 清空目标表当前ds值的数据
clearTargetTableData(targetJdbc, targetTable);
// 3. 全量同步数据
syncAllData(sourceJdbc, targetJdbc, sourceTable, targetTable);
System.out.println("" + sourceTable + " 同步到 " + targetTable + " 成功");
syncAllData(task, sourceJdbc, targetJdbc, sourceTable, targetTable);
} catch (Exception e) {
System.err.println("表同步失败: " + e.getMessage());
}
@@ -255,37 +262,60 @@ public class taskDbSync {
/**
* 全量同步数据
*/
private void syncAllData(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
private void syncAllData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc,
String sourceTable, String targetTable) {
// 1. 获取源表所有数据
String selectSql = "SELECT * FROM " + sourceTable;
List<Map<String, Object>> dataList = sourceJdbc.queryForList(selectSql);
if (dataList.isEmpty()) {
System.out.println("源表 " + sourceTable + " 没有数据需要同步");
return;
}
// 2. 构建插入SQL
List<String> columns = getTableColumnsWithoutException(sourceJdbc, sourceTable);
String insertSql = buildInsertSql(targetTable, columns);
// 3. 批量插入数据
targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> row = dataList.get(i);
int paramIndex = 1;
// 设置原有字段值
for (String column : columns) {
ps.setObject(paramIndex++, row.get(column));
LocalDateTime startTime = LocalDateTime.now();
int totalRows = 0;
int successRows = 0;
int failRows = 0;
String ustatus = "1";
String errorMsg = null;
try {
// 1. 获取源表所有数据
String selectSql = "SELECT * FROM " + sourceTable;
List<Map<String, Object>> dataList = sourceJdbc.queryForList(selectSql);
if (dataList.isEmpty()) {
System.out.println("源表 " + sourceTable + " 没有数据需要同步");
return;
}
// 2. 构建插入SQL
List<String> columns = getTableColumnsWithoutException(sourceJdbc, sourceTable);
String insertSql = buildInsertSql(targetTable, columns);
// 3. 批量插入数据
int[] batchResult = targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> row = dataList.get(i);
int paramIndex = 1;
// 设置原有字段值
for (String column : columns) {
ps.setObject(paramIndex++, row.get(column));
}
// 设置ds字段值
ps.setString(paramIndex, dsValue);
}
// 设置ds字段值
ps.setString(paramIndex, dsValue);
}
@Override
public int getBatchSize() {
return dataList.size();
}
});
System.out.println("成功同步 " + dataList.size() + " 条数据到表 " + targetTable);
@Override
public int getBatchSize() {
return dataList.size();
}
});
// 统计成功/失败行数
successRows = (int) Arrays.stream(batchResult).filter(row -> row >= 0).count();
failRows = totalRows - successRows;
} catch (Exception e) {
ustatus = "2";
errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
System.err.println("同步数据失败: " + errorMsg);
}
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
LocalDateTime endTime = LocalDateTime.now();
SyncTaskLog taskLog = new SyncTaskLog(task.getTaskId(), task.getTaskName(), task.getSourceDbId(),
sourceDbConfig.getDbName(), sourceTable, task.getTargetDbId(), targetDbConfig.getDbName(), targetTable,
LocalDateTime.now(), LocalDateTime.now(), (long) totalRows, (long) successRows, (long) failRows, ustatus,
errorMsg, (int) Duration.between(startTime, endTime).getSeconds(), "0");
taskLogService.save(taskLog);
}
/**

View File

@@ -29,7 +29,7 @@ public class demo {
.pathInfo(Collections.singletonMap(OutputFile.xml, System.getProperty("user.dir") + "/src/main/resources/mapper"));
})
.strategyConfig(builder -> {
builder.addInclude("biz_sync_task")
builder.addInclude("biz_sync_task_log")
.addTablePrefix("biz_")
.entityBuilder()
.enableLombok()