|
@@ -1,5 +1,14 @@
|
|
|
package com.alibaba.otter.canal.client.adapter.clickhouse.service;
|
|
|
|
|
|
+import java.sql.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+
|
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
|
|
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig.DbMapping;
|
|
@@ -9,19 +18,13 @@ import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.Util;
|
|
|
|
|
|
-import javax.sql.DataSource;
|
|
|
-import java.sql.*;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.LinkedHashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
-
|
|
|
/**
|
|
|
* ClickHouse ETL 操作业务类
|
|
|
*
|
|
|
- * @author rewerma @ 2018-11-7
|
|
|
- * @version 1.0.0
|
|
|
+ * @author: Xander
|
|
|
+ * @date: Created in 2023/11/10 22:23
|
|
|
+ * @email: zhrunxin33@gmail.com
|
|
|
+ * @version 1.1.8
|
|
|
*/
|
|
|
public class ClickHouseEtlService extends AbstractEtlService {
|
|
|
|
|
@@ -58,23 +61,23 @@ public class ClickHouseEtlService extends AbstractEtlService {
|
|
|
Util.sqlRS(targetDS,
|
|
|
"SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " LIMIT 1 ",
|
|
|
rs -> {
|
|
|
- try {
|
|
|
+ try {
|
|
|
+
|
|
|
+ ResultSetMetaData rsd = rs.getMetaData();
|
|
|
+ int columnCount = rsd.getColumnCount();
|
|
|
+ List<String> columns = new ArrayList<>();
|
|
|
+ for (int i = 1; i <= columnCount; i++) {
|
|
|
+ columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
|
|
|
+ columns.add(rsd.getColumnName(i));
|
|
|
+ }
|
|
|
|
|
|
- ResultSetMetaData rsd = rs.getMetaData();
|
|
|
- int columnCount = rsd.getColumnCount();
|
|
|
- List<String> columns = new ArrayList<>();
|
|
|
- for (int i = 1; i <= columnCount; i++) {
|
|
|
- columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
|
|
|
- columns.add(rsd.getColumnName(i));
|
|
|
+ columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
- columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- return false;
|
|
|
- }
|
|
|
- });
|
|
|
+ });
|
|
|
|
|
|
Util.sqlRS(srcDS, sql, values, rs -> {
|
|
|
int idx = 1;
|
|
@@ -86,8 +89,10 @@ public class ClickHouseEtlService extends AbstractEtlService {
|
|
|
insertSql.append("INSERT INTO ")
|
|
|
.append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()))
|
|
|
.append(" (");
|
|
|
- columnsMap
|
|
|
- .forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick).append(targetColumnName).append(backtick).append(","));
|
|
|
+ columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
|
|
|
+ .append(targetColumnName)
|
|
|
+ .append(backtick)
|
|
|
+ .append(","));
|
|
|
|
|
|
int len = insertSql.length();
|
|
|
insertSql.delete(len - 1, len).append(") VALUES (");
|
|
@@ -97,7 +102,7 @@ public class ClickHouseEtlService extends AbstractEtlService {
|
|
|
}
|
|
|
len = insertSql.length();
|
|
|
insertSql.delete(len - 1, len).append(")");
|
|
|
- logger.info("executeSqlImport sql:{}",insertSql.toString());
|
|
|
+ logger.info("executeSqlImport sql:{}", insertSql.toString());
|
|
|
try (Connection connTarget = targetDS.getConnection();
|
|
|
PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
|
|
|
connTarget.setAutoCommit(false);
|
|
@@ -110,7 +115,8 @@ public class ClickHouseEtlService extends AbstractEtlService {
|
|
|
// 删除数据
|
|
|
Map<String, Object> pkVal = new LinkedHashMap<>();
|
|
|
StringBuilder deleteSql = new StringBuilder(
|
|
|
- "ALTER TABLE " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " DELETE WHERE ");
|
|
|
+ "ALTER TABLE " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())
|
|
|
+ + " DELETE WHERE ");
|
|
|
appendCondition(dbMapping, deleteSql, pkVal, rs, backtick);
|
|
|
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
|
|
|
int k = 1;
|