瀏覽代碼

add tablestore adapter (#3754)

* add tablestore adapter

* add tablestore adapter

* fix bug add add log when etl fail

* fix bug,tinyint(1)以及rowupdatechange

Co-authored-by: agapple <jianghang115@gmail.com>
364102729 3 年之前
父節點
當前提交
0a2254eeaf
共有 17 個文件被更改,包括 1634 次插入2 次删除
  1. 10 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 13 0
      client-adapter/launcher/pom.xml
  3. 7 0
      client-adapter/launcher/src/main/assembly/dev.xml
  4. 7 0
      client-adapter/launcher/src/main/assembly/release.xml
  5. 10 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java
  6. 20 0
      client-adapter/pom.xml
  7. 93 0
      client-adapter/tablestore/pom.xml
  8. 318 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreAdapter.java
  9. 14 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/common/PropertyConstants.java
  10. 51 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java
  11. 318 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java
  12. 9 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/enums/TablestoreFieldType.java
  13. 134 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreEtlService.java
  14. 397 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java
  15. 211 0
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/support/SyncUtil.java
  16. 1 0
      client-adapter/tablestore/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  17. 21 0
      client-adapter/tablestore/src/main/resources/tablestore/test.yml

+ 10 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -54,6 +54,8 @@ public class CanalClientConfig {
     // canal adapters 配置
     private List<CanalAdapter> canalAdapters;
 
+    private Boolean terminateOnException = false;
+
     public String getCanalServerHost() {
         return canalServerHost;
     }
@@ -222,6 +224,14 @@ public class CanalClientConfig {
         this.namespace = namespace;
     }
 
+    public Boolean getTerminateOnException() {
+        return terminateOnException;
+    }
+
+    public void setTerminateOnException(Boolean terminateOnException) {
+        this.terminateOnException = terminateOnException;
+    }
+
     public static class CanalAdapter {
 
         private String      instance; // 实例名

+ 13 - 0
client-adapter/launcher/pom.xml

@@ -141,6 +141,19 @@
             <classifier>jar-with-dependencies</classifier>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.tablestore</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>*</artifactId>
+                    <groupId>*</groupId>
+                </exclusion>
+            </exclusions>
+            <classifier>jar-with-dependencies</classifier>
+            <scope>provided</scope>
+        </dependency>
         <!-- connector plugin -->
         <dependency>
             <groupId>com.alibaba.otter</groupId>

+ 7 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -64,6 +64,13 @@
                 <exclude>META-INF/**</exclude>
             </excludes>
         </fileSet>
+        <fileSet>
+            <directory>../tablestore/src/main/resources/</directory>
+            <outputDirectory>/conf</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+        </fileSet>
         <fileSet>
             <directory>target</directory>
             <outputDirectory>logs</outputDirectory>

+ 7 - 0
client-adapter/launcher/src/main/assembly/release.xml

@@ -65,6 +65,13 @@
                 <exclude>META-INF/**</exclude>
             </excludes>
         </fileSet>
+        <fileSet>
+            <directory>../tablestore/src/main/resources/</directory>
+            <outputDirectory>/conf</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+        </fileSet>
         <fileSet>
             <directory>target</directory>
             <outputDirectory>logs</outputDirectory>

+ 10 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -7,6 +7,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -217,8 +218,15 @@ public class AdapterProcessor {
                                 canalMsgConsumer.rollback(); // 处理失败, 回滚数据
                                 logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
                             } else {
-                                canalMsgConsumer.ack();
-                                logger.error(e.getMessage() + " Error sync but ACK!");
+                                if (canalClientConfig.getTerminateOnException()) {
+                                    canalMsgConsumer.rollback();
+                                    logger.error("Retry fail, turn switch off and abort data transfer.");
+                                    syncSwitch.off(canalDestination);
+                                    logger.error("finish turn off switch of destination:" + canalDestination);
+                                } else {
+                                    canalMsgConsumer.ack();
+                                    logger.error(e.getMessage() + " Error sync but ACK!");
+                                }
                             }
                             Thread.sleep(500);
                         }

+ 20 - 0
client-adapter/pom.xml

@@ -33,6 +33,7 @@
         <module>escore</module>
         <module>kudu</module>
         <module>phoenix</module>
+        <module>tablestore</module>
     </modules>
 
     <licenses>
@@ -251,6 +252,25 @@
                 <version>1.9.0</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>com.aliyun.openservices</groupId>
+                <artifactId>tablestore</artifactId>
+                <version>5.10.3</version>
+                <classifier>jar-with-dependencies</classifier>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.google.protobuf</groupId>
+                        <artifactId>protobuf-java</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.httpcomponents</groupId>
+                        <artifactId>httpasyncclient</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+
         </dependencies>
     </dependencyManagement>
 

+ 93 - 0
client-adapter/tablestore/pom.xml

@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.6-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.tablestore</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter rdb module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>tablestore</artifactId>
+            <version>5.10.3</version>
+            <classifier>jar-with-dependencies</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpasyncclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <tasks>
+                                <copy todir="${project.basedir}/../launcher/target/classes/tablestore" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/tablestore" erroronmissingdir="true">
+                                        <include name="*.yml" />
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 318 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreAdapter.java

@@ -0,0 +1,318 @@
+package com.alibaba.otter.canal.client.adapter.tablestore;
+
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.tablestore.common.PropertyConstants;
+import com.alibaba.otter.canal.client.adapter.tablestore.config.ConfigLoader;
+import com.alibaba.otter.canal.client.adapter.tablestore.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.tablestore.service.TablestoreEtlService;
+import com.alibaba.otter.canal.client.adapter.tablestore.service.TablestoreSyncService;
+import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
+import com.alicloud.openservices.tablestore.TableStoreWriter;
+import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
+import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
+import com.alicloud.openservices.tablestore.writer.WriterConfig;
+import com.alicloud.openservices.tablestore.writer.WriterResult;
+import com.alicloud.openservices.tablestore.writer.enums.BatchRequestType;
+import com.alicloud.openservices.tablestore.writer.enums.DispatchMode;
+import com.alicloud.openservices.tablestore.writer.enums.WriteMode;
+import com.alicloud.openservices.tablestore.writer.enums.WriterRetryStrategy;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+
+@SPI("tablestore")
+public class TablestoreAdapter implements OuterAdapter {
+
+    private static final Logger logger              = LoggerFactory.getLogger(TablestoreAdapter.class);
+
+    private Map<String, MappingConfig>              tablestoreMapping          = new ConcurrentHashMap<>();                // 文件名对应配置
+
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache  = new ConcurrentHashMap<>();
+
+    private Map<String, Map<String, TableStoreWriter>> writerCache  = new ConcurrentHashMap<>();
+
+    private TablestoreSyncService tablestoreSyncService;
+
+    private Properties                              envProperties;
+
+    private OuterAdapterConfig configuration;
+
+
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        this.envProperties = envProperties;
+        this.configuration = configuration;
+        Map<String, MappingConfig> tablestoreMappingTmp = ConfigLoader.load(envProperties);
+        // 过滤不匹配的key的配置
+        tablestoreMappingTmp.forEach((key, mappingConfig) -> {
+            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
+                    || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                    .equalsIgnoreCase(configuration.getKey()))) {
+                tablestoreMapping.put(key, mappingConfig);
+                mappingConfig.getDbMapping().init(mappingConfig);
+            }
+        });
+
+        if (tablestoreMapping.isEmpty()) {
+            throw new RuntimeException("No tablestore adapter found for config key: " + configuration.getKey());
+        }
+
+        Map<String, String> properties = configuration.getProperties();
+
+        for (Map.Entry<String, MappingConfig> entry : tablestoreMapping.entrySet()) {
+            String configName = entry.getKey();
+            MappingConfig mappingConfig = entry.getValue();
+            String key;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            } else {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            }
+            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                    k1 -> new ConcurrentHashMap<>());
+            configMap.put(configName, mappingConfig);
+
+
+            // 构建对应的 TableStoreWriter
+            ServiceCredentials credentials = new DefaultCredentials(
+                    properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETID),
+                    properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETKEY)
+            );
+
+
+            WriterConfig config = getWriterConfig(mappingConfig);
+
+            TableStoreWriter writer = new DefaultTableStoreWriter(
+                    properties.get(PropertyConstants.TABLESTORE_ENDPOINT),
+                    credentials,
+                    properties.get(PropertyConstants.TABLESTORE_INSTANCENAME),
+                    mappingConfig.getDbMapping().getTargetTable(),
+                    config,
+                    null
+            );
+
+            Map<String, TableStoreWriter> config2writerMap = writerCache.computeIfAbsent(key,
+                    k1 -> new ConcurrentHashMap<>());
+            config2writerMap.put(configName, writer);
+
+        }
+
+        tablestoreSyncService = new TablestoreSyncService();
+    }
+
+    /**
+     * 根据配置文件获得tablestorewriter的WriterConfig信息
+     * @param mappingConfig
+     * @return
+     */
+    private WriterConfig getWriterConfig(MappingConfig mappingConfig) {
+        WriterConfig config = new WriterConfig();
+        MappingConfig.DbMapping mapping = mappingConfig.getDbMapping();
+        config.setMaxBatchRowsCount(mapping.getCommitBatch());
+        config.setConcurrency(mappingConfig.getThreads());
+        config.setDispatchMode(DispatchMode.HASH_PRIMARY_KEY);
+        config.setWriteMode(WriteMode.SEQUENTIAL);
+        config.setBatchRequestType(BatchRequestType.BULK_IMPORT);
+        config.setBucketCount(mappingConfig.getThreads());
+        config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);
+        config.setAllowDuplicatedRowInBatchRequest(false);
+        return config;
+    }
+
+
+    @Override
+    public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+
+        try {
+            Set<TableStoreWriter> writerSet = new HashSet<>();
+            List<Future<WriterResult>> futureList = new ArrayList<>();
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                String key;
+                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                    key = destination + "-" + groupId + "_" + database + "-" + table;
+                } else {
+                    key = destination + "_" + database + "-" + table;
+                }
+                Map<String, MappingConfig> configMap = mappingConfigCache.get(key);
+                if (configMap == null) {
+                    // 可能有dml中涉及到的表并没有出现在配置中,说明此类dml并不需要同步
+                    continue;
+                }
+
+                Map<String, TableStoreWriter> writerMap = writerCache.get(key);
+
+                for (Map.Entry<String, MappingConfig> entry : configMap.entrySet()) {
+                    TableStoreWriter w = writerMap.get(entry.getKey());
+                    // 拿到所有future用于判定失败的记录
+                    Future<WriterResult> futureTemp = tablestoreSyncService.sync(entry.getValue(), dml, w);
+                    if (futureTemp != null) {
+                        writerSet.add(w);
+                        futureList.add(futureTemp);
+                    }
+                }
+            }
+
+            if (writerSet.isEmpty()) {
+                return;
+            }
+
+            writerSet.forEach(e -> e.flush());
+
+            List<WriterResult.RowChangeStatus> totalFailedRows = new ArrayList<>();
+            for (Future<WriterResult> future : futureList) {
+                try {
+                    WriterResult result = future.get();
+                    List<WriterResult.RowChangeStatus> failedRows = result.getFailedRows();
+                    if (!CollectionUtils.isEmpty(failedRows)) {
+                        totalFailedRows.addAll(failedRows);
+                    }
+                } catch (InterruptedException e) {
+                    logger.info("InterruptedException", e);
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            if (!CollectionUtils.isEmpty(totalFailedRows)) {
+                // 认为有失败的请求
+                List<String> msgs = totalFailedRows.stream().map(e -> buildErrorMsgForFailedRowChange(e)).collect(Collectors.toList());
+                throw new RuntimeException("Failed rows:" + org.springframework.util.StringUtils.collectionToDelimitedString(msgs, ",", "[", "]"));
+            }
+
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * 组装失败记录的信息
+     * @param rowChangeStatus
+     * @return
+     */
+    public static String buildErrorMsgForFailedRowChange(WriterResult.RowChangeStatus rowChangeStatus) {
+        StringBuilder sb = new StringBuilder("{Exception:");
+        sb.append(rowChangeStatus.getException().getMessage()).append(",Table:")
+        .append(rowChangeStatus.getRowChange().getTableName()).append(",PrimaryKey:")
+        .append("{").append(rowChangeStatus.getRowChange().getPrimaryKey().toString())
+        .append("}}");
+        return sb.toString();
+    }
+
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = tablestoreMapping.get(task);
+        if (config == null) {
+            etlResult.setErrorMessage("can not find config for " + task);
+            etlResult.setSucceeded(false);
+            return etlResult;
+        }
+
+        TableStoreWriter writer = null;
+        try {
+            writer = buildEtlWriter(configuration, config);
+
+            TablestoreEtlService rdbEtlService = new TablestoreEtlService(writer, config);
+            rdbEtlService.importData(params);
+
+            etlResult.setSucceeded(true);
+            return etlResult;
+        } catch (Exception e) {
+            logger.error("Error while etl for task " + task, e);
+            etlResult.setSucceeded(false);
+            etlResult.setErrorMessage(e.getMessage());
+            return etlResult;
+        } finally {
+            if (writer != null) {
+                writer.close();
+            }
+        }
+    }
+
+    /**
+     * 构造批量导入的writer
+     * @param configuration
+     * @param mappingConfig
+     * @return
+     */
+    private TableStoreWriter buildEtlWriter(OuterAdapterConfig configuration, MappingConfig mappingConfig) {
+        Map<String, String> properties = configuration.getProperties();
+
+        ServiceCredentials credentials = new DefaultCredentials(
+                properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETID),
+                properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETKEY)
+        );
+
+        WriterConfig config = getWriterConfig(mappingConfig);
+        config.setBucketCount(3);
+        config.setAllowDuplicatedRowInBatchRequest(true);
+        config.setConcurrency(8);
+        config.setWriteMode(WriteMode.PARALLEL);
+
+        TableStoreWriter writer = new DefaultTableStoreWriter(
+                properties.get(PropertyConstants.TABLESTORE_ENDPOINT),
+                credentials,
+                properties.get(PropertyConstants.TABLESTORE_INSTANCENAME),
+                mappingConfig.getDbMapping().getTargetTable(),
+                config,
+                null
+        );
+        return writer;
+    }
+
+
+    @Override
+    public Map<String, Object> count(String task) {
+        throw new RuntimeException("count is not supportted in tablestore");
+    }
+
+
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = tablestoreMapping.get(task);
+        if (config != null) {
+            return config.getDestination();
+        }
+        return null;
+    }
+
+    @Override
+    public void destroy() {
+        if (tablestoreSyncService != null) {
+            tablestoreSyncService.close();
+        }
+
+        if (writerCache != null) {
+            for (Map<String, TableStoreWriter> tmpMap : writerCache.values()) {
+                if (tmpMap != null) {
+                    for (TableStoreWriter writer : tmpMap.values()) {
+                        writer.close();
+                    }
+                }
+            }
+        }
+    }
+}

+ 14 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/common/PropertyConstants.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.common;
+
+
+public class PropertyConstants {
+
+    public static final String TABLESTORE_ACCESSSECRETID = "tablestore.accessSecretId";
+
+    public static final String TABLESTORE_ACCESSSECRETKEY = "tablestore.accessSecretKey";
+
+    public static final String TABLESTORE_ENDPOINT = "tablestore.endpoint";
+
+    public static final String TABLESTORE_INSTANCENAME = "tablestore.instanceName";
+
+}

+ 51 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java

@@ -0,0 +1,51 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.config;
+
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * RDB表映射配置加载器
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class ConfigLoader {
+
+    private static Logger logger = LoggerFactory.getLogger(ConfigLoader.class);
+
+    /**
+     * 加载RDB表映射配置
+     *
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, MappingConfig> load(Properties envProperties) {
+        logger.info("## Start loading tablestore mapping config ... ");
+
+        Map<String, MappingConfig> result = new LinkedHashMap<>();
+
+        Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("tablestore");
+        configContentMap.forEach((fileName, content) -> {
+            MappingConfig config = YmlConfigBinder
+                .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
+            }
+            result.put(fileName, config);
+        });
+
+        logger.info("## Tablestore mapping config loaded:" + StringUtils.collectionToCommaDelimitedString(result.keySet()));
+        return result;
+    }
+}

+ 318 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java

@@ -0,0 +1,318 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.config;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
+import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
+import org.apache.commons.lang.StringUtils;
+
+import java.sql.ResultSetMetaData;
+import java.util.*;
+
+/**
+ * RDB表映射配置
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfig implements AdapterConfig {
+
+    private String    dataSourceKey;      // 数据源key
+
+    private String    destination;        // canal实例或MQ的topic
+
+    private String    groupId;            // groupId
+
+    private String    outerAdapterKey;    // 对应适配器的key
+
+    private DbMapping dbMapping;          // db映射配置
+
+    private Boolean updateChangeColumns = false;
+
+    private Integer threads = 8;
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
+    public DbMapping getDbMapping() {
+        return dbMapping;
+    }
+
+    public void setDbMapping(DbMapping dbMapping) {
+        this.dbMapping = dbMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public AdapterMapping getMapping() {
+        return dbMapping;
+    }
+
+    public Boolean getUpdateChangeColumns() {
+        return updateChangeColumns;
+    }
+
+    public void setUpdateChangeColumns(Boolean updateChangeColumns) {
+        this.updateChangeColumns = updateChangeColumns;
+    }
+
+    public Integer getThreads() {
+        return threads;
+    }
+
+    public void setThreads(Integer threads) {
+        this.threads = threads;
+    }
+
+    public void validate() {
+        if (dbMapping.database == null || dbMapping.database.isEmpty()) {
+            throw new NullPointerException("dbMapping.database");
+        }
+        if (dbMapping.table == null || dbMapping.table.isEmpty()) {
+            throw new NullPointerException("dbMapping.table");
+        }
+        if (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty()) {
+            throw new NullPointerException("dbMapping.targetTable");
+        }
+    }
+
+
+
+
+    public static class ColumnItem {
+        private String targetColumn;
+        private String  column;
+        private TablestoreFieldType type;
+
+        public String getColumn() {
+            return column;
+        }
+
+        public void setColumn(String column) {
+            this.column = column;
+        }
+
+        public TablestoreFieldType getType() {
+            return type;
+        }
+
+        public void setType(TablestoreFieldType type) {
+            this.type = type;
+        }
+
+        public String getTargetColumn() {
+            return targetColumn;
+        }
+
+        public void setTargetColumn(String targetColumn) {
+            this.targetColumn = targetColumn;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            ColumnItem that = (ColumnItem) o;
+            return Objects.equals(column, that.column);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(column);
+        }
+    }
+
+
+
+    public static class DbMapping implements AdapterMapping {
+
+        private String              database;                                // 数据库名或schema名
+        private String              table;                                   // 表名
+        private LinkedHashMap<String, String> targetPk        = new LinkedHashMap<>(); // 目标表主键字段
+//        private boolean             mapAll          = false;                 // 映射所有字段
+
+        private String              targetTable;                             // 目标表名
+        private Map<String, String> targetColumns;                           // 目标表字段映射
+        private Map<String, String> targetColumnsParsed;
+        private String              etlCondition;                            // etl条件sql
+
+        private int                 readBatch       = 5000;
+        private int                 commitBatch     = 5000;                  // etl等批量提交大小
+
+        private Map<String, ColumnItem> columnItems        = new LinkedHashMap<>(); // 转换后的字段映射列表
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+
+        public LinkedHashMap<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(LinkedHashMap<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColumns() {
+            return targetColumns;
+        }
+
+
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
+
+        }
+
+        public Map<String, String> getTargetColumnsParsed() {
+            return targetColumnsParsed;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public Map<String, ColumnItem> getColumnItems() {
+            return columnItems;
+        }
+
+        public void setColumnItems(Map<String, ColumnItem> columnItems) {
+            this.columnItems = columnItems;
+        }
+
+        public void init(MappingConfig config) {
+            String splitBy = "$";
+            if (targetColumns != null) {
+                boolean needTypeInference = false;
+                for (Map.Entry<String, String> columnField : targetColumns.entrySet()) {
+                    String field = columnField.getValue();
+                    String type = null;
+                    if (field != null) {
+                        // 解析类型
+                        int i = field.indexOf(splitBy);
+                        if (i > -1) {
+                            type = field.substring(i + 1);
+                            field = field.substring(0, i);
+                        }
+                    }
+                    ColumnItem columnItem = new ColumnItem();
+                    columnItem.setColumn(columnField.getKey());
+                    columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);
+
+                    TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
+                    if (fieldType == null) {
+                        needTypeInference = true;
+                    }
+                    columnItem.setType(fieldType);
+                    columnItems.put(columnField.getKey(), columnItem);
+                }
+                if (needTypeInference) {
+                    // 认为有field没有配置映射类型,需要进行类型推断
+                    DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+
+                    Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
+                        try {
+                            ResultSetMetaData rsd = rs.getMetaData();
+                            int columnCount = rsd.getColumnCount();
+                            List<String> columns = new ArrayList<>();
+                            for (int i = 1; i <= columnCount; i++) {
+                                String columnName = rsd.getColumnName(i);
+                                if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
+                                    int columnType = rsd.getColumnType(i);
+                                    columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
+                                }
+                            }
+                            return true;
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                }
+
+            } else {
+                this.targetColumns = new LinkedHashMap<>();
+            }
+            targetColumnsParsed = new HashMap<>();
+
+            targetColumns.forEach((key, value) -> {
+                if (StringUtils.isEmpty(value)) {
+                    targetColumnsParsed.put(key, key);
+                } else if (value.contains(splitBy) && columnItems.containsKey(key)) {
+                    targetColumnsParsed.put(key, columnItems.get(key).targetColumn);
+                } else {
+                    targetColumnsParsed.put(key, value);
+                }
+            });
+        }
+
+    }
+}

+ 9 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/enums/TablestoreFieldType.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.enums;
+
+public enum TablestoreFieldType {
+    INT,
+    DOUBLE,
+    BOOL,
+    STRING,
+    BINARY
+}

+ 134 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreEtlService.java

@@ -0,0 +1,134 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.service;
+
+
+import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.tablestore.TablestoreAdapter;
+import com.alibaba.otter.canal.client.adapter.tablestore.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
+import com.alicloud.openservices.tablestore.TableStoreWriter;
+import com.alicloud.openservices.tablestore.model.RowChange;
+import com.alicloud.openservices.tablestore.writer.WriterResult;
+import org.springframework.util.CollectionUtils;
+
+import javax.sql.DataSource;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class TablestoreEtlService extends AbstractEtlService {
+
+    private TableStoreWriter writer;
+    private MappingConfig config;
+    private TablestoreSyncService syncService;
+
+
+    public TablestoreEtlService(TableStoreWriter writer, MappingConfig config){
+        super("Tablestore", config);
+        this.writer = writer;
+        this.config = config;
+        syncService = new TablestoreSyncService();
+    }
+
+
+    public EtlResult importData(List<String> params) {
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping.getDatabase(), dbMapping.getTable());
+        return importData(sql, params);
+    }
+
+    @Override
+    protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> values,
+                                       AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
+
+        try {
+            MappingConfig.DbMapping dbMapping = (MappingConfig.DbMapping) mapping;
+            Map<String, String> columnsMap = dbMapping.getTargetColumnsParsed();
+
+            Util.sqlRS(srcDS, sql, values, rs -> {
+                int idx = 0;
+                List<Future<WriterResult>> futureList = new ArrayList<>();
+                while (true) {
+                    try {
+                        if (!rs.next()) break;
+                    } catch (SQLException throwables) {
+                        logger.error("Error while get data from srcDs", throwables);
+                        break;
+                    }
+
+                    Dml dml = getDMLByRs(columnsMap, rs);
+
+                    List<RowChange> rowChanges = syncService.getRowChanges(dml, config);
+                    if (CollectionUtils.isEmpty(rowChanges)) {
+                        return null;
+                    }
+
+
+                    Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
+                    if (future != null) {
+                        futureList.add(future);
+                    }
+
+                }
+                writer.flush();
+                for (Future<WriterResult> future : futureList) {
+                    try {
+                        WriterResult result = future.get();
+                        if (result != null && result.isAllSucceed()) {
+                            impCount.incrementAndGet();
+                            idx++;
+                        } else if (result != null && !result.isAllSucceed()) {
+                            List<WriterResult.RowChangeStatus> totalFailedRows = result.getFailedRows();
+                            List<String> msgs = totalFailedRows.stream().map(e -> TablestoreAdapter.buildErrorMsgForFailedRowChange(e)).collect(Collectors.toList());
+                            logger.error("Failed rows when ETL:" + org.springframework.util.StringUtils.collectionToDelimitedString(msgs, ",", "[", "]"));
+                        }
+                    } catch (InterruptedException e) {
+                        logger.info("InterruptedException", e);
+                        errMsg.add(e.getMessage());
+                        Thread.currentThread().interrupt();
+                    } catch (ExecutionException e) {
+                        errMsg.add(e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                return idx;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+
+
+
+    private Dml getDMLByRs(Map<String, String> columnsMap, ResultSet rs) {
+        try {
+            Dml dml = new Dml();
+            dml.setType("INSERT");
+            Map<String, Object> dataMap = new HashMap<>();
+            List<Map<String, Object>> dataList = new ArrayList<>();
+            dataList.add(dataMap);
+            dml.setData(dataList);
+            for (String key : columnsMap.keySet()) {
+                dataMap.put(key, rs.getObject(key));
+            }
+
+            return dml;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+
+
+}

+ 397 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java

@@ -0,0 +1,397 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.service;
+
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
+import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
+import com.alicloud.openservices.tablestore.TableStoreWriter;
+import com.alicloud.openservices.tablestore.model.*;
+import com.alicloud.openservices.tablestore.writer.WriterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.alibaba.otter.canal.client.adapter.tablestore.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.springframework.util.CollectionUtils;
+
+
+/**
+ * RDB同步操作业务
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
+public class TablestoreSyncService {
+
+    private static final Logger               logger  = LoggerFactory.getLogger(TablestoreSyncService.class);
+
+    private Map<String, Map<String, Integer>> columnsTypeCache;
+
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    public TablestoreSyncService(){
+        this(new ConcurrentHashMap<>());
+    }
+
+    @SuppressWarnings("unchecked")
+    public TablestoreSyncService(Map<String, Map<String, Integer>> columnsTypeCache){
+        this.columnsTypeCache = columnsTypeCache;
+
+    }
+
+    public Future<WriterResult> sync(MappingConfig mappingConfig,
+                     Dml dml,
+                     TableStoreWriter writer) {
+
+        List<RowChange> rowChanges = getRowChanges(dml, mappingConfig);
+        if (CollectionUtils.isEmpty(rowChanges)) {
+            return null;
+        }
+
+        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+        }
+        return future;
+    }
+
+    public List<RowChange> getRowChanges(Dml dml, MappingConfig config) {
+        String type = dml.getType();
+        boolean updateColume = config.getUpdateChangeColumns();
+        if (type != null && type.equalsIgnoreCase("INSERT")) {
+            return getInsertChanges(dml, updateColume, config);
+        } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+            return getUpdateChanges(dml, updateColume, config);
+        } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+            return getDeleteChanges(dml, updateColume, config);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Update 类型下构造rowChangeList
+     * @param dml
+     * @param isColumnUpdate
+     * @param config
+     * @return
+     */
+    private List<RowChange> getUpdateChanges(Dml dml, boolean isColumnUpdate, MappingConfig config) {
+        List<RowChange> changeList = new ArrayList<>();
+        Map<String, String> columnMap = config.getDbMapping().getTargetColumnsParsed();
+        Map<String, MappingConfig.ColumnItem> typeMap = SyncUtil.getTypeMap(config);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+
+        if (isColumnUpdate) {
+            // 列更新
+            for (int i = 0; i < dml.getData().size(); i++) {
+                Map<String, Object> map = dml.getData().get(i);
+                Map<String, Object> old = dml.getOld().get(i);
+                boolean isPrimaryKeyChange = isPrimaryKeyChange(old, dbMapping.getTargetPk());
+                if (isPrimaryKeyChange) {
+                    // 如果发现主键修改用put delete操作
+                    // 先 delete
+                    RowUpdateChange change = new RowUpdateChange(dbMapping.getTargetTable());
+                    PrimaryKey primaryKey = buildOldPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk(), old);
+                    change.setPrimaryKey(primaryKey);
+                    for (Map.Entry<String, Object> entry : map.entrySet()) {
+                        if (dbMapping.getTargetPk().containsKey(entry.getKey())) {
+                            // 这是个主键, 不需要再次处理
+                            continue;
+                        }
+                        if (!dbMapping.getTargetColumns().containsKey(entry.getKey())) {
+                            // 可能是没有配置的字段
+                            continue;
+                        }
+                        // 非主键
+                        String targetColumn = columnMap.get(entry.getKey());
+                        change.deleteColumns(targetColumn);
+                    }
+                    changeList.add(change);
+
+                    // 然后再put
+                    change = new RowUpdateChange(dbMapping.getTargetTable());
+                    primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                    change.setPrimaryKey(primaryKey);
+
+                    List<Column> columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+                    if (!CollectionUtils.isEmpty(columnList)) {
+                        change.put(columnList);
+                        changeList.add(change);
+                    }
+
+
+                } else {
+                    //否则用update
+                    RowUpdateChange change = new RowUpdateChange(dbMapping.getTargetTable());
+                    PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                    change.setPrimaryKey(primaryKey);
+                    // 部分update 部分delete
+
+                    boolean validData = false;
+                    for (Map.Entry<String, Object> entry : old.entrySet()) {
+                        if (dbMapping.getTargetPk().containsKey(entry.getKey())) {
+                            // 这是个主键, 不需要再次处理
+                            continue;
+                        }
+                        if (!dbMapping.getTargetColumns().containsKey(entry.getKey())) {
+                            // 可能是没有配置的字段
+                            continue;
+                        }
+                        // 非主键
+                        String targetColumn = columnMap.get(entry.getKey());
+                        Object value = map.get(entry.getKey());
+                        validData = true;
+                        if (value == null) {
+                            change.deleteColumns(targetColumn);
+                        } else {
+                            TablestoreFieldType type = typeMap.get(entry.getKey()).getType();
+                            ColumnValue columnValue = SyncUtil.getColumnValue(value, type);
+                            change.put(targetColumn, columnValue);
+                        }
+                    }
+                    if (validData) {
+                        changeList.add(change);
+                    }
+
+                }
+            }
+
+        } else {
+            // 列覆盖
+            for (int i = 0; i < dml.getData().size(); i++) {
+                Map<String, Object> map = dml.getData().get(i);
+
+                RowPutChange change = new RowPutChange(dbMapping.getTargetTable());
+                PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                change.setPrimaryKey(primaryKey);
+
+                List<Column> columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+                if (!CollectionUtils.isEmpty(columnList)) {
+                    change.addColumns(columnList);
+                    changeList.add(change);
+                }
+
+                if (dml.getOld() != null) {
+                    // 如果主键发生修改,需要在tablestore中删除对应的原记录
+                    Map<String, Object> old = dml.getOld().get(i);
+                    if (isPrimaryKeyChange(old, dbMapping.getTargetPk())) {
+                        RowDeleteChange delete = new RowDeleteChange(dbMapping.getTargetTable());
+                        PrimaryKey primaryKeyDelete = buildOldPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk(), old);
+                        delete.setPrimaryKey(primaryKeyDelete);
+                        changeList.add(delete);
+                    }
+
+                }
+
+            }
+        }
+
+        return changeList;
+    }
+
+    /**
+     * Delete 类型下构造rowChangeList
+     * @param dml
+     * @param isColumnUpdate
+     * @param config
+     * @return
+     */
+    private List<RowChange> getDeleteChanges(Dml dml, boolean isColumnUpdate, MappingConfig config) {
+        List<RowChange> changeList = new ArrayList<>();
+        Map<String, String> columnMap = config.getDbMapping().getTargetColumnsParsed();
+        Map<String, MappingConfig.ColumnItem> typeMap = SyncUtil.getTypeMap(config);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+
+        if (isColumnUpdate) {
+            // 列更新
+            for (Map<String, Object> map : dml.getData()) {
+                RowUpdateChange change = new RowUpdateChange(dbMapping.getTargetTable());
+                PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                change.setPrimaryKey(primaryKey);
+                boolean validData = false;
+                for (Map.Entry<String, Object> entry : map.entrySet()) {
+                    if (dbMapping.getTargetPk().containsKey(entry.getKey())) {
+                        // 这是个主键, 不需要再次处理
+                        continue;
+                    }
+                    if (!dbMapping.getTargetColumns().containsKey(entry.getKey())) {
+                        // 可能是没有配置的字段
+                        continue;
+                    }
+                    // 非主键
+                    validData = true;
+                    String targetColumn = columnMap.get(entry.getKey());
+                    change.deleteColumns(targetColumn);
+                }
+                if (validData) {
+                    changeList.add(change);
+                }
+            }
+
+        } else {
+            // 列覆盖
+            for (Map<String, Object> map : dml.getData()) {
+                RowDeleteChange change = new RowDeleteChange(dbMapping.getTargetTable());
+                PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                change.setPrimaryKey(primaryKey);
+
+                changeList.add(change);
+            }
+        }
+
+        return changeList;
+    }
+
+    /**
+     * Insert 类型下构造rowChangeList
+     * @param dml
+     * @param isColumnUpdate
+     * @param config
+     * @return
+     */
+    private List<RowChange> getInsertChanges(Dml dml, boolean isColumnUpdate, MappingConfig config) {
+        List<RowChange> changeList = new ArrayList<>();
+
+        Map<String, String> columnMap = config.getDbMapping().getTargetColumnsParsed();
+        Map<String, MappingConfig.ColumnItem> typeMap = SyncUtil.getTypeMap(config);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        if (isColumnUpdate) {
+            // 列更新
+            for (Map<String, Object> map : dml.getData()) {
+                RowUpdateChange change = new RowUpdateChange(dbMapping.getTargetTable());
+                PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                change.setPrimaryKey(primaryKey);
+
+                List<Column> columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+                if (!CollectionUtils.isEmpty(columnList)) {
+                    change.put(columnList);
+                    changeList.add(change);
+                }
+            }
+
+        } else {
+            // 列覆盖
+            for (Map<String, Object> map : dml.getData()) {
+                RowPutChange change = new RowPutChange(dbMapping.getTargetTable());
+                PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
+                change.setPrimaryKey(primaryKey);
+
+                List<Column> columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+                if (!CollectionUtils.isEmpty(columnList)) {
+                    change.addColumns(columnList);
+                }
+
+                changeList.add(change);
+            }
+        }
+        return changeList;
+    }
+
+
+    /**
+     * 组装rowChange的主键
+     * @param map 数据map
+     * @param typeMap  类型映射
+     * @param columnMap 字段名称映射
+     * @param targetPk  主键map
+     * @return
+     */
+    private PrimaryKey buildPrimaryKey(Map<String, Object> map, Map<String, MappingConfig.ColumnItem> typeMap, Map<String, String> columnMap, LinkedHashMap<String, String> targetPk) {
+        List primaryKeyList = new ArrayList<>();
+        for (Map.Entry<String, String> entry : targetPk.entrySet()) {
+            // build primary key
+            String targetColumn = columnMap.get(entry.getKey());
+            Object value = map.get(entry.getKey());
+            TablestoreFieldType type = typeMap.get(entry.getKey()).getType();
+            PrimaryKeyValue keyValue = SyncUtil.getPrimaryKeyValue(value, type);
+            PrimaryKeyColumn primaryKeyColumn = new PrimaryKeyColumn(targetColumn, keyValue);
+            primaryKeyList.add(primaryKeyColumn);
+        }
+        return new PrimaryKey(primaryKeyList);
+    }
+
+
+
+    private PrimaryKey buildOldPrimaryKey(Map<String, Object> map,
+                                          Map<String, MappingConfig.ColumnItem> typeMap,
+                                          Map<String, String> columnMap,
+                                          LinkedHashMap<String, String> targetPk,
+                                          Map<String, Object> old) {
+        List primaryKeyList = new ArrayList<>();
+        for (Map.Entry<String, String> entry : targetPk.entrySet()) {
+            // build primary key
+            String targetColumn = columnMap.get(entry.getKey());
+            Object value = old != null && old.containsKey(entry.getKey()) && !old.get(entry.getKey()).equals(map.get(entry.getKey())) ? old.get(entry.getKey()) : map.get(entry.getKey());
+            TablestoreFieldType type = typeMap.get(entry.getKey()).getType();
+            PrimaryKeyValue keyValue = SyncUtil.getPrimaryKeyValue(value, type);
+            PrimaryKeyColumn primaryKeyColumn = new PrimaryKeyColumn(targetColumn, keyValue);
+            primaryKeyList.add(primaryKeyColumn);
+        }
+        return new PrimaryKey(primaryKeyList);
+    }
+
+    /**
+     * 用于获得全量覆盖时的非主键列对应的columnlist
+     * @param columnMap
+     * @param dbMapping
+     * @param map
+     * @param typeMap
+     * @return
+     */
+    private List<Column> getColumnsWhenPut(Map<String, String> columnMap,
+                                           MappingConfig.DbMapping dbMapping,
+                                           Map<String, Object> map,
+                                           Map<String, MappingConfig.ColumnItem> typeMap) {
+        List<Column> columnList = new ArrayList<>();
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+            if (dbMapping.getTargetPk().containsKey(entry.getKey())) {
+                // 这是个主键, 不需要再次处理
+                continue;
+            }
+            if (!dbMapping.getTargetColumns().containsKey(entry.getKey())) {
+                // 可能是没有配置的字段
+                continue;
+            }
+            // 非主键
+            String targetColumn = columnMap.get(entry.getKey());
+            Object value = entry.getValue();
+            if (value == null) {
+                // insert时空值过滤掉
+                continue;
+            }
+            TablestoreFieldType type = typeMap.get(entry.getKey()).getType();
+            ColumnValue columnValue = SyncUtil.getColumnValue(value, type);
+            columnList.add(new Column(targetColumn, columnValue));
+        }
+        return columnList;
+    }
+
+    /**
+     * 检查是否主键被修改
+     * @param old       old中的数据
+     * @param targetPk  主键map
+     * @return
+     */
+    private boolean isPrimaryKeyChange(Map<String, Object> old, Map<String, String> targetPk) {
+        for (String pkCol : targetPk.keySet()) {
+            if (old.containsKey(pkCol)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void close() {
+
+    }
+}

+ 211 - 0
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/support/SyncUtil.java

@@ -0,0 +1,211 @@
+package com.alibaba.otter.canal.client.adapter.tablestore.support;
+
+
+import com.alibaba.otter.canal.client.adapter.tablestore.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
+import com.alicloud.openservices.tablestore.model.ColumnValue;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.Map;
+
+public class SyncUtil {
+    private static final Logger logger  = LoggerFactory.getLogger(SyncUtil.class);
+
+
+    public static PrimaryKeyValue getPrimaryKeyValue(Object value, TablestoreFieldType fieldType) {
+        Object tablestoreValue;
+        switch (fieldType) {
+            case STRING:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return PrimaryKeyValue.fromString((String)tablestoreValue);
+            case INT:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return PrimaryKeyValue.fromLong((long)tablestoreValue);
+            case BINARY:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return PrimaryKeyValue.fromBinary((byte[])tablestoreValue);
+
+            default:
+                return PrimaryKeyValue.fromString(value.toString());
+
+        }
+    }
+
+    private static Object getTablestoreValue(Object value, TablestoreFieldType type) {
+        switch (type) {
+            case STRING:
+                if (value instanceof byte[]) {
+                    return new String(((byte[])value));
+                }
+                return value.toString();
+            case INT:
+                if (value instanceof Number) {
+                    return ((Number) value).longValue();
+                } else if (value instanceof Timestamp) {
+                    return ((Timestamp) value).getTime();
+                } else if (value instanceof String) {
+                    try {
+                        return Long.parseLong((String) value);
+                    } catch (NumberFormatException e) {
+                        logger.error("Error while parse long:" + value.toString(), e);
+                        throw e;
+                    }
+                } else if (value instanceof Date) {
+                    return ((Date) value).getTime();
+                } else if (value instanceof Time) {
+                    return ((Time) value).getTime();
+                } else if (value instanceof java.util.Date) {
+                    return ((java.util.Date) value).getTime();
+                } else if (value instanceof Boolean) {
+                    Boolean ob =  ((Boolean)value);
+                    return ob ? 1L : 0L;
+                }
+                return null;
+            case BINARY:
+                if (value instanceof byte[]) {
+                    return value;
+                } else if (value instanceof Blob) {
+                    Blob item = ((Blob) value);
+                    int length;
+                    try {
+                        length = (int) item.length();
+                        return item.getBytes(1, length);
+                    } catch (SQLException e) {
+                        logger.error("Error while convert blob to binary, blob:" + item.toString(), e);
+                        throw new RuntimeException(e);
+                    }
+                } else if (value instanceof String) {
+                    return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
+                } else if (value instanceof Clob) {
+                    return value.toString().getBytes(StandardCharsets.ISO_8859_1);
+                }
+                return null;
+            case BOOL:
+                if (value instanceof Boolean) {
+                    return value;
+                } else if (value instanceof String) {
+                    return !value.equals("0");
+                } else if (value instanceof Number) {
+                    return ((Number) value).intValue() != 0;
+                }
+                return null;
+            case DOUBLE:
+                if (value instanceof Number) {
+                    return ((Number)value).doubleValue();
+                } else if (value instanceof String) {
+                    try {
+                        return Double.parseDouble((String) value);
+                    } catch (NumberFormatException e) {
+                        logger.error("Error while parse double:" + value.toString(), e);
+                        throw e;
+                    }
+                }
+                return null;
+            default:
+                return value;
+        }
+    }
+
+    /**
+     * 解析配置的字段映射类型
+     * @param type
+     * @return
+     */
+    public static TablestoreFieldType getTablestoreType(String type) {
+        if (type != null) {
+            if (type.equalsIgnoreCase("string")) {
+                return TablestoreFieldType.STRING;
+            } else if (type.equalsIgnoreCase("int") || type.equalsIgnoreCase("integer")) {
+                return TablestoreFieldType.INT;
+            } else if (type.equalsIgnoreCase("bool") || type.equalsIgnoreCase("boolean")) {
+                return TablestoreFieldType.BOOL;
+            } else if (type.equalsIgnoreCase("binary")) {
+                return TablestoreFieldType.BINARY;
+            } else if (type.equalsIgnoreCase("double") || type.equalsIgnoreCase("float") || type.equalsIgnoreCase("decimal")) {
+                return TablestoreFieldType.DOUBLE;
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+
+    public static ColumnValue getColumnValue(Object value, TablestoreFieldType fieldType) {
+        Object tablestoreValue;
+        switch (fieldType) {
+            case STRING:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return ColumnValue.fromString((String)tablestoreValue);
+            case INT:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return ColumnValue.fromLong((long)tablestoreValue);
+            case BINARY:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return ColumnValue.fromBinary((byte[])tablestoreValue);
+            case DOUBLE:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return ColumnValue.fromDouble((double)tablestoreValue);
+            case BOOL:
+                tablestoreValue = getTablestoreValue(value, fieldType);
+                return ColumnValue.fromBoolean((boolean)tablestoreValue);
+            default:
+                return ColumnValue.fromString(value.toString());
+        }
+
+
+    }
+
+    public static Map<String, MappingConfig.ColumnItem> getTypeMap(MappingConfig config) {
+        return config.getDbMapping().getColumnItems();
+    }
+
+    public static TablestoreFieldType getDefaultTablestoreType(int sqlType) {
+        switch (sqlType) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                return TablestoreFieldType.BOOL;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return TablestoreFieldType.BINARY;
+            case Types.TINYINT:
+            case Types.SMALLINT:
+            case Types.INTEGER:
+            case Types.BIGINT:
+                return TablestoreFieldType.INT;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+            case Types.REAL:
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                return TablestoreFieldType.DOUBLE;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.CLOB:
+            case Types.DATE:
+            case Types.TIME:
+            case Types.TIMESTAMP:
+                return TablestoreFieldType.STRING;
+            default:
+                return TablestoreFieldType.STRING;
+        }
+    }
+
+    public static String getDbTableName(String db,String table) {
+        String result = "";
+        if (StringUtils.isNotEmpty(db)) {
+            result += ("`" + db + "`.");
+        }
+        result += ("`" + table + "`");
+        return result;
+    }
+}

+ 1 - 0
client-adapter/tablestore/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+tablestore=com.alibaba.otter.canal.client.adapter.tablestore.TablestoreAdapter

+ 21 - 0
client-adapter/tablestore/src/main/resources/tablestore/test.yml

@@ -0,0 +1,21 @@
+dataSourceKey: defaultDS
+destination: test_ots
+groupId: g1
+outerAdapterKey: mysql1
+threads: 1
+updateChangeColumns: false
+dbMapping:
+  database: test_ots
+  table: test
+  targetTable: canal_target
+  targetPk:
+    oId: oId
+  targetColumns:
+    oId:
+    c_id:
+    p_price:
+    p_count:
+  etlCondition: 
+  commitBatch: 1 # 批量提交的大小
+
+