数据互导逻辑开发
This commit is contained in:
@@ -92,14 +92,16 @@ public class DbDatasourceController {
|
||||
newFactoryBeanList.add(factoryBean);
|
||||
}
|
||||
}
|
||||
// 创建新的数据源
|
||||
DatabaseFactoryBean databaseFactoryBean = DatasourceUtil.createDatabaseFactoryBean(dbDatasourceSel);
|
||||
if (databaseFactoryBean != null) {
|
||||
newFactoryBeanList.add(databaseFactoryBean);
|
||||
}
|
||||
databaseRegistrationBean.setDatabaseFactoryBeanList(newFactoryBeanList);
|
||||
if (databaseFactoryBean == null) {
|
||||
return DocDbResponseJson.warn("创建数据源失败,请检查配置是否正确");
|
||||
if (Optional.ofNullable(dbDatasourceSel.getYn()).orElse(0) == 1) {
|
||||
// 创建新的数据源
|
||||
DatabaseFactoryBean databaseFactoryBean = DatasourceUtil.createDatabaseFactoryBean(dbDatasourceSel);
|
||||
if (databaseFactoryBean != null) {
|
||||
newFactoryBeanList.add(databaseFactoryBean);
|
||||
}
|
||||
databaseRegistrationBean.setDatabaseFactoryBeanList(newFactoryBeanList);
|
||||
if (databaseFactoryBean == null) {
|
||||
return DocDbResponseJson.warn("创建数据源失败,请检查配置是否正确");
|
||||
}
|
||||
}
|
||||
return DocDbResponseJson.ok();
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ public class DbSqlExecutorController {
|
||||
executeParam.setExecuteId(executeId);
|
||||
executeParam.setExecuteType(executeType);
|
||||
executeParam.setSql(sqlItem);
|
||||
executeParam.setMaxRows(1000);
|
||||
ExecuteResult executeResult = sqlExecutor.execute(executeParam);
|
||||
SerializeConfig mapping = new SerializeConfig();
|
||||
mapping.put(Date.class, new SimpleDateFormatSerializer("yyyy-MM-dd HH:mm:ss"));
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.zyplayer.doc.db.controller;
|
||||
|
||||
import com.zyplayer.doc.core.annotation.AuthMan;
|
||||
import com.zyplayer.doc.core.json.ResponseJson;
|
||||
import com.zyplayer.doc.db.framework.db.transfer.TransferDataServer;
|
||||
import com.zyplayer.doc.db.framework.json.DocDbResponseJson;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 数据互导工具
|
||||
*
|
||||
* @author 暮光:城中城
|
||||
* @since 2019年9月28日
|
||||
*/
|
||||
@AuthMan
|
||||
@RestController
|
||||
@RequestMapping("/zyplayer-doc-db/transfer")
|
||||
public class DbTransferDataController {
|
||||
@Resource
|
||||
TransferDataServer transferDataServer;
|
||||
|
||||
@GetMapping(value = "/start")
|
||||
public ResponseJson doTransfer() {
|
||||
Long querySourceId = 5L;
|
||||
Long storageSourceId = 5L;
|
||||
String querySql = "select * from zyplayer_doc_manage.wiki_space where id > 0";
|
||||
String storageSql = "insert into zyplayer_doc_manage._temp_wiki_space_20190928(`id`, `name`, `type`, `space_explain`, `edit_type`, `tree_lazy_load`, `open_doc`, `uuid`, `create_user_id`, `create_user_name`, `create_time`, `del_flag`) VALUES (#{id}, #{name}, #{type}, #{space_explain}, #{edit_type}, #{tree_lazy_load}, #{open_doc}, #{uuid}, #{create_user_id}, #{create_user_name}, #{create_time}, #{del_flag})";
|
||||
transferDataServer.transferData(querySourceId, storageSourceId, querySql, storageSql);
|
||||
return DocDbResponseJson.ok();
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ public class ExecuteParam {
|
||||
private List<Object> paramList;
|
||||
private List<ParameterMapping> parameterMappings;
|
||||
private Long datasourceId;
|
||||
private Integer maxRows;
|
||||
private String executeId;
|
||||
private ExecuteType executeType;
|
||||
|
||||
@@ -65,4 +66,12 @@ public class ExecuteParam {
|
||||
public void setParamList(List<Object> paramList) {
|
||||
this.paramList = paramList;
|
||||
}
|
||||
|
||||
public Integer getMaxRows() {
|
||||
return maxRows;
|
||||
}
|
||||
|
||||
public void setMaxRows(Integer maxRows) {
|
||||
this.maxRows = maxRows;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.MybatisConfiguration;
|
||||
import com.zyplayer.doc.data.service.manage.DbHistoryService;
|
||||
import com.zyplayer.doc.db.framework.db.bean.DatabaseFactoryBean;
|
||||
import com.zyplayer.doc.db.framework.db.bean.DatabaseRegistrationBean;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.ibatis.builder.SqlSourceBuilder;
|
||||
import org.apache.ibatis.builder.StaticSqlSource;
|
||||
import org.apache.ibatis.mapping.BoundSql;
|
||||
@@ -18,10 +19,7 @@ import javax.annotation.Resource;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -78,7 +76,10 @@ public class SqlExecutor {
|
||||
// BoundSql boundSql = getBoundSql(sql, paramMap);
|
||||
// sql = boundSql.getSql();
|
||||
// String sqlStr = SqlLogUtil.getSqlString(paramMap, boundSql);
|
||||
logger.info("sql ==> {}", executeParam.getSql());
|
||||
// 有参数的时候不输出日志,暂时只有导数据才有参数
|
||||
if (CollectionUtils.isEmpty(executeParam.getParamList())) {
|
||||
logger.info("sql ==> {}", executeParam.getSql());
|
||||
}
|
||||
|
||||
// List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
|
||||
PreparedStatement preparedStatement = null;
|
||||
@@ -97,13 +98,15 @@ public class SqlExecutor {
|
||||
preparedStatement.setObject(i + 1, paramDataList.get(i));
|
||||
}
|
||||
}
|
||||
// 限制下最大数量
|
||||
preparedStatement.setMaxRows(1000);
|
||||
if (ExecuteType.SELECT.equals(executeParam.getExecuteType())) {
|
||||
preparedStatement.executeQuery();
|
||||
} else {
|
||||
preparedStatement.execute();
|
||||
}
|
||||
// 限制下最大数量
|
||||
if (executeParam.getMaxRows() != null) {
|
||||
preparedStatement.setMaxRows(executeParam.getMaxRows());
|
||||
}
|
||||
// 查询的结果集
|
||||
ResultSet resultSet = preparedStatement.getResultSet();
|
||||
List<Map<String, Object>> resultList = new LinkedList<>();
|
||||
|
||||
@@ -5,9 +5,13 @@ import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
|
||||
import net.sf.jsqlparser.expression.operators.relational.ItemsList;
|
||||
import net.sf.jsqlparser.parser.CCJSqlParserManager;
|
||||
import net.sf.jsqlparser.schema.Column;
|
||||
import net.sf.jsqlparser.schema.Table;
|
||||
import net.sf.jsqlparser.statement.Statement;
|
||||
import net.sf.jsqlparser.statement.insert.Insert;
|
||||
import net.sf.jsqlparser.statement.select.Join;
|
||||
import net.sf.jsqlparser.statement.select.PlainSelect;
|
||||
import net.sf.jsqlparser.statement.select.Select;
|
||||
import net.sf.jsqlparser.statement.select.SelectBody;
|
||||
import org.apache.ibatis.builder.SqlSourceBuilder;
|
||||
import org.apache.ibatis.builder.StaticSqlSource;
|
||||
import org.apache.ibatis.mapping.BoundSql;
|
||||
@@ -16,11 +20,14 @@ import org.apache.ibatis.session.Configuration;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* sql预处理工具
|
||||
*
|
||||
* @author 暮光:城中城
|
||||
* @since 2019年9月28日
|
||||
*/
|
||||
public class SqlParseUtil {
|
||||
|
||||
/**
|
||||
@@ -181,4 +188,60 @@ public class SqlParseUtil {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取sql语句里面查询的列
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static String getSelectCountSql(String sqlStr) {
|
||||
try {
|
||||
CCJSqlParserManager parser = new CCJSqlParserManager();
|
||||
Statement stmt = parser.parse(new StringReader(sqlStr));
|
||||
if (stmt instanceof Select) {
|
||||
Select selectStmt = (Select) stmt;
|
||||
SelectBody selectBody = selectStmt.getSelectBody();
|
||||
if (selectBody instanceof PlainSelect) {
|
||||
PlainSelect select = (PlainSelect) selectBody;
|
||||
StringBuilder countSql = new StringBuilder();
|
||||
countSql.append("SELECT count(0) counts");
|
||||
if (select.getFromItem() != null) {
|
||||
countSql.append(" FROM ").append(select.getFromItem());
|
||||
if (select.getJoins() != null) {
|
||||
Iterator<Join> it = select.getJoins().iterator();
|
||||
while (it.hasNext()) {
|
||||
Join join = it.next();
|
||||
if (join.isSimple()) {
|
||||
countSql.append(", ").append(join);
|
||||
} else {
|
||||
countSql.append(" ").append(join);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (select.getWhere() != null) {
|
||||
countSql.append(" WHERE ").append(select.getWhere());
|
||||
}
|
||||
if (select.getOracleHierarchical() != null) {
|
||||
countSql.append(select.getOracleHierarchical().toString());
|
||||
}
|
||||
countSql.append(PlainSelect.getFormatedList(select.getGroupByColumnReferences(), "GROUP BY"));
|
||||
if (select.getHaving() != null) {
|
||||
countSql.append(" HAVING ").append(select.getHaving());
|
||||
}
|
||||
}
|
||||
return countSql.toString();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
String storageSql = "select * from zyplayer_doc_manage.wiki_space where id=1 group by id";
|
||||
storageSql = storageSql.replaceAll("(#\\{\\w+})", "'$1'");
|
||||
String selectCountSql = getSelectCountSql(storageSql);
|
||||
System.out.println(selectCountSql);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,10 @@ import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam;
|
||||
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteResult;
|
||||
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteType;
|
||||
import com.zyplayer.doc.db.framework.db.mapper.base.SqlExecutor;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@@ -14,8 +18,15 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 数据互导服务
|
||||
*
|
||||
* @author 暮光:城中城
|
||||
* @since 2019年9月28日
|
||||
*/
|
||||
@Service
|
||||
public class TransferDataServer {
|
||||
private static Logger logger = LoggerFactory.getLogger(TransferDataServer.class);
|
||||
|
||||
@Resource
|
||||
SqlExecutor sqlExecutor;
|
||||
@@ -24,30 +35,48 @@ public class TransferDataServer {
|
||||
// 批量插入一批次的条数
|
||||
private final Integer batchInsertNum = 100;
|
||||
|
||||
public void writeData(Long querySourceId, Long storageSourceId, String querySql, String storageSql) {
|
||||
public void transferData(Long querySourceId, Long storageSourceId, String querySql, String storageSql) {
|
||||
List<Map<String, Object>> selectResultList = new LinkedList<>();
|
||||
DatabaseFactoryBean factoryBean = databaseRegistrationBean.getFactoryById(querySourceId);
|
||||
ExecuteParam executeParam = new ExecuteParam();
|
||||
executeParam.setSql(querySql);
|
||||
executeParam.setDatasourceId(querySourceId);
|
||||
executeParam.setExecuteType(ExecuteType.SELECT);
|
||||
executeParam.setExecuteId(RandomUtil.simpleUUID());
|
||||
ExecuteResult executeResult = sqlExecutor.execute(factoryBean, executeParam, resultMap -> {
|
||||
selectResultList.add(resultMap);
|
||||
// 批量输出数据
|
||||
if (selectResultList.size() >= batchInsertNum) {
|
||||
try {
|
||||
String selectCountSql = SqlParseUtil.getSelectCountSql(querySql);
|
||||
executeParam.setSql(selectCountSql);
|
||||
ExecuteResult countResult = sqlExecutor.execute(executeParam);
|
||||
if (CollectionUtils.isEmpty(countResult.getResult())) {
|
||||
logger.error("获取总条数失败");
|
||||
return;
|
||||
}
|
||||
logger.info("总条数:{}", countResult.getResult().get(0).get("counts"));
|
||||
|
||||
executeParam.setSql(querySql);
|
||||
ExecuteResult executeResult = sqlExecutor.execute(factoryBean, executeParam, resultMap -> {
|
||||
selectResultList.add(resultMap);
|
||||
// 批量输出数据
|
||||
if (selectResultList.size() >= batchInsertNum) {
|
||||
this.writeData(storageSourceId, storageSql, selectResultList);
|
||||
}
|
||||
});
|
||||
// 不足100的数据
|
||||
if (selectResultList.size() > 0) {
|
||||
this.writeData(storageSourceId, storageSql, selectResultList);
|
||||
}
|
||||
});
|
||||
// 不足100的数据
|
||||
if (selectResultList.size() > 0) {
|
||||
this.writeData(storageSourceId, storageSql, selectResultList);
|
||||
if (StringUtils.isNotBlank(executeResult.getErrMsg())) {
|
||||
logger.error("执行出错:{}", executeResult.getErrMsg());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("SQL执行出错:", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeData(Long storageSourceId, String storageSql, List<Map<String, Object>> selectResultList) {
|
||||
private void writeData(Long storageSourceId, String storageSql, List<Map<String, Object>> selectResultList) {
|
||||
List<ExecuteParam> executeParamList = SqlParseUtil.getExecuteParamList(storageSql, selectResultList);
|
||||
for (ExecuteParam executeParam : executeParamList) {
|
||||
executeParam.setDatasourceId(storageSourceId);
|
||||
executeParam.setExecuteId(RandomUtil.simpleUUID());
|
||||
sqlExecutor.execute(executeParam);
|
||||
}
|
||||
selectResultList.clear();
|
||||
|
||||
Reference in New Issue
Block a user