浏览代码

client-adapter 写入kudu (#2396)

* client-adapter 提交写入kudu适配器

* kudu etl 处理数据缓存提交问题

* Update KuduTemplate.java

* 1.写入kudu前获取元数据方式更改
2.解决etl批量导入kudu数据少量丢失问题

* 1.修改文件头注释
2.调整kudu更新事件update为upsert
agapple 5 年之前
父节点
当前提交
b2b2a82554
共有 20 个文件被更改,包括 1779 次插入1 次删除
  1. 87 0
      client-adapter/kudu/pom.xml
  2. 204 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java
  3. 199 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfig.java
  4. 47 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfigLoader.java
  5. 162 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java
  6. 131 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java
  7. 204 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduSyncService.java
  8. 410 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/KuduTemplate.java
  9. 67 0
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/SyncUtil.java
  10. 1 0
      client-adapter/kudu/src/main/resources/META-INF.canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  11. 19 0
      client-adapter/kudu/src/main/resources/kudu/kudutest_user.yml
  12. 29 0
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/KuduConnectionTest.java
  13. 45 0
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConfig.java
  14. 37 0
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConstant.java
  15. 47 0
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/Common.java
  16. 71 0
      client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/TestSyncKudu.java
  17. 7 0
      client-adapter/launcher/src/main/assembly/dev.xml
  18. 7 0
      client-adapter/launcher/src/main/assembly/release.xml
  19. 4 1
      client-adapter/launcher/src/main/resources/application.yml
  20. 1 0
      client-adapter/pom.xml

+ 87 - 0
client-adapter/kudu/pom.xml

@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.5-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.kudu</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter kudu module for otter ${project.version}</name>
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.47</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <tasks>
+                                <copy todir="${project.basedir}/../launcher/target/classes/kudu" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/kudu" erroronmissingdir="true">
+                                        <include name="*.yml"/>
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 204 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java

@@ -0,0 +1,204 @@
+package com.alibaba.otter.canal.client.adapter.kudu;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.kudu.monitor.KuduConfigMonitor;
+import com.alibaba.otter.canal.client.adapter.kudu.service.KuduEtlService;
+import com.alibaba.otter.canal.client.adapter.kudu.service.KuduSyncService;
+import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author liuyadong
+ * @description kudu适配器主类
+ */
+@SPI("kudu")
+public class KuduAdapter implements OuterAdapter {
+    private static Logger logger = LoggerFactory.getLogger(KuduAdapter.class);
+
+    private Map<String, KuduMappingConfig> kuduMapping = new ConcurrentHashMap<>();                                    // 文件名对应配置
+    private Map<String, Map<String, KuduMappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                // 库名-表名对应配置
+
+    private String dataSourceKey;
+
+    private KuduTemplate kuduTemplate;
+
+    private KuduSyncService kuduSyncService;
+
+    private KuduConfigMonitor kuduConfigMonitor;
+
+    private Properties envProperties;
+
+    public Map<String, KuduMappingConfig> getKuduMapping() {
+        return kuduMapping;
+    }
+
+    public Map<String, Map<String, KuduMappingConfig>> getMappingConfigCache() {
+        return mappingConfigCache;
+    }
+
+
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        this.envProperties = envProperties;
+        Map<String, KuduMappingConfig> kuduMappingTmp = KuduMappingConfigLoader.load(envProperties);
+        // 过滤不匹配的key的配置,获取连接key,key为配置文件名称
+        kuduMappingTmp.forEach((key, mappingConfig) -> {
+            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
+                    || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                    .equalsIgnoreCase(configuration.getKey()))) {
+                kuduMapping.put(key, mappingConfig);
+                dataSourceKey = mappingConfig.getDataSourceKey();
+            }
+        });
+        //判断目标字段是否为空
+        if (kuduMapping.isEmpty()) {
+            throw new RuntimeException("No kudu adapter found for config key: " + configuration.getKey());
+        }
+        for (Map.Entry<String, KuduMappingConfig> entry : kuduMapping.entrySet()) {
+            String configName = entry.getKey();
+            KuduMappingConfig mappingConfig = entry.getValue();
+            String k;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getKuduMapping().getDatabase() + "-"
+                        + mappingConfig.getKuduMapping().getTable();
+            } else {
+                k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getKuduMapping().getDatabase() + "-"
+                        + mappingConfig.getKuduMapping().getTable();
+            }
+            Map<String, KuduMappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
+                    k1 -> new ConcurrentHashMap<>());
+            configMap.put(configName, mappingConfig);
+        }
+        Map<String, String> properties = configuration.getProperties();
+
+        String kudu_master = properties.get("kudu.master.address");
+        kuduTemplate = new KuduTemplate(kudu_master);
+        kuduSyncService = new KuduSyncService(kuduTemplate);
+
+        kuduConfigMonitor = new KuduConfigMonitor();
+        kuduConfigMonitor.init(this, envProperties);
+    }
+
+    @Override
+    public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+        for (Dml dml : dmls) {
+            if (dml == null) {
+                return;
+            }
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, KuduMappingConfig> configMap;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                configMap = mappingConfigCache.get(destination + "-"
+                        + groupId + "_"
+                        + database + "-" + table);
+            } else {
+                configMap = mappingConfigCache.get(destination + "_" + database + "-" + table);
+            }
+            if (configMap != null) {
+                List<KuduMappingConfig> configs = new ArrayList<>();
+                configMap.values().forEach(config -> {
+                    if (StringUtils.isNotEmpty(config.getGroupId())) {
+                        if (config.getGroupId().equals(dml.getGroupId())) {
+                            configs.add(config);
+                        }
+                    } else {
+                        configs.add(config);
+                    }
+                });
+                if (!configs.isEmpty()) {
+                    configs.forEach(config -> kuduSyncService.sync(config, dml));
+                } else {
+                    logger.error("groupID didn't mach,please check your gruopId ");
+                }
+            } else {
+                logger.error("{} config didn't get,please check your map key ", destination + "_" + database + "-" + table);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (kuduConfigMonitor != null) {
+            kuduConfigMonitor.destroy();
+        }
+        //加入kudu client 关闭钩子
+        kuduTemplate.closeKuduClient();
+    }
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        KuduMappingConfig config = kuduMapping.get(task);
+        KuduEtlService hbaseEtlService = new KuduEtlService(kuduTemplate, config);
+        if (config != null) {
+            return hbaseEtlService.importData(params);
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            for (KuduMappingConfig configTmp : kuduMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    EtlResult etlRes = hbaseEtlService.importData(params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        Map<String, Object> res = new LinkedHashMap<>();
+        KuduMappingConfig config = kuduMapping.get(task);
+        if (config != null && config.getKuduMapping() != null) {
+            String tableName = config.getKuduMapping().getTargetTable();
+            long rowCount = kuduTemplate.countRow(tableName);
+            res.put("kuduTable", tableName);
+            res.put("count", rowCount);
+        }
+        return res;
+    }
+
+    @Override
+    public String getDestination(String task) {
+        KuduMappingConfig config = kuduMapping.get(task);
+        if (config != null && config.getKuduMapping() != null) {
+            return config.getDestination();
+        }
+        return null;
+    }
+}

+ 199 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfig.java

@@ -0,0 +1,199 @@
+package com.alibaba.otter.canal.client.adapter.kudu.config;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * @author liuyadong
+ * @description kudu配置文件映射
+ */
+public class KuduMappingConfig implements AdapterConfig {
+    private String dataSourceKey;      // 数据源key
+
+    private String destination;        // canal实例或MQ的topic
+
+    private String groupId;            // groupId
+
+    private String outerAdapterKey;    // 对应适配器的key
+
+    private boolean concurrent = false; // 是否并行同步
+
+    private KuduMapping kuduMapping;          // db映射配置
+
+    @Override
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    @Override
+    public AdapterMapping getMapping() {
+        return kuduMapping;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
+    public boolean getConcurrent() {
+        return concurrent;
+    }
+
+    public void setConcurrent(boolean concurrent) {
+        this.concurrent = concurrent;
+    }
+
+    public KuduMapping getKuduMapping() {
+        return kuduMapping;
+    }
+
+    public void setKuduMapping(KuduMapping kuduMapping) {
+        this.kuduMapping = kuduMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+
+    public void validate() {
+        if (kuduMapping.database == null || kuduMapping.database.isEmpty()) {
+            throw new NullPointerException("KuduMapping.database");
+        }
+    }
+
+    public static class KuduMapping implements AdapterMapping {
+
+        private String database;                            // 数据库名或schema名
+        private String table;                               // 表名
+        private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
+        private boolean mapAll = false;                 // 映射所有字段
+        private String targetDb;                            // 目标库名
+        private String targetTable;                         // 目标表名
+        private Map<String, String> targetColumns;                       // 目标表字段映射
+
+        private String etlCondition;                        // etl条件sql
+
+        private int readBatch = 5000;
+        private int commitBatch = 5000;                  // etl等批量提交大小
+
+        private Map<String, String> allMapColumns;
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+        public Map<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(Map<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
+        public Boolean getMapAll() {
+            return mapAll;
+        }
+
+        public void setMapAll(Boolean mapAll) {
+            this.mapAll = mapAll;
+        }
+
+        public String getTargetDb() {
+            return targetDb;
+        }
+
+        public void setTargetDb(String targetDb) {
+            this.targetDb = targetDb;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColumns() {
+            if (targetColumns != null) {
+                targetColumns.forEach((key, value) -> {
+                    if (StringUtils.isEmpty(value)) {
+                        targetColumns.put(key, key);
+                    }
+                });
+            }
+            return targetColumns;
+        }
+
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
+    }
+}

+ 47 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/config/KuduMappingConfigLoader.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.client.adapter.kudu.config;
+
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author liuyadong
+ * @description kudu表信息加载
+ */
+public class KuduMappingConfigLoader {
+    private static Logger logger = LoggerFactory.getLogger(KuduMappingConfigLoader.class);
+
+    /**
+     * 加载HBase表映射配置
+     *
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, KuduMappingConfig> load(Properties envProperties) {
+        logger.info("## Start loading kudu mapping config ... ");
+
+        Map<String, KuduMappingConfig> result = new LinkedHashMap<>();
+
+        Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("kudu");
+        configContentMap.forEach((fileName, content) -> {
+            KuduMappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, content, KuduMappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR load Config: " + fileName + " " + e.getMessage(), e);
+            }
+            result.put(fileName, config);
+        });
+
+        logger.info("## kudu mapping config loaded");
+        return result;
+    }
+}

+ 162 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java

@@ -0,0 +1,162 @@
+package com.alibaba.otter.canal.client.adapter.kudu.monitor;
+
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author liuyadong
+ * @description 配置文件监听
+ */
+public class KuduConfigMonitor {
+    private static final Logger logger = LoggerFactory.getLogger(KuduConfigMonitor.class);
+
+    private static final String adapterName = "kudu";
+
+    private KuduAdapter kuduAdapter;
+
+    private Properties envProperties;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(KuduAdapter kuduAdapter, Properties envProperties) {
+        this.kuduAdapter = kuduAdapter;
+        this.envProperties = envProperties;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 停止监听配置文件
+     */
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 配置文件监听
+     */
+    private class FileListener extends FileAlterationListenerAdaptor {
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                KuduMappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, KuduMappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
+                config.validate();
+                addConfigToCache(file, config);
+
+                logger.info("Add a new kudu mapping config: {} to canal adapter", file.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                            .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
+                    KuduMappingConfig config = YmlConfigBinder
+                            .bindYmlToObj(null, configContent, KuduMappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
+                    config.validate();
+                    if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
+                        deleteConfigFromCache(file);
+                    }
+                    addConfigToCache(file, config);
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+                    logger.info("Delete a hbase mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        /**
+         * 添加配置文件信息到缓存
+         *
+         * @param file
+         * @param config
+         */
+        private void addConfigToCache(File file, KuduMappingConfig config) {
+            kuduAdapter.getKuduMapping().put(file.getName(), config);
+            Map<String, KuduMappingConfig> configMap = kuduAdapter.getMappingConfigCache()
+                    .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
+                                    + config.getKuduMapping().getDatabase() + "." + config.getKuduMapping().getTable(),
+                            k1 -> new HashMap<>());
+            configMap.put(file.getName(), config);
+        }
+
+        /**
+         * 从缓存中删除配置
+         *
+         * @param file 文件
+         */
+        private void deleteConfigFromCache(File file) {
+            kuduAdapter.getKuduMapping().remove(file.getName());
+            for (Map<String, KuduMappingConfig> configMap : kuduAdapter.getMappingConfigCache().values()) {
+                if (configMap != null) {
+                    configMap.remove(file.getName());
+                }
+            }
+        }
+
+    }
+}

+ 131 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduEtlService.java

@@ -0,0 +1,131 @@
+package com.alibaba.otter.canal.client.adapter.kudu.service;
+
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
+import com.alibaba.otter.canal.client.adapter.kudu.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.google.common.base.Joiner;
+import org.apache.kudu.client.KuduException;
+
+import javax.sql.DataSource;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author liuyadong
+ * @description kudu 拉取历史数据
+ */
+public class KuduEtlService extends AbstractEtlService {
+
+    private KuduTemplate kuduTemplate;
+    private KuduMappingConfig config;
+
+    public KuduEtlService(KuduTemplate kuduTemplate, KuduMappingConfig config) {
+        super("kudu", config);
+        this.kuduTemplate = kuduTemplate;
+        this.config = config;
+    }
+
+    public EtlResult importData(List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        List<String> errMsg = new ArrayList<>();
+
+        KuduMappingConfig.KuduMapping kuduMapping = config.getKuduMapping();
+        boolean flag = kuduTemplate.tableExists(kuduMapping.getTargetTable());
+        //表不存在,停止导入
+        if (!flag) {
+            logger.info("{} is don't hava,please check your kudu table !", kuduMapping.getTargetTable());
+            errMsg.add(kuduMapping.getTargetTable() + " is don't hava,please check your kudu table !");
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+            return etlResult;
+        }
+        logger.info("{} etl is starting!", kuduMapping.getTargetTable());
+        String sql = "SELECT * FROM " + kuduMapping.getDatabase() + "." + kuduMapping.getTable();
+        return importData(sql, params);
+    }
+
+
+    @Override
+    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values, AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
+        KuduMappingConfig.KuduMapping kuduMapping = (KuduMappingConfig.KuduMapping) mapping;
+        //获取字段元数据
+        Map<String, String> columnsMap = new LinkedHashMap<>();//需要同步的字段
+
+        try {
+            Util.sqlRS(ds, "SELECT * FROM " + SyncUtil.getDbTableName(kuduMapping) + " LIMIT 1", rs -> {
+                try {
+                    ResultSetMetaData rsd = rs.getMetaData();
+                    int columnCount = rsd.getColumnCount();
+                    List<String> columns = new ArrayList<>();
+                    for (int i = 1; i <= columnCount; i++) {
+                        columns.add(rsd.getColumnName(i).toLowerCase());
+                    }
+                    columnsMap.putAll(SyncUtil.getColumnsMap(kuduMapping, columns));
+                    return true;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    return false;
+                }
+            });
+            //写入数据
+            logger.info("etl select data sql is :{}", sql);
+            Util.sqlRS(ds, sql, values, rs -> {
+                int idx = 1;
+                try {
+                    List<Map<String, Object>> dataList = new ArrayList<>();
+                    while (rs.next()) {
+                        Map<String, Object> data = new HashMap<>();
+                        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                            String mysqlColumnName = entry.getKey();//mysql字段名
+                            String kuduColumnName = entry.getValue();//kudu字段名
+                            if (kuduColumnName == null) {
+                                kuduColumnName = mysqlColumnName;
+                            }
+                            Object value = rs.getObject(kuduColumnName);
+                            if (value != null) {
+                                data.put(kuduColumnName, value);
+                            } else {
+                                data.put(kuduColumnName, null);
+                            }
+                        }
+                        dataList.add(data);
+                        idx++;
+                        impCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + impCount.get());
+                        }
+                        if (idx % kuduMapping.getCommitBatch() == 0) {
+                            kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                            dataList.clear();
+                        }
+                    }
+                    if (!dataList.isEmpty()) {
+                        kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                    }
+                    return true;
+
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                    logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
+                    return false;
+                } catch (KuduException e) {
+                    e.printStackTrace();
+                    logger.error(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(kuduMapping.getTargetTable() + " etl failed! ==>" + e.getMessage());
+                    return false;
+                }
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+
+    }
+}

+ 204 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/service/KuduSyncService.java

@@ -0,0 +1,204 @@
+package com.alibaba.otter.canal.client.adapter.kudu.service;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.apache.kudu.client.KuduException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author liuyadong
+ * @description kudu实时同步
+ */
+public class KuduSyncService {
+    private static Logger logger = LoggerFactory.getLogger(KuduSyncService.class);
+
+    private KuduTemplate kuduTemplate;
+
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+//    private Map<String, Map<String, Integer>> columnsTypeCache = new ConcurrentHashMap<>();
+
+    public KuduSyncService(KuduTemplate kuduTemplate) {
+        this.kuduTemplate = kuduTemplate;
+    }
+
+//    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+//        return columnsTypeCache;
+//    }
+
+    /**
+     * 同步事件处理
+     *
+     * @param config
+     * @param dml
+     */
+    public void sync(KuduMappingConfig config, Dml dml) {
+        if (config != null) {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                upsert(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(config, dml);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            }
+        }
+    }
+
+    /**
+     * 删除事件
+     *
+     * @param config
+     * @param dml
+     */
+    private void delete(KuduMappingConfig config, Dml dml) {
+        KuduMappingConfig.KuduMapping kuduMapping = config.getKuduMapping();
+        String configTable = kuduMapping.getTable();
+        String configDatabase = kuduMapping.getDatabase();
+        String table = dml.getTable();
+        String database = dml.getDatabase();
+        if (configTable.equals(table) && configDatabase.equals(database)) {
+            List<Map<String, Object>> data = dml.getData();
+            if (data == null || data.isEmpty()) {
+                return;
+            }
+            //判定主键映射
+            String pkId = "";
+            Map<String, String> targetPk = kuduMapping.getTargetPk();
+            for (Map.Entry<String, String> entry : targetPk.entrySet()) {
+                String mysqlID = entry.getKey().toLowerCase();
+                String kuduID = entry.getValue();
+                if (kuduID == null) {
+                    pkId = mysqlID;
+                } else {
+                    pkId = kuduID;
+                }
+            }
+            //切割联合主键
+            List<String> pkIds = Arrays.asList(pkId.split(","));
+            try {
+                int idx = 1;
+                boolean completed = false;
+                List<Map<String, Object>> dataList = new ArrayList<>();
+
+                for (Map<String, Object> item : data) {
+                    Map<String, Object> primaryKeyMap = new HashMap<>();
+                    for (Map.Entry<String, Object> entry : item.entrySet()) {
+                        String columnName = entry.getKey().toLowerCase();
+                        Object value = entry.getValue();
+                        if (pkIds.contains(columnName)) {
+                            primaryKeyMap.put(columnName, value);
+                        }
+                    }
+                    dataList.add(primaryKeyMap);
+                    idx++;
+                    if (idx % kuduMapping.getCommitBatch() == 0) {
+                        kuduTemplate.delete(kuduMapping.getTargetTable(), dataList);
+                        dataList.clear();
+                        completed = true;
+                    }
+                }
+                if (!completed) {
+                    kuduTemplate.delete(kuduMapping.getTargetTable(), dataList);
+                }
+            } catch (KuduException e) {
+                logger.error(e.getMessage());
+                logger.error("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            }
+        }
+    }
+
+    /**
+     * 更新插入事件
+     *
+     * @param config
+     * @param dml
+     */
+    private void upsert(KuduMappingConfig config, Dml dml) {
+        KuduMappingConfig.KuduMapping kuduMapping = config.getKuduMapping();
+        String configTable = kuduMapping.getTable();
+        String configDatabase = kuduMapping.getDatabase();
+        String table = dml.getTable();
+        String database = dml.getDatabase();
+        if (configTable.equals(table) && configDatabase.equals(database)) {
+            List<Map<String, Object>> data = dml.getData();
+            if (data == null || data.isEmpty()) {
+                return;
+            }
+            try {
+                int idx = 1;
+                boolean completed = false;
+                List<Map<String, Object>> dataList = new ArrayList<>();
+
+                for (Map<String, Object> entry : data) {
+                    dataList.add(entry);
+                    idx++;
+                    if (idx % kuduMapping.getCommitBatch() == 0) {
+                        kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                        dataList.clear();
+                        completed = true;
+                    }
+                }
+                if (!completed) {
+                    kuduTemplate.upsert(kuduMapping.getTargetTable(), dataList);
+                }
+            } catch (KuduException e) {
+                logger.error(e.getMessage());
+                logger.error("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            }
+        }
+
+
+    }
+
+    /**
+     * 插入事件
+     *
+     * @param config
+     * @param dml
+     */
+    private void insert(KuduMappingConfig config, Dml dml) {
+        KuduMappingConfig.KuduMapping kuduMapping = config.getKuduMapping();
+        String configTable = kuduMapping.getTable();
+        String configDatabase = kuduMapping.getDatabase();
+        String table = dml.getTable();
+        String database = dml.getDatabase();
+        if (configTable.equals(table) && configDatabase.equals(database)) {
+            List<Map<String, Object>> data = dml.getData();
+            if (data == null || data.isEmpty()) {
+                return;
+            }
+            try {
+                int idx = 1;
+                boolean completed = false;
+                List<Map<String, Object>> dataList = new ArrayList<>();
+
+                for (Map<String, Object> entry : data) {
+                    dataList.add(entry);
+                    idx++;
+                    if (idx % kuduMapping.getCommitBatch() == 0) {
+                        kuduTemplate.insert(kuduMapping.getTargetTable(), dataList);
+                        dataList.clear();
+                        completed = true;
+                    }
+                }
+                if (!completed) {
+                    kuduTemplate.insert(kuduMapping.getTargetTable(), dataList);
+                }
+            } catch (KuduException e) {
+                logger.error(e.getMessage());
+                logger.error("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            }
+        }
+    }
+
+}

+ 410 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/KuduTemplate.java

@@ -0,0 +1,410 @@
+package com.alibaba.otter.canal.client.adapter.kudu.support;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * @author liuyadong
+ * @description kudu 操作工具类
+ */
+public class KuduTemplate {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private KuduClient kuduClient;
+    private String masters;
+
+    private final static int OPERATION_BATCH = 500;
+
+    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public KuduTemplate(String master_str) {
+        this.masters = master_str;
+        checkClient();
+    }
+
+    /**
+     * 检车连接
+     */
+    private void checkClient() {
+        if (kuduClient == null) {
+            //kudu master 以逗号分隔
+            List<String> masterList = Arrays.asList(masters.split(","));
+            kuduClient = new KuduClient.KuduClientBuilder(masterList)
+                    .defaultOperationTimeoutMs(60000)
+                    .defaultSocketReadTimeoutMs(60000)
+                    .defaultAdminOperationTimeoutMs(60000).build();
+        }
+    }
+
+    /**
+     * 查询表是否存在
+     *
+     * @param tableName
+     * @return
+     */
+    public boolean tableExists(String tableName) {
+        this.checkClient();
+        try {
+            return kuduClient.tableExists(tableName);
+        } catch (KuduException e) {
+            logger.error("kudu table exists check fail,message :{}", e.getMessage());
+            return true;
+        }
+    }
+
+    /**
+     * 删除行
+     *
+     * @param tableName
+     * @param dataList
+     * @throws KuduException
+     */
+    public void delete(String tableName, List<Map<String, Object>> dataList) throws KuduException {
+        this.checkClient();
+        KuduTable kuduTable = kuduClient.openTable(tableName);
+        KuduSession session = kuduClient.newSession();
+        try {
+            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+            session.setMutationBufferSpace(OPERATION_BATCH);
+            //获取元数据结构
+            Map<String, Type> metaMap = new HashMap<>();
+            Schema schema = kuduTable.getSchema();
+            for (ColumnSchema columnSchema : schema.getColumns()) {
+                String colName = columnSchema.getName().toLowerCase();
+                Type type = columnSchema.getType();
+                metaMap.put(colName, type);
+            }
+            int uncommit = 0;
+            for (Map<String, Object> data : dataList) {
+                Delete delete = kuduTable.newDelete();
+                PartialRow row = delete.getRow();
+                for (Map.Entry<String, Object> entry : data.entrySet()) {
+                    String name = entry.getKey().toLowerCase();
+                    Type type = metaMap.get(name);
+                    Object value = entry.getValue();
+                    fillRow(row, name, value, type); //填充行数据
+                }
+                session.apply(delete);
+                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
+                uncommit = uncommit + 1;
+                if (uncommit > OPERATION_BATCH / 3 * 2) {
+                    List<OperationResponse> delete_option = session.flush();
+                    if (delete_option.size() > 0) {
+                        OperationResponse response = delete_option.get(0);
+                        if (response.hasRowError()) {
+                            logger.error("delete row fail table name is :{} ", tableName);
+                            logger.error("error list is :{}", response.getRowError().getMessage());
+                        }
+                    }
+                    uncommit = 0;
+                }
+            }
+            List<OperationResponse> delete_option = session.flush();
+            if (delete_option.size() > 0) {
+                OperationResponse response = delete_option.get(0);
+                if (response.hasRowError()) {
+                    logger.error("delete row fail table name is :{} ", tableName);
+                    logger.error("error list is :{}", response.getRowError().getMessage());
+                }
+            }
+
+        } catch (KuduException e) {
+            logger.error("error message is :{}", dataList.toString());
+            throw e;
+        } finally {
+            if (!session.isClosed()) {
+                session.close();
+            }
+        }
+    }
+
+    /**
+     * 更新/插入字段
+     *
+     * @param tableName
+     * @param dataList
+     * @throws KuduException
+     */
+    public void upsert(String tableName, List<Map<String, Object>> dataList) throws KuduException {
+        this.checkClient();
+        KuduTable kuduTable = kuduClient.openTable(tableName);
+        KuduSession session = kuduClient.newSession();
+        try {
+            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+            session.setMutationBufferSpace(OPERATION_BATCH);
+            //获取元数据结构
+            Map<String, Type> metaMap = new HashMap<>();
+            Schema schema = kuduTable.getSchema();
+            for (ColumnSchema columnSchema : schema.getColumns()) {
+                String colName = columnSchema.getName().toLowerCase();
+                Type type = columnSchema.getType();
+                metaMap.put(colName, type);
+            }
+            int uncommit = 0;
+            for (Map<String, Object> data : dataList) {
+                Upsert upsert = kuduTable.newUpsert();
+                PartialRow row = upsert.getRow();
+                for (Map.Entry<String, Object> entry : data.entrySet()) {
+                    String name = entry.getKey().toLowerCase();
+                    Type type = metaMap.get(name);
+                    Object value = entry.getValue();
+                    fillRow(row, name, value, type); //填充行数据
+                }
+                session.apply(upsert);
+                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
+                uncommit = uncommit + 1;
+                if (uncommit > OPERATION_BATCH / 3 * 2) {
+                    List<OperationResponse> update_option = session.flush();
+                    if (update_option.size() > 0) {
+                        OperationResponse response = update_option.get(0);
+                        if (response.hasRowError()) {
+                            logger.error("update row fail table name is :{} ", tableName);
+                            logger.error("update list is :{}", response.getRowError().getMessage());
+                        }
+                    }
+                    uncommit = 0;
+                }
+            }
+            List<OperationResponse> update_option = session.flush();
+            if (update_option.size() > 0) {
+                OperationResponse response = update_option.get(0);
+                if (response.hasRowError()) {
+                    logger.error("update row fail table name is :{} ", tableName);
+                    logger.error("update list is :{}", response.getRowError().getMessage());
+                }
+            }
+        } catch (KuduException e) {
+            logger.error("error message is :{}", dataList.toString());
+            throw e;
+        } finally {
+            if (!session.isClosed()) {
+                session.close();
+            }
+        }
+
+
+    }
+
+    /**
+     * 插入数据
+     *
+     * @param tableName
+     * @param dataList
+     * @throws KuduException
+     */
+    public void insert(String tableName, List<Map<String, Object>> dataList) throws KuduException {
+        this.checkClient();
+        KuduTable kuduTable = kuduClient.openTable(tableName);// 打开表
+        KuduSession session = kuduClient.newSession();  // 创建写session,kudu必须通过session写入
+        try {
+            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); // 采取Flush方式 手动刷新
+            session.setMutationBufferSpace(OPERATION_BATCH);
+            //获取元数据结构
+            Map<String, Type> metaMap = new HashMap<>();
+            Schema schema = kuduTable.getSchema();
+            for (ColumnSchema columnSchema : schema.getColumns()) {
+                String colName = columnSchema.getName().toLowerCase();
+                Type type = columnSchema.getType();
+                metaMap.put(colName, type);
+            }
+            int uncommit = 0;
+            for (Map<String, Object> data : dataList) {
+                Insert insert = kuduTable.newInsert();
+                PartialRow row = insert.getRow();
+                for (Map.Entry<String, Object> entry : data.entrySet()) {
+                    String name = entry.getKey().toLowerCase();
+                    Type type = metaMap.get(name);
+                    Object value = entry.getValue();
+                    fillRow(row, name, value, type); //填充行数据
+                }
+                session.apply(insert);
+                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
+                uncommit = uncommit + 1;
+                if (uncommit > OPERATION_BATCH / 3 * 2) {
+                    List<OperationResponse> insert_option = session.flush();
+                    if (insert_option.size() > 0) {
+                        OperationResponse response = insert_option.get(0);
+                        if (response.hasRowError()) {
+                            logger.error("insert row fail table name is :{} ", tableName);
+                            logger.error("insert list is :{}", response.getRowError().getMessage());
+                        }
+                    }
+                    uncommit = 0;
+                }
+            }
+            List<OperationResponse> insert_option = session.flush();
+            if (insert_option.size() > 0) {
+                OperationResponse response = insert_option.get(0);
+                if (response.hasRowError()) {
+                    logger.error("insert row fail table name is :{} ", tableName);
+                    logger.error("insert list is :{}", response.getRowError().getMessage());
+                }
+            }
+        } catch (KuduException e) {
+            logger.error("error message is :{}", dataList.toString());
+            throw e;
+        } finally {
+            if (!session.isClosed()) {
+                session.close();
+            }
+        }
+
+    }
+
+    /**
+     * 统计kudu表数据
+     *
+     * @param tableName
+     * @return
+     */
+    public long countRow(String tableName) {
+        this.checkClient();
+        long rowCount = 0L;
+        try {
+            KuduTable kuduTable = kuduClient.openTable(tableName);
+            //创建scanner扫描
+            KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
+            //遍历数据
+            while (scanner.hasMoreRows()) {
+                while (scanner.nextRows().hasNext()) {
+                    rowCount++;
+                }
+            }
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        return rowCount;
+    }
+
+    /**
+     * 关闭钩子
+     *
+     * @throws IOException
+     */
+    public void closeKuduClient() {
+        if (kuduClient != null) {
+            try {
+                kuduClient.close();
+            } catch (Exception e) {
+                logger.error("ShutdownHook Close KuduClient Error! error message {}", e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * 封装kudu行数据
+     *
+     * @param row
+     * @param rawVal
+     * @param type
+     */
+    private void fillRow(PartialRow row, String colName, Object rawVal, Type type) {
+        String rowValue = "0";
+        if (!(rawVal == null || "".equals(rawVal))) {
+            rowValue = rawVal + "";
+        } else {
+            return;
+        }
+        if (type == null) {
+            logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
+            return;
+        }
+        try {
+            switch (type) {
+                case INT8:
+                    row.addByte(colName, Byte.parseByte(rowValue));
+                    break;
+                case INT16:
+                    row.addShort(colName, Short.parseShort(rowValue));
+                    break;
+                case INT32:
+                    row.addInt(colName, Integer.parseInt(rowValue));
+                    break;
+                case INT64:
+                    row.addLong(colName, Long.parseLong(rowValue));
+                    break;
+                case BINARY:
+                    row.addBinary(colName, rowValue.getBytes());
+                    break;
+                case STRING:
+                    row.addString(colName, rowValue);
+                    break;
+                case BOOL:
+                    if (!("true".equalsIgnoreCase(rowValue) || "false".equalsIgnoreCase(rowValue))) {
+                        return;
+                    }
+                    row.addBoolean(colName, Boolean.parseBoolean(rowValue));
+                    break;
+                case FLOAT:
+                    row.addFloat(colName, Float.parseFloat(rowValue));
+                    break;
+                case DOUBLE:
+                    row.addDouble(colName, Double.parseDouble(rowValue));
+                    break;
+                case UNIXTIME_MICROS:
+                    if ("0".equals(rowValue)) {
+                        try {
+                            Date parse = sdf.parse("2099-11-11 11:11:11");
+                            row.addLong(colName, parse.getTime());
+                        } catch (ParseException e) {
+                            logger.warn("date column is null");
+                        }
+                    } else {
+                        try {
+                            Date parse = rowValue.length() > 19 ? sdf.parse(rowValue.substring(0, 19)) : sdf.parse(rowValue);
+                            row.addLong(colName, parse.getTime());
+                        } catch (ParseException e) {
+                            logger.warn("date format error, error data is :{}", rowValue);
+                            try {
+                                Date parse = sdf.parse("2099-11-11 11:11:11");
+                                row.addLong(colName, parse.getTime());
+                            } catch (ParseException ie) {
+                                logger.warn("date column is null");
+                            }
+                        }
+                    }
+                    break;
+                default:
+                    logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
+            }
+        } catch (NumberFormatException e) {
+            logger.error(e.getMessage());
+            logger.error("column parse fail==> column name=>{},column type=>{},column value=>{}", colName, type, rawVal);
+        }
+    }
+
+    /**
+     * kudu数据类型映射
+     *
+     * @param
+     */
+    private Type toKuduType(String mysqlType) throws IllegalArgumentException {
+
+        switch (mysqlType) {
+            case "varchar":
+                return Type.STRING;
+            case "int":
+                return Type.INT8;
+            case "decimal":
+                return Type.DOUBLE;
+            case "double":
+                return Type.DOUBLE;
+            case "datetime":
+                return Type.STRING;
+            case "timestamp":
+                return Type.STRING;
+            default:
+                throw new IllegalArgumentException("The provided data type doesn't map to know any known one.");
+        }
+    }
+}

+ 67 - 0
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/support/SyncUtil.java

@@ -0,0 +1,67 @@
+package com.alibaba.otter.canal.client.adapter.kudu.support;
+
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author liuyadong
+ * @description 工具
+ */
+public class SyncUtil {
+
+    /**
+     * 获取源数据库 元数据结构
+     *
+     * @param kuduMapping
+     * @return
+     */
+    public static String getDbTableName(KuduMappingConfig.KuduMapping kuduMapping) {
+        String result = "";
+        if (StringUtils.isNotEmpty(kuduMapping.getTable())) {
+            result += kuduMapping.getDatabase() + ".";
+        }
+        result += kuduMapping.getTable();
+        return result;
+    }
+
+    /**
+     * 过滤转换字段
+     *
+     * @param kuduMapping
+     * @param columns
+     * @return
+     */
+    public static Map<String, String> getColumnsMap(KuduMappingConfig.KuduMapping kuduMapping, List<String> columns) {
+        Map<String, String> columnsMap;
+        if (kuduMapping.getMapAll()) {
+            if (kuduMapping.getAllMapColumns() != null) {
+                return kuduMapping.getAllMapColumns();
+            }
+            columnsMap = new LinkedHashMap<>();
+            for (String srcColumn : columns) {
+                boolean flag = true;
+                if (kuduMapping.getTargetColumns() != null) {
+                    for (Map.Entry<String, String> entry : kuduMapping.getTargetColumns().entrySet()) {
+                        if (srcColumn.equals(entry.getValue())) {
+                            columnsMap.put(entry.getKey(), srcColumn);
+                            flag = false;
+                            break;
+                        }
+                    }
+                }
+                if (flag) {
+                    columnsMap.put(srcColumn, srcColumn);
+                }
+            }
+            kuduMapping.setAllMapColumns(columnsMap);
+        } else {
+            columnsMap = kuduMapping.getTargetColumns();
+        }
+        return columnsMap;
+    }
+
+}

+ 1 - 0
client-adapter/kudu/src/main/resources/META-INF.canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+kudu=com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter

+ 19 - 0
client-adapter/kudu/src/main/resources/kudu/kudutest_user.yml

@@ -0,0 +1,19 @@
+dataSourceKey: defaultDS
+destination: test_instance
+outerAdapterKey: kudu
+groupId: g1
+kuduMapping:
+  database: test
+  table: test
+  targetTable: test
+  targetPk:
+    id: id
+  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
+  etlCondition:
+  commitBatch: 3000 # 批量提交的大小

+ 29 - 0
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/KuduConnectionTest.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.adapter.kudu.test;
+
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduConnectionTest {
+
+    @Test
+    public void test01() {
+        List<String> masterList = Arrays.asList("10.6.36.102:7051,10.6.36.187:7051,10.6.36.229:7051".split(","));
+        KuduClient kuduClient = new KuduClient.KuduClientBuilder(masterList).defaultOperationTimeoutMs(60000)
+                .defaultSocketReadTimeoutMs(30000).defaultAdminOperationTimeoutMs(60000).build();
+        try {
+            List<String> tablesList = kuduClient.getTablesList().getTablesList();
+            System.out.println(tablesList.toString());
+            KuduTable web_white_list = kuduClient.openTable("web_white_list");
+
+
+
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 45 - 0
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConfig.java

@@ -0,0 +1,45 @@
+package com.alibaba.otter.canal.client.adapter.kudu.test;
+
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * ━━━━━━神兽出没━━━━━━
+ *    ┏┓   ┏┓
+ *   ┏┛┻━━━┛┻┓
+ *   ┃   ━   ┃
+ *   ┃ ┳┛ ┗┳ ┃
+ *   ┃   ┻   ┃
+ *   ┗━┓   ┏━┛
+ *     ┃   ┃  神兽保佑
+ *     ┃   ┃  代码无bug
+ *     ┃   ┗━━━┓
+ *     ┃       ┣┓
+ *     ┃       ┏┛
+ *     ┗┓┓┏━┳┓┏┛
+ *      ┃┫┫ ┃┫┫
+ *      ┗┻┛ ┗┻┛
+ * ━━━━━━感觉萌萌哒━━━━━━
+ * Created by Liuyadong on 2019-11-13
+ *
+ * @description
+ */
+public class TestConfig {
+    @Before
+    public void before() {
+        // 加载数据源连接池
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+    }
+
+    @Test
+    public void testLoad() {
+        Map<String, KuduMappingConfig> configMap = KuduMappingConfigLoader.load(null);
+        Assert.assertFalse(configMap.isEmpty());
+    }
+}

+ 37 - 0
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/TestConstant.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.kudu.test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.sql.SQLException;
+
+public class TestConstant {
+
+    public final static String    jdbcUrl      = "jdbc:mysql://10.0.9.5:3306/canal_manager?useUnicode=true";
+    public final static String    jdbcUser     = "axtest";
+    public final static String    jdbcPassword = "axtest123";
+
+    public final static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 47 - 0
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/Common.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.client.adapter.kudu.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.kudu.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ━━━━━━神兽出没━━━━━━
+ *    ┏┓   ┏┓
+ *   ┏┛┻━━━┛┻┓
+ *   ┃   ━   ┃
+ *   ┃ ┳┛ ┗┳ ┃
+ *   ┃   ┻   ┃
+ *   ┗━┓   ┏━┛
+ *     ┃   ┃  神兽保佑
+ *     ┃   ┃  代码无bug
+ *     ┃   ┗━━━┓
+ *     ┃       ┣┓
+ *     ┃       ┏┛
+ *     ┗┓┓┏━┳┓┏┛
+ *      ┃┫┫ ┃┫┫
+ *      ┗┻┛ ┗┻┛
+ * ━━━━━━感觉萌萌哒━━━━━━
+ * Created by Liuyadong on 2019-11-13
+ *
+ * @description
+ */
+public class Common {
+    public static KuduAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("kudu");
+        outerAdapterConfig.setKey("kudu");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("kudu.master.address", "10.6.36.102,10.6.36.187,10.6.36.229");
+        outerAdapterConfig.setProperties(properties);
+
+        KuduAdapter adapter = new KuduAdapter();
+        adapter.init(outerAdapterConfig, null);
+        return adapter;
+    }
+}

+ 71 - 0
client-adapter/kudu/src/test/java/com/alibaba/otter/canal/client/adapter/kudu/test/sync/TestSyncKudu.java

@@ -0,0 +1,71 @@
+package com.alibaba.otter.canal.client.adapter.kudu.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * ━━━━━━神兽出没━━━━━━
+ *    ┏┓   ┏┓
+ *   ┏┛┻━━━┛┻┓
+ *   ┃   ━   ┃
+ *   ┃ ┳┛ ┗┳ ┃
+ *   ┃   ┻   ┃
+ *   ┗━┓   ┏━┛
+ *     ┃   ┃  神兽保佑
+ *     ┃   ┃  代码无bug
+ *     ┃   ┗━━━┓
+ *     ┃       ┣┓
+ *     ┃       ┏┛
+ *     ┗┓┓┏━┳┓┏┛
+ *      ┃┫┫ ┃┫┫
+ *      ┗┻┛ ┗┻┛
+ * ━━━━━━感觉萌萌哒━━━━━━
+ * Created by Liuyadong on 2019-11-13
+ *
+ * @description
+ */
+public class TestSyncKudu {
+
+    private KuduAdapter kuduAdapter;
+
+    @Before
+    public void init() {
+        kuduAdapter = Common.init();
+    }
+
+    @Test
+    public void testEtl() {
+        List<String> param = new ArrayList<>();
+        kuduAdapter.etl("kudutest_user.yml", param);
+    }
+
+    @Test
+    public void testCount(){
+        kuduAdapter.count("kudutest_user.yml");
+    }
+
+    @Test
+    public void testSync() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setGroupId("g1");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("test1");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "liuyadong");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+        dml.setData(dataList);
+
+        kuduAdapter.sync(Collections.singletonList(dml));
+    }
+}

+ 7 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -50,6 +50,13 @@
                 <include>**/*</include>
                 <include>**/*</include>
             </includes>
             </includes>
         </fileSet>
         </fileSet>
+        <fileSet>
+            <directory>../kudu/src/main/resources/kudu</directory>
+            <outputDirectory>/conf/kudu</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
         <fileSet>
         <fileSet>
             <directory>../rdb/src/main/resources/</directory>
             <directory>../rdb/src/main/resources/</directory>
             <outputDirectory>/conf</outputDirectory>
             <outputDirectory>/conf</outputDirectory>

+ 7 - 0
client-adapter/launcher/src/main/assembly/release.xml

@@ -51,6 +51,13 @@
                 <include>**/*</include>
                 <include>**/*</include>
             </includes>
             </includes>
         </fileSet>
         </fileSet>
+        <fileSet>
+            <directory>../kudu/src/main/resources/kudu</directory>
+            <outputDirectory>/conf/kudu</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
         <fileSet>
         <fileSet>
             <directory>../rdb/src/main/resources/</directory>
             <directory>../rdb/src/main/resources/</directory>
             <outputDirectory>/conf</outputDirectory>
             <outputDirectory>/conf</outputDirectory>

+ 4 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -91,4 +91,7 @@ canal.conf:
 #          mode: transport # or rest
 #          mode: transport # or rest
 #          # security.auth: test:123456 #  only used for rest mode
 #          # security.auth: test:123456 #  only used for rest mode
 #          cluster.name: elasticsearch
 #          cluster.name: elasticsearch
-
+#        - name: kudu
+#          key: kudu
+#          properties:
+#            kudu.master.address: 127.0.0.1 # ',' split multi address

+ 1 - 0
client-adapter/pom.xml

@@ -31,6 +31,7 @@
         <module>es6x</module>
         <module>es6x</module>
         <module>es7x</module>
         <module>es7x</module>
         <module>escore</module>
         <module>escore</module>
+        <module>kudu</module>
     </modules>
     </modules>
 
 
     <licenses>
     <licenses>