From 85bad69784430d81465e36920857359ca7e0ae68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9A=AE=E5=85=89=EF=BC=9A=E5=9F=8E=E4=B8=AD=E5=9F=8E?= <806783409@qq.com> Date: Sun, 29 Sep 2019 22:05:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=96=87=E6=A1=A3or?= =?UTF-8?q?acle=E6=94=AF=E6=8C=81=E4=BC=98=E5=8C=96=EF=BC=8C=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=BA=92=E5=AF=BC=E9=9B=8F=E5=BD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DbSqlExecutorController.java | 9 +- .../configuration/DatasourceUtil.java | 3 + .../db/mapper/base/ExecuteParam.java | 68 +++++++ .../framework/db/mapper/base/SqlExecutor.java | 33 ++-- .../db/mapper/oracle/OracleBaseMapper.xml | 27 ++- .../framework/db/transfer/SqlParseUtil.java | 184 ++++++++++++++++++ .../db/transfer/TransferDataServer.java | 55 ++++++ zyplayer-doc-manage/pom.xml | 11 +- 8 files changed, 354 insertions(+), 36 deletions(-) create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/ExecuteParam.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/SqlParseUtil.java create mode 100644 zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/TransferDataServer.java diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/controller/DbSqlExecutorController.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/controller/DbSqlExecutorController.java index 3bd5dba8..96b4d942 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/controller/DbSqlExecutorController.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/controller/DbSqlExecutorController.java @@ -16,6 +16,7 @@ import com.zyplayer.doc.data.repository.support.consts.DocAuthConst; import com.zyplayer.doc.data.service.manage.DbFavoriteService; import com.zyplayer.doc.data.service.manage.DbHistoryService; import com.zyplayer.doc.db.framework.consts.DbAuthType; +import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam; import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteResult; import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteType; import com.zyplayer.doc.db.framework.db.mapper.base.SqlExecutor; @@ -69,9 +70,13 @@ public class DbSqlExecutorController { } sqlItem = sqlItem.trim(); try { - Map paramMap = JSON.parseObject(params); ExecuteType executeType = (!manageAuth && select) ? ExecuteType.SELECT : ExecuteType.ALL; - ExecuteResult executeResult = sqlExecutor.execute(sourceId, executeId, executeType, sqlItem, paramMap); + ExecuteParam executeParam = new ExecuteParam(); + executeParam.setDatasourceId(sourceId); + executeParam.setExecuteId(executeId); + executeParam.setExecuteType(executeType); + executeParam.setSql(sqlItem); + ExecuteResult executeResult = sqlExecutor.execute(executeParam); SerializeConfig mapping = new SerializeConfig(); mapping.put(Date.class, new SimpleDateFormatSerializer("yyyy-MM-dd HH:mm:ss")); mapping.put(Timestamp.class, new SimpleDateFormatSerializer("yyyy-MM-dd HH:mm:ss")); diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/configuration/DatasourceUtil.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/configuration/DatasourceUtil.java index 6fae67de..06036b7c 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/configuration/DatasourceUtil.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/configuration/DatasourceUtil.java @@ -38,6 +38,9 @@ public class DatasourceUtil { dataSource.setConnectionErrorRetryAttempts(2); dataSource.setBreakAfterAcquireFailure(true); dataSource.setName("zyplayer-doc-db-" + dbDatasource.getId()); + if (dbDatasource.getSourceUrl().contains("oracle")) { + dataSource.setValidationQuery("select 1 from dual"); + } DruidPooledConnection connection = dataSource.getConnection(3000); if (connection == null) { throw new ConfirmException("尝试获取该数据源连接失败:" + dbDatasource.getSourceUrl()); diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/ExecuteParam.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/ExecuteParam.java new file mode 100644 index 00000000..ed6918d7 --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/ExecuteParam.java @@ -0,0 +1,68 @@ +package com.zyplayer.doc.db.framework.db.mapper.base; + +import org.apache.ibatis.mapping.ParameterMapping; + +import java.util.List; + +/** + * 通过SQL和参数列表处理后的执行参数对象 + * + * @author 暮光:城中城 + * @since 2019-09-28 + */ +public class ExecuteParam { + private String sql; + private List paramList; + private List parameterMappings; + private Long datasourceId; + private String executeId; + private ExecuteType executeType; + + public Long getDatasourceId() { + return datasourceId; + } + + public void setDatasourceId(Long datasourceId) { + this.datasourceId = datasourceId; + } + + public String getExecuteId() { + return executeId; + } + + public void setExecuteId(String executeId) { + this.executeId = executeId; + } + + public ExecuteType getExecuteType() { + return executeType; + } + + public void setExecuteType(ExecuteType executeType) { + this.executeType = executeType; + } + + public List getParameterMappings() { + return parameterMappings; + } + + public void setParameterMappings(List parameterMappings) { + this.parameterMappings = parameterMappings; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public List getParamList() { + return paramList; + } + + public void setParamList(List paramList) { + this.paramList = paramList; + } +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/SqlExecutor.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/SqlExecutor.java index c709aa77..72823acd 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/SqlExecutor.java +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/base/SqlExecutor.java @@ -8,6 +8,7 @@ import com.zyplayer.doc.db.framework.db.bean.DatabaseRegistrationBean; import org.apache.ibatis.builder.SqlSourceBuilder; import org.apache.ibatis.builder.StaticSqlSource; import org.apache.ibatis.mapping.BoundSql; +import org.apache.ibatis.mapping.ParameterMapping; import org.apache.ibatis.parsing.GenericTokenParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,9 +61,9 @@ public class SqlExecutor { * @author 暮光:城中城 * @since 2019年8月18日 */ - public ExecuteResult execute(Long datasourceId, String executeId, ExecuteType executeType, String sql, Map paramMap) { - DatabaseFactoryBean factoryBean = databaseRegistrationBean.getFactoryById(datasourceId); - return this.execute(factoryBean, executeId, executeType, sql, paramMap, null); + public ExecuteResult execute(ExecuteParam param) { + DatabaseFactoryBean factoryBean = databaseRegistrationBean.getFactoryById(param.getDatasourceId()); + return this.execute(factoryBean, param, null); } /** @@ -70,14 +71,14 @@ public class SqlExecutor { * @author 暮光:城中城 * @since 2019年8月18日 */ - public ExecuteResult execute(DatabaseFactoryBean factoryBean, String executeId, ExecuteType executeType, String sqlStr, Map paramMap, ResultHandler handler) { + public ExecuteResult execute(DatabaseFactoryBean factoryBean, ExecuteParam executeParam, ResultHandler handler) { if (factoryBean == null) { - return ExecuteResult.error("未找到数据库连接", sqlStr); + return ExecuteResult.error("未找到数据库连接", executeParam.getSql()); } // BoundSql boundSql = getBoundSql(sql, paramMap); // sql = boundSql.getSql(); // String sqlStr = SqlLogUtil.getSqlString(paramMap, boundSql); - logger.info("sql ==> {}", sqlStr); + logger.info("sql ==> {}", executeParam.getSql()); // List parameterMappings = boundSql.getParameterMappings(); PreparedStatement preparedStatement = null; @@ -86,15 +87,19 @@ public class SqlExecutor { try { long startTime = System.currentTimeMillis(); connection = factoryBean.getDataSource().getConnection(); - preparedStatement = connection.prepareStatement(sqlStr); + preparedStatement = connection.prepareStatement(executeParam.getSql()); // 设置当前的PreparedStatement - statementMap.put(executeId, preparedStatement); -// for (int i = 0; i < parameterMappings.size(); i++) { -// preparedStatement.setObject(i + 1, paramMap.get(parameterMappings.get(i).getProperty())); -// } + statementMap.put(executeParam.getExecuteId(), preparedStatement); + List parameterMappings = executeParam.getParameterMappings(); + List paramDataList = executeParam.getParamList(); + if (parameterMappings != null && paramDataList != null && parameterMappings.size() > 0 && paramDataList.size() > 0) { + for (int i = 0; i < parameterMappings.size(); i++) { + preparedStatement.setObject(i + 1, paramDataList.get(i)); + } + } // 限制下最大数量 preparedStatement.setMaxRows(1000); - if (ExecuteType.SELECT.equals(executeType)) { + if (ExecuteType.SELECT.equals(executeParam.getExecuteType())) { preparedStatement.executeQuery(); } else { preparedStatement.execute(); @@ -119,11 +124,11 @@ public class SqlExecutor { // 更新的数量,小于0代表不是更新语句 int updateCount = preparedStatement.getUpdateCount(); long useTime = System.currentTimeMillis() - startTime; - return new ExecuteResult(updateCount, resultList, useTime, sqlStr); + return new ExecuteResult(updateCount, resultList, useTime, executeParam.getSql()); } catch (Exception e) { throw new RuntimeException(e); } finally { - statementMap.remove(executeId); + statementMap.remove(executeParam.getExecuteId()); try { if (preparedStatement != null && !preparedStatement.isClosed()) { preparedStatement.close(); diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/oracle/OracleBaseMapper.xml b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/oracle/OracleBaseMapper.xml index ced31719..b8465908 100644 --- a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/oracle/OracleBaseMapper.xml +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/mapper/oracle/OracleBaseMapper.xml @@ -1,7 +1,7 @@ - + @@ -9,7 +9,7 @@ - + @@ -18,46 +18,43 @@ - + - + - + - + diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/SqlParseUtil.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/SqlParseUtil.java new file mode 100644 index 00000000..da264a4e --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/SqlParseUtil.java @@ -0,0 +1,184 @@ +package com.zyplayer.doc.db.framework.db.transfer; + +import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam; +import net.sf.jsqlparser.expression.operators.relational.ExpressionList; +import net.sf.jsqlparser.expression.operators.relational.ItemsList; +import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.schema.Column; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.insert.Insert; +import net.sf.jsqlparser.statement.select.PlainSelect; +import org.apache.ibatis.builder.SqlSourceBuilder; +import org.apache.ibatis.builder.StaticSqlSource; +import org.apache.ibatis.mapping.BoundSql; +import org.apache.ibatis.mapping.ParameterMapping; +import org.apache.ibatis.session.Configuration; +import org.springframework.util.CollectionUtils; + +import java.io.StringReader; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SqlParseUtil { + + /** + * 通过SQL和参数列表获取处理后的SQL和参数 + * @param sql + * @param paramList + * @return + */ + public static List getExecuteParamList(String sql, List> paramList) { + // 单条操作 + if (paramList == null || paramList.size() <= 1) { + Map paramMap = CollectionUtils.isEmpty(paramList) ? Collections.emptyMap() : paramList.get(0); + ExecuteParam executeParam = getSingleExecuteParam(sql, paramMap); + return Collections.singletonList(executeParam); + } + // 批量的insert语法 + ExecuteParam multiExecuteParam = getMultiExecuteParam(sql, paramList); + if (multiExecuteParam != null) { + return Collections.singletonList(multiExecuteParam); + } + // 不是insert语法单条处理 + List executeParamList = new LinkedList<>(); + for (Map paramMap : paramList) { + ExecuteParam executeParam = getSingleExecuteParam(sql, paramMap); + executeParamList.add(executeParam); + } + return executeParamList; + } + + /** + * 解析sql,如果是insert则把values拼成多个,用于支持批量插入 + * 不是insert则返回null + * 不支持insert set语法,不支持values以外使用#{column}语法 + * @param sql + * @param paramList + * @return + */ + public static ExecuteParam getMultiExecuteParam(String sql, List> paramList) { + List resultParamList = new LinkedList<>(); + SqlSourceBuilder sqlSourceBuilder = new SqlSourceBuilder(new Configuration()); + String insertSql = sql.replaceAll("(#\\{\\w+})", "'$1'"); + // 处理${column}参数 + Insert insert = getInsertStmt(insertSql); + if (insert == null) { + return null; + } + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("INSERT "); + if (insert.getModifierPriority() != null) { + sqlBuilder.append(insert.getModifierPriority().name()).append(" "); + } + if (insert.isModifierIgnore()) { + sqlBuilder.append("IGNORE "); + } + sqlBuilder.append("INTO "); + sqlBuilder.append(insert.getTable()).append(" "); + if (insert.getColumns() != null) { + sqlBuilder.append(PlainSelect.getStringList(insert.getColumns(), true, true)).append(" "); + } + if (insert.isUseValues()) { + sqlBuilder.append("VALUES "); + } + if (insert.getItemsList() != null) { + ExpressionList expressionList = (ExpressionList) insert.getItemsList(); + String fillSql = expressionList.toString().replaceAll("'(#\\{\\w+})'", "$1"); + StringBuilder itemsSb = new StringBuilder(); + for (Map paramMap : paramList) { + if (itemsSb.length() > 0) { + itemsSb.append(", "); + } + itemsSb.append(fillSql); + // 解析获取参数 + StaticSqlSource parse = (StaticSqlSource) sqlSourceBuilder.parse(fillSql, Object.class, Collections.emptyMap()); + BoundSql boundSql = parse.getBoundSql(new Object()); + List parameterMappings = boundSql.getParameterMappings(); + for (ParameterMapping parameterMapping : parameterMappings) { + resultParamList.add(paramMap.get(parameterMapping.getProperty())); + } + } + sqlBuilder.append(itemsSb.toString()); + } +// pagehelper 使用了jsqlparser包,而且版本很低,,低版本没这个方法 +// if (insert.isUseSet()) { +// throw new RuntimeException("暂不支持insert set语法"); +// } + if (insert.isUseDuplicate()) { + sqlBuilder.append(" ON DUPLICATE KEY UPDATE "); + List duplicateUpdateColumns = insert.getDuplicateUpdateColumns(); + for (int i = 0; i < duplicateUpdateColumns.size(); i++) { + if (i != 0) { + sqlBuilder.append(", "); + } + sqlBuilder.append(duplicateUpdateColumns.get(i)).append(" = "); + sqlBuilder.append(insert.getDuplicateUpdateExpressionList().get(i)); + } + } + if (insert.isReturningAllColumns()) { + sqlBuilder.append(" RETURNING *"); + } else if (insert.getReturningExpressionList() != null) { + sqlBuilder.append(" RETURNING ").append(PlainSelect.getStringList(insert.getReturningExpressionList(), true, false)); + } + StaticSqlSource parse = (StaticSqlSource) sqlSourceBuilder.parse(sqlBuilder.toString(), Object.class, Collections.emptyMap()); + BoundSql boundSql = parse.getBoundSql(new Object()); + List parameterMappings = boundSql.getParameterMappings(); + // values 里面的参数个数和所需参数个数不一致,则不支持批量,说明values外面还存在#{column}的参数 + if (parameterMappings.size() != resultParamList.size()) { + throw new RuntimeException("insert除values以外不支持#{column}的动态参数"); + } + // 组装结果 + ExecuteParam executeParam = new ExecuteParam(); + executeParam.setParameterMappings(parameterMappings); + executeParam.setSql(boundSql.getSql()); + executeParam.setParamList(resultParamList); + return executeParam; + } + + /** + * 解析单条操作的SQL + * @param sql + * @param paramMap + * @return + */ + public static ExecuteParam getSingleExecuteParam(String sql, Map paramMap) { + ExecuteParam executeParam = new ExecuteParam(); + + SqlSourceBuilder sqlSourceBuilder = new SqlSourceBuilder(new Configuration()); + StaticSqlSource parse = (StaticSqlSource) sqlSourceBuilder.parse(sql, Object.class, paramMap); + BoundSql boundSql = parse.getBoundSql(new Object()); + List parameterMappings = boundSql.getParameterMappings(); + List resultParamList = new LinkedList<>(); + for (ParameterMapping parameterMapping : parameterMappings) { + resultParamList.add(paramMap.get(parameterMapping.getProperty())); + } + executeParam.setSql(boundSql.getSql()); + executeParam.setParameterMappings(parameterMappings); + executeParam.setParamList(resultParamList); + return executeParam; + } + + /** + * 获取sql语句里面查询的列 + * + * @return + */ + public static Insert getInsertStmt(String sql) { + try { + CCJSqlParserManager parser = new CCJSqlParserManager(); + Statement stmt = parser.parse(new StringReader(sql)); + if (stmt instanceof Insert) { + Insert insertStmt = (Insert) stmt; + ItemsList itemsList = insertStmt.getItemsList(); + if (itemsList instanceof ExpressionList) { + return insertStmt; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/TransferDataServer.java b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/TransferDataServer.java new file mode 100644 index 00000000..45eea49b --- /dev/null +++ b/zyplayer-doc-db/src/main/java/com/zyplayer/doc/db/framework/db/transfer/TransferDataServer.java @@ -0,0 +1,55 @@ +package com.zyplayer.doc.db.framework.db.transfer; + +import cn.hutool.core.util.RandomUtil; +import com.zyplayer.doc.db.framework.db.bean.DatabaseFactoryBean; +import com.zyplayer.doc.db.framework.db.bean.DatabaseRegistrationBean; +import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam; +import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteResult; +import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteType; +import com.zyplayer.doc.db.framework.db.mapper.base.SqlExecutor; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +@Service +public class TransferDataServer { + + @Resource + SqlExecutor sqlExecutor; + @Resource + DatabaseRegistrationBean databaseRegistrationBean; + // 批量插入一批次的条数 + private final Integer batchInsertNum = 100; + + public void writeData(Long querySourceId, Long storageSourceId, String querySql, String storageSql) { + List> selectResultList = new LinkedList<>(); + DatabaseFactoryBean factoryBean = databaseRegistrationBean.getFactoryById(querySourceId); + ExecuteParam executeParam = new ExecuteParam(); + executeParam.setSql(querySql); + executeParam.setExecuteType(ExecuteType.SELECT); + executeParam.setExecuteId(RandomUtil.simpleUUID()); + ExecuteResult executeResult = sqlExecutor.execute(factoryBean, executeParam, resultMap -> { + selectResultList.add(resultMap); + // 批量输出数据 + if (selectResultList.size() >= batchInsertNum) { + this.writeData(storageSourceId, storageSql, selectResultList); + } + }); + // 不足100的数据 + if (selectResultList.size() > 0) { + this.writeData(storageSourceId, storageSql, selectResultList); + } + } + + public void writeData(Long storageSourceId, String storageSql, List> selectResultList) { + List executeParamList = SqlParseUtil.getExecuteParamList(storageSql, selectResultList); + for (ExecuteParam executeParam : executeParamList) { + executeParam.setDatasourceId(storageSourceId); + sqlExecutor.execute(executeParam); + } + selectResultList.clear(); + } +} diff --git a/zyplayer-doc-manage/pom.xml b/zyplayer-doc-manage/pom.xml index 6b7a2c7b..39e8e081 100644 --- a/zyplayer-doc-manage/pom.xml +++ b/zyplayer-doc-manage/pom.xml @@ -73,11 +73,12 @@ zyplayer-doc-data ${zyplayer.doc.version} - - com.zyplayer - zyplayer-doc-grpc - ${zyplayer.doc.version} - + + + + + + com.zyplayer