数据互导功能开发

This commit is contained in:
暮光:城中城
2019-10-06 18:19:27 +08:00
parent 9b4decf68a
commit aebc6d2d8b
21 changed files with 902 additions and 39 deletions

View File

@@ -1,14 +1,22 @@
package com.zyplayer.doc.db.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zyplayer.doc.core.annotation.AuthMan;
import com.zyplayer.doc.core.json.ResponseJson;
import com.zyplayer.doc.data.config.security.DocUserDetails;
import com.zyplayer.doc.data.config.security.DocUserUtil;
import com.zyplayer.doc.data.repository.manage.entity.DbTransferTask;
import com.zyplayer.doc.data.service.manage.DbTransferTaskService;
import com.zyplayer.doc.db.framework.db.transfer.SqlParseUtil;
import com.zyplayer.doc.db.framework.db.transfer.TransferDataServer;
import com.zyplayer.doc.db.framework.json.DocDbResponseJson;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* 数据互导工具
@@ -20,16 +28,69 @@ import javax.annotation.Resource;
@RestController
@RequestMapping("/zyplayer-doc-db/transfer")
public class DbTransferDataController {
@Resource
TransferDataServer transferDataServer;
@Resource
DbTransferTaskService dbTransferTaskService;
@GetMapping(value = "/start")
public ResponseJson doTransfer() {
Long querySourceId = 5L;
Long storageSourceId = 5L;
String querySql = "select * from zyplayer_doc_manage.wiki_space where id > 0";
String storageSql = "insert into zyplayer_doc_manage._temp_wiki_space_20190928(`id`, `name`, `type`, `space_explain`, `edit_type`, `tree_lazy_load`, `open_doc`, `uuid`, `create_user_id`, `create_user_name`, `create_time`, `del_flag`) VALUES (#{id}, #{name}, #{type}, #{space_explain}, #{edit_type}, #{tree_lazy_load}, #{open_doc}, #{uuid}, #{create_user_id}, #{create_user_name}, #{create_time}, #{del_flag})";
transferDataServer.transferData(querySourceId, storageSourceId, querySql, storageSql);
@PostMapping(value = "/start")
public ResponseJson doTransfer(Long id) {
transferDataServer.transferData(id);
return DocDbResponseJson.ok();
}
@PostMapping(value = "/cancel")
public ResponseJson cancel(Long id) {
transferDataServer.cancel(id);
return DocDbResponseJson.ok();
}
@PostMapping(value = "/list")
public ResponseJson list() {
QueryWrapper<DbTransferTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("del_flag", 0);
queryWrapper.select(
"id", "name", "query_datasource_id", "storage_datasource_id", "need_count", "last_execute_status",
"query_sql", "storage_sql", "last_execute_time", "create_user_name", "create_time"
);
List<DbTransferTask> taskList = dbTransferTaskService.list(queryWrapper);
return DocDbResponseJson.ok(taskList);
}
@PostMapping(value = "/detail")
public ResponseJson detail(Long id) {
DbTransferTask transferTask = dbTransferTaskService.getById(id);
return DocDbResponseJson.ok(transferTask);
}
@PostMapping(value = "/update")
public ResponseJson update(DbTransferTask transferTask) {
DbTransferTask transferTaskUp = new DbTransferTask();
if (transferTask.getId() == null) {
DocUserDetails currentUser = DocUserUtil.getCurrentUser();
transferTaskUp.setCreateTime(new Date());
transferTaskUp.setCreateUserId(currentUser.getUserId());
transferTaskUp.setCreateUserName(currentUser.getUsername());
transferTaskUp.setDelFlag(0);
} else {
transferTaskUp.setId(transferTask.getId());
}
transferTaskUp.setName(transferTask.getName());
transferTaskUp.setQueryDatasourceId(transferTask.getQueryDatasourceId());
transferTaskUp.setStorageDatasourceId(transferTask.getStorageDatasourceId());
transferTaskUp.setQuerySql(transferTask.getQuerySql());
transferTaskUp.setStorageSql(transferTask.getStorageSql());
transferTaskUp.setNeedCount(transferTask.getNeedCount());
transferTaskUp.setDelFlag(transferTask.getDelFlag());
dbTransferTaskService.saveOrUpdate(transferTaskUp);
return DocDbResponseJson.ok();
}
@PostMapping("/sqlColumns")
public ResponseJson sqlColumns(String sql) {
List<String> selectNames = SqlParseUtil.getSelectNames(sql);
return DocDbResponseJson.ok(selectNames);
}
}

View File

@@ -45,13 +45,18 @@ public class SqlExecutor {
* @author 暮光:城中城
* @since 2019年8月18日
*/
public void cancel(String executeId) {
public boolean cancel(String executeId) {
PreparedStatement preparedStatement = statementMap.remove(executeId);
try {
preparedStatement.cancel();
if (preparedStatement != null) {
preparedStatement.cancel();
return true;
}
logger.error("未找到指定任务,取消执行失败:{}", executeId);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**

View File

@@ -1,17 +1,17 @@
package com.zyplayer.doc.db.framework.db.transfer;
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.expression.operators.relational.ItemsList;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.Join;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectBody;
import net.sf.jsqlparser.statement.select.*;
import org.apache.commons.lang.StringUtils;
import org.apache.ibatis.builder.SqlSourceBuilder;
import org.apache.ibatis.builder.StaticSqlSource;
import org.apache.ibatis.mapping.BoundSql;
@@ -21,6 +21,7 @@ import org.springframework.util.CollectionUtils;
import java.io.StringReader;
import java.util.*;
import java.util.stream.Collectors;
/**
* sql预处理工具
@@ -238,6 +239,52 @@ public class SqlParseUtil {
return null;
}
/**
* 获取sql语句里面查询的列
*
* @param sql
* @return
*/
public static List<String> getSelectNames(String sql) {
Statement stmt;
try {
CCJSqlParserManager parser = new CCJSqlParserManager();
stmt = parser.parse(new StringReader(sql));
} catch (JSQLParserException e) {
e.printStackTrace();
return Collections.emptyList();
}
if (stmt instanceof Select) {
Select selectStatement = (Select) stmt;
SelectBody selectBody = selectStatement.getSelectBody();
if (selectBody instanceof PlainSelect) {
PlainSelect plainBody = (PlainSelect) selectBody;
List<SelectItem> selectItems = plainBody.getSelectItems();
return selectItems.stream().map(val -> {
if (val instanceof SelectExpressionItem) {
Alias alias = ((SelectExpressionItem) val).getAlias();
if (alias != null) {
String name = alias.getName();
name = StringUtils.removeStart(name, "`");
name = StringUtils.removeEnd(name, "`");
return name;
}
Expression expression = ((SelectExpressionItem) val).getExpression();
if (expression instanceof Column) {
String name = ((Column) expression).getColumnName();
name = StringUtils.removeStart(name, "`");
name = StringUtils.removeEnd(name, "`");
return name;
}
}
return null;
})
.filter(StringUtils::isNotBlank).collect(Collectors.toList());
}
}
return Collections.emptyList();
}
public static void main(String[] args) {
String storageSql = "select * from zyplayer_doc_manage.wiki_space where id=1 group by id";
storageSql = storageSql.replaceAll("(#\\{\\w+})", "'$1'");

View File

@@ -1,6 +1,15 @@
package com.zyplayer.doc.db.framework.db.transfer;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.RandomUtil;
import com.zyplayer.doc.core.exception.ConfirmException;
import com.zyplayer.doc.data.config.security.DocUserDetails;
import com.zyplayer.doc.data.config.security.DocUserUtil;
import com.zyplayer.doc.data.repository.manage.entity.DbTransferTask;
import com.zyplayer.doc.data.repository.support.consts.DocAuthConst;
import com.zyplayer.doc.data.service.manage.DbTransferTaskService;
import com.zyplayer.doc.data.utils.ThreadPoolUtil;
import com.zyplayer.doc.db.framework.consts.DbAuthType;
import com.zyplayer.doc.db.framework.db.bean.DatabaseFactoryBean;
import com.zyplayer.doc.db.framework.db.bean.DatabaseRegistrationBean;
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam;
@@ -9,6 +18,7 @@ import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteType;
import com.zyplayer.doc.db.framework.db.mapper.base.SqlExecutor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -17,6 +27,9 @@ import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 数据互导服务
@@ -31,30 +44,118 @@ public class TransferDataServer {
@Resource
SqlExecutor sqlExecutor;
@Resource
DbTransferTaskService dbTransferTaskService;
@Resource
DatabaseRegistrationBean databaseRegistrationBean;
// 批量插入一批次的条数
private final Integer batchInsertNum = 100;
// 多少条时输出条数日志
private final Integer executeCountLogNum = 5000;
// 执行记录信息
private static final Map<Long, String> taskExecuteMap = new ConcurrentHashMap<>();
public void transferData(Long querySourceId, Long storageSourceId, String querySql, String storageSql) {
/**
* 取消任务执行
* @param taskId
* @author 暮光:城中城
* @since 2019年9月28日
*/
public void cancel(Long taskId) {
String executeId = taskExecuteMap.get(taskId);
if (StringUtils.isBlank(executeId)) {
throw new ConfirmException("该任务不在执行中,取消失败");
}
if (sqlExecutor.cancel(executeId)) {
DocUserDetails currentUser = DocUserUtil.getCurrentUser();
String executeInfo = String.format("[%s] %s 手动终止了此任务", DateTime.now().toString(), currentUser.getUsername());
dbTransferTaskService.addExecuteInfo(taskId, TransferTaskStatus.CANCEL.getCode(), executeInfo);
} else {
throw new ConfirmException("终止该任务失败");
}
}
/**
* 查询任务,执行数据查询和数据写入
* @param taskId
* @author 暮光:城中城
* @since 2019年9月28日
*/
public void transferData(Long taskId) {
DbTransferTask transferTask = dbTransferTaskService.getById(taskId);
if (transferTask == null || transferTask.getDelFlag() == 1) {
throw new ConfirmException("未找到该任务记录,创建任务失败");
}
if (Objects.equals(transferTask.getLastExecuteStatus(), 1)) {
throw new ConfirmException("任务正在执行中,请勿重复执行");
}
boolean manageAuth = DocUserUtil.haveAuth(DocAuthConst.DB_DATASOURCE_MANAGE);
boolean querySelect = DocUserUtil.haveCustomAuth(DbAuthType.SELECT.getName(), DocAuthConst.DB + transferTask.getQueryDatasourceId());
boolean queryUpdate = DocUserUtil.haveCustomAuth(DbAuthType.UPDATE.getName(), DocAuthConst.DB + transferTask.getQueryDatasourceId());
if (!manageAuth && !querySelect && !queryUpdate) {
throw new ConfirmException("没有该数据源的查询权限,创建任务失败");
}
boolean storageUpdate = DocUserUtil.haveCustomAuth(DbAuthType.UPDATE.getName(), DocAuthConst.DB + transferTask.getStorageDatasourceId());
if (!manageAuth && !storageUpdate) {
throw new ConfirmException("没有该数据源的写入权限,创建任务失败");
}
dbTransferTaskService.resetExecuteInfo(taskId);
// 提交任务
ThreadPoolUtil.getThreadPool().submit(() -> {
String executeInfo = String.format("[%s] 任务开始执行", DateTime.now().toString());
dbTransferTaskService.addExecuteInfo(taskId, TransferTaskStatus.EXECUTING.getCode(), executeInfo);
String executeId = RandomUtil.simpleUUID();
taskExecuteMap.put(taskId, executeId);
this.transferData(transferTask, executeId);
});
}
/**
* 执行数据查询和数据写入
* @param transferTask
* @param executeId
* @author 暮光:城中城
* @since 2019年9月28日
*/
private void transferData(DbTransferTask transferTask, String executeId) {
Long querySourceId = transferTask.getQueryDatasourceId();
Long storageSourceId = transferTask.getStorageDatasourceId();
String querySql = transferTask.getQuerySql();
String storageSql = transferTask.getStorageSql();
List<Map<String, Object>> selectResultList = new LinkedList<>();
DatabaseFactoryBean factoryBean = databaseRegistrationBean.getFactoryById(querySourceId);
ExecuteParam executeParam = new ExecuteParam();
executeParam.setDatasourceId(querySourceId);
executeParam.setExecuteType(ExecuteType.SELECT);
executeParam.setExecuteId(RandomUtil.simpleUUID());
executeParam.setExecuteId(executeId);
try {
String selectCountSql = SqlParseUtil.getSelectCountSql(querySql);
executeParam.setSql(selectCountSql);
ExecuteResult countResult = sqlExecutor.execute(executeParam);
if (CollectionUtils.isEmpty(countResult.getResult())) {
logger.error("获取总条数失败");
return;
long executeStartTime = System.currentTimeMillis();
if (Objects.equals(transferTask.getNeedCount(), 1)) {
String selectCountSql = SqlParseUtil.getSelectCountSql(querySql);
executeParam.setSql(selectCountSql);
ExecuteResult countResult = sqlExecutor.execute(executeParam);
if (CollectionUtils.isEmpty(countResult.getResult())) {
String executeInfo = String.format("[%s] 获取总条数失败", DateTime.now().toString());
logger.error(executeInfo);
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.ERROR.getCode(), executeInfo);
return;
}
Object transferCount = countResult.getResult().get(0).get("counts");
String executeInfo = String.format("[%s] 待处理总条数:%s查询总条数耗时%sms", DateTime.now().toString(), transferCount, System.currentTimeMillis() - executeStartTime);
logger.info(executeInfo);
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), executeInfo);
} else {
String executeInfo = String.format("[%s] 未开启查询总条数,跳过条数查询", DateTime.now().toString());
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), executeInfo);
}
logger.info("总条数:{}", countResult.getResult().get(0).get("counts"));
AtomicLong readCount = new AtomicLong(0L);
executeParam.setSql(querySql);
// executeParam.setSql("select sleep(10)");
ExecuteResult executeResult = sqlExecutor.execute(factoryBean, executeParam, resultMap -> {
selectResultList.add(resultMap);
if (readCount.incrementAndGet() % executeCountLogNum == 0) {
String executeInfo = String.format("[%s] 已处理条数:%s", DateTime.now().toString(), readCount.get());
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), executeInfo);
}
// 批量输出数据
if (selectResultList.size() >= batchInsertNum) {
this.writeData(storageSourceId, storageSql, selectResultList);
@@ -65,13 +166,28 @@ public class TransferDataServer {
this.writeData(storageSourceId, storageSql, selectResultList);
}
if (StringUtils.isNotBlank(executeResult.getErrMsg())) {
logger.error("执行出错:{}", executeResult.getErrMsg());
String executeInfo = String.format("[%s] 执行出错:%s", DateTime.now().toString(), executeResult.getErrMsg());
logger.error(executeInfo);
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.ERROR.getCode(), executeInfo);
} else {
String executeInfo = String.format("[%s] 任务执行成功,处理总条数:%s总耗时%sms", DateTime.now().toString(), readCount.get(), System.currentTimeMillis() - executeStartTime);
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.SUCCESS.getCode(), executeInfo);
}
} catch (Exception e) {
logger.error("SQL执行出错", e);
String executeInfo = String.format("[%s] 处理出错:%s", DateTime.now().toString(), ExceptionUtils.getFullStackTrace(e));
dbTransferTaskService.addExecuteInfo(transferTask.getId(), TransferTaskStatus.ERROR.getCode(), executeInfo);
}
}
/**
* 写数据操作
* @param storageSourceId
* @param storageSql
* @param selectResultList
* @author 暮光:城中城
* @since 2019年9月28日
*/
private void writeData(Long storageSourceId, String storageSql, List<Map<String, Object>> selectResultList) {
List<ExecuteParam> executeParamList = SqlParseUtil.getExecuteParamList(storageSql, selectResultList);
for (ExecuteParam executeParam : executeParamList) {

View File

@@ -0,0 +1,19 @@
package com.zyplayer.doc.db.framework.db.transfer;
public enum TransferTaskStatus {
// 最后执行状态 0=未执行 1=执行中 2=执行成功 3=执行失败 4=取消执行
NOT_START(0), EXECUTING(1), SUCCESS(2), ERROR(3), CANCEL(4);
private Integer code;
TransferTaskStatus(Integer code) {
this.code = code;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
}