mcy 6 роки тому
батько
коміт
b45a27e231

+ 5 - 57
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -1,10 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.rdb;
 
-import java.io.File;
 import java.sql.Connection;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -14,8 +15,8 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.support.*;
@@ -34,10 +35,6 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void init(OuterAdapterConfig configuration) {
-        System.out.println("xxxxx: " + this.getClass().getClassLoader().getResource("").getPath());
-        File file = new File(this.getClass().getClassLoader().getResource("").getPath() + "rdb" + File.separator);
-        System.out.println(file.getAbsolutePath());
-
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
@@ -73,55 +70,6 @@ public class RdbAdapter implements OuterAdapter {
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
         }
 
-        rdbMapping.values().forEach(config -> {
-            try {
-                MappingConfig.DbMapping dbMapping = config.getDbMapping();
-                // 从源表加载所有字段名
-                if (dbMapping.getAllColumns() == null) {
-                    synchronized (RdbSyncService.class) {
-                        if (dbMapping.getAllColumns() == null) {
-                            DataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-                            Connection srcConn = srcDS.getConnection();
-                            String srcMetaSql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable()
-                                                + " WHERE 1=2 ";
-                            List<String> srcColumns = new ArrayList<>();
-                            Util.sqlRS(srcConn, srcMetaSql, rs -> {
-                                try {
-                                    ResultSetMetaData rmd = rs.getMetaData();
-                                    int cnt = rmd.getColumnCount();
-                                    for (int i = 1; i <= cnt; i++) {
-                                        srcColumns.add(rmd.getColumnName(i).toLowerCase());
-                                    }
-                                } catch (SQLException e) {
-                                    logger.error(e.getMessage(), e);
-                                }
-                            });
-                            Map<String, String> columnsMap = new LinkedHashMap<>();
-
-                            for (String srcColumn : srcColumns) {
-                                String targetColumn = srcColumn;
-                                if (dbMapping.getTargetColumns() != null) {
-                                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
-                                        String targetColumnName = entry.getKey();
-                                        String srcColumnName = entry.getValue();
-
-                                        if (srcColumnName != null
-                                            && srcColumnName.toLowerCase().equals(srcColumn.toUpperCase())) {
-                                            targetColumn = targetColumnName;
-                                        }
-                                    }
-                                }
-                                columnsMap.put(targetColumn, srcColumn);
-                            }
-                            dbMapping.setAllColumns(columnsMap);
-                        }
-                    }
-                }
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
-            }
-        });
-
         rdbSyncService = new RdbSyncService(dataSource);
     }
 

+ 8 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -79,7 +79,7 @@ public class MappingConfig {
         private int                          readBatch   = 5000;
         private int                          commitBatch = 5000;                  // etl等批量提交大小
 
-        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
+//        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
 
         public String getDatabase() {
             return database;
@@ -161,12 +161,12 @@ public class MappingConfig {
             this.commitBatch = commitBatch;
         }
 
-        public Map<String, String> getAllColumns() {
-            return allColumns;
-        }
-
-        public void setAllColumns(Map<String, String> allColumns) {
-            this.allColumns = allColumns;
-        }
+//        public Map<String, String> getAllColumns() {
+//            return allColumns;
+//        }
+//
+//        public void setAllColumns(Map<String, String> allColumns) {
+//            this.allColumns = allColumns;
+//        }
     }
 }

+ 14 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,8 +92,8 @@ public class RdbEtlService {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
-                    Future<Boolean> future = executor.submit(
-                        () -> executeSqlImport(params, srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
                     futures.add(future);
                 }
 
@@ -102,7 +103,7 @@ public class RdbEtlService {
 
                 executor.shutdown();
             } else {
-                executeSqlImport(params, srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
+                executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
             }
 
             logger.info(
@@ -160,8 +161,8 @@ public class RdbEtlService {
     /**
      * 执行导入
      */
-    private static boolean executeSqlImport(List<String> params, DataSource srcDS, DataSource targetDS, String sql,
-                                            DbMapping dbMapping, AtomicLong successCount, List<String> errMsg) {
+    private static boolean executeSqlImport(DataSource srcDS, DataSource targetDS, String sql, DbMapping dbMapping,
+                                            AtomicLong successCount, List<String> errMsg) {
         try {
             Util.sqlRS(srcDS, sql, rs -> {
                 int idx = 1;
@@ -172,16 +173,18 @@ public class RdbEtlService {
                     Map<String, Integer> columnType = new LinkedHashMap<>();
                     ResultSetMetaData rsd = rs.getMetaData();
                     int columnCount = rsd.getColumnCount();
+                    List<String> columns = new ArrayList<>();
                     for (int i = 1; i <= columnCount; i++) {
                         columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                        columns.add(rsd.getColumnName(i));
                     }
 
-                    Map<String, String> columnsMap;
-                    if (dbMapping.isMapAll()) {
-                        columnsMap = dbMapping.getAllColumns();
-                    } else {
-                        columnsMap = dbMapping.getTargetColumns();
-                    }
+                    Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, columns);
+                    // if (dbMapping.isMapAll()) {
+                    // columnsMap = dbMapping.getAllColumns();
+                    // } else {
+                    // columnsMap = dbMapping.getTargetColumns();
+                    // }
 
                     StringBuilder insertSql = new StringBuilder();
                     insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");

+ 5 - 15
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -10,11 +10,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
 
 import javax.sql.DataSource;
 
-import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,8 +21,9 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * RDB同步操作业务
@@ -84,12 +83,7 @@ public class RdbSyncService {
         Connection conn = dataSource.getConnection();
         conn.setAutoCommit(false);
         try {
-            Map<String, String> columnsMap;
-            if (dbMapping.isMapAll()) {
-                columnsMap = dbMapping.getAllColumns();
-            } else {
-                columnsMap = dbMapping.getTargetColumns();
-            }
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
 
             StringBuilder insertSql = new StringBuilder();
             insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
@@ -181,12 +175,7 @@ public class RdbSyncService {
         conn.setAutoCommit(false);
 
         try {
-            Map<String, String> columnsMap;
-            if (dbMapping.isMapAll()) {
-                columnsMap = dbMapping.getAllColumns();
-            } else {
-                columnsMap = dbMapping.getTargetColumns();
-            }
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
 
             Map<String, Integer> ctype = getTargetColumnType(conn, config);
 
@@ -525,4 +514,5 @@ public class RdbSyncService {
             logger.error(e.getMessage(), e);
         }
     }
+
 }

+ 39 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -0,0 +1,39 @@
+package com.alibaba.otter.canal.client.adapter.rdb.support;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+
+public class SyncUtil {
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Map<String, Object> data) {
+        return getColumnsMap(dbMapping, data.keySet());
+    }
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
+        Map<String, String> columnsMap;
+        if (dbMapping.isMapAll()) {
+            columnsMap = new LinkedHashMap<>();
+            for (String srcColumn : columns) {
+                boolean flag = true;
+                if (dbMapping.getTargetColumns() != null) {
+                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
+                        if (srcColumn.equals(entry.getValue())) {
+                            columnsMap.put(entry.getKey(), srcColumn);
+                            flag = false;
+                            break;
+                        }
+                    }
+                }
+                if (flag) {
+                    columnsMap.put(srcColumn, srcColumn);
+                }
+            }
+        } else {
+            columnsMap = dbMapping.getTargetColumns();
+        }
+        return columnsMap;
+    }
+}