diff --git a/src/main/java/com/mini/capi/biz/controller/SyncTaskLogController.java b/src/main/java/com/mini/capi/biz/controller/SyncTaskLogController.java new file mode 100644 index 0000000..27009b9 --- /dev/null +++ b/src/main/java/com/mini/capi/biz/controller/SyncTaskLogController.java @@ -0,0 +1,18 @@ +package com.mini.capi.biz.controller; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + *

+ * 数据同步任务执行日志表 前端控制器 + *

+ * + * @author gaoxq + * @since 2025-08-27 + */ +@RestController +@RequestMapping("/biz/syncTaskLog") +public class SyncTaskLogController { + +} diff --git a/src/main/java/com/mini/capi/biz/domain/SyncTaskLog.java b/src/main/java/com/mini/capi/biz/domain/SyncTaskLog.java new file mode 100644 index 0000000..1fd2d4c --- /dev/null +++ b/src/main/java/com/mini/capi/biz/domain/SyncTaskLog.java @@ -0,0 +1,183 @@ +package com.mini.capi.biz.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import java.io.Serializable; +import java.time.LocalDateTime; + +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * 数据同步任务执行日志表 + *

+ * + * @author gaoxq + * @since 2025-08-27 + */ +@Getter +@Setter +@TableName("biz_sync_task_log") +public class SyncTaskLog implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 记录创建时间 + */ + @TableField("create_time") + private LocalDateTime createTime; + + /** + * 日志ID + */ + @TableId(value = "task_log_id", type = IdType.AUTO) + private String taskLogId; + + /** + * 关联的同步任务ID + */ + @TableField("task_id") + private String taskId; + + /** + * 同步任务名称 + */ + @TableField("task_name") + private String taskName; + + /** + * 源数据库ID + */ + @TableField("source_db_id") + private String sourceDbId; + + /** + * 源数据库名称 + */ + @TableField("source_db_name") + private String sourceDbName; + + /** + * 源表名 + */ + @TableField("source_table") + private String sourceTable; + + /** + * 目标数据库ID + */ + @TableField("target_db_id") + private String targetDbId; + + /** + * 目标数据库名称 + */ + @TableField("target_db_name") + private String targetDbName; + + /** + * 目标表名 + */ + @TableField("target_table") + private String targetTable; + + /** + * 同步开始时间 + */ + @TableField("start_time") + private LocalDateTime startTime; + + /** + * 同步结束时间 + */ + @TableField("end_time") + private LocalDateTime endTime; + + /** + * 总记录数 + */ + @TableField("total_rows") + private Long totalRows; + + /** + * 成功同步记录数 + */ + @TableField("success_rows") + private Long successRows; + + /** + * 失败记录数 + */ + @TableField("fail_rows") + private Long failRows; + + /** + * 同步状态(0:执行中,1:成功,2:失败) + */ + @TableField("sync_status") + private String syncStatus; + + /** + * 错误信息(失败时记录) + */ + @TableField("error_msg") + private String errorMsg; + + /** + * 耗时(秒) + */ + @TableField("cost_time") + private Integer costTime; + + /** + * 租户id + */ + @TableField("f_tenant_id") + private String fTenantId; + + /** + * 流程id + */ + @TableField("f_flow_id") + private String fFlowId; + + /** + * 流程任务主键 + */ + @TableField("f_flow_task_id") + private String fFlowTaskId; + + /** + * 流程任务状态 + */ + @TableField("f_flow_state") + private Integer fFlowState; + + + public SyncTaskLog(String taskId, String taskName, String sourceDbId, String sourceDbName, String sourceTable, String targetDbId + , String targetDbName, String targetTable, LocalDateTime startTime, LocalDateTime endTime, Long totalRows, Long successRows, Long failRows, String syncStatus + , String errorMsg, Integer costTime, String fTenantId) { + this.taskId = taskId; + this.taskName = taskName; + this.sourceDbId = sourceDbId; + this.sourceDbName = sourceDbName; + this.sourceTable = sourceTable; + this.targetDbId = targetDbId; + this.targetDbName = targetDbName; + this.targetTable = targetTable; + this.startTime = startTime; + this.endTime = endTime; + this.totalRows = totalRows; + this.successRows = successRows; + this.failRows = failRows; + this.syncStatus = syncStatus; + this.errorMsg = errorMsg; + this.costTime = costTime; + this.fTenantId = fTenantId; + } +} diff --git a/src/main/java/com/mini/capi/biz/mapper/SyncTaskLogMapper.java b/src/main/java/com/mini/capi/biz/mapper/SyncTaskLogMapper.java new file mode 100644 index 0000000..350aadb --- /dev/null +++ b/src/main/java/com/mini/capi/biz/mapper/SyncTaskLogMapper.java @@ -0,0 +1,16 @@ +package com.mini.capi.biz.mapper; + +import com.mini.capi.biz.domain.SyncTaskLog; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + *

+ * 数据同步任务执行日志表 Mapper 接口 + *

+ * + * @author gaoxq + * @since 2025-08-27 + */ +public interface SyncTaskLogMapper extends BaseMapper { + +} diff --git a/src/main/java/com/mini/capi/biz/service/SyncTaskLogService.java b/src/main/java/com/mini/capi/biz/service/SyncTaskLogService.java new file mode 100644 index 0000000..99d156a --- /dev/null +++ b/src/main/java/com/mini/capi/biz/service/SyncTaskLogService.java @@ -0,0 +1,16 @@ +package com.mini.capi.biz.service; + +import com.mini.capi.biz.domain.SyncTaskLog; +import com.baomidou.mybatisplus.extension.service.IService; + +/** + *

+ * 数据同步任务执行日志表 服务类 + *

+ * + * @author gaoxq + * @since 2025-08-27 + */ +public interface SyncTaskLogService extends IService { + +} diff --git a/src/main/java/com/mini/capi/biz/service/impl/SyncTaskLogServiceImpl.java b/src/main/java/com/mini/capi/biz/service/impl/SyncTaskLogServiceImpl.java new file mode 100644 index 0000000..369ccb4 --- /dev/null +++ b/src/main/java/com/mini/capi/biz/service/impl/SyncTaskLogServiceImpl.java @@ -0,0 +1,20 @@ +package com.mini.capi.biz.service.impl; + +import com.mini.capi.biz.domain.SyncTaskLog; +import com.mini.capi.biz.mapper.SyncTaskLogMapper; +import com.mini.capi.biz.service.SyncTaskLogService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +/** + *

+ * 数据同步任务执行日志表 服务实现类 + *

+ * + * @author gaoxq + * @since 2025-08-27 + */ +@Service +public class SyncTaskLogServiceImpl extends ServiceImpl implements SyncTaskLogService { + +} diff --git a/src/main/java/com/mini/capi/job/taskDbSync.java b/src/main/java/com/mini/capi/job/taskDbSync.java index 18093b7..bf45dc0 100644 --- a/src/main/java/com/mini/capi/job/taskDbSync.java +++ b/src/main/java/com/mini/capi/job/taskDbSync.java @@ -2,7 +2,9 @@ package com.mini.capi.job; import com.mini.capi.biz.domain.DbConfig; import com.mini.capi.biz.domain.SyncTask; +import com.mini.capi.biz.domain.SyncTaskLog; import com.mini.capi.biz.service.DbConfigService; +import com.mini.capi.biz.service.SyncTaskLogService; import com.mini.capi.biz.service.SyncTaskService; import com.mini.capi.config.DataSourceConfig; import com.mini.capi.model.ApiResult; @@ -15,9 +17,12 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.sql.*; +import java.time.Duration; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -32,6 +37,9 @@ public class taskDbSync { @Resource private DbConfigService dbConfigService; + @Resource + private SyncTaskLogService taskLogService; + @Resource private Executor hostExecutor; @@ -80,7 +88,7 @@ public class taskDbSync { public void syncTableData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc) { try { // 确保源表和目标表名转为小写 - String sourceTable = task.getSourceTable().toLowerCase(); + String sourceTable = task.getSourceTable(); String targetTable = task.getTargetTable().toLowerCase(); // 1. 检查并创建目标表 if (!tableExists(targetJdbc, targetTable)) { @@ -89,8 +97,7 @@ public class taskDbSync { // 2. 清空目标表当前ds值的数据 clearTargetTableData(targetJdbc, targetTable); // 3. 全量同步数据 - syncAllData(sourceJdbc, targetJdbc, sourceTable, targetTable); - System.out.println("表 " + sourceTable + " 同步到 " + targetTable + " 成功"); + syncAllData(task, sourceJdbc, targetJdbc, sourceTable, targetTable); } catch (Exception e) { System.err.println("表同步失败: " + e.getMessage()); } @@ -255,37 +262,60 @@ public class taskDbSync { /** * 全量同步数据 */ - private void syncAllData(JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc, + private void syncAllData(SyncTask task, JdbcTemplate sourceJdbc, JdbcTemplate targetJdbc, String sourceTable, String targetTable) { - // 1. 获取源表所有数据 - String selectSql = "SELECT * FROM " + sourceTable; - List> dataList = sourceJdbc.queryForList(selectSql); - if (dataList.isEmpty()) { - System.out.println("源表 " + sourceTable + " 没有数据需要同步"); - return; - } - // 2. 构建插入SQL - List columns = getTableColumnsWithoutException(sourceJdbc, sourceTable); - String insertSql = buildInsertSql(targetTable, columns); - // 3. 批量插入数据 - targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - Map row = dataList.get(i); - int paramIndex = 1; - // 设置原有字段值 - for (String column : columns) { - ps.setObject(paramIndex++, row.get(column)); + LocalDateTime startTime = LocalDateTime.now(); + int totalRows = 0; + int successRows = 0; + int failRows = 0; + String ustatus = "1"; + String errorMsg = null; + try { + // 1. 获取源表所有数据 + String selectSql = "SELECT * FROM " + sourceTable; + List> dataList = sourceJdbc.queryForList(selectSql); + if (dataList.isEmpty()) { + System.out.println("源表 " + sourceTable + " 没有数据需要同步"); + return; + } + // 2. 构建插入SQL + List columns = getTableColumnsWithoutException(sourceJdbc, sourceTable); + String insertSql = buildInsertSql(targetTable, columns); + // 3. 批量插入数据 + int[] batchResult = targetJdbc.batchUpdate(insertSql, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + Map row = dataList.get(i); + int paramIndex = 1; + // 设置原有字段值 + for (String column : columns) { + ps.setObject(paramIndex++, row.get(column)); + } + // 设置ds字段值 + ps.setString(paramIndex, dsValue); } - // 设置ds字段值 - ps.setString(paramIndex, dsValue); - } - @Override - public int getBatchSize() { - return dataList.size(); - } - }); - System.out.println("成功同步 " + dataList.size() + " 条数据到表 " + targetTable); + + @Override + public int getBatchSize() { + return dataList.size(); + } + }); + // 统计成功/失败行数 + successRows = (int) Arrays.stream(batchResult).filter(row -> row >= 0).count(); + failRows = totalRows - successRows; + } catch (Exception e) { + ustatus = "2"; + errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); + System.err.println("同步数据失败: " + errorMsg); + } + DbConfig sourceDbConfig = dbConfigService.getById(task.getSourceDbId()); + DbConfig targetDbConfig = dbConfigService.getById(task.getTargetDbId()); + LocalDateTime endTime = LocalDateTime.now(); + SyncTaskLog taskLog = new SyncTaskLog(task.getTaskId(), task.getTaskName(), task.getSourceDbId(), + sourceDbConfig.getDbName(), sourceTable, task.getTargetDbId(), targetDbConfig.getDbName(), targetTable, + LocalDateTime.now(), LocalDateTime.now(), (long) totalRows, (long) successRows, (long) failRows, ustatus, + errorMsg, (int) Duration.between(startTime, endTime).getSeconds(), "0"); + taskLogService.save(taskLog); } /** diff --git a/src/main/java/com/mini/capi/mybatis/demo.java b/src/main/java/com/mini/capi/mybatis/demo.java index c94e183..455c6db 100644 --- a/src/main/java/com/mini/capi/mybatis/demo.java +++ b/src/main/java/com/mini/capi/mybatis/demo.java @@ -29,7 +29,7 @@ public class demo { .pathInfo(Collections.singletonMap(OutputFile.xml, System.getProperty("user.dir") + "/src/main/resources/mapper")); }) .strategyConfig(builder -> { - builder.addInclude("biz_sync_task") + builder.addInclude("biz_sync_task_log") .addTablePrefix("biz_") .entityBuilder() .enableLombok() diff --git a/src/main/resources/mapper/SyncTaskLogMapper.xml b/src/main/resources/mapper/SyncTaskLogMapper.xml new file mode 100644 index 0000000..26abf36 --- /dev/null +++ b/src/main/resources/mapper/SyncTaskLogMapper.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + create_time, task_log_id, task_id, task_name, source_db_id, source_db_name, source_table, target_db_id, target_db_name, target_table, start_time, end_time, total_rows, success_rows, fail_rows, sync_status, error_msg, cost_time, f_tenant_id, f_flow_id, f_flow_task_id, f_flow_state + + +