新增MySQL和pg数据库的同步

This commit is contained in:
2025-08-27 17:03:38 +08:00
parent 9e05905dba
commit a0b51f56a5

View File

@@ -46,40 +46,67 @@ public class taskDbSync {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final String dsValue = LocalDate.now().format(DATE_FORMATTER);
/**
* 运行全部任务
*/
@GetMapping("/getTaskSyncDbInfo")
public ApiResult<?> jobSyncTask(String token) {
if (vToken.isValidToken(token)) {
List<SyncTask> syncTasks = syncTaskService.list();
public ApiResult<?> jobSyncAllTask(String token) {
if (!vToken.isValidToken(token)) {
return ApiResult.error(401, "无效的访问令牌");
}
List<SyncTask> syncTasks = syncTaskService.list();
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
List<String> errorMessages = new ArrayList<>();
for (SyncTask task : syncTasks) {
// 提交任务到线程池(异步执行,接口不等待)
execSyncTask(task, errorMessages);
}
// 接口立即返回,不等待任务执行完成
return ApiResult.success();
}
/**
* 运行单个任务
*/
@GetMapping("/getTaskSyncDbByInfo")
public ApiResult<?> jobSyncOneTask(String token, String taskId) {
if (!vToken.isValidToken(token)) {
return ApiResult.error(401, "无效的访问令牌");
}
try {
SyncTask task = syncTaskService.getById(taskId);
// 记录是否有任务失败(仅用于后台日志,不影响接口返回)
List<String> errorMessages = new ArrayList<>();
for (SyncTask task : syncTasks) {
// 提交任务到线程池(异步执行,接口不等待)
hostExecutor.execute(() -> {
try {
// 1. 获取源/目标数据库配置
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
// 2. 创建对应数据库的JdbcTemplate
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
// 3. 执行表同步逻辑(异步执行)
syncTableData(task, sourceJdbc, targetJdbc);
} catch (Exception e) {
// 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口)
String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage();
System.err.println(errorMsg);
// 加锁保证线程安全多线程操作同一List
synchronized (errorMessages) {
errorMessages.add(errorMsg);
}
}
});
System.out.println(errorMessages);
}
// 接口立即返回,不等待任务执行完成
execSyncTask(task, errorMessages);
return ApiResult.success();
} catch (Exception e) {
return ApiResult.error(101, e.getMessage());
}
return ApiResult.error();
}
private void execSyncTask(SyncTask task, List<String> errorMessages) {
hostExecutor.execute(() -> {
try {
// 1. 获取源/目标数据库配置
DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId());
DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId());
// 2. 创建对应数据库的JdbcTemplate
JdbcTemplate sourceJdbc = DataSourceConfig.createJdbcTemplate(sourceDbConfig);
JdbcTemplate targetJdbc = DataSourceConfig.createJdbcTemplate(targetDbConfig);
// 3. 执行表同步逻辑(异步执行)
syncTableData(task, sourceJdbc, targetJdbc);
} catch (Exception e) {
// 捕获任务执行异常,记录错误信息(仅后台打印,不阻塞接口)
String errorMsg = "任务 " + task.getTaskId() + " 同步失败: " + e.getMessage();
System.err.println(errorMsg);
// 加锁保证线程安全多线程操作同一List
synchronized (errorMessages) {
errorMessages.add(errorMsg);
}
}
});
System.out.println(errorMessages);
}
/**