mcy před 6 roky
rodič
revize
05cc627b5d

+ 101 - 91
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -13,12 +13,12 @@ import java.util.concurrent.atomic.AtomicLong;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
-import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.google.common.base.Joiner;
 import com.google.common.base.Joiner;
@@ -36,7 +36,8 @@ public class RdbEtlService {
     /**
     /**
      * 导入数据
      * 导入数据
      */
      */
-    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config, List<String> params) {
+    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
+                                       List<String> params) {
         EtlResult etlResult = new EtlResult();
         EtlResult etlResult = new EtlResult();
         AtomicLong successCount = new AtomicLong();
         AtomicLong successCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
         List<String> errMsg = new ArrayList<>();
@@ -53,8 +54,8 @@ public class RdbEtlService {
             long start = System.currentTimeMillis();
             long start = System.currentTimeMillis();
 
 
             // 拼接sql
             // 拼接sql
-            StringBuilder sql = new StringBuilder("SELECT * FROM " + dbMapping.getDatabase() + "."
-                                                  + dbMapping.getTable());
+            StringBuilder sql = new StringBuilder(
+                "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
 
 
             // 拼接条件
             // 拼接条件
             appendCondition(params, dbMapping, srcDS, sql);
             appendCondition(params, dbMapping, srcDS, sql);
@@ -91,12 +92,8 @@ public class RdbEtlService {
                     } else {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
                     }
-                    Future<Boolean> future = executor.submit(() -> executeSqlImport(srcDS,
-                        targetDS,
-                        sqlFinal,
-                        dbMapping,
-                        successCount,
-                        errMsg));
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
                     futures.add(future);
                     futures.add(future);
                 }
                 }
 
 
@@ -109,11 +106,11 @@ public class RdbEtlService {
                 executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
                 executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
             }
             }
 
 
-            logger.info(dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000
-                        + "s!");
+            logger.info(
+                dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
 
-            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get()
-                                       + " 条");
+            etlResult
+                .setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -127,8 +124,8 @@ public class RdbEtlService {
         return etlResult;
         return etlResult;
     }
     }
 
 
-    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds, StringBuilder sql)
-                                                                                                                   throws SQLException {
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
+                                        StringBuilder sql) throws SQLException {
         if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
         if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
             AtomicBoolean stExists = new AtomicBoolean(false);
             AtomicBoolean stExists = new AtomicBoolean(false);
             // 验证是否有SYS_TIME字段
             // 验证是否有SYS_TIME字段
@@ -145,9 +142,9 @@ public class RdbEtlService {
                     }
                     }
                 } catch (Exception e) {
                 } catch (Exception e) {
                     // ignore
                     // ignore
-            }
-            return null;
-        }   );
+                }
+                return null;
+            });
             if (stExists.get()) {
             if (stExists.get()) {
                 sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
                 sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
             }
             }
@@ -168,13 +165,12 @@ public class RdbEtlService {
     private static boolean executeSqlImport(DataSource srcDS, DataSource targetDS, String sql, DbMapping dbMapping,
     private static boolean executeSqlImport(DataSource srcDS, DataSource targetDS, String sql, DbMapping dbMapping,
                                             AtomicLong successCount, List<String> errMsg) {
                                             AtomicLong successCount, List<String> errMsg) {
         try {
         try {
-            Util.sqlRS(srcDS, sql, rs -> {
-                int idx = 1;
+            Map<String, String> columnsMap = new LinkedHashMap<>();
+            Map<String, Integer> columnType = new LinkedHashMap<>();
 
 
+            Util.sqlRS(targetDS, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
                 try {
                 try {
-                    boolean completed = false;
 
 
-                    Map<String, Integer> columnType = new LinkedHashMap<>();
                     ResultSetMetaData rsd = rs.getMetaData();
                     ResultSetMetaData rsd = rs.getMetaData();
                     int columnCount = rsd.getColumnCount();
                     int columnCount = rsd.getColumnCount();
                     List<String> columns = new ArrayList<>();
                     List<String> columns = new ArrayList<>();
@@ -183,91 +179,105 @@ public class RdbEtlService {
                         columns.add(rsd.getColumnName(i));
                         columns.add(rsd.getColumnName(i));
                     }
                     }
 
 
-                    Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, columns);
+                    columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
+                    return true;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    return false;
+                }
+            });
+
+            Util.sqlRS(srcDS, sql, rs -> {
+                int idx = 1;
+
+                try {
+                    boolean completed = false;
+
                     // if (dbMapping.isMapAll()) {
                     // if (dbMapping.isMapAll()) {
                     // columnsMap = dbMapping.getAllColumns();
                     // columnsMap = dbMapping.getAllColumns();
                     // } else {
                     // } else {
                     // columnsMap = dbMapping.getTargetColumns();
                     // columnsMap = dbMapping.getTargetColumns();
                     // }
                     // }
 
 
-                StringBuilder insertSql = new StringBuilder();
-                insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
-                columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+                    StringBuilder insertSql = new StringBuilder();
+                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+                    columnsMap
+                            .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
 
-                int len = insertSql.length();
-                insertSql.delete(len - 1, len).append(") VALUES (");
-                int mapLen = columnsMap.size();
-                for (int i = 0; i < mapLen; i++) {
-                    insertSql.append("?,");
-                }
-                len = insertSql.length();
-                insertSql.delete(len - 1, len).append(")");
-                try (Connection connTarget = targetDS.getConnection();
-                        PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
-                    connTarget.setAutoCommit(false);
-
-                    while (rs.next()) {
-                        pstmt.clearParameters();
-
-                        // 删除数据
-                        Map<String, Object> values = new LinkedHashMap<>();
-                        StringBuilder deleteSql = new StringBuilder("DELETE FROM " + SyncUtil.getDbTableName(dbMapping)
-                                                                    + " WHERE ");
-                        appendCondition(dbMapping, deleteSql, values, rs);
-                        try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
-                            int k = 1;
-                            for (Object val : values.values()) {
-                                pstmt2.setObject(k++, val);
+                    int len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(") VALUES (");
+                    int mapLen = columnsMap.size();
+                    for (int i = 0; i < mapLen; i++) {
+                        insertSql.append("?,");
+                    }
+                    len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(")");
+                    try (Connection connTarget = targetDS.getConnection();
+                         PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                        connTarget.setAutoCommit(false);
+
+                        while (rs.next()) {
+                            pstmt.clearParameters();
+
+                            // 删除数据
+                            Map<String, Object> values = new LinkedHashMap<>();
+                            StringBuilder deleteSql = new StringBuilder(
+                                    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
+                            appendCondition(dbMapping, deleteSql, values, rs);
+                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                                int k = 1;
+                                for (Object val : values.values()) {
+                                    pstmt2.setObject(k++, val);
+                                }
+                                pstmt2.execute();
                             }
                             }
-                            pstmt2.execute();
-                        }
 
 
-                        int i = 1;
-                        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                            String targetClolumnName = entry.getKey();
-                            String srcColumnName = entry.getValue();
-                            if (srcColumnName == null) {
-                                srcColumnName = targetClolumnName;
-                            }
+                            int i = 1;
+                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                                String targetClolumnName = entry.getKey();
+                                String srcColumnName = entry.getValue();
+                                if (srcColumnName == null) {
+                                    srcColumnName = targetClolumnName;
+                                }
 
 
-                            Integer type = columnType.get(targetClolumnName.toLowerCase());
+                                Integer type = columnType.get(targetClolumnName.toLowerCase());
 
 
-                            Object value = rs.getObject(srcColumnName);
-                            if (value != null) {
-                                SyncUtil.setPStmt(type, pstmt, value, i);
-                            } else {
-                                pstmt.setNull(i, type);
+                                Object value = rs.getObject(srcColumnName);
+                                if (value != null) {
+                                    SyncUtil.setPStmt(type, pstmt, value, i);
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+
+                                i++;
                             }
                             }
 
 
-                            i++;
-                        }
+                            pstmt.execute();
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Insert into target table, sql: {}", insertSql);
+                            }
 
 
-                        pstmt.execute();
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Insert into target table, sql: {}", insertSql);
+                            if (idx % dbMapping.getCommitBatch() == 0) {
+                                connTarget.commit();
+                                completed = true;
+                            }
+                            idx++;
+                            successCount.incrementAndGet();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("successful import count:" + successCount.get());
+                            }
                         }
                         }
-
-                        if (idx % dbMapping.getCommitBatch() == 0) {
+                        if (!completed) {
                             connTarget.commit();
                             connTarget.commit();
-                            completed = true;
-                        }
-                        idx++;
-                        successCount.incrementAndGet();
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("successful import count:" + successCount.get());
                         }
                         }
                     }
                     }
-                    if (!completed) {
-                        connTarget.commit();
-                    }
-                }
 
 
-            } catch (Exception e) {
-                logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
-                errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
-            }
-            return idx;
-        }   );
+                } catch (Exception e) {
+                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                }
+                return idx;
+            });
             return true;
             return true;
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
@@ -278,8 +288,8 @@ public class RdbEtlService {
     /**
     /**
      * 拼接目标表主键where条件
      * 拼接目标表主键where条件
      */
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values, ResultSet rs)
-                                                                                                                         throws SQLException {
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        ResultSet rs) throws SQLException {
         // 拼接主键
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();
             String targetColumnName = entry.getKey();