Jelajahi Sumber

fixed format

agapple 5 tahun lalu
induk
melakukan
afbf125231
13 mengubah file dengan 251 tambahan dan 254 penghapusan
  1. 30 28
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java
  2. 21 20
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfig.java
  3. 12 7
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfigLoader.java
  4. 32 24
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java
  5. 60 54
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java
  6. 20 15
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduSyncService.java
  7. 41 27
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/KuduTemplate.java
  8. 4 3
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/SyncUtil.java
  9. 6 6
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/KuduConnectionTest.java
  10. 6 22
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConfig.java
  11. 5 5
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConstant.java
  12. 4 21
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/Common.java
  13. 10 22
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/TestSyncKudu.java

+ 30 - 28
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java

@@ -1,5 +1,16 @@
 package com.alibaba.otter.canal.client.adapter.kudu;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
@@ -11,12 +22,6 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author liuyadong
@@ -24,20 +29,21 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @SPI("kudu")
 public class KuduAdapter implements OuterAdapter {
-    private static Logger logger = LoggerFactory.getLogger(KuduAdapter.class);
 
-    private Map<String, KuduMappingConfig> kuduMapping = new ConcurrentHashMap<>();                                    // 文件名对应配置
-    private Map<String, Map<String, KuduMappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                // 库名-表名对应配置
+    private static Logger                               logger             = LoggerFactory.getLogger(KuduAdapter.class);
+
+    private Map<String, KuduMappingConfig>              kuduMapping        = new ConcurrentHashMap<>();                 // 文件名对应配置
+    private Map<String, Map<String, KuduMappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                 // 库名-表名对应配置
 
-    private String dataSourceKey;
+    private String                                      dataSourceKey;
 
-    private KuduTemplate kuduTemplate;
+    private KuduTemplate                                kuduTemplate;
 
-    private KuduSyncService kuduSyncService;
+    private KuduSyncService                             kuduSyncService;
 
-    private KuduConfigMonitor kuduConfigMonitor;
+    private KuduConfigMonitor                           kuduConfigMonitor;
 
-    private Properties envProperties;
+    private Properties                                  envProperties;
 
     public Map<String, KuduMappingConfig> getKuduMapping() {
         return kuduMapping;
@@ -47,7 +53,6 @@ public class KuduAdapter implements OuterAdapter {
         return mappingConfigCache;
     }
 
-
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         this.envProperties = envProperties;
@@ -55,13 +60,13 @@ public class KuduAdapter implements OuterAdapter {
         // 过滤不匹配的key的配置,获取连接key,key为配置文件名称
         kuduMappingTmp.forEach((key, mappingConfig) -> {
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
                     .equalsIgnoreCase(configuration.getKey()))) {
                 kuduMapping.put(key, mappingConfig);
                 dataSourceKey = mappingConfig.getDataSourceKey();
             }
         });
-        //判断目标字段是否为空
+        // 判断目标字段是否为空
         if (kuduMapping.isEmpty()) {
             throw new RuntimeException("No kudu adapter found for config key: " + configuration.getKey());
         }
@@ -71,16 +76,14 @@ public class KuduAdapter implements OuterAdapter {
             String k;
             if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
                 k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                        + mappingConfig.getKuduMapping().getDatabase() + "-"
-                        + mappingConfig.getKuduMapping().getTable();
+                    + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
             } else {
                 k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                        + mappingConfig.getKuduMapping().getDatabase() + "-"
-                        + mappingConfig.getKuduMapping().getTable();
+                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
             }
             Map<String, KuduMappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
-                    k1 -> new ConcurrentHashMap<>());
+                k1 -> new ConcurrentHashMap<>());
             configMap.put(configName, mappingConfig);
         }
         Map<String, String> properties = configuration.getProperties();
@@ -108,9 +111,7 @@ public class KuduAdapter implements OuterAdapter {
             String table = dml.getTable();
             Map<String, KuduMappingConfig> configMap;
             if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                configMap = mappingConfigCache.get(destination + "-"
-                        + groupId + "_"
-                        + database + "-" + table);
+                configMap = mappingConfigCache.get(destination + "-" + groupId + "_" + database + "-" + table);
             } else {
                 configMap = mappingConfigCache.get(destination + "_" + database + "-" + table);
             }
@@ -131,7 +132,8 @@ public class KuduAdapter implements OuterAdapter {
                     logger.error("groupID didn't mach,please check your gruopId ");
                 }
             } else {
-                logger.error("{} config didn't get,please check your map key ", destination + "_" + database + "-" + table);
+                logger.error("{} config didn't get,please check your map key ", destination + "_" + database + "-"
+                                                                                + table);
             }
         }
     }
@@ -141,7 +143,7 @@ public class KuduAdapter implements OuterAdapter {
         if (kuduConfigMonitor != null) {
             kuduConfigMonitor.destroy();
         }
-        //加入kudu client 关闭钩子
+        // 加入kudu client 关闭钩子
         kuduTemplate.closeKuduClient();
     }
 

+ 21 - 20
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfig.java

@@ -1,27 +1,29 @@
 package com.alibaba.otter.canal.client.adapter.kudu.config;
 
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
-import org.apache.commons.lang.StringUtils;
-
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+
 /**
  * @author liuyadong
  * @description kudu配置文件映射
  */
 public class KuduMappingConfig implements AdapterConfig {
-    private String dataSourceKey;      // 数据源key
 
-    private String destination;        // canal实例或MQ的topic
+    private String      dataSourceKey;     // 数据源key
 
-    private String groupId;            // groupId
+    private String      destination;       // canal实例或MQ的topic
 
-    private String outerAdapterKey;    // 对应适配器的key
+    private String      groupId;           // groupId
 
-    private boolean concurrent = false; // 是否并行同步
+    private String      outerAdapterKey;   // 对应适配器的key
 
-    private KuduMapping kuduMapping;          // db映射配置
+    private boolean     concurrent = false; // 是否并行同步
+
+    private KuduMapping kuduMapping;       // db映射配置
 
     @Override
     public String getDataSourceKey() {
@@ -77,7 +79,6 @@ public class KuduMappingConfig implements AdapterConfig {
         this.destination = destination;
     }
 
-
     public void validate() {
         if (kuduMapping.database == null || kuduMapping.database.isEmpty()) {
             throw new NullPointerException("KuduMapping.database");
@@ -86,18 +87,18 @@ public class KuduMappingConfig implements AdapterConfig {
 
     public static class KuduMapping implements AdapterMapping {
 
-        private String database;                            // 数据库名或schema名
-        private String table;                               // 表名
-        private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
-        private boolean mapAll = false;                 // 映射所有字段
-        private String targetDb;                            // 目标库名
-        private String targetTable;                         // 目标表名
-        private Map<String, String> targetColumns;                       // 目标表字段映射
+        private String              database;                           // 数据库名或schema名
+        private String              table;                              // 表名
+        private Map<String, String> targetPk    = new LinkedHashMap<>(); // 目标表主键字段
+        private boolean             mapAll      = false;                // 映射所有字段
+        private String              targetDb;                           // 目标库名
+        private String              targetTable;                        // 目标表名
+        private Map<String, String> targetColumns;                      // 目标表字段映射
 
-        private String etlCondition;                        // etl条件sql
+        private String              etlCondition;                       // etl条件sql
 
-        private int readBatch = 5000;
-        private int commitBatch = 5000;                  // etl等批量提交大小
+        private int                 readBatch   = 5000;
+        private int                 commitBatch = 5000;                 // etl等批量提交大小
 
         private Map<String, String> allMapColumns;
 

+ 12 - 7
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfigLoader.java

@@ -1,19 +1,21 @@
 package com.alibaba.otter.canal.client.adapter.kudu.config;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+
 /**
  * @author liuyadong
  * @description kudu表信息加载
  */
 public class KuduMappingConfigLoader {
+
     private static Logger logger = LoggerFactory.getLogger(KuduMappingConfigLoader.class);
 
     /**
@@ -28,8 +30,11 @@ public class KuduMappingConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("kudu");
         configContentMap.forEach((fileName, content) -> {
-            KuduMappingConfig config = YmlConfigBinder
-                    .bindYmlToObj(null, content, KuduMappingConfig.class, null, envProperties);
+            KuduMappingConfig config = YmlConfigBinder.bindYmlToObj(null,
+                content,
+                KuduMappingConfig.class,
+                null,
+                envProperties);
             if (config == null) {
                 return;
             }

+ 32 - 24
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java

@@ -1,10 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.kudu.monitor;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
-import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
-import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import com.alibaba.otter.canal.client.adapter.support.Util;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -13,23 +13,25 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * @author liuyadong
  * @description 配置文件监听
  */
 public class KuduConfigMonitor {
-    private static final Logger logger = LoggerFactory.getLogger(KuduConfigMonitor.class);
 
-    private static final String adapterName = "kudu";
+    private static final Logger   logger      = LoggerFactory.getLogger(KuduConfigMonitor.class);
+
+    private static final String   adapterName = "kudu";
 
-    private KuduAdapter kuduAdapter;
+    private KuduAdapter           kuduAdapter;
 
-    private Properties envProperties;
+    private Properties            envProperties;
 
     private FileAlterationMonitor fileMonitor;
 
@@ -39,7 +41,7 @@ public class KuduConfigMonitor {
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
             FileListener listener = new FileListener();
             observer.addListener(listener);
             fileMonitor = new FileAlterationMonitor(3000, observer);
@@ -64,6 +66,7 @@ public class KuduConfigMonitor {
      * 配置文件监听
      */
     private class FileListener extends FileAlterationListenerAdaptor {
+
         @Override
         public void onFileCreate(File file) {
             super.onFileCreate(file);
@@ -71,8 +74,11 @@ public class KuduConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                KuduMappingConfig config = YmlConfigBinder
-                        .bindYmlToObj(null, configContent, KuduMappingConfig.class, null, envProperties);
+                KuduMappingConfig config = YmlConfigBinder.bindYmlToObj(null,
+                    configContent,
+                    KuduMappingConfig.class,
+                    null,
+                    envProperties);
                 if (config == null) {
                     return;
                 }
@@ -92,14 +98,17 @@ public class KuduConfigMonitor {
             try {
                 if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
                     // 加载配置文件
-                    String configContent = MappingConfigsLoader
-                            .loadConfig(adapterName + File.separator + file.getName());
+                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator
+                                                                           + file.getName());
                     if (configContent == null) {
                         onFileDelete(file);
                         return;
                     }
-                    KuduMappingConfig config = YmlConfigBinder
-                            .bindYmlToObj(null, configContent, KuduMappingConfig.class, null, envProperties);
+                    KuduMappingConfig config = YmlConfigBinder.bindYmlToObj(null,
+                        configContent,
+                        KuduMappingConfig.class,
+                        null,
+                        envProperties);
                     if (config == null) {
                         return;
                     }
@@ -114,7 +123,6 @@ public class KuduConfigMonitor {
             }
         }
 
-
         @Override
         public void onFileDelete(File file) {
             super.onFileDelete(file);
@@ -138,9 +146,9 @@ public class KuduConfigMonitor {
         private void addConfigToCache(File file, KuduMappingConfig config) {
             kuduAdapter.getKuduMapping().put(file.getName(), config);
             Map<String, KuduMappingConfig> configMap = kuduAdapter.getMappingConfigCache()
-                    .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
-                                    + config.getKuduMapping().getDatabase() + "." + config.getKuduMapping().getTable(),
-                            k1 -> new HashMap<>());
+                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
+                                 + config.getKuduMapping().getDatabase() + "." + config.getKuduMapping().getTable(),
+                    k1 -> new HashMap<>());
             configMap.put(file.getName(), config);
         }
 

+ 60 - 54
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java

@@ -1,5 +1,18 @@
 package com.alibaba.otter.canal.client.adapter.kudu.service;
 
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.apache.kudu.client.KuduException;
+
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
 import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
 import com.alibaba.otter.canal.client.adapter.kudu.support.SyncUtil;
@@ -8,13 +21,6 @@ import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.google.common.base.Joiner;
-import org.apache.kudu.client.KuduException;
-
-import javax.sql.DataSource;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author liuyadong
@@ -22,10 +28,10 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class KuduEtlService extends AbstractEtlService {
 
-    private KuduTemplate kuduTemplate;
+    private KuduTemplate      kuduTemplate;
     private KuduMappingConfig config;
 
-    public KuduEtlService(KuduTemplate kuduTemplate, KuduMappingConfig config) {
+    public KuduEtlService(KuduTemplate kuduTemplate, KuduMappingConfig config){
         super("kudu", config);
         this.kuduTemplate = kuduTemplate;
         this.config = config;
@@ -37,7 +43,7 @@ public class KuduEtlService extends AbstractEtlService {
 
         KuduMappingConfig.KuduMapping kuduMapping = config.getKuduMapping();
         boolean flag = kuduTemplate.tableExists(kuduMapping.getTargetTable());
-        //表不存在,停止导入
+        // 表不存在,停止导入
         if (!flag) {
             logger.info("{} is don't hava,please check your kudu table !", kuduMapping.getTargetTable());
             errMsg.add(kuduMapping.getTargetTable() + " is don't hava,please check your kudu table !");
@@ -49,12 +55,12 @@ public class KuduEtlService extends AbstractEtlService {
         return importData(sql, params);
     }
 
-
     @Override
-    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values, AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
+    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
+                                       AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
         KuduMappingConfig.KuduMapping kuduMapping = (KuduMappingConfig.KuduMapping) mapping;
-        //获取字段元数据
-        Map<String, String> columnsMap = new LinkedHashMap<>();//需要同步的字段
+        // 获取字段元数据
+        Map<String, String> columnsMap = new LinkedHashMap<>();// 需要同步的字段
 
         try {
             Util.sqlRS(ds, "SELECT * FROM " + SyncUtil.getDbTableName(kuduMapping) + " LIMIT 1", rs -> {
@@ -72,7 +78,7 @@ public class KuduEtlService extends AbstractEtlService {
                     return false;
                 }
             });
-            //写入数据
+            // 写入数据
             logger.info("etl select data sql is :{}", sql);
             Util.sqlRS(ds, sql, values, rs -> {
                 int idx = 1;
@@ -81,46 +87,46 @@ public class KuduEtlService extends AbstractEtlService {
                     while (rs.next()) {
                         Map<String, Object> data = new HashMap<>();
                         for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                            String mysqlColumnName = entry.getKey();//mysql字段名
-                            String kuduColumnName = entry.getValue();//kudu字段名
-                            if (kuduColumnName == null) {
-                                kuduColumnName = mysqlColumnName;
-                            }
-                            Object value = rs.getObject(kuduColumnName);
-                            if (value != null) {
-                                data.put(kuduColumnName, value);
-                            } else {
-                                data.put(kuduColumnName, null);
-                            }
-                        }
-                        dataList.add(data);
-                        idx++;
-                        impCount.incrementAndGet();
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("successful import count:" + impCount.get());
-                        }
-                        if (idx % kuduMapping.getCommitBatch() == 0) {
-                            kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
-                            dataList.clear();
-                        }
-                    }
-                    if (!dataList.isEmpty()) {
-                        kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
-                    }
-                    return true;
-
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
-                    errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
-                    return false;
-                } catch (KuduException e) {
-                    e.printStackTrace();
-                    logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
-                    errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
-                    return false;
+                            String mysqlColumnName = entry.getKey();// mysql字段名
+                String kuduColumnName = entry.getValue();// kudu字段名
+                if (kuduColumnName == null) {
+                    kuduColumnName = mysqlColumnName;
                 }
-            });
+                Object value = rs.getObject(kuduColumnName);
+                if (value != null) {
+                    data.put(kuduColumnName, value);
+                } else {
+                    data.put(kuduColumnName, null);
+                }
+            }
+            dataList.add(data);
+            idx++;
+            impCount.incrementAndGet();
+            if (logger.isDebugEnabled()) {
+                logger.debug("successful import count:" + impCount.get());
+            }
+            if (idx % kuduMapping.getCommitBatch() == 0) {
+                kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                dataList.clear();
+            }
+        }
+        if (!dataList.isEmpty()) {
+            kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+        }
+        return true;
+
+    } catch (SQLException e) {
+        e.printStackTrace();
+        logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
+        errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
+        return false;
+    } catch (KuduException e) {
+        e.printStackTrace();
+        logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
+        errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
+        return false;
+    }
+}           );
             return true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);

+ 20 - 15
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduSyncService.java

@@ -1,36 +1,42 @@
 package com.alibaba.otter.canal.client.adapter.kudu.service;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kudu.client.KuduException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
 import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
-import org.apache.kudu.client.KuduException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author liuyadong
  * @description kudu实时同步
  */
 public class KuduSyncService {
+
     private static Logger logger = LoggerFactory.getLogger(KuduSyncService.class);
 
-    private KuduTemplate kuduTemplate;
+    private KuduTemplate  kuduTemplate;
 
     // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
-//    private Map<String, Map<String, Integer>> columnsTypeCache = new ConcurrentHashMap<>();
+    // private Map<String, Map<String, Integer>> columnsTypeCache = new
+    // ConcurrentHashMap<>();
 
-    public KuduSyncService(KuduTemplate kuduTemplate) {
+    public KuduSyncService(KuduTemplate kuduTemplate){
         this.kuduTemplate = kuduTemplate;
     }
 
-//    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
-//        return columnsTypeCache;
-//    }
+    // public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+    // return columnsTypeCache;
+    // }
 
     /**
      * 同步事件处理
@@ -71,7 +77,7 @@ public class KuduSyncService {
             if (data == null || data.isEmpty()) {
                 return;
             }
-            //判定主键映射
+            // 判定主键映射
             String pkId = "";
             Map<String, String> targetPk = kuduMapping.getTargetPk();
             for (Map.Entry<String, String> entry : targetPk.entrySet()) {
@@ -83,7 +89,7 @@ public class KuduSyncService {
                     pkId = kuduID;
                 }
             }
-            //切割联合主键
+            // 切割联合主键
             List<String> pkIds = Arrays.asList(pkId.split(","));
             try {
                 int idx = 1;
@@ -157,7 +163,6 @@ public class KuduSyncService {
             }
         }
 
-
     }
 
     /**

+ 41 - 27
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/KuduTemplate.java

@@ -1,33 +1,47 @@
 package com.alibaba.otter.canal.client.adapter.kudu.support;
 
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.*;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Upsert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
 /**
  * @author liuyadong
  * @description kudu 操作工具类
  */
 public class KuduTemplate {
 
-    private Logger logger = LoggerFactory.getLogger(this.getClass());
+    private Logger           logger          = LoggerFactory.getLogger(this.getClass());
 
-    private KuduClient kuduClient;
-    private String masters;
+    private KuduClient       kuduClient;
+    private String           masters;
 
     private final static int OPERATION_BATCH = 500;
 
-    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    private SimpleDateFormat sdf             = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-    public KuduTemplate(String master_str) {
+    public KuduTemplate(String master_str){
         this.masters = master_str;
         checkClient();
     }
@@ -37,12 +51,12 @@ public class KuduTemplate {
      */
     private void checkClient() {
         if (kuduClient == null) {
-            //kudu master 以逗号分隔
+            // kudu master 以逗号分隔
             List<String> masterList = Arrays.asList(masters.split(","));
-            kuduClient = new KuduClient.KuduClientBuilder(masterList)
-                    .defaultOperationTimeoutMs(60000)
-                    .defaultSocketReadTimeoutMs(60000)
-                    .defaultAdminOperationTimeoutMs(60000).build();
+            kuduClient = new KuduClient.KuduClientBuilder(masterList).defaultOperationTimeoutMs(60000)
+                .defaultSocketReadTimeoutMs(60000)
+                .defaultAdminOperationTimeoutMs(60000)
+                .build();
         }
     }
 
@@ -76,7 +90,7 @@ public class KuduTemplate {
         try {
             session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
             session.setMutationBufferSpace(OPERATION_BATCH);
-            //获取元数据结构
+            // 获取元数据结构
             Map<String, Type> metaMap = new HashMap<>();
             Schema schema = kuduTable.getSchema();
             for (ColumnSchema columnSchema : schema.getColumns()) {
@@ -92,7 +106,7 @@ public class KuduTemplate {
                     String name = entry.getKey().toLowerCase();
                     Type type = metaMap.get(name);
                     Object value = entry.getValue();
-                    fillRow(row, name, value, type); //填充行数据
+                    fillRow(row, name, value, type); // 填充行数据
                 }
                 session.apply(delete);
                 // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
@@ -142,7 +156,7 @@ public class KuduTemplate {
         try {
             session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
             session.setMutationBufferSpace(OPERATION_BATCH);
-            //获取元数据结构
+            // 获取元数据结构
             Map<String, Type> metaMap = new HashMap<>();
             Schema schema = kuduTable.getSchema();
             for (ColumnSchema columnSchema : schema.getColumns()) {
@@ -158,7 +172,7 @@ public class KuduTemplate {
                     String name = entry.getKey().toLowerCase();
                     Type type = metaMap.get(name);
                     Object value = entry.getValue();
-                    fillRow(row, name, value, type); //填充行数据
+                    fillRow(row, name, value, type); // 填充行数据
                 }
                 session.apply(upsert);
                 // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
@@ -192,7 +206,6 @@ public class KuduTemplate {
             }
         }
 
-
     }
 
     /**
@@ -205,11 +218,12 @@ public class KuduTemplate {
     public void insert(String tableName, List<Map<String, Object>> dataList) throws KuduException {
         this.checkClient();
         KuduTable kuduTable = kuduClient.openTable(tableName);// 打开表
-        KuduSession session = kuduClient.newSession();  // 创建写session,kudu必须通过session写入
+        KuduSession session = kuduClient.newSession(); // 创建写session,kudu必须通过session写入
         try {
-            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); // 采取Flush方式 手动刷新
+            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); // 采取Flush方式
+                                                                               // 手动刷新
             session.setMutationBufferSpace(OPERATION_BATCH);
-            //获取元数据结构
+            // 获取元数据结构
             Map<String, Type> metaMap = new HashMap<>();
             Schema schema = kuduTable.getSchema();
             for (ColumnSchema columnSchema : schema.getColumns()) {
@@ -225,7 +239,7 @@ public class KuduTemplate {
                     String name = entry.getKey().toLowerCase();
                     Type type = metaMap.get(name);
                     Object value = entry.getValue();
-                    fillRow(row, name, value, type); //填充行数据
+                    fillRow(row, name, value, type); // 填充行数据
                 }
                 session.apply(insert);
                 // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
@@ -272,9 +286,9 @@ public class KuduTemplate {
         long rowCount = 0L;
         try {
             KuduTable kuduTable = kuduClient.openTable(tableName);
-            //创建scanner扫描
+            // 创建scanner扫描
             KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
-            //遍历数据
+            // 遍历数据
             while (scanner.hasMoreRows()) {
                 while (scanner.nextRows().hasNext()) {
                     rowCount++;

+ 4 - 3
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/SyncUtil.java

@@ -1,12 +1,13 @@
 package com.alibaba.otter.canal.client.adapter.kudu.support;
 
-import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
-import org.apache.commons.lang.StringUtils;
-
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+
 /**
  * @author liuyadong
  * @description 工具

+ 6 - 6
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/KuduConnectionTest.java

@@ -1,27 +1,27 @@
 package com.alibaba.otter.canal.client.adapter.kudu.test;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class KuduConnectionTest {
 
     @Test
     public void test01() {
         List<String> masterList = Arrays.asList("10.6.36.102:7051,10.6.36.187:7051,10.6.36.229:7051".split(","));
         KuduClient kuduClient = new KuduClient.KuduClientBuilder(masterList).defaultOperationTimeoutMs(60000)
-                .defaultSocketReadTimeoutMs(30000).defaultAdminOperationTimeoutMs(60000).build();
+            .defaultSocketReadTimeoutMs(30000)
+            .defaultAdminOperationTimeoutMs(60000)
+            .build();
         try {
             List<String> tablesList = kuduClient.getTablesList().getTablesList();
             System.out.println(tablesList.toString());
             KuduTable web_white_list = kuduClient.openTable("web_white_list");
 
-
-
         } catch (KuduException e) {
             e.printStackTrace();
         }

+ 6 - 22
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConfig.java

@@ -1,36 +1,20 @@
 package com.alibaba.otter.canal.client.adapter.kudu.test;
 
-import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
-import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Map;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 /**
- * ━━━━━━神兽出没━━━━━━
- *    ┏┓   ┏┓
- *   ┏┛┻━━━┛┻┓
- *   ┃   ━   ┃
- *   ┃ ┳┛ ┗┳ ┃
- *   ┃   ┻   ┃
- *   ┗━┓   ┏━┛
- *     ┃   ┃  神兽保佑
- *     ┃   ┃  代码无bug
- *     ┃   ┗━━━┓
- *     ┃       ┣┓
- *     ┃       ┏┛
- *     ┗┓┓┏━┳┓┏┛
- *      ┃┫┫ ┃┫┫
- *      ┗┻┛ ┗┻┛
- * ━━━━━━感觉萌萌哒━━━━━━
- * Created by Liuyadong on 2019-11-13
- *
  * @description
  */
 public class TestConfig {
+
     @Before
     public void before() {
         // 加载数据源连接池

+ 5 - 5
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConstant.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.client.adapter.kudu.test;
 
-import com.alibaba.druid.pool.DruidDataSource;
-
 import java.sql.SQLException;
 
+import com.alibaba.druid.pool.DruidDataSource;
+
 public class TestConstant {
 
-    public final static String    jdbcUrl      = "jdbc:mysql://10.0.9.5:3306/canal_manager?useUnicode=true";
-    public final static String    jdbcUser     = "axtest";
-    public final static String    jdbcPassword = "axtest123";
+    public final static String          jdbcUrl      = "jdbc:mysql://10.0.9.5:3306/canal_manager?useUnicode=true";
+    public final static String          jdbcUser     = "axtest";
+    public final static String          jdbcPassword = "axtest123";
 
     public final static DruidDataSource dataSource;
 

+ 4 - 21
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/Common.java

@@ -1,35 +1,18 @@
 package com.alibaba.otter.canal.client.adapter.kudu.test.sync;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
 import com.alibaba.otter.canal.client.adapter.kudu.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
- * ━━━━━━神兽出没━━━━━━
- *    ┏┓   ┏┓
- *   ┏┛┻━━━┛┻┓
- *   ┃   ━   ┃
- *   ┃ ┳┛ ┗┳ ┃
- *   ┃   ┻   ┃
- *   ┗━┓   ┏━┛
- *     ┃   ┃  神兽保佑
- *     ┃   ┃  代码无bug
- *     ┃   ┗━━━┓
- *     ┃       ┣┓
- *     ┃       ┏┛
- *     ┗┓┓┏━┳┓┏┛
- *      ┃┫┫ ┃┫┫
- *      ┗┻┛ ┗┻┛
- * ━━━━━━感觉萌萌哒━━━━━━
- * Created by Liuyadong on 2019-11-13
- *
  * @description
  */
 public class Common {
+
     public static KuduAdapter init() {
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
 

+ 10 - 22
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/TestSyncKudu.java

@@ -1,31 +1,19 @@
 package com.alibaba.otter.canal.client.adapter.kudu.test.sync;
 
-import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.*;
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
- * ━━━━━━神兽出没━━━━━━
- *    ┏┓   ┏┓
- *   ┏┛┻━━━┛┻┓
- *   ┃   ━   ┃
- *   ┃ ┳┛ ┗┳ ┃
- *   ┃   ┻   ┃
- *   ┗━┓   ┏━┛
- *     ┃   ┃  神兽保佑
- *     ┃   ┃  代码无bug
- *     ┃   ┗━━━┓
- *     ┃       ┣┓
- *     ┃       ┏┛
- *     ┗┓┓┏━┳┓┏┛
- *      ┃┫┫ ┃┫┫
- *      ┗┻┛ ┗┻┛
- * ━━━━━━感觉萌萌哒━━━━━━
- * Created by Liuyadong on 2019-11-13
- *
  * @description
  */
 public class TestSyncKudu {
@@ -44,7 +32,7 @@ public class TestSyncKudu {
     }
 
     @Test
-    public void testCount(){
+    public void testCount() {
         kuduAdapter.count("kudutest_user.yml");
     }