Browse Source

enhance clickhouse support (#4957)

xander-os 1 year ago
parent
commit
9baeff55e4
23 changed files with 2687 additions and 0 deletions
  1. 84 0
      client-adapter/clickhouse/pom.xml
  2. 334 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseAdapter.java
  3. 49 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/ConfigLoader.java
  4. 224 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/MappingConfig.java
  5. 44 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/MirrorDbConfig.java
  6. 123 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/monitor/ClickHouseConfigMonitor.java
  7. 553 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseBatchSyncService.java
  8. 193 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseEtlService.java
  9. 168 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseMirrorDbBatchSyncService.java
  10. 112 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/BatchExecutor.java
  11. 111 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/SingleDml.java
  12. 291 0
      client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/SyncUtil.java
  13. 1 0
      client-adapter/clickhouse/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  14. 31 0
      client-adapter/clickhouse/src/main/resources/clickhouse/mytest_user.yml
  15. 109 0
      client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseBatchSyncServiceTest.java
  16. 79 0
      client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseBatchSyncThreadSafeTest.java
  17. 37 0
      client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/TestConstant.java
  18. 49 0
      client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/sync/Common.java
  19. 31 0
      client-adapter/clickhouse/src/test/resources/clickhouse/mytest_customer.yml
  20. 31 0
      client-adapter/clickhouse/src/test/resources/clickhouse/mytest_user.yml
  21. 17 0
      client-adapter/launcher/pom.xml
  22. 10 0
      client-adapter/launcher/src/main/resources/application.yml
  23. 6 0
      client-adapter/pom.xml

+ 84 - 0
client-adapter/clickhouse/pom.xml

@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.8-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.clickhouse</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter clickhouse module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <tasks>
+                                <copy todir="${project.basedir}/../launcher/target/classes/clickhouse" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/clickhouse" erroronmissingdir="true">
+                                        <include name="*.yml" />
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 334 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseAdapter.java

@@ -0,0 +1,334 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse;
+
+import com.alibaba.druid.filter.stat.StatFilter;
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.util.JdbcUtils;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.ConfigLoader;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MirrorDbConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.monitor.ClickHouseConfigMonitor;
+import com.alibaba.otter.canal.client.adapter.clickhouse.service.ClickHouseBatchSyncService;
+import com.alibaba.otter.canal.client.adapter.clickhouse.service.ClickHouseEtlService;
+import com.alibaba.otter.canal.client.adapter.clickhouse.service.ClickHouseMirrorDbBatchSyncService;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.*;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ClickHouse Adapter implementation
+ *
+ * @author: Xander
+ * @date: Created in 2023/11/10 1:13
+ * @email: zhrunxin33@gmail.com
+ */
+@SPI
+public class ClickHouseAdapter implements OuterAdapter {
+
+    private static final Logger                         logger = LoggerFactory.getLogger(ClickHouseAdapter.class);
+
+    private Map<String, MappingConfig>                  clickHouseMapping = new ConcurrentHashMap<>();        // Store the mapping of filename and configuration, load yml files below resource path
+
+    private Map<String, Map<String, MappingConfig>>     mappingConfigCache = new ConcurrentHashMap<>();       // Schema -> Table -> MappingConfig
+
+    private Map<String, MirrorDbConfig>                 mirrorDbConfigCache = new ConcurrentHashMap<>();      // Mirror DB Configuration, don't need to load column mapping
+
+    private DruidDataSource                             dataSource;
+
+    private ClickHouseBatchSyncService clickHouseBatchSyncService;
+
+    private ClickHouseMirrorDbBatchSyncService clickHouseMirrorDbBatchSyncService;
+
+    private Properties                                  envProperties;
+
+    private OuterAdapterConfig                          configuration;                                        // Launch configuration
+
+    private ClickHouseConfigMonitor clickHouseConfigMonitor;
+
+    public Map<String, MappingConfig> getClickHouseMapping() {
+        return clickHouseMapping;
+    }
+
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        this.envProperties = envProperties;
+        this.configuration = configuration;
+        // Load DB type from adapter.launch/bootstrap.yml
+        Map<String, String> properties = configuration.getProperties();
+        String dbType = JdbcUtils.getDbType(properties.get("jdbc.url"), null);
+        // 当.yml文件编码格式存在问题,此处clickhouse yml文件构建 可能会抛出异常
+        Map<String, MappingConfig> clickHouseMappingTmp = ConfigLoader.load(envProperties);
+        // 过滤不匹配的key的配置
+        clickHouseMappingTmp.forEach((key, config) -> {
+            addConfig(key, config);
+        });
+
+        if (clickHouseMapping.isEmpty()) {
+            throw new RuntimeException("No clickhouse adapter found for config key: " + configuration.getKey());
+        }
+
+        // 初始化连接池
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
+        dataSource.setUrl(properties.get("jdbc.url"));
+        dataSource.setUsername(properties.get("jdbc.username"));
+        dataSource.setPassword(properties.get("jdbc.password"));
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(30);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setUseUnfairLock(true);
+        dataSource.setDefaultAutoCommit(false); // disable auto commit or disable Transactional
+        dataSource.setDbType(dbType);
+
+        if ("true".equals(properties.getOrDefault("druid.stat.enable", "true"))) {
+            StatFilter statFilter = new StatFilter();
+            statFilter.setSlowSqlMillis(Long.parseLong(properties.getOrDefault("druid.stat.slowSqlMillis", "1000")));
+            statFilter.setMergeSql(true);
+            statFilter.setLogSlowSql(true);
+            dataSource.setProxyFilters(Collections.singletonList(statFilter));
+        }
+
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
+        }
+
+        String threads = properties.get("threads");
+        String scheduleTime = properties.get("scheduleTime");
+        String batchSize = properties.get("batchSize");
+
+        boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
+                .getOrDefault("skipDupException", "true"));
+        clickHouseBatchSyncService = new ClickHouseBatchSyncService(dataSource,
+                threads != null ? Integer.valueOf(threads) : null,
+                batchSize != null ? Integer.valueOf(batchSize) : null,
+                scheduleTime != null ? Long.valueOf(scheduleTime) : null,
+                skipDupException);
+
+        clickHouseMirrorDbBatchSyncService = new ClickHouseMirrorDbBatchSyncService(mirrorDbConfigCache,
+                dataSource,
+                threads != null ? Integer.valueOf(threads) : null,
+                batchSize != null ? Integer.valueOf(batchSize) : null,
+                scheduleTime != null ? Long.valueOf(scheduleTime) : null,
+                clickHouseBatchSyncService.getColumnsTypeCache(),
+                skipDupException);
+
+        clickHouseConfigMonitor = new ClickHouseConfigMonitor();
+        clickHouseConfigMonitor.init(configuration.getKey(), this, envProperties);
+    }
+
+    /**
+     * Sync main entrance
+     *
+     * @param dmls 数据包
+     */
+    @Override
+    public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+        try {
+            // If mappingConfigCache(column mapping) is empty, that must be mirroring synchronize
+            if (!mappingConfigCache.isEmpty()) {
+                clickHouseBatchSyncService.sync(mappingConfigCache, dmls, envProperties);
+            }
+            clickHouseMirrorDbBatchSyncService.sync(dmls);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Destroy
+     */
+    @Override
+    public void destroy() {
+        if (clickHouseConfigMonitor != null) {
+            clickHouseConfigMonitor.destroy();
+        }
+
+        if (clickHouseBatchSyncService != null) {
+            clickHouseBatchSyncService.close();
+        }
+
+        if (dataSource != null) {
+            dataSource.close();
+        }
+    }
+
+
+    /**
+     * ETL方法
+     *
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
+     * @return ETL结果
+     */
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = clickHouseMapping.get(task);
+        ClickHouseEtlService clickhouseEtlService = new ClickHouseEtlService(dataSource, config);
+        if (config != null) {
+            return clickhouseEtlService.importData(params);
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            for (MappingConfig configTmp : clickHouseMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    EtlResult etlRes = clickhouseEtlService.importData(params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    /**
+     * 获取总数方法
+     *
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
+    @Override
+    public Map<String, Object> count(String task) {
+        MappingConfig config = clickHouseMapping.get(task);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType());
+        Connection conn = null;
+        Map<String, Object> res = new LinkedHashMap<>();
+        try {
+            conn = dataSource.getConnection();
+            Util.sqlRS(conn, sql, rs -> {
+                try {
+                    if (rs.next()) {
+                        Long rowCount = rs.getLong("cnt");
+                        res.put("count", rowCount);
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            });
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        res.put("targetTable", SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
+
+        return res;
+    }
+
+    /**
+     * 获取对应canal instance name 或 mq topic
+     *
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = clickHouseMapping.get(task);
+        if (config != null) {
+            return config.getDestination();
+        }
+        return null;
+    }
+
+    private void addSyncConfigToCache(String configName, MappingConfig mappingConfig) {
+        if (!mappingConfig.getDbMapping().getMirrorDb()) {
+            String key;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            } else {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            }
+            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                    k1 -> new ConcurrentHashMap<>());
+            configMap.put(configName, mappingConfig);
+        } else {
+            // mirrorDB
+            String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                    + mappingConfig.getDbMapping().getDatabase();
+            mirrorDbConfigCache.put(key, MirrorDbConfig.create(configName, mappingConfig));
+        }
+    }
+
+    public boolean addConfig(String fileName, MappingConfig config) {
+        if (match(config)) {
+            clickHouseMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, MappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        clickHouseMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        clickHouseMapping.remove(fileName);
+        for (Map<String, MappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(MappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
+}

+ 49 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/ConfigLoader.java

@@ -0,0 +1,49 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.config;
+
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.YamlUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * CLICKHOUSE表映射配置加载器
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class ConfigLoader {
+
+    private static Logger logger = LoggerFactory.getLogger(ConfigLoader.class);
+
+    /**
+     * 加载CLICKHOUSE表映射配置
+     *
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, MappingConfig> load(Properties envProperties) {
+        logger.info("## Start loading clickhouse mapping config ... ");
+
+        Map<String, MappingConfig> result = new LinkedHashMap<>();
+
+        Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("clickhouse");
+        configContentMap.forEach((fileName, content) -> {
+            MappingConfig config = YamlUtils.ymlToObj(null, content, MappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
+            }
+            result.put(fileName, config);
+        });
+
+        logger.info("## ClickHouse mapping config loaded");
+        return result;
+    }
+}

+ 224 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/MappingConfig.java

@@ -0,0 +1,224 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.config;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * CLICKHOUSE表映射配置
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfig implements AdapterConfig {
+
+    private String    dataSourceKey;      // 数据源key
+
+    private String    destination;        // canal实例或MQ的topic
+
+    private String    groupId;            // groupId
+
+    private String    outerAdapterKey;    // 对应适配器的key
+
+    private boolean   concurrent = false; // 是否并行同步
+
+    private DbMapping dbMapping;          // db映射配置
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
+    public boolean getConcurrent() {
+        return concurrent;
+    }
+
+    public void setConcurrent(boolean concurrent) {
+        this.concurrent = concurrent;
+    }
+
+    public DbMapping getDbMapping() {
+        return dbMapping;
+    }
+
+    public void setDbMapping(DbMapping dbMapping) {
+        this.dbMapping = dbMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public AdapterMapping getMapping() {
+        return dbMapping;
+    }
+
+    public void validate() {
+        if (dbMapping.database == null || dbMapping.database.isEmpty()) {
+            throw new NullPointerException("dbMapping.database");
+        }
+        if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
+            throw new NullPointerException("dbMapping.table");
+        }
+        if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
+            throw new NullPointerException("dbMapping.targetTable");
+        }
+    }
+
+    public static class DbMapping implements AdapterMapping {
+
+        private boolean             mirrorDb        = false;                 // 是否镜像库
+        private String              database;                                // 数据库名或schema名
+        private String              table;                                   // 表名
+        private Map<String, String> targetPk        = new LinkedHashMap<>(); // 目标表主键字段
+        private boolean             mapAll          = false;                 // 映射所有字段
+        private String              targetDb;                                // 目标库名
+        private String              targetTable;                             // 目标表名
+        private Map<String, String> targetColumns;                           // 目标表字段映射
+
+        private boolean             caseInsensitive = false;                 // 目标表不区分大小写,默认是否
+
+        private String              etlCondition;                            // etl条件sql
+
+        private int                 readBatch       = 5000;
+        private int                 commitBatch     = 5000;                  // etl等批量提交大小
+
+        private Map<String, String> allMapColumns;
+
+        public boolean getMirrorDb() {
+            return mirrorDb;
+        }
+
+        public void setMirrorDb(boolean mirrorDb) {
+            this.mirrorDb = mirrorDb;
+        }
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+        public Map<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(Map<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
+        public Boolean getMapAll() {
+            return mapAll;
+        }
+
+        public void setMapAll(Boolean mapAll) {
+            this.mapAll = mapAll;
+        }
+
+        public String getTargetDb() {
+            return targetDb;
+        }
+
+        public void setTargetDb(String targetDb) {
+            this.targetDb = targetDb;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColumns() {
+            if (targetColumns != null) {
+                targetColumns.forEach((key, value) -> {
+                    if (StringUtils.isEmpty(value)) {
+                        targetColumns.put(key, key);
+                    }
+                });
+            }
+            return targetColumns;
+        }
+
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
+        }
+
+        public boolean isCaseInsensitive() {
+            return caseInsensitive;
+        }
+
+        public void setCaseInsensitive(boolean caseInsensitive) {
+            this.caseInsensitive = caseInsensitive;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
+    }
+}

+ 44 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/config/MirrorDbConfig.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.config;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MirrorDbConfig {
+
+    private String             fileName;
+    private MappingConfig      mappingConfig;
+    private Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
+
+    public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
+        return new MirrorDbConfig(fileName, mappingConfig);
+    }
+
+    public MirrorDbConfig(String fileName, MappingConfig mappingConfig){
+        this.fileName = fileName;
+        this.mappingConfig = mappingConfig;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public MappingConfig getMappingConfig() {
+        return mappingConfig;
+    }
+
+    public void setMappingConfig(MappingConfig mappingConfig) {
+        this.mappingConfig = mappingConfig;
+    }
+
+    public Map<String, MappingConfig> getTableConfig() {
+        return tableConfig;
+    }
+
+    public void setTableConfig(Map<String, MappingConfig> tableConfig) {
+        this.tableConfig = tableConfig;
+    }
+}

+ 123 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/monitor/ClickHouseConfigMonitor.java

@@ -0,0 +1,123 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.monitor;
+
+import com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.alibaba.otter.canal.client.adapter.support.YamlUtils;
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Properties;
+
+public class ClickHouseConfigMonitor {
+
+    private static final Logger   logger      = LoggerFactory.getLogger(ClickHouseConfigMonitor.class);
+
+    private static final String   adapterName = "clickhouse";
+
+    private String                key;
+
+    private ClickHouseAdapter     clickHouseAdapter;
+
+    private Properties            envProperties;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(String key, ClickHouseAdapter clickHouseAdapter, Properties envProperties) {
+        this.key = key;
+        this.clickHouseAdapter = clickHouseAdapter;
+        this.envProperties = envProperties;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                MappingConfig config = YamlUtils
+                    .ymlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
+                config.validate();
+                boolean result = clickHouseAdapter.addConfig(file.getName(), config);
+                if (result) {
+                    logger.info("Add a new clickhouse mapping config: {} to canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (clickHouseAdapter.getClickHouseMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
+                    MappingConfig config = YamlUtils
+                        .ymlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
+                    config.validate();
+                    clickHouseAdapter.updateConfig(file.getName(), config);
+                    logger.info("Change a clickhouse mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (clickHouseAdapter.getClickHouseMapping().containsKey(file.getName())) {
+                    clickHouseAdapter.deleteConfig(file.getName());
+
+                    logger.info("Delete a clickhouse mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+}

+ 553 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseBatchSyncService.java

@@ -0,0 +1,553 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.service;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * ClickHouse batch synchronize
+ *
+ * @author: Xander
+ * @date: Created in 2023/11/10 22:23
+ * @email: zhrunxin33@gmail.com
+ */
+public class ClickHouseBatchSyncService {
+
+    private static final Logger                         logger  = LoggerFactory.getLogger(ClickHouseBatchSyncService.class);
+
+    private DruidDataSource                             dataSource;
+
+    private Map<String, Map<String, Integer>>           columnsTypeCache;     // Cache of instance.schema.table -> <columnName, jdbcType>
+
+    private Map<MappingConfig, List<SingleDml>>[]       bufferPools;          // Data buffer pool store sync data, List<Dml> dispersed as arrays according to hash value
+
+    private BatchExecutor[]                             batchExecutors;       // Batch Executor
+
+    private BatchExecutor                               alterExecutors;       // Alter Single Executor(update/delete/truncate)
+
+    private ExecutorService[]                           executorThreads;      // Be initialized once
+    
+    private ScheduledExecutorService[]                  scheduledExecutors;
+
+    private int                                         threads = 3;          // Default parallel thread count
+    private int                                         batchSize = 1000;
+    private long                                        scheduleTime = 10;
+    private boolean                                     skipDupException;
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    public ClickHouseBatchSyncService(DruidDataSource dataSource, Integer threads, Integer batchSize, Long scheduleTime, boolean skipDupException){
+        this(dataSource, threads, batchSize, scheduleTime, new ConcurrentHashMap<>(), skipDupException);
+    }
+
+    @SuppressWarnings("unchecked")
+    public ClickHouseBatchSyncService(DruidDataSource dataSource, Integer threads, Integer batchSize, Long scheduleTime, Map<String, Map<String, Integer>> columnsTypeCache,
+                                      boolean skipDupException){
+        this.dataSource = dataSource;
+        this.columnsTypeCache = columnsTypeCache;
+        this.skipDupException = skipDupException;
+        try {
+            if (threads != null) {
+                this.threads = threads;
+            }
+            if (batchSize != null) {
+                this.batchSize = batchSize;
+            }
+            if (scheduleTime != null) {
+                this.scheduleTime = scheduleTime;
+            }
+            this.alterExecutors = new BatchExecutor(dataSource);
+            this.bufferPools = new ConcurrentHashMap[this.threads];
+            this.batchExecutors = new BatchExecutor[this.threads];
+            this.executorThreads = new ExecutorService[this.threads];
+            this.scheduledExecutors = new ScheduledExecutorService[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                bufferPools[i] = new ConcurrentHashMap<>();
+                batchExecutors[i] = new BatchExecutor(dataSource);
+                executorThreads[i] = Executors.newSingleThreadExecutor();
+                scheduledExecutors[i] = Executors.newSingleThreadScheduledExecutor();
+            }
+            scheduleBatchSync();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Timing-driven event
+     * start schedule batch sync threadPool
+     */
+    private void scheduleBatchSync() {
+        for (int i = 0; i < scheduledExecutors.length; i++) {
+            int index = i;
+            scheduledExecutors[i].scheduleAtFixedRate(()->{
+                List<Future<Boolean>> futures = new ArrayList<>();
+                for (MappingConfig mappingConfig : bufferPools[index].keySet()) {
+                    List<SingleDml> dmls = bufferPools[index].get(mappingConfig);
+                    if (dmls == null || dmls.isEmpty()) {
+                        return;
+                    }
+                    List<SingleDml> tempDmls;
+                    synchronized (dmls) {
+                        tempDmls = new ArrayList<>(dmls);
+                        dmls.clear();
+                    }
+                    futures.add(executorThreads[index].submit(()->{
+                        try {
+                            insert(batchExecutors[index], mappingConfig, tempDmls);
+                            batchExecutors[index].commit();
+                            return true;
+                        } catch (Exception e){
+                            batchExecutors[index].rollback();
+                            throw new RuntimeException(e);
+                        }
+                    }));
+                }
+            }, 0, scheduleTime, TimeUnit.SECONDS);
+        }
+        logger.info("Schedule batch executors has started successfully!");
+    }
+
+    /**
+     * 批量同步回调
+     *
+     * @param dmls 批量 DML
+     * @param function 回调方法
+     */
+    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
+        boolean toExecute = false;
+        for (Dml dml : dmls) {
+            if (!toExecute) {
+                toExecute = function.apply(dml);
+            } else {
+                function.apply(dml);
+            }
+        }
+    }
+
+    /**
+     * Distribute dmls into different partition
+     *
+     * @param mappingConfig {@link com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter#mappingConfigCache }
+     * @param dmls received DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
+        sync(dmls, dml -> {
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // DDL(Cache need to update when DDL was executed)
+            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            return false;
+        } else {
+            // DML
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, MappingConfig> configMap;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
+            } else {
+                configMap = mappingConfig.get(destination + "_" + database + "-" + table);
+            }
+
+            if (configMap == null) {
+                return false;
+            }
+
+            if (configMap.values().isEmpty()) {
+                return false;
+            }
+
+            for (MappingConfig config : configMap.values()) {
+                distributeDml(config, dml);
+            }
+            return true;
+        }
+    }   );
+    }
+
+    /**
+     * Dml distributor
+     */
+    private void distributeDml(MappingConfig config, Dml dml) {
+        if (config != null) {
+            try {
+                String type = dml.getType();
+                if (type == null) return;
+
+                if (type.equalsIgnoreCase("INSERT")) {
+                    appendDmlBufferPartition(config, dml);
+                } else {
+                    boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
+
+                    if (type.equalsIgnoreCase("UPDATE")) {
+                        for (SingleDml singleDml : singleDmls) {
+                            update(alterExecutors, config, singleDml);
+                        }
+                    } else if (type.equalsIgnoreCase("DELETE")) {
+                        for (SingleDml singleDml : singleDmls) {
+                            delete(alterExecutors, config, singleDml);
+                        }
+                    } else if (type.equalsIgnoreCase("TRUNCATE")) {
+                        truncate(alterExecutors, config);
+                    }
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, Feature.WriteNulls));
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void appendDmlBufferPartition(MappingConfig config, Dml dml) {
+        boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
+        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
+
+        singleDmls.forEach(singleDml -> {
+            int hash = mappingHash(config.getDbMapping());
+            if (!config.getConcurrent()) {
+                hash = 0;
+            }
+            List<SingleDml> dmls = bufferPools[hash].computeIfAbsent(config, k -> new ArrayList<>());
+            synchronized (dmls) {
+                dmls.add(singleDml);
+                logger.info("Append one data into pool, id {}", singleDml.getData().get("id"));
+            }
+            // Check the size of the List, achieve when it reaches the maximum value
+            if (dmls.size() >= batchSize) syncToClickHouse(config, hash);
+        });
+    }
+
+    /**
+     * sync when size of list{@link #bufferPools } reaches the maximum value
+     *
+     * @param config key
+     * @param index parallel thread index
+     */
+    private void syncToClickHouse(MappingConfig config, int index) {
+        List<SingleDml> dmls = bufferPools[index].get(config);
+        logger.info("schema:{} table:{} reaches the maximum value, ready to synchronize, size {}", config.getDbMapping().getDatabase(), config.getDbMapping().getTable(), dmls.size());
+        if (dmls ==null || dmls.isEmpty()) {
+            return;
+        }
+        List<SingleDml> tempDmls;
+        synchronized (dmls) {
+            tempDmls = new ArrayList<>(dmls);
+            dmls.clear();
+        }
+        executorThreads[index].submit(() -> {
+            try {
+                insert(batchExecutors[index], config, tempDmls);
+                batchExecutors[index].commit();
+                return true;
+            } catch (Exception e) {
+                batchExecutors[index].rollback();
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /**
+     * Insert
+     *
+     * @param batchExecutor batch translational executor
+     * @param config corresponding configuration object
+     * @param dmls DMLs
+     */
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, List<SingleDml> dmls) throws SQLException {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+        List<SingleDml> clearDmls = dmls.stream().filter(e -> e.getData() != null && !e.getData().isEmpty()).collect(Collectors.toList());
+        if (clearDmls == null || clearDmls.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+        String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, clearDmls.get(0).getData());
+
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" (");
+
+        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
+            .append(targetColumnName)
+            .append(backtick)
+            .append(","));
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        int mapLen = columnsMap.size();
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        List<List<Map<String, ?>>> values = new ArrayList<>();
+        boolean flag = false;
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
+            for (int i = 0; i < clearDmls.size(); i++) {
+                Map<String, Object> dmlData = clearDmls.get(i).getData();
+                List<Map<String, ?>> item;
+                if (flag == false) {
+                    item = new ArrayList<>();
+                    values.add(item);
+                } else {
+                    item = values.get(i);
+                }
+                Object value = dmlData.get(srcColumnName);
+                BatchExecutor.setValue(item, type, value);
+            }
+            flag = true;
+        }
+
+        try {
+            batchExecutor.batchExecute(insertSql.toString(), values);
+        } catch (SQLException e) {
+            if (skipDupException
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
+                // ignore
+                // TODO 增加更多关系数据库的主键冲突的错误码
+            } else {
+                throw e;
+            }
+        }
+        if (logger.isTraceEnabled()) {
+            logger.trace("Insert into target table, sql: {}", insertSql);
+        }
+
+    }
+
+    /**
+     * 更新操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        Map<String, Object> old = dml.getOld();
+        if (old == null || old.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+        String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder updateSql = new StringBuilder();
+        updateSql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" UPDATE ");
+        List<Map<String, ?>> values = new ArrayList<>();
+        boolean hasMatched = false;
+        for (String srcColumnName : old.keySet()) {
+            List<String> targetColumnNames = new ArrayList<>();
+            columnsMap.forEach((targetColumn, srcColumn) -> {
+                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
+                    targetColumnNames.add(targetColumn);
+                }
+            });
+            if (!targetColumnNames.isEmpty()) {
+                hasMatched = true;
+                for (String targetColumnName : targetColumnNames) {
+                    updateSql.append(backtick).append(targetColumnName).append(backtick).append("=?, ");
+                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                    if (type == null) {
+                        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+                    }
+                    BatchExecutor.setValue(values, type, data.get(srcColumnName));
+                }
+            }
+        }
+        if (!hasMatched) {
+            logger.warn("Did not matched any columns to update ");
+            return;
+        }
+        int len = updateSql.length();
+        updateSql.delete(len - 2, len).append(" WHERE ");
+
+        // 拼接主键
+        appendCondition(dbMapping, updateSql, ctype, values, data, old);
+        batchExecutor.execute(updateSql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Alter update target table, sql: {}", updateSql);
+        }
+    }
+
+    /**
+     * 删除操作
+     *
+     * @param config
+     * @param dml
+     */
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("ALTER TABLE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" DELETE WHERE ");
+
+        List<Map<String, ?>> values = new ArrayList<>();
+        // 拼接主键
+        appendCondition(dbMapping, sql, ctype, values, data);
+        batchExecutor.execute(sql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Alter delete from target table, sql: {}", sql);
+        }
+    }
+
+    /**
+     * truncate操作
+     *
+     * @param config
+     */
+    private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
+        DbMapping dbMapping = config.getDbMapping();
+        StringBuilder sql = new StringBuilder();
+        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
+        batchExecutor.execute(sql.toString(), new ArrayList<>());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Truncate target table, sql: {}", sql);
+        }
+    }
+
+    /**
+     * 获取目标字段类型
+     *
+     * @param conn sql connection
+     * @param config 映射配置
+     * @return 字段sqlType
+     */
+    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
+        if (columnType == null) {
+            synchronized (ClickHouseBatchSyncService.class) {
+                columnType = columnsTypeCache.get(cacheKey);
+                if (columnType == null) {
+                    columnType = new LinkedHashMap<>();
+                    final Map<String, Integer> columnTypeTmp = columnType;
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " WHERE 1=2";
+                    Util.sqlRS(conn, sql, rs -> {
+                        try {
+                            ResultSetMetaData rsd = rs.getMetaData();
+                            int columnCount = rsd.getColumnCount();
+                            for (int i = 1; i <= columnCount; i++) {
+                                int colType = rsd.getColumnType(i);
+                                // 修复year类型作为date处理时的data truncated问题
+                                if ("YEAR".equals(rsd.getColumnTypeName(i))) {
+                                    colType = Types.VARCHAR;
+                                }
+                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), colType);
+                            }
+                            columnsTypeCache.put(cacheKey, columnTypeTmp);
+                        } catch (SQLException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    });
+                }
+            }
+        }
+        return columnType;
+    }
+
+    /**
+     * 拼接主键 where条件
+     */
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d) {
+        appendCondition(dbMapping, sql, ctype, values, d, null);
+    }
+
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
+        String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
+
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+            sql.append(backtick).append(targetColumnName).append(backtick).append("=? AND ");
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
+            // 如果有修改主键的情况
+            if (o != null && o.containsKey(srcColumnName)) {
+                BatchExecutor.setValue(values, type, o.get(srcColumnName));
+            } else {
+                BatchExecutor.setValue(values, type, d.get(srcColumnName));
+            }
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+
+    /**
+     * make sure the same table in one index
+     *
+     * @param dbMapping
+     * @return
+     */
+    private int mappingHash(MappingConfig.DbMapping dbMapping) {
+        int hash = dbMapping.getDatabase().toLowerCase().hashCode() + dbMapping.getTable().toLowerCase().hashCode();
+        hash = Math.abs(hash) % threads;
+        return Math.abs(hash);
+    }
+
+    public void close() {
+        for (int i = 0; i < threads; i++) {
+            executorThreads[i].shutdown();
+        }
+    }
+}

+ 193 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseEtlService.java

@@ -0,0 +1,193 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.service;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * ClickHouse ETL 操作业务类
+ *
+ * @author rewerma @ 2018-11-7
+ * @version 1.0.0
+ */
+public class ClickHouseEtlService extends AbstractEtlService {
+
+    private DataSource    targetDS;
+    private MappingConfig config;
+
+    public ClickHouseEtlService(DataSource targetDS, MappingConfig config){
+        super("CLICKHOUSE", config);
+        this.targetDS = targetDS;
+        this.config = config;
+    }
+
+    /**
+     * 导入数据
+     */
+    public EtlResult importData(List<String> params) {
+        DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        return importData(sql, params);
+    }
+
+    /**
+     * 执行导入
+     */
+    protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> values,
+                                       AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
+        try {
+            DbMapping dbMapping = (DbMapping) mapping;
+            Map<String, String> columnsMap = new LinkedHashMap<>();
+            Map<String, Integer> columnType = new LinkedHashMap<>();
+            DruidDataSource dataSource = (DruidDataSource) srcDS;
+            String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
+
+            Util.sqlRS(targetDS,
+                "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " LIMIT 1 ",
+                rs -> {
+                try {
+
+                    ResultSetMetaData rsd = rs.getMetaData();
+                    int columnCount = rsd.getColumnCount();
+                    List<String> columns = new ArrayList<>();
+                    for (int i = 1; i <= columnCount; i++) {
+                        columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                        columns.add(rsd.getColumnName(i));
+                    }
+
+                    columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
+                    return true;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    return false;
+                }
+            });
+
+            Util.sqlRS(srcDS, sql, values, rs -> {
+                int idx = 1;
+
+                try {
+                    boolean completed = false;
+
+                    StringBuilder insertSql = new StringBuilder();
+                    insertSql.append("INSERT INTO ")
+                        .append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()))
+                        .append(" (");
+                    columnsMap
+                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick).append(targetColumnName).append(backtick).append(","));
+
+                    int len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(") VALUES (");
+                    int mapLen = columnsMap.size();
+                    for (int i = 0; i < mapLen; i++) {
+                        insertSql.append("?,");
+                    }
+                    len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(")");
+                    logger.info("executeSqlImport sql:{}",insertSql.toString());
+                    try (Connection connTarget = targetDS.getConnection();
+                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                        connTarget.setAutoCommit(false);
+
+                        while (rs.next()) {
+                            completed = false;
+
+                            pstmt.clearParameters();
+
+                            // 删除数据
+                            Map<String, Object> pkVal = new LinkedHashMap<>();
+                            StringBuilder deleteSql = new StringBuilder(
+                                "ALTER TABLE " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " DELETE WHERE ");
+                            appendCondition(dbMapping, deleteSql, pkVal, rs, backtick);
+                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                                int k = 1;
+                                for (Object val : pkVal.values()) {
+                                    pstmt2.setObject(k++, val);
+                                }
+                                pstmt2.execute();
+                            }
+
+                            int i = 1;
+                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                                String targetClolumnName = entry.getKey();
+                                String srcColumnName = entry.getValue();
+                                if (srcColumnName == null) {
+                                    srcColumnName = targetClolumnName;
+                                }
+
+                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+                                Object value = rs.getObject(srcColumnName);
+                                if (value != null) {
+                                    SyncUtil.setPStmt(type, pstmt, value, i);
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+
+                                i++;
+                            }
+
+                            pstmt.execute();
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Insert into target table, sql: {}", insertSql);
+                            }
+
+                            if (idx % dbMapping.getCommitBatch() == 0) {
+                                connTarget.commit();
+                                completed = true;
+                            }
+                            idx++;
+                            impCount.incrementAndGet();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("successful import count:" + impCount.get());
+                            }
+                        }
+                        if (!completed) {
+                            connTarget.commit();
+                        }
+                    }
+
+                } catch (Exception e) {
+                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                }
+                return idx;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+    /**
+     * 拼接目标表主键where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        ResultSet rs, String backtick) throws SQLException {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(backtick).append(targetColumnName).append(backtick).append("=? AND ");
+            values.put(targetColumnName, rs.getObject(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+}

+ 168 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/service/ClickHouseMirrorDbBatchSyncService.java

@@ -0,0 +1,168 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.service;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MirrorDbConfig;
+import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CLICKHOUSE镜像库同步操作业务
+ *
+ * @author rewerma 2018-12-12 下午011:23
+ * @version 1.0.0
+ */
+public class ClickHouseMirrorDbBatchSyncService {
+
+    private static final Logger         logger = LoggerFactory.getLogger(ClickHouseMirrorDbBatchSyncService.class);
+
+    private Map<String, MirrorDbConfig> mirrorDbConfigCache;                                           // 镜像库配置
+    private DruidDataSource             dataSource;
+    private ClickHouseBatchSyncService clickHouseBatchSyncService;                                     // clickhouseSyncService代理
+
+    public ClickHouseMirrorDbBatchSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DruidDataSource dataSource,
+                                              Integer threads, Integer batchSize, Long scheduleTime, Map<String, Map<String, Integer>> columnsTypeCache,
+                                              boolean skipDupException){
+        this.mirrorDbConfigCache = mirrorDbConfigCache;
+        this.dataSource = dataSource;
+        this.clickHouseBatchSyncService = new ClickHouseBatchSyncService(dataSource, threads, batchSize, scheduleTime, columnsTypeCache, skipDupException);
+    }
+
+    /**
+     * 批量同步方法
+     *
+     * @param dmls 批量 DML,包含DDL
+     */
+    public void sync(List<Dml> dmls) {
+        List<Dml> dmlList = new ArrayList<>();
+        for (Dml dml : dmls) {
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String database = dml.getDatabase();
+            MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
+            if (mirrorDbConfig == null) {
+                continue;
+            }
+            if (mirrorDbConfig.getMappingConfig() == null) {
+                continue;
+            }
+            if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
+                if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
+                    continue; // 如果groupId不匹配则过滤
+                }
+            }
+
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // 确保执行DDL前DML已执行完
+                syncDml(dmlList);
+                dmlList.clear();
+
+                // DDL
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DDL: {}", JSON.toJSONString(dml, Feature.WriteNulls));
+                }
+                executeDdl(mirrorDbConfig, dml);
+                clickHouseBatchSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
+                mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
+            } else {
+                // DML
+                initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
+                dmlList.add(dml);
+            }
+        }
+        syncDml(dmlList);
+    }
+
+    /**
+     * 批量同步Dml
+     *
+     * @param dmlList Dml列表,不包含DDL
+     */
+    private void syncDml(List<Dml> dmlList) {
+        if (dmlList == null || dmlList.isEmpty()) {
+            return;
+        }
+        clickHouseBatchSyncService.sync(dmlList, dml -> {
+            MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
+            if (mirrorDbConfig == null) {
+                return false;
+            }
+            String table = dml.getTable();
+            MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+
+            if (config == null) {
+                return false;
+            }
+            clickHouseBatchSyncService.appendDmlBufferPartition(config, dml);
+            return true;
+        });
+    }
+
+    /**
+     * 初始化表配置
+     *
+     * @param key 配置key: destination.database.table
+     * @param baseConfigMap db sync config
+     * @param dml DML
+     */
+    private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
+        MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
+        if (mappingConfig == null) {
+            // 构造表配置
+            mappingConfig = new MappingConfig();
+            mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
+            mappingConfig.setDestination(baseConfigMap.getDestination());
+            mappingConfig.setGroupId(baseConfigMap.getGroupId());
+            mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
+            mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
+            MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
+            mappingConfig.setDbMapping(dbMapping);
+            dbMapping.setDatabase(dml.getDatabase());
+            dbMapping.setTable(dml.getTable());
+            dbMapping.setTargetDb(dml.getDatabase());
+            dbMapping.setTargetTable(dml.getTable());
+            dbMapping.setMapAll(true);
+            List<String> pkNames = dml.getPkNames();
+            Map<String, String> pkMapping = new LinkedHashMap<>();
+            pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
+            dbMapping.setTargetPk(pkMapping);
+
+            mirrorDbConfig.getTableConfig().put(key, mappingConfig);
+        }
+    }
+
+    /**
+     * DDL 操作
+     *
+     * @param ddl DDL
+     */
+    private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
+        try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
+            // 替换反引号
+            String sql = ddl.getSql();
+            String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
+            if (!"`".equals(backtick)) {
+                sql = sql.replaceAll("`", backtick);
+            }
+            statement.execute(sql);
+            // 移除对应配置
+            mirrorDbConfig.getTableConfig().remove(ddl.getTable());
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 112 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/BatchExecutor.java

@@ -0,0 +1,112 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.support;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.Closeable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * sql批量执行器
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
+public class BatchExecutor implements Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
+
+    private DataSource          dataSource;
+    private Connection          conn;
+    private AtomicInteger       idx    = new AtomicInteger(0);
+
+    public BatchExecutor(DataSource dataSource){
+        this.dataSource = dataSource;
+    }
+
+    public Connection getConn() {
+        if (conn == null) {
+            try {
+                conn = dataSource.getConnection();
+                this.conn.setAutoCommit(false);
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        return conn;
+    }
+
+    public static void setValue(List<Map<String, ?>> values, int type, Object value) {
+        Map<String, Object> valueItem = new HashMap<>();
+        valueItem.put("type", type);
+        valueItem.put("value", value);
+        values.add(valueItem);
+    }
+
+    public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
+        PreparedStatement pstmt = getConn().prepareStatement(sql);
+        int len = values.size();
+        for (int i = 0; i < len; i++) {
+            int type = (Integer) values.get(i).get("type");
+            Object value = values.get(i).get("value");
+            SyncUtil.setPStmt(type, pstmt, value, i + 1);
+        }
+
+        pstmt.execute();
+        idx.incrementAndGet();
+        pstmt.close();
+    }
+
+    public void batchExecute(String sql, List<List<Map<String,?>>> batchValues) throws SQLException {
+        PreparedStatement pstmt = getConn().prepareStatement(sql);
+        for (int i = 0; i < batchValues.size(); i++) {
+            List<Map<String, ?>> values = batchValues.get(i);
+            for (int j = 0; j < values.size(); j++) {
+                int type = (Integer) values.get(j).get("type");
+                Object value = values.get(j).get("value");
+                SyncUtil.setPStmt(type, pstmt, value, j + 1);
+            }
+            pstmt.addBatch();
+        }
+
+        pstmt.executeBatch();
+        idx.addAndGet(batchValues.size());
+        pstmt.close();
+    }
+
+    public void commit() throws SQLException {
+        getConn().commit();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor commit " + idx.get() + " rows");
+        }
+        idx.set(0);
+    }
+
+    public void rollback() throws SQLException {
+        getConn().rollback();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor rollback " + idx.get() + " rows");
+        }
+        idx.set(0);
+    }
+
+    @Override
+    public void close() {
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            } finally {
+                conn = null;
+            }
+        }
+    }
+}

+ 111 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/SingleDml.java

@@ -0,0 +1,111 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.support;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.springframework.util.LinkedCaseInsensitiveMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SingleDml {
+
+    private String              destination;
+    private String              database;
+    private String              table;
+    private String              type;
+    private Map<String, Object> data;
+    private Map<String, Object> old;
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Object> data) {
+        this.data = data;
+    }
+
+    public Map<String, Object> getOld() {
+        return old;
+    }
+
+    public void setOld(Map<String, Object> old) {
+        this.old = old;
+    }
+
+    public static List<SingleDml> dml2SingleDmls(Dml dml, boolean caseInsensitive) {
+        List<SingleDml> singleDmls = new ArrayList<>();
+        if (dml.getData() != null) {
+            int size = dml.getData().size();
+            for (int i = 0; i < size; i++) {
+                SingleDml singleDml = new SingleDml();
+                singleDml.setDestination(dml.getDestination());
+                singleDml.setDatabase(dml.getDatabase());
+                singleDml.setTable(dml.getTable());
+                singleDml.setType(dml.getType());
+                Map<String, Object> data = dml.getData().get(i);
+                if (caseInsensitive) {
+                    data = toCaseInsensitiveMap(data);
+                }
+                singleDml.setData(data);
+                if (dml.getOld() != null) {
+                    Map<String, Object> oldData = dml.getOld().get(i);
+                    if (caseInsensitive) {
+                        oldData = toCaseInsensitiveMap(oldData);
+                    }
+                    singleDml.setOld(oldData);
+                }
+                singleDmls.add(singleDml);
+            }
+        } else if ("TRUNCATE".equalsIgnoreCase(dml.getType())) {
+            SingleDml singleDml = new SingleDml();
+            singleDml.setDestination(dml.getDestination());
+            singleDml.setDatabase(dml.getDatabase());
+            singleDml.setTable(dml.getTable());
+            singleDml.setType(dml.getType());
+            singleDmls.add(singleDml);
+        }
+        return singleDmls;
+    }
+
+    public static List<SingleDml> dml2SingleDmls(Dml dml) {
+        return dml2SingleDmls(dml, false);
+    }
+
+    private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
+        LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
+        map.putAll(data);
+        return map;
+    }
+}

+ 291 - 0
client-adapter/clickhouse/src/main/java/com/alibaba/otter/canal/client/adapter/clickhouse/support/SyncUtil.java

@@ -0,0 +1,291 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.support;
+
+import com.alibaba.druid.DbType;
+import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SyncUtil {
+    private static final Logger logger  = LoggerFactory.getLogger(SyncUtil.class);
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Map<String, Object> data) {
+        return getColumnsMap(dbMapping, data.keySet());
+    }
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
+        Map<String, String> columnsMap;
+        if (dbMapping.getMapAll()) {
+            if (dbMapping.getAllMapColumns() != null) {
+                return dbMapping.getAllMapColumns();
+            }
+            columnsMap = new LinkedHashMap<>();
+            for (String srcColumn : columns) {
+                boolean flag = true;
+                if (dbMapping.getTargetColumns() != null) {
+                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
+                        if (srcColumn.equals(entry.getValue())) {
+                            columnsMap.put(entry.getKey(), srcColumn);
+                            flag = false;
+                            break;
+                        }
+                    }
+                }
+                if (flag) {
+                    columnsMap.put(srcColumn, srcColumn);
+                }
+            }
+            dbMapping.setAllMapColumns(columnsMap);
+        } else {
+            columnsMap = dbMapping.getTargetColumns();
+        }
+        return columnsMap;
+    }
+
+    /**
+     * 设置 preparedStatement
+     *
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
+     */
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                if (value instanceof String) {
+                    pstmt.setString(i, (String) value);
+                } else if (value == null) {
+                    pstmt.setNull(i, type);
+                } else {
+                    pstmt.setString(i, value.toString());
+                }
+                break;
+            case Types.TINYINT:
+                // 向上提升一级,处理unsigned情况
+                if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Number) {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                } else if (value instanceof String) {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                if (value instanceof BigDecimal) {
+                    pstmt.setBigDecimal(i, (BigDecimal) value);
+                } else if (value instanceof Byte) {
+                    pstmt.setInt(i, ((Byte) value).intValue());
+                } else if (value instanceof Short) {
+                    pstmt.setInt(i, ((Short) value).intValue());
+                } else if (value instanceof Integer) {
+                    pstmt.setInt(i, (Integer) value);
+                } else if (value instanceof Long) {
+                    pstmt.setLong(i, (Long) value);
+                } else if (value instanceof Float) {
+                    pstmt.setBigDecimal(i, new BigDecimal((float) value));
+                } else if (value instanceof Double) {
+                    pstmt.setBigDecimal(i, new BigDecimal((double) value));
+                } else if (value != null) {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.REAL:
+                if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                if (value instanceof Blob) {
+                    pstmt.setBlob(i, (Blob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof Clob) {
+                    pstmt.setClob(i, (Clob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof Date) {
+                    pstmt.setDate(i, (Date) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setDate(i, new Date(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
+                    } else {
+                        pstmt.setObject(i, value);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof Time) {
+                    pstmt.setTime(i, (Time) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    java.util.Date date = Util.parseDate(v);
+                    if (date != null) {
+                        pstmt.setTime(i, new Time(date.getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof Timestamp) {
+                    pstmt.setTimestamp(i, (Timestamp) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setTimestamp(i, new Timestamp(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
+                    } else {
+                        pstmt.setObject(i, value);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
+        }
+    }
+
+    public static String getDbTableName(MappingConfig.DbMapping dbMapping, String dbType) {
+        String result = "";
+        String backtick = getBacktickByDbType(dbType);
+        if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
+            result += (backtick + dbMapping.getTargetDb() + backtick + ".");
+        }
+        result += (backtick + dbMapping.getTargetTable() + backtick);
+        return result;
+    }
+
+    /**
+     * 根据DbType返回反引号或空字符串
+     *
+     * @param dbTypeName DbType名称
+     * @return 反引号或空字符串
+     */
+    public static String getBacktickByDbType(String dbTypeName) {
+        DbType dbType = DbType.of(dbTypeName);
+        if (dbType == null) {
+            dbType = DbType.other;
+        }
+
+        // 只有当dbType为MySQL/MariaDB或OceanBase时返回反引号
+        switch (dbType) {
+            case mysql:
+            case mariadb:
+            case oceanbase:
+                return "`";
+            default:
+                return "";
+        }
+    }
+}

+ 1 - 0
client-adapter/clickhouse/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+clickhouse=com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter

+ 31 - 0
client-adapter/clickhouse/src/main/resources/clickhouse/mytest_user.yml

@@ -0,0 +1,31 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+outerAdapterKey: clickhouse1
+concurrent: true
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.user
+  targetPk:
+    id: id
+#  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
+  etlCondition: "where c_time>={}"
+  commitBatch: 3000 # 批量提交的大小
+
+
+## Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#groupId: g1
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest

+ 109 - 0
client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseBatchSyncServiceTest.java

@@ -0,0 +1,109 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse;
+
+
+import com.alibaba.otter.canal.client.adapter.clickhouse.sync.Common;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.*;
+
+@Ignore
+public class ClickHouseBatchSyncServiceTest {
+
+    private ClickHouseAdapter clickHouseAdapter;
+
+    @Before
+    public void init() {
+        clickHouseAdapter = Common.init();
+    }
+
+    @Test
+    public void insert() throws InterruptedException {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+        data.put("test1", "sdfasdfawe中国asfwef");
+        dml.setData(dataList);
+
+        clickHouseAdapter.sync(Collections.singletonList(dml));
+        Thread.sleep(10000L);
+    }
+
+    @Test
+    public void update() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric");
+        dml.setOld(oldList);
+
+        clickHouseAdapter.sync(Collections.singletonList(dml));
+    }
+
+    @Test
+    public void delete() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+
+        clickHouseAdapter.sync(Collections.singletonList(dml));
+    }
+
+    @Test
+    public void truncate() throws InterruptedException {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("TRUNCATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric");
+        dml.setOld(oldList);
+
+        clickHouseAdapter.sync(Collections.singletonList(dml));
+        Thread.sleep(1000L);
+
+    }
+}

+ 79 - 0
client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseBatchSyncThreadSafeTest.java

@@ -0,0 +1,79 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse;
+
+/**
+ * @author: Xander
+ * @date: Created in 2023/11/13 22:27
+ * @email: zhrunxin33@gmail.com
+ * @description: Testing thread safe
+ */
+
+import ch.qos.logback.classic.Level;
+import com.alibaba.otter.canal.client.adapter.clickhouse.sync.Common;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Ignore
+public class ClickHouseBatchSyncThreadSafeTest {
+
+    private ClickHouseAdapter clickHouseAdapter;
+
+    private ExecutorService executorService;
+
+    private String[] operations = new String[]{"INSERT", "UPDATE"};
+
+    private String[] tables = new String[]{"user", "customer"};
+
+    @Before
+    public void init() {
+        clickHouseAdapter = Common.init();
+        Common.setLogLevel(Level.INFO);
+        executorService = Executors.newFixedThreadPool(5);
+    }
+
+    @Test
+    public void test01() throws InterruptedException, ExecutionException {
+        ArrayList<Future> list = new ArrayList();
+        AtomicInteger count = new AtomicInteger();
+        for (int i = 0; i < 10; i++) {
+            list.add(executorService.submit(() -> {
+                for (int j = 0; j < 300; j++) {
+                    Random random = new Random();
+                    int cou = count.incrementAndGet();
+                    // test insert
+                    String dmlType = operations[random.nextInt(1)];
+                    Dml dml = new Dml();
+                    dml.setDestination("example");
+                    dml.setTs(new Date().getTime());
+                    dml.setType(dmlType);
+                    dml.setDatabase("mytest");
+                    dml.setTable(tables[(int)Math.round(Math.random())]);
+                    List<Map<String, Object>> dataList = new ArrayList<>();
+                    Map<String, Object> data = new LinkedHashMap<>();
+                    dataList.add(data);
+                    data.put("id", cou);
+                    data.put("name", "Eric"+cou);
+                    data.put("role_id", cou);
+                    data.put("c_time", new Date());
+                    data.put("test1", "sdfasdfawe中国asfwef");
+                    dml.setData(dataList);
+                    clickHouseAdapter.sync(Collections.singletonList(dml));
+                }
+            }));
+
+        }
+        for (Future future : list) {
+            future.get();
+        }
+        Thread.sleep(10000L);   // waiting multiple threads execute successfully.
+    }
+
+}

+ 37 - 0
client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/TestConstant.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.sql.SQLException;
+
+public class TestConstant {
+
+    public final static String    jdbcUrl      = "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true";
+    public final static String    jdbcUser     = "root";
+    public final static String    jdbcPassword = "121212";
+
+    public final static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 49 - 0
client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/sync/Common.java

@@ -0,0 +1,49 @@
+package com.alibaba.otter.canal.client.adapter.clickhouse.sync;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter;
+import com.alibaba.otter.canal.client.adapter.clickhouse.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: Xander
+ * @date: Created in 2023/11/13 0:16
+ * @email: zhrunxin33@gmail.com
+ * @description:
+ */
+public class Common {
+
+    public static ClickHouseAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("clickhouse");
+        outerAdapterConfig.setKey("clickhouse1");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("jdbc.driveClassName", "ru.yandex.clickhouse.ClickHouseDriver");
+        properties.put("jdbc.url", "jdbc:clickhouse://127.0.0.1:8123/default");
+        properties.put("jdbc.username", "default");
+        properties.put("jdbc.password", "123456");
+        outerAdapterConfig.setProperties(properties);
+
+        ClickHouseAdapter adapter = new ClickHouseAdapter();
+        adapter.init(outerAdapterConfig, null);
+        return adapter;
+    }
+
+    /**
+     * set global log level
+     *
+     * @param logLevel
+     */
+    public static void setLogLevel(Level logLevel) {
+        ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+        logger.setLevel(logLevel);
+    }
+}

+ 31 - 0
client-adapter/clickhouse/src/test/resources/clickhouse/mytest_customer.yml

@@ -0,0 +1,31 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+outerAdapterKey: clickhouse1
+concurrent: true
+dbMapping:
+  database: mytest
+  table: customer
+  targetTable: mytest.customer
+  targetPk:
+    id: id
+#  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
+  etlCondition: "where c_time>={}"
+  commitBatch: 3000 # 批量提交的大小
+
+
+## Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#groupId: g1
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest

+ 31 - 0
client-adapter/clickhouse/src/test/resources/clickhouse/mytest_user.yml

@@ -0,0 +1,31 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+outerAdapterKey: clickhouse1
+concurrent: true
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.user
+  targetPk:
+    id: id
+#  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
+  etlCondition: "where c_time>={}"
+  commitBatch: 3000 # 批量提交的大小
+
+
+## Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#groupId: g1
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest

+ 17 - 0
client-adapter/launcher/pom.xml

@@ -74,6 +74,10 @@
             <artifactId>hbase-shaded-client</artifactId>
             <scope>runtime</scope>
         </dependency>
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+        </dependency>
 
         <!-- outer adapter jar with dependencies-->
         <dependency>
@@ -167,6 +171,19 @@
             <classifier>jar-with-dependencies</classifier>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.clickhouse</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>*</artifactId>
+                    <groupId>*</groupId>
+                </exclusion>
+            </exclusions>
+            <classifier>jar-with-dependencies</classifier>
+            <scope>provided</scope>
+        </dependency>
         <!-- connector plugin -->
         <dependency>
             <groupId>com.alibaba.otter</groupId>

+ 10 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -104,3 +104,13 @@ canal.conf:
 #          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
 #          jdbc.username:
 #          jdbc.password:
+#      - name: clickhouse
+#        key: clickhouse1
+#        properties:
+#          jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
+#          jdbc.url: jdbc:clickhouse://127.0.0.1:8123/default
+#          jdbc.username: default
+#          jdbc.password: 123456
+#          batchSize: 3000
+#          scheduleTime: 600   # second unit
+#          threads: 3          # parallel threads

+ 6 - 0
client-adapter/pom.xml

@@ -36,6 +36,7 @@
         <module>kudu</module>
         <module>phoenix</module>
         <module>tablestore</module>
+        <module>clickhouse</module>
     </modules>
 
     <licenses>
@@ -192,6 +193,11 @@
                 <artifactId>hbase-client</artifactId>
                 <version>1.4.8</version>
             </dependency>
+            <dependency>
+                <groupId>ru.yandex.clickhouse</groupId>
+                <artifactId>clickhouse-jdbc</artifactId>
+                <version>0.3.1</version>
+            </dependency>
             <dependency>
                 <groupId>com.aliyun.openservices</groupId>
                 <artifactId>tablestore</artifactId>