瀏覽代碼

canal adapter phoenix (#3290)

* canal adapter phoenix

* canal adapter phoenix

* canal adapter phoenix

* canal adapter phoenix

* canal adapter phoenix

* canal adapter phoenix

* canal adapter phoenix

Co-authored-by: lihuav2017 <lihua@able-elec.com>
spark 3 年之前
父節點
當前提交
63407dc570
共有 25 個文件被更改,包括 3132 次插入2 次删除
  1. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  2. 8 1
      client-adapter/launcher/src/main/resources/application.yml
  3. 154 0
      client-adapter/phoenix/pom.xml
  4. 291 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/PhoenixAdapter.java
  5. 48 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/ConfigLoader.java
  6. 49 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/ConfigurationManager.java
  7. 288 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/MappingConfig.java
  8. 167 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/monitor/PhoenixConfigMonitor.java
  9. 429 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixEtlService.java
  10. 644 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixSyncService.java
  11. 108 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/BatchExecutor.java
  12. 97 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/PhoenixSupportUtil.java
  13. 97 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/SingleDml.java
  14. 274 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/SyncUtil.java
  15. 142 0
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/TypeUtil.java
  16. 1 0
      client-adapter/phoenix/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  17. 99 0
      client-adapter/phoenix/src/main/resources/hbase-site.xml
  18. 2 0
      client-adapter/phoenix/src/main/resources/phoenix/phoenix_common.properties
  19. 22 0
      client-adapter/phoenix/src/main/resources/phoenix/phoenixtest_user.yml
  20. 32 0
      client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/PhoenixConnectionTest.java
  21. 29 0
      client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/TestConfigLoad.java
  22. 40 0
      client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/TestConstant.java
  23. 36 0
      client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/sync/Common.java
  24. 73 0
      client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/sync/PhoenixSyncTest.java
  25. 1 0
      client-adapter/pom.xml

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -12,8 +12,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
  */
 @SpringBootApplication
 public class CanalAdapterApplication {
-
     public static void main(String[] args) {
+
         SpringApplication application = new SpringApplication(CanalAdapterApplication.class);
         application.setBannerMode(Banner.Mode.OFF);
         application.run(args);

+ 8 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -94,4 +94,11 @@ canal.conf:
 #        - name: kudu
 #          key: kudu
 #          properties:
-#            kudu.master.address: 127.0.0.1 # ',' split multi address
+#            kudu.master.address: 127.0.0.1 # ',' split multi address
+#        - name: phoenix
+#            key: phoenix
+#            properties:
+#              jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
+#              jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
+#              jdbc.username:
+#              jdbc.password:

+ 154 - 0
client-adapter/phoenix/pom.xml

@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.5-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.phoenix</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter phoenix module for otter ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.phoenix</groupId>
+            <artifactId>phoenix-core</artifactId>
+            <version>4.14.1-HBase-1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>1.4.8</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>2.5.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.12</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.6</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.4</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.9</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.58</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.48</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+<!--                            <tasks>-->
+<!--                                <copy todir="${project.basedir}/../launcher/target/classes/phoenix" overwrite="true">-->
+<!--                                    <fileset dir="${project.basedir}/target/classes/phoenix" erroronmissingdir="true">-->
+<!--                                        <include name="*.yml"/>-->
+<!--                                    </fileset>-->
+<!--                                </copy>-->
+<!--                            </tasks>-->
+                            <tasks>
+                                <copy todir="${project.basedir}/../launcher/target/canal-adapter/conf/phoenix" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/phoenix" erroronmissingdir="true">
+                                        <include name="*.yml"/>
+                                    </fileset>
+                                </copy>
+                                <copy todir="${project.basedir}/../launcher/target/canal-adapter/plugin" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/" erroronmissingdir="true">
+                                        <include name="*with-dependencies.jar"/>
+                                    </fileset>
+                                </copy>
+                                <copy todir="${project.basedir}/../launcher/target/classes/phoenix" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/phoenix" erroronmissingdir="true">
+                                        <include name="*.yml" />
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 291 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/PhoenixAdapter.java

@@ -0,0 +1,291 @@
+package com.alibaba.otter.canal.client.adapter.phoenix;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.ConfigLoader;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.phoenix.monitor.PhoenixConfigMonitor;
+import com.alibaba.otter.canal.client.adapter.phoenix.service.PhoenixEtlService;
+import com.alibaba.otter.canal.client.adapter.phoenix.service.PhoenixSyncService;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.*;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 15:01
+ * @Description: Phoenix适配器实现类
+ */
+@SPI("phoenix")
+public class PhoenixAdapter implements OuterAdapter {
+
+    private static Logger logger = LoggerFactory.getLogger(PhoenixAdapter.class);
+
+    private Map<String, MappingConfig> phoenixMapping = new ConcurrentHashMap<>();                // 文件名对应配置
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                // 库名-表名对应配置
+
+    private static  String DriverClass;
+    private static  String PhoenixUrl;
+    private static  Properties phoenixPro = new Properties();
+
+    private PhoenixSyncService phoenixSyncService;
+
+    private PhoenixConfigMonitor phoenixConfigMonitor;
+
+    private Properties envProperties;
+
+
+
+    public Map<String, MappingConfig> getPhoenixMapping() {
+        return phoenixMapping;
+    }
+
+    public Map<String, Map<String, MappingConfig>> getMappingConfigCache() {
+        return mappingConfigCache;
+    }
+
+    public PhoenixAdapter() {
+        logger.info("PhoenixAdapter create: {} {}", this, Thread.currentThread().getStackTrace());
+    }
+    /**
+     * 初始化方法
+     *
+     * @param configuration 外部适配器配置信息
+     */
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        this.envProperties = envProperties;
+        Map<String, MappingConfig> phoenixMappingTmp = ConfigLoader.load(envProperties);
+        // 过滤不匹配的key的配置
+        phoenixMappingTmp.forEach((key, mappingConfig) -> {
+            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
+                    || (mappingConfig.getOuterAdapterKey() != null
+                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                phoenixMapping.put(key, mappingConfig);
+            }
+        });
+
+        if (phoenixMapping.isEmpty()) {
+            throw new RuntimeException("No phoenix adapter found for config key: " + configuration.getKey());
+        } else {
+            logger.info("[{}]phoenix config mapping: {}", this, phoenixMapping.keySet());
+        }
+
+        for (Map.Entry<String, MappingConfig> entry : phoenixMapping.entrySet()) {
+            String configName = entry.getKey();
+            MappingConfig mappingConfig = entry.getValue();
+            String key;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
+            } else {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
+            }
+            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                    k1 -> new ConcurrentHashMap<>());
+            configMap.put(configName, mappingConfig);
+        }
+
+
+        Map<String, String> properties = configuration.getProperties();
+
+        DriverClass= properties.get("jdbc.driverClassName");
+        PhoenixUrl=properties.get("jdbc.url");
+
+        try {
+            //phoenix内部本身有连接池,不需要使用Druid初始化
+            phoenixPro.setProperty("hbase.rpc.timeout","600000");
+            phoenixPro.setProperty("hbase.client.scanner.timeout.period","600000");
+            phoenixPro.setProperty("dfs.client.socket-timeout","600000");
+            phoenixPro.setProperty("phoenix.query.keepAliveMs","600000");
+            phoenixPro.setProperty("phoenix.query.timeoutMs","3600000");
+            Class.forName(DriverClass);
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        }
+        String threads = properties.get("threads");
+        phoenixSyncService = new PhoenixSyncService(
+                threads != null ? Integer.valueOf(threads) : null
+        );
+
+        phoenixConfigMonitor = new PhoenixConfigMonitor();
+        phoenixConfigMonitor.init(configuration.getKey(), this, envProperties);
+    }
+
+
+    /**
+     * 获取phoenix连接
+     * @return
+     */
+    public static  Connection  getPhoenixConnection() {
+        try {
+            return DriverManager.getConnection(PhoenixUrl,phoenixPro);
+        } catch (SQLException e) {
+            logger.error("getPhoenixConnection Exception"+e.getMessage());
+        }
+        return  null;
+    }
+
+
+
+    /**
+     * 同步方法
+     *
+     * @param dmls 数据包
+     */
+    @Override
+    public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+        try {
+            phoenixSyncService.sync(mappingConfigCache, dmls, envProperties);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * ETL方法
+     *
+     * @param task   任务名, 对应配置名
+     * @param params etl筛选条件
+     * @return ETL结果
+     */
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = phoenixMapping.get(task);
+        if (config != null) {
+            DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (srcDataSource != null) {
+                return PhoenixEtlService.importData(srcDataSource, getPhoenixConnection(), config, params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            // ds不为空说明传入的是destination
+            for (MappingConfig configTmp : phoenixMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
+                    if (srcDataSource == null) {
+                        continue;
+                    }
+                    EtlResult etlRes = PhoenixEtlService.importData(srcDataSource,getPhoenixConnection(), configTmp, params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    /**
+     * 获取总数方法
+     *
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
+    @Override
+    public Map<String, Object> count(String task) {
+        Map<String, Object> res = new LinkedHashMap<>();
+        MappingConfig config = phoenixMapping.get(task);
+        if (config == null) {
+            logger.info("[{}]phoenix config mapping: {}", this, phoenixMapping.keySet());
+            res.put("succeeded", false);
+            res.put("errorMessage", "Task[" + task + "] not found");
+            res.put("tasks", phoenixMapping.keySet());
+            return res;
+        }
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
+        Connection conn = null;
+        try {
+            //conn = dataSource.getConnection();
+            conn = getPhoenixConnection();
+            Util.sqlRS(conn, sql, rs -> {
+                try {
+                    if (rs.next()) {
+                        Long rowCount = rs.getLong("cnt");
+                        res.put("count", rowCount);
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            });
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
+
+        return res;
+    }
+
+    /**
+     * 获取对应canal instance name 或 mq topic
+     *
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = phoenixMapping.get(task);
+        if (config != null) {
+            return config.getDestination();
+        }
+        return null;
+    }
+
+    /**
+     * 销毁方法
+     */
+    @Override
+    public void destroy() {
+        if (phoenixConfigMonitor != null) {
+            phoenixConfigMonitor.destroy();
+        }
+
+        if (phoenixSyncService != null) {
+            phoenixSyncService.close();
+        }
+    }
+}

+ 48 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/ConfigLoader.java

@@ -0,0 +1,48 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.config;
+
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Phoenix表映射配置加载器
+ */
+public class ConfigLoader {
+
+    private static Logger logger = LoggerFactory.getLogger(ConfigLoader.class);
+
+    /**
+     * 加载Phoenix表映射配置
+     *
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, MappingConfig> load(Properties envProperties) {
+        logger.info("## Start loading phoenix mapping config ... ");
+
+        Map<String, MappingConfig> result = new LinkedHashMap<>();
+
+        Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("phoenix");
+        configContentMap.forEach((fileName, content) -> {
+            MappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
+            }
+            result.put(fileName, config);
+        });
+
+        logger.info("## Phoenix mapping config loaded");
+        logger.info("## Phoenix sync threads: " + ConfigurationManager.getInteger("threads"));
+        return result;
+    }
+}

+ 49 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/ConfigurationManager.java

@@ -0,0 +1,49 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.config;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * 配置管理组件
+ * @author Administrator
+ *
+ */
+public class ConfigurationManager {
+
+	private static Properties prop = new Properties();
+
+	static {
+		try {
+			InputStream in = ConfigurationManager.class
+					.getClassLoader().getResourceAsStream("phoenix/phoenix_common.properties");
+			prop.load(in);  
+		} catch (Exception e) {
+			e.printStackTrace();  
+		}
+	}
+	
+	/**
+	 * 获取指定key对应的value
+	 *
+	 * @param key 
+	 * @return 返回value是字符串
+	 */
+	public static String getProperty(String key) {
+		return prop.getProperty(key);
+	}
+	
+	/**
+	 * 获取整数类型的配置项
+	 * @param key StringKye
+	 * @return value
+	 */
+	public static Integer getInteger(String key) {
+		String value = getProperty(key);
+		try {
+			return Integer.valueOf(value);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		return 0;
+	}
+}

+ 288 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/config/MappingConfig.java

@@ -0,0 +1,288 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.config;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.*;
+
+/**
+ * Phoenix表映射配置
+ */
+@SuppressWarnings("unused")
+public class MappingConfig {
+
+    private String dataSourceKey;           // 数据源key
+    private String destination;             // canal实例或MQ的topic
+    private String groupId;                 // groupId
+    private String outerAdapterKey;         // 对应适配器的key
+    private boolean concurrent = false;     // 是否并行同步
+    private DbMapping dbMapping;            // db映射配置
+    private boolean debug = false;          // 调试
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
+    public boolean getConcurrent() {
+        return concurrent;
+    }
+
+    public void setConcurrent(boolean concurrent) {
+        this.concurrent = concurrent;
+    }
+
+    public DbMapping getDbMapping() {
+        return dbMapping;
+    }
+
+    public void setDbMapping(DbMapping dbMapping) {
+        this.dbMapping = dbMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public boolean isDebug() {
+        return debug;
+    }
+
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    public void validate() {
+        if (dbMapping.database == null || dbMapping.database.isEmpty()) {
+            throw new NullPointerException("dbMapping.database");
+        }
+        if ((dbMapping.table == null || dbMapping.table.isEmpty())) {
+            throw new NullPointerException("dbMapping.table");
+        }
+        if ((dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
+            throw new NullPointerException("dbMapping.targetTable");
+        }
+    }
+
+    public static class DbMapping {
+
+        private String database;                            // 数据库名或schema名
+        private String table;                               // 表名
+        private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
+        private boolean mapAll = true;                      // 映射所有字段
+
+        private boolean alter = true;                       // 是否允许修改表
+        private boolean drop = false;                       // 是否允许删除字段
+        private boolean limit = false;                      // 是否限制字段长度
+        private boolean skipMissing = false;                // 是否跳过丢失的字段
+        private boolean escapeUpper = true;                 // 字段默认大写加双引号
+
+        private String targetDb;                            // 目标库名
+        private String targetTable;                         // 目标表名
+        private Map<String, String> targetColumns;          // 目标表字段映射
+
+        private List<String> excludeColumns;                // 不映射的字段
+
+        private String etlCondition;                        // etl条件sql
+        private int readBatch = 5000;
+        private int commitBatch = 5000;                     // etl等批量提交大小
+        private Map<String, String> allMapColumns;
+
+        public String escape(String name) {
+            if (escapeUpper) {
+                return "\"" + name.toUpperCase() + "\"";
+            } else {
+                return name;
+            }
+        }
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+        public Map<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(Map<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
+        public Boolean getMapAll() {
+            return mapAll;
+        }
+
+        public void setMapAll(Boolean mapAll) {
+            this.mapAll = mapAll;
+        }
+
+        public boolean isAlter() {
+            return alter;
+        }
+
+        public void setAlter(boolean alter) {
+            this.alter = alter;
+        }
+
+        public boolean isLimit() {
+            return limit;
+        }
+
+        public void setLimit(boolean limit) {
+            this.limit = limit;
+        }
+
+        public boolean isDrop() {
+            return drop;
+        }
+
+        public void setDrop(boolean drop) {
+            this.drop = drop;
+        }
+
+        public boolean isSkipMissing() {
+            return skipMissing;
+        }
+
+        public void setSkipMissing(boolean skipMissing) {
+            this.skipMissing = skipMissing;
+        }
+
+        public boolean isEscapeUpper() {
+            return escapeUpper;
+        }
+
+        public void setEscapeUpper(boolean escapeUpper) {
+            this.escapeUpper = escapeUpper;
+        }
+
+        public String getTargetDb() {
+            return targetDb;
+        }
+
+        public void setTargetDb(String targetDb) {
+            this.targetDb = targetDb;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColumns() {
+            if (targetColumns != null) {
+                targetColumns.forEach((key, value) -> {
+                    if (StringUtils.isEmpty(value)) {
+                        targetColumns.put(key, key);
+                    }
+                });
+            } else {
+                targetColumns = new HashMap<>();
+            }
+            return targetColumns;
+        }
+
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
+        }
+
+        public void addTargetColumn(String key, String value) {
+            if (targetColumns == null) {
+                targetColumns = new HashMap<>();
+            }
+            targetColumns.put(key, value);
+            if (allMapColumns != null) {
+                allMapColumns.put(key, value);
+            }
+        }
+
+        public void removeTargetColumn(String key) {
+            if (targetColumns != null) {
+                targetColumns.remove(key);
+            }
+            if (allMapColumns != null) {
+                allMapColumns.remove(key);
+            }
+        }
+
+        public List<String> getExcludeColumns() {
+            if (excludeColumns == null) {
+                excludeColumns = new ArrayList<>();
+            }
+            return excludeColumns;
+        }
+
+        public void setExcludeColumns(List<String> excludeColumns) {
+            this.excludeColumns = excludeColumns;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
+    }
+}

+ 167 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/monitor/PhoenixConfigMonitor.java

@@ -0,0 +1,167 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.monitor;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * phoenix config monitor
+ */
+public class PhoenixConfigMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(PhoenixConfigMonitor.class);
+
+    private static final String adapterName = "phoenix";  //相应组件名字
+
+    private String key;
+
+    private PhoenixAdapter phoenixAdapter;   //相应适配器名实现类
+
+    private Properties envProperties;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(String key, PhoenixAdapter phoenixAdapter, Properties envProperties) {
+        this.key = key;
+        this.phoenixAdapter = phoenixAdapter;
+        this.envProperties = envProperties;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                MappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
+                config.validate();
+                if ((key == null && config.getOuterAdapterKey() == null)
+                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                    addConfigToCache(file, config);
+
+                    logger.info("Add a new phoenix mapping config: {} to canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (phoenixAdapter.getPhoenixMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                            .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
+                    MappingConfig config = YmlConfigBinder
+                            .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
+                    config.validate();
+                    if ((key == null && config.getOuterAdapterKey() == null)
+                            || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                        if (phoenixAdapter.getPhoenixMapping().containsKey(file.getName())) {
+                            deleteConfigFromCache(file);
+                        }
+                        addConfigToCache(file, config);
+                    } else {
+                        // 不能修改outerAdapterKey
+                        throw new RuntimeException("Outer adapter key not allowed modify");
+                    }
+                    logger.info("Change a phoenix mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (phoenixAdapter.getPhoenixMapping().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+
+                    logger.info("Delete a phoenix mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
+            phoenixAdapter.getPhoenixMapping().put(file.getName(), mappingConfig);
+            Map<String, MappingConfig> configMap = phoenixAdapter.getMappingConfigCache()
+                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                                    + mappingConfig.getDbMapping().getDatabase() + "-"
+                                    + mappingConfig.getDbMapping().getTable().toLowerCase(),
+                            k1 -> new HashMap<>());
+            configMap.put(file.getName(), mappingConfig);
+        }
+
+        private void deleteConfigFromCache(File file) {
+            logger.info("deleteConfigFromCache: {}", file.getName());
+            MappingConfig mappingConfig = phoenixAdapter.getPhoenixMapping().remove(file.getName());
+
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
+            for (Map<String, MappingConfig> configMap : phoenixAdapter.getMappingConfigCache().values()) {
+                if (configMap != null) {
+                    configMap.remove(file.getName());
+                }
+            }
+        }
+    }
+}

+ 429 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixEtlService.java

@@ -0,0 +1,429 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.service;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.PhoenixSupportUtil;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.TypeUtil;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Phoenix ETL 操作业务类
+ */
+public class PhoenixEtlService {
+
+    private static final Logger logger = LoggerFactory.getLogger(PhoenixEtlService.class);
+
+    private static String[] splitNotEmpty(String s) {
+        if (s != null && s.trim().length() > 0) {
+            return s.trim().split(",");
+        }
+        return new String[]{};
+    }
+
+
+    static boolean syncSchema(Connection targetDSConnection, MappingConfig config) {
+        DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (srcDataSource == null) {
+            return false;
+        }
+        try {
+            return syncSchema(srcDataSource.getConnection(), targetDSConnection, config);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static boolean syncSchema(DataSource srcDS,Connection targetDSConnection, MappingConfig config) {
+        try {
+            return syncSchema(srcDS.getConnection(),targetDSConnection, config);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static boolean syncSchema(Connection srcDS, Connection targetDS, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        if (dbMapping.getMapAll() && dbMapping.isAlter()) { // 检查字段是否缺失
+            Map<String, Integer> targetColumnType = new LinkedHashMap<>();
+            String targetTable = SyncUtil.getDbTableName(dbMapping);
+            try {
+                Util.sqlRS(targetDS, "SELECT * FROM " + targetTable + " LIMIT 1", rs -> {
+                    try {
+                        ResultSetMetaData rsd = rs.getMetaData();
+                        int columnCount = rsd.getColumnCount();
+                        for (int i = 1; i <= columnCount; i++) {
+                            targetColumnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                        }
+                    } catch (Exception e) {
+                        logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    }
+                });
+            } catch (RuntimeException e) {
+                if (!e.getCause().getClass().getName().endsWith("TableNotFoundException")) {
+                    throw e;
+                }
+            }
+            StringBuilder missing = new StringBuilder();
+            StringBuilder constraint = new StringBuilder();
+            Util.sqlRS(srcDS, "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '" + dbMapping.getDatabase() + "'  AND TABLE_NAME = '" + dbMapping.getTable() + "'", rs -> {
+                try {
+                    List<String> excludeColumns = config.getDbMapping().getExcludeColumns();
+                    while (rs.next()) {
+                        String name = rs.getString("COLUMN_NAME");
+                        String lower = name.toLowerCase();
+                        String colType = rs.getString("COLUMN_TYPE");
+                        if (targetColumnType.get(lower) == null && !excludeColumns.contains(lower)) {
+                            boolean isPri = rs.getString("COLUMN_KEY").equals("PRI");
+                            String[] args = splitNotEmpty(colType.replaceAll("^\\w+(?:\\(([^)]*)\\))?[\\s\\S]*$", "$1"));
+                            missing.append(dbMapping.escape(name)).append(" ").append(TypeUtil.getPhoenixType(
+                                    rs.getString("DATA_TYPE").toUpperCase(),
+                                    args,
+                                    colType.contains("unsigned"),
+                                    dbMapping.isLimit()
+                            ));
+                            if (isPri) {
+                                if (args.length > 0 && dbMapping.isLimit() || rs.getString("IS_NULLABLE").equals("NO")) {
+                                    missing.append(" NOT NULL");
+                                }
+                                constraint.append(dbMapping.escape(name)).append(',');
+                            }
+                            missing.append(',');
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error(dbMapping.getDatabase() + "." + dbMapping.getTable() + " schema failed! ==>" + e.getMessage(), e);
+                    throw new RuntimeException(e);
+                }
+            });
+            if (missing.length() > 0) {
+                String sql;
+                if (targetColumnType.isEmpty()) {
+                    if (constraint.length() > 0) {
+                        constraint.deleteCharAt(constraint.length() - 1);
+                        missing.append("CONSTRAINT pk PRIMARY KEY(").append(constraint.toString()).append(")");
+                    } else {
+                        missing.deleteCharAt(missing.length() - 1);
+                    }
+                    sql = "CREATE TABLE " + targetTable + " (" + missing.toString() + ")";
+                } else {
+                    missing.deleteCharAt(missing.length() - 1);
+                    sql = "ALTER TABLE " + targetTable + " ADD " + missing.toString();
+                }
+                logger.info("schema missing: {} {}", targetColumnType, sql);
+                try (PreparedStatement pstmt = targetDS.prepareStatement(sql)) {
+                    pstmt.executeUpdate();
+                } catch (SQLException e) {
+                    logger.error("sync schema error: " + e.getMessage(), e);
+                }
+            } else {
+                logger.debug("schema ok: {}", targetColumnType);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * 导入数据
+     */
+    public static EtlResult importData(DataSource srcDS, Connection targetDSConnection, MappingConfig config,
+                                       List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        AtomicLong successCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String hbaseTable = "";
+        try {
+            if (config == null) {
+                logger.error("Config is null!");
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("Config is null!");
+                return etlResult;
+            }
+            boolean debug = params != null && params.get(0).equals("_debug");
+            if (debug) {
+                params = params.subList(1, params.size());
+            }
+            syncSchema(srcDS, targetDSConnection, config);
+            DbMapping dbMapping = config.getDbMapping();
+
+            long start = System.currentTimeMillis();
+
+            // 拼接sql
+            StringBuilder sql = new StringBuilder(
+                    "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
+
+            // 拼接条件
+            appendCondition(params, dbMapping, srcDS, sql);
+
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) Util.sqlRS(srcDS, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0 : count;
+            });
+
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3;
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                    }
+                    executor
+                            .execute(() -> executeSqlImport(srcDS, targetDSConnection, sqlFinal, dbMapping, successCount, errMsg, debug));
+                }
+
+                executor.shutdown();
+                //noinspection StatementWithEmptyBody
+                while (!executor.awaitTermination(3, TimeUnit.SECONDS)) ;
+            } else {
+                executeSqlImport(srcDS, targetDSConnection, sql.toString(), dbMapping, successCount, errMsg, debug);
+            }
+
+            logger.info(
+                    dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+
+            etlResult
+                    .setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
+        }
+
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
+    }
+
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
+                                        StringBuilder sql) {
+        if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
+            AtomicBoolean stExists = new AtomicBoolean(false);
+            // 验证是否有SYS_TIME字段
+            Util.sqlRS(ds, sql.toString(), rs -> {
+                try {
+                    ResultSetMetaData rsmd = rs.getMetaData();
+                    int cnt = rsmd.getColumnCount();
+                    for (int i = 1; i <= cnt; i++) {
+                        String columnName = rsmd.getColumnName(i);
+                        if ("SYS_TIME".equalsIgnoreCase(columnName)) {
+                            stExists.set(true);
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+                return null;
+            });
+            if (stExists.get()) {
+                sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
+            }
+        } else if (dbMapping.getEtlCondition() != null && params != null && params.size() > 0) {
+            String etlCondition = dbMapping.getEtlCondition();
+            int size = params.size();
+            for (int i = 0; i < size; i++) {
+                etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+            }
+
+            sql.append(" ").append(etlCondition);
+        }
+    }
+
+    /**
+     * 执行导入
+     */
+    private static boolean executeSqlImport(DataSource srcDS, Connection targetDSConnection, String sql, DbMapping dbMapping,
+                                            AtomicLong successCount, List<String> errMsg, boolean debug) {
+        try {
+            Map<String, String> columnsMap = new LinkedHashMap<>();
+            Map<String, Integer> columnType = new LinkedHashMap<>();
+
+
+            PhoenixSupportUtil.sqlRS(targetDSConnection, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
+                try {
+
+                    ResultSetMetaData rsd = rs.getMetaData();
+                    int columnCount = rsd.getColumnCount();
+                    List<String> columns = new ArrayList<>();
+                    List<String> excludeColumns = dbMapping.getExcludeColumns();
+                    for (int i = 1; i <= columnCount; i++) {
+                        String lower = rsd.getColumnName(i).toLowerCase();
+                        if (!excludeColumns.contains(lower)) {
+                            columnType.put(lower, rsd.getColumnType(i));
+                            columns.add(lower);
+                        }
+                    }
+                    columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
+                    return true;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    return false;
+                }
+            });
+            Util.sqlRS(srcDS, sql, rs -> {
+                int idx = 1;
+
+                try {
+                    boolean completed = false;
+
+                    // if (dbMapping.isMapAll()) {
+                    // columnsMap = dbMapping.getAllColumns();
+                    // } else {
+                    // columnsMap = dbMapping.getTargetColumns();
+                    // }
+
+                    StringBuilder insertSql = new StringBuilder();
+                    insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+                    columnsMap
+                            .forEach((targetColumnName, srcColumnName) -> insertSql.append(dbMapping.escape(targetColumnName)).append(","));
+
+                    int len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(") VALUES (");
+                    int mapLen = columnsMap.size();
+                    for (int i = 0; i < mapLen; i++) {
+                        insertSql.append("?,");
+                    }
+                    len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(")");
+                    try (
+                         //Connection connTarget = targetDS.getConnection();
+                         Connection connTarget =PhoenixAdapter.getPhoenixConnection();
+                         PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                        connTarget.setAutoCommit(false);
+
+                        while (rs.next()) {
+                            completed = false;
+
+                            pstmt.clearParameters();
+
+                            // 删除数据
+                            Map<String, Object> values = new LinkedHashMap<>();
+                            StringBuilder deleteSql = new StringBuilder(
+                                    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
+                            appendCondition(dbMapping, deleteSql, values, rs);
+                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                                int k = 1;
+                                for (Object val : values.values()) {
+                                    pstmt2.setObject(k++, val);
+                                }
+                                pstmt2.execute();
+                            }
+
+                            Map<String, Object> insertValues = new HashMap<>();
+                            int i = 1;
+                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                                String targetClolumnName = entry.getKey();
+                                String srcColumnName = entry.getValue();
+                                if (srcColumnName == null) {
+                                    srcColumnName = targetClolumnName;
+                                }
+
+                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+
+                                try {
+                                    Object value = rs.getObject(srcColumnName);
+                                    insertValues.put(srcColumnName, value);
+                                    if (value != null) {
+                                        SyncUtil.setPStmt(type, pstmt, value, i);
+                                    } else {
+                                        pstmt.setNull(i, type);
+                                    }
+                                } catch (SQLException e) {
+                                    insertValues.put(srcColumnName, null);
+                                    pstmt.setNull(i, type);
+                                }
+                                i++;
+                            }
+                            if (debug) {
+                                logger.info("insert sql: {} {} {}", insertSql, insertValues, pstmt);
+                            }
+
+                            pstmt.execute();
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Insert into target table, sql: {}", insertSql);
+                            }
+
+                            if (idx % dbMapping.getCommitBatch() == 0) {
+                                connTarget.commit();
+                                completed = true;
+                            }
+                            idx++;
+                            successCount.incrementAndGet();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("successful import count:" + successCount.get());
+                            }
+                        }
+                        if (!completed) {
+                            connTarget.commit();
+                        }
+                    }
+
+                } catch (Exception e) {
+                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                }
+                return idx;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+    /**
+     * 拼接目标表主键where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        ResultSet rs) throws SQLException {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(dbMapping.escape(targetColumnName)).append("=? AND ");
+            values.put(targetColumnName, rs.getObject(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+}

+ 644 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixSyncService.java

@@ -0,0 +1,644 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.service;
+
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLName;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.statement.*;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.alibaba.druid.util.JdbcConstants;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.ConfigurationManager;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.phoenix.support.TypeUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+
+/**
+ * Phoenix同步操作业务
+ */
+public class PhoenixSyncService {
+
+    private static final Logger logger = LoggerFactory.getLogger(PhoenixSyncService.class);
+
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+    private Map<String, Map<String, Integer>> columnsTypeCache;
+
+    //同步线程数
+    //默认开启3个线程同步,此处配置了自定义同步线程数
+    private int threads = ConfigurationManager.getInteger("threads");
+
+    private List<SyncItem>[] dmlsPartition;
+    private BatchExecutor[] batchExecutors;
+    private ExecutorService[] executorThreads;
+
+    public PhoenixSyncService(Integer threads) {
+        this(threads, new ConcurrentHashMap<>());
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private PhoenixSyncService(Integer threads, Map<String, Map<String, Integer>> columnsTypeCache) {
+        this.columnsTypeCache = columnsTypeCache;
+        try {
+            if (threads != null) {
+                this.threads = threads;
+            }
+            this.dmlsPartition = new List[this.threads];
+            this.batchExecutors = new BatchExecutor[this.threads];
+            this.executorThreads = new ExecutorService[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                dmlsPartition[i] = new ArrayList<>();
+                batchExecutors[i] = new BatchExecutor();
+                //创建单个线程,用来操作一个无界的队列任务,不会使用额外的线程。如果线程崩溃会重新创建一个,直到任务完成。
+                executorThreads[i] = Executors.newSingleThreadExecutor();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 批量同步回调
+     *
+     * @param dmls     批量 DML
+     * @param function 回调方法
+     */
+    private void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
+        try {
+            boolean toExecute = false;
+            for (Dml dml : dmls) {
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
+            }
+            if (toExecute) {
+                List<Future<Boolean>> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    if (dmlsPartition[j].isEmpty()) {
+                        // bypass
+                        continue;
+                    }
+
+                    futures.add(executorThreads[i].submit(() -> {
+                        try {
+                            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                                    syncItem.config,
+                                    syncItem.singleDml));
+                            //相对于RDB同步 少了  dmlsPartition[j].clear();
+                            //在 try catch中获取异常后再次执行一次batchExecutors[j].commit()
+                            batchExecutors[j].commit();
+                            return true;
+                        } catch (Throwable e) {
+                            batchExecutors[j].rollback();
+                            if (!e.getClass().getName().endsWith("ColumnNotFoundException")
+                                    && !e.getClass().getName().endsWith("TableNotFoundException")) {
+                                throw new RuntimeException(e);
+                            }
+                            logger.info("table or column not found: " + e.getMessage());
+                            boolean synced = false;
+                            for (SyncItem syncItem : dmlsPartition[j]) {
+                                if (PhoenixEtlService.syncSchema(batchExecutors[j].getConn(), syncItem.config)) {
+                                    synced = true;
+                                }
+                            }
+                            if (!synced) {
+                                throw new RuntimeException(e);
+                            }
+                            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                                    syncItem.config,
+                                    syncItem.singleDml));
+                            try {
+                                batchExecutors[j].commit();
+                                return true;
+                            } catch (Throwable e1) {
+                                batchExecutors[j].rollback();
+                                throw new RuntimeException(e1);
+                            }
+                        } finally {
+                            dmlsPartition[j].clear();
+                        }
+                    }));
+                }
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (ExecutionException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+        } finally {
+            for (BatchExecutor batchExecutor : batchExecutors) {
+                if (batchExecutor != null) {
+                    batchExecutor.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * 批量同步 :将批量DML进行解析并放入List<SingleDml> --> dmlsPartition[hash].add(syncItem);
+     * @param mappingConfig 配置集合
+     * @param dmls   批量 DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
+        sync(dmls, dml -> {
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+            String database = dml.getDatabase();
+            String table = dml.getTable().toLowerCase();
+            Map<String, MappingConfig> configMap;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                //tcp 模式
+                configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
+            } else {
+                //kafka 模式 或者 RocketMQ模式
+                configMap = mappingConfig.get(destination + "_" + database + "-" + table);
+            }
+
+            if (configMap == null) {
+                if (logger.isTraceEnabled()) {
+                    logger.trace("no config map: destination={},groupId={}, database={}, table={}, keys={}", destination, groupId, database, table, mappingConfig.keySet());
+                }
+                return false;
+            }
+            if (configMap.values().isEmpty()) {
+                logger.info("config map empty: destination={},groupId={}, database={}, table={}, keys={}", destination, groupId, database, table, mappingConfig.keySet());
+                return false;
+            }
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // DDL
+                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+                List<SQLStatement> stmtList;
+                try {
+                    stmtList = SQLUtils.parseStatements(dml.getSql(), JdbcConstants.MYSQL, false);
+                } catch (ParserException e) {
+                    // 可能存在一些SQL是不支持的,比如存储过程
+                    logger.info("parse sql error: " + dml.getSql(), e);
+                    return false;
+                }
+                for (Map.Entry<String, MappingConfig> entry : configMap.entrySet()) {
+                    try {
+                        alter(batchExecutors[0], entry.getValue(), dml, stmtList, entry.getKey());
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+                return false;
+            } else {
+                // DML
+                for (Map.Entry<String, MappingConfig> entry : configMap.entrySet()) {
+                    MappingConfig config = entry.getValue();
+                    if (config.isDebug()) {
+                        logger.info("DML: {} {}", entry.getKey(), JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                    }
+                    if (config.getConcurrent()) {
+                        //并行同步
+                        //将一批DML转成SingleDml
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            //取主键hash
+                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            //相同的主键数据的顺序是可以保证的
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    } else {
+                        //不并行同步
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            //这里线程默认是3个,如果不并行,则会出现2个线程空跑
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    }
+                }
+                return true;
+            }
+        });
+    }
+
+    /**
+     * 单条 dml 同步
+     *
+     * @param batchExecutor 批量事务执行器
+     * @param config        对应配置对象
+     * @param dml           DML
+     */
+    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        if (config != null) {
+            try {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
+                    truncate(batchExecutor, config);
+                } else if (logger.isInfoEnabled()){
+                    logger.info("SingleDml: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
+            } catch (SQLException e) {
+                logger.error("sync error: " + e.getMessage(), e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void alter(BatchExecutor batchExecutor, MappingConfig config, Dml dml, List<SQLStatement> stmtList, String configFile) throws SQLException {
+        if (config.isDebug()) {
+            logger.info("DML: {} {}", configFile, JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+        }
+        DbMapping dbMapping = config.getDbMapping();
+        if (!dbMapping.isAlter()) {
+            logger.info("not alterable table: {} {}", dml.getTable(), configFile);
+            return;
+        }
+
+        Map<String, String> columnsMap = dbMapping.getTargetColumns();
+
+        Map<String, String> columnsMap1 = new HashMap<>();
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            columnsMap1.put(entry.getValue(), entry.getKey());
+        }
+
+        String targetTable = SyncUtil.getDbTableName(dbMapping);
+        Map<String, String> defValues = new HashMap<>();
+        for (SQLStatement statement : stmtList) {
+            if (statement instanceof SQLAlterTableStatement) {
+                SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
+                for (SQLAlterTableItem item : alterTable.getItems()) {
+                    if (item instanceof SQLAlterTableDropColumnItem) {
+                        SQLAlterTableDropColumnItem dropColumnItem = (SQLAlterTableDropColumnItem) item;
+                        if (!dbMapping.isDrop()) {
+                            logger.info("drop table column disabled: {} {}", targetTable, dropColumnItem.getColumns());
+                            continue;
+                        }
+                        for (SQLName sqlName : dropColumnItem.getColumns()) {
+                            String name = Util.cleanColumn(sqlName.getSimpleName());
+                            String sql = "ALTER TABLE " + targetTable + " DROP COLUMN IF EXISTS " +
+                                    dbMapping.escape(columnsMap1.getOrDefault(name, name));
+                            try {
+                                logger.info("drop table column: {} {}", sql, batchExecutor.executeUpdate(sql));
+                                dbMapping.removeTargetColumn(name);
+                            } catch (Exception e) {
+                                logger.warn("drop table column error: " + sql, e);
+                            }
+                        }
+                    } else if (item instanceof SQLAlterTableAddColumn) {
+                        SQLAlterTableAddColumn addColumn = (SQLAlterTableAddColumn) item;
+                        if (!dbMapping.getMapAll()) {
+                            logger.info("add table column disabled: {} {}", targetTable, addColumn.getColumns());
+                            continue;
+                        }
+                        for (SQLColumnDefinition definition : addColumn.getColumns()) {
+                            String name = Util.cleanColumn(definition.getNameAsString());
+                            if (dbMapping.getExcludeColumns().contains(name)) {
+                                continue;
+                            }
+                            String sql = "ALTER TABLE " + targetTable +
+                                    " ADD IF NOT EXISTS " +
+                                    dbMapping.escape(name) + " " + TypeUtil.getPhoenixType(definition, dbMapping.isLimit());
+                            try {
+                                logger.info("add table column: {} {}", sql, batchExecutor.executeUpdate(sql));
+                                dbMapping.addTargetColumn(name, name);
+                                if (definition.getDefaultExpr() != null) {
+                                    String defVal = definition.getDefaultExpr().toString();
+                                    if (!defVal.equalsIgnoreCase("NULL") && !defVal.equalsIgnoreCase("NOT NULL") && name.length() > 0) {
+                                        defValues.put(name, defVal);
+                                    }
+                                }
+                            } catch (Exception e) {
+                                logger.error("add table column error: " + sql, e);
+                                throw e;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (!defValues.isEmpty()) {
+            StringBuilder defSql = new StringBuilder();
+            defSql.append("UPSERT INTO ").append(targetTable).append("(");
+            Set<Map.Entry<String, String>> pkSet = dbMapping.getTargetPk().entrySet();
+            Set<Map.Entry<String, String>> defSet = defValues.entrySet();
+            for (Map.Entry<String, String> entry : pkSet) {
+                defSql.append(dbMapping.escape(entry.getKey())).append(",");
+            }
+            for (Map.Entry<String, String> entry : defSet) {
+                defSql.append(dbMapping.escape(entry.getKey())).append(",");
+            }
+            defSql.deleteCharAt(defSql.length() - 1).append(") SELECT ");
+            for (Map.Entry<String, String> entry : pkSet) {
+                defSql.append(dbMapping.escape(entry.getKey())).append(",");
+            }
+            for (Map.Entry<String, String> entry : defSet) {
+                defSql.append(entry.getValue()).append(",");
+            }
+            defSql.deleteCharAt(defSql.length() - 1).append(" FROM ").append(targetTable);
+            try {
+                logger.info("set column default value: {} {}", defSql, batchExecutor.executeUpdate(defSql.toString()));
+                batchExecutor.commit();
+            } catch (SQLException e) {
+                logger.error("set column default value error: {}", defSql, e);
+                batchExecutor.rollback();
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * 插入操作
+     *
+     * @param config 配置项
+     * @param dml    DML数据
+     */
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        int mapLen = columnsMap.size();
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                if (dbMapping.isSkipMissing()) {
+                    logger.warn("Target missing field: {}", targetColumnName);
+                    mapLen -= 1;
+                    continue;
+                } else if (dbMapping.getMapAll() && dbMapping.isAlter() && PhoenixEtlService.syncSchema(batchExecutor.getConn(), config)) {
+                    columnsTypeCache.remove(config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable());
+                    ctype = getTargetColumnType(batchExecutor.getConn(), config);
+                    type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                }
+                if (type == null) {
+                    throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+                }
+            }
+            insertSql.append(dbMapping.escape(targetColumnName)).append(",");
+            Object value = data.get(srcColumnName);
+            BatchExecutor.setValue(values, type, value);
+        }
+
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
+
+        Map<String, Object> old = dml.getOld();
+        try {
+            if (old != null && !old.isEmpty()) {
+                boolean keyChanged = false;
+                List<Map<String, ?>> delValues = new ArrayList<>();
+                StringBuilder deleteSql = new StringBuilder();
+                deleteSql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
+                for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+                    String targetColumnName = entry.getKey();
+                    String srcColumnName = entry.getValue();
+                    if (srcColumnName == null) {
+                        srcColumnName = Util.cleanColumn(targetColumnName);
+                    }
+                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                    if (type != null) {
+                        deleteSql.append(dbMapping.escape(targetColumnName)).append("=? AND ");
+                        // 如果有修改主键的情况
+                        if (old.containsKey(srcColumnName)) {
+                            keyChanged = true;
+                            BatchExecutor.setValue(delValues, type, old.get(srcColumnName));
+                        } else {
+                            BatchExecutor.setValue(delValues, type, data.get(srcColumnName));
+                        }
+                    }
+                }
+                if (keyChanged) {
+                    if (config.isDebug()) {
+                        logger.info("insert into table: {} {}", deleteSql, delValues);
+                    }
+                    batchExecutor.execute(deleteSql.toString(), delValues);
+                }
+            }
+            if (config.isDebug()) {
+                logger.info("insert into table: {} {}", insertSql, values);
+            }
+            batchExecutor.execute(insertSql.toString(), values);
+        } catch (SQLException | RuntimeException e) {
+            logger.warn("Insert into target table, sql: {} {}", insertSql, values ,e);
+            throw e;
+        }
+        if (logger.isTraceEnabled()) {
+            logger.trace("Insert into target table, sql: {}", insertSql);
+        }
+    }
+
+    /**
+     * 删除操作 没有改动
+     *
+     * @param config MappingConfig
+     * @param dml    Single DML
+     */
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
+
+        List<Map<String, ?>> values = new ArrayList<>();
+        // 拼接主键
+        appendCondition(dbMapping, sql, ctype, values, data);
+        try {
+            batchExecutor.execute(sql.toString(), values);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Delete from target table, sql: {}", sql);
+            }
+        } catch (SQLException e) {
+            logger.warn("Delete from target error, sql: {} {}", sql, values);
+            throw e;
+        }
+    }
+
+    /**
+     * truncate操作   没有改动
+     *
+     * @param config MappingConfig
+     */
+    private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
+        DbMapping dbMapping = config.getDbMapping();
+        StringBuilder sql = new StringBuilder();
+        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
+        batchExecutor.execute(sql.toString(), new ArrayList<>());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Truncate target table, sql: {}", sql);
+        }
+    }
+
+    /**
+     * 获取目标字段类型
+     *
+     * @param conn   sql connection
+     * @param config 映射配置
+     * @return 字段sqlType
+     */
+    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
+        if (columnType == null) {
+            synchronized (PhoenixSyncService.class) {
+                columnType = columnsTypeCache.get(cacheKey);
+                if (columnType == null) {
+                    columnType = new LinkedHashMap<>();
+                    final Map<String, Integer> columnTypeTmp = columnType;
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
+                    try {
+                        Util.sqlRS(conn, sql, rs -> {
+                            try {
+                                ResultSetMetaData rsd = rs.getMetaData();
+                                int columnCount = rsd.getColumnCount();
+                                for (int i = 1; i <= columnCount; i++) {
+                                    columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                                }
+                                columnsTypeCache.put(cacheKey, columnTypeTmp);
+                            } catch (SQLException e) {
+                                logger.error(e.getMessage(), e);
+                            }
+                        });
+                    } catch (RuntimeException e) {
+                        //新增catch 里面做了操作
+                        if (!e.getCause().getClass().getName().endsWith("TableNotFoundException")) {
+                            throw e;
+                        }
+                        if (!PhoenixEtlService.syncSchema(conn, config)) {
+                            throw e;
+                        }
+                        Util.sqlRS(conn, sql, rs -> {
+                            try {
+                                ResultSetMetaData rsd = rs.getMetaData();
+                                int columnCount = rsd.getColumnCount();
+                                for (int i = 1; i <= columnCount; i++) {
+                                    columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                                }
+                                columnsTypeCache.put(cacheKey, columnTypeTmp);
+                            } catch (SQLException e1) {
+                                logger.error(e1.getMessage(), e1);
+                            }
+                        });
+                    }
+                }
+            }
+        }
+        return columnType;
+    }
+
+    /**
+     * 拼接主键 where条件
+     */
+    private void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d) {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+            sql.append(dbMapping.escape(targetColumnName)).append("=? AND ");
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
+            BatchExecutor.setValue(values, type, d.get(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+
+    public static class SyncItem {
+
+        private MappingConfig config;
+        private SingleDml singleDml;
+
+        SyncItem(MappingConfig config, SingleDml singleDml) {
+            this.config = config;
+            this.singleDml = singleDml;
+        }
+    }
+
+    /**
+     * 取主键hash
+     */
+    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+        int hash = 0;
+        // 取主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+            Object value = null;
+            if (d != null) {
+                value = d.get(srcColumnName);
+            }
+            if (value != null) {
+                hash += value.hashCode();
+
+            }
+        }
+        hash = Math.abs(hash) % threads;
+        return Math.abs(hash);
+    }
+
+
+    public void close() {
+        for (int i = 0; i < threads; i++) {
+            executorThreads[i].shutdown();
+        }
+    }
+}

+ 108 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/BatchExecutor.java

@@ -0,0 +1,108 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.support;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.Closeable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * sql批量执行器   基本没有变动
+ */
+public class BatchExecutor implements Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
+
+    private DataSource          dataSource;
+    private Connection          conn;
+    private AtomicInteger       idx    = new AtomicInteger(0);
+
+    public BatchExecutor(DataSource dataSource){
+        this.dataSource = dataSource;
+    }
+    public BatchExecutor(){
+    }
+
+    public Connection getConn() {
+        if (conn == null) {
+            try {
+                conn=PhoenixAdapter.getPhoenixConnection();
+                this.conn.setAutoCommit(false);
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        return conn;
+    }
+
+    public static void setValue(List<Map<String, ?>> values, int type, Object value) {
+        Map<String, Object> valueItem = new HashMap<>();
+        valueItem.put("type", type);
+        valueItem.put("value", value);
+        values.add(valueItem);
+    }
+
+    public int executeUpdate(String sql) throws SQLException {
+        logger.debug("execute: {}", sql);
+        Statement statement = getConn().createStatement();
+        int ret = statement.executeUpdate(sql);
+        statement.close();
+        return ret;
+    }
+
+    public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("execute: {} {}", sql, Arrays.toString(values.toArray()));
+        }
+        PreparedStatement pstmt = getConn().prepareStatement(sql);
+        int len = values.size();
+        for (int i = 0; i < len; i++) {
+            int type = (Integer) values.get(i).get("type");
+            Object value = values.get(i).get("value");
+            SyncUtil.setPStmt(type, pstmt, value, i + 1);
+        }
+
+        pstmt.execute();
+        idx.incrementAndGet();
+        pstmt.close();
+    }
+
+    public void commit() throws SQLException {
+        getConn().commit();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor commit " + idx.get() + " rows");
+        }
+        idx.set(0);
+    }
+
+    public void rollback() throws SQLException {
+        getConn().rollback();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor rollback " + idx.get() + " rows");
+        }
+        idx.set(0);
+    }
+
+    @Override
+    public void close() {
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            } finally {
+                conn = null;
+            }
+        }
+    }
+}

+ 97 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/PhoenixSupportUtil.java

@@ -0,0 +1,97 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.support;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.function.Function;
+
+/**
+ * @author: lihua
+ * @date: 2020/12/30 18:43
+ * @Description:
+ */
+public class PhoenixSupportUtil {
+
+
+    public static final  Logger logger = LoggerFactory.getLogger(PhoenixSupportUtil.class);
+    public static Object sqlRS(Connection dsConnection, String sql, Function<ResultSet, Object> fun) {
+
+        try {
+            Connection conn = dsConnection;
+            Throwable var4 = null;
+
+            Object var9;
+            try {
+                Statement stmt = conn.createStatement(1003, 1007);
+                Throwable var6 = null;
+
+                try {
+                    stmt.setFetchSize(-2147483648);
+                    ResultSet rs = stmt.executeQuery(sql);
+                    Throwable var8 = null;
+
+                    try {
+                        var9 = fun.apply(rs);
+                    } catch (Throwable var56) {
+                        var9 = var56;
+                        var8 = var56;
+                        throw var56;
+                    } finally {
+                        if (rs != null) {
+                            if (var8 != null) {
+                                try {
+                                    rs.close();
+                                } catch (Throwable var55) {
+                                    var8.addSuppressed(var55);
+                                }
+                            } else {
+                                rs.close();
+                            }
+                        }
+
+                    }
+                } catch (Throwable var58) {
+                    var6 = var58;
+                    throw var58;
+                } finally {
+                    if (stmt != null) {
+                        if (var6 != null) {
+                            try {
+                                stmt.close();
+                            } catch (Throwable var54) {
+                                var6.addSuppressed(var54);
+                            }
+                        } else {
+                            stmt.close();
+                        }
+                    }
+
+                }
+            } catch (Throwable var60) {
+                var4 = var60;
+                throw var60;
+            } finally {
+                if (conn != null) {
+                    if (var4 != null) {
+                        try {
+                            conn.close();
+                        } catch (Throwable var53) {
+                            var4.addSuppressed(var53);
+                        }
+                    } else {
+                        conn.close();
+                    }
+                }
+
+            }
+
+            return var9;
+        } catch (Exception var62) {
+            logger.error("sqlRs has error, sql: {} ", sql);
+            throw new RuntimeException(var62);
+        }
+    }
+}

+ 97 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/SingleDml.java

@@ -0,0 +1,97 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.support;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 没有改动 单个DML类
+ */
+@SuppressWarnings({"unused", "WeakerAccess"})
+public class SingleDml {
+
+    private String              destination;
+    private String              database;
+    private String              table;
+    private String              type;
+    private Map<String, Object> data;
+    private Map<String, Object> old;
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Object> data) {
+        this.data = data;
+    }
+
+    public Map<String, Object> getOld() {
+        return old;
+    }
+
+    public void setOld(Map<String, Object> old) {
+        this.old = old;
+    }
+
+    public static List<SingleDml> dml2SingleDmls(Dml dml) {
+        List<SingleDml> singleDmls = new ArrayList<>();
+        if (dml.getData() != null) {
+            int size = dml.getData().size();
+            for (int i = 0; i < size; i++) {
+                SingleDml singleDml = new SingleDml();
+                singleDml.setDestination(dml.getDestination());
+                singleDml.setDatabase(dml.getDatabase());
+                singleDml.setTable(dml.getTable());
+                singleDml.setType(dml.getType());
+                singleDml.setData(dml.getData().get(i));
+                if (dml.getOld() != null) {
+                    singleDml.setOld(dml.getOld().get(i));
+                }
+                singleDmls.add(singleDml);
+            }
+            //MaxWell 中没有对TRUNCATE的DML操作进行解析
+        } else if ("TRUNCATE".equalsIgnoreCase(dml.getType())) {
+            SingleDml singleDml = new SingleDml();
+            singleDml.setDestination(dml.getDestination());
+            singleDml.setDatabase(dml.getDatabase());
+            singleDml.setTable(dml.getTable());
+            singleDml.setType(dml.getType());
+            singleDmls.add(singleDml);
+        }
+        return singleDmls;
+    }
+}

+ 274 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/SyncUtil.java

@@ -0,0 +1,274 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.support;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SyncUtil {
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Map<String, Object> data) {
+        return getColumnsMap(dbMapping, data.keySet());
+    }
+
+    public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
+        Map<String, String> columnsMap;
+        if (dbMapping.getMapAll()) {
+            if (dbMapping.getAllMapColumns() != null) {
+                return dbMapping.getAllMapColumns();
+            }
+            columnsMap = new LinkedHashMap<>();
+            for (String srcColumn : columns) {
+                boolean flag = true;
+                if (dbMapping.getTargetColumns() != null) {
+                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
+                        if (srcColumn.equals(entry.getValue())) {
+                            columnsMap.put(entry.getKey(), srcColumn);
+                            flag = false;
+                            break;
+                        }
+                    }
+                }
+                //新增 排除掉getExcludeColumns() 去除的列
+                if (flag && !dbMapping.getExcludeColumns().contains(srcColumn)) {
+                    columnsMap.put(srcColumn, srcColumn);
+                }
+            }
+            dbMapping.setAllMapColumns(columnsMap);
+        } else {
+            columnsMap = dbMapping.getTargetColumns();
+        }
+        return columnsMap;
+    }
+
+    /**
+     * 设置 preparedStatement
+     *
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
+     */
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                if (value instanceof String) {
+                    pstmt.setString(i, (String) value);
+                } else if (value == null) {
+                    pstmt.setNull(i, type);
+                } else {
+                    pstmt.setString(i, value.toString());
+                }
+                break;
+            case Types.TINYINT:
+                if (value instanceof Number) {
+                    pstmt.setByte(i, ((Number) value).byteValue());
+                } else if (value instanceof String) {
+                    pstmt.setByte(i, Byte.parseByte((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                if (value instanceof BigDecimal) {
+                    pstmt.setBigDecimal(i, (BigDecimal) value);
+                } else if (value instanceof Byte) {
+                    pstmt.setInt(i, ((Byte) value).intValue());
+                } else if (value instanceof Short) {
+                    pstmt.setInt(i, ((Short) value).intValue());
+                } else if (value instanceof Integer) {
+                    pstmt.setInt(i, (Integer) value);
+                } else if (value instanceof Long) {
+                    pstmt.setLong(i, (Long) value);
+                } else if (value instanceof Float) {
+                    pstmt.setBigDecimal(i, new BigDecimal((float) value));
+                } else if (value instanceof Double) {
+                    pstmt.setBigDecimal(i, new BigDecimal((double) value));
+                } else if (value != null) {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.REAL:
+                if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                if (value instanceof Blob) {
+                    pstmt.setBlob(i, (Blob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof Clob) {
+                    pstmt.setClob(i, (Clob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof Date) {
+                    pstmt.setDate(i, (Date) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setDate(i, new Date(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
+                    } else {
+                        pstmt.setObject(i, value);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof Time) {
+                    pstmt.setTime(i, (Time) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    java.util.Date date = Util.parseDate(v);
+                    if (date != null) {
+                        pstmt.setTime(i, new Time(date.getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof Timestamp) {
+                    pstmt.setTimestamp(i, (Timestamp) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setTimestamp(i, new Timestamp(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
+                    } else {
+                        pstmt.setObject(i, value);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
+        }
+    }
+
+    public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
+        String result = "";
+        if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
+            if (dbMapping.isEscapeUpper()) {
+                //新增 字段默认大写加双引号
+                result += "\"" + dbMapping.getTargetDb() + "\".";
+            } else {
+                result += dbMapping.getTargetDb() + ".";
+            }
+        }
+
+        if (dbMapping.isEscapeUpper()) {
+            //新增  字段默认大写加双引号
+            result += "\"" + dbMapping.getTargetTable().replaceAll("\\.", "\".\"") + "\"";
+        } else {
+            result += dbMapping.getTargetTable();
+        }
+        return result;
+    }
+}

+ 142 - 0
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/support/TypeUtil.java

@@ -0,0 +1,142 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.support;
+
+import com.alibaba.druid.sql.ast.SQLDataType;
+import com.alibaba.druid.sql.ast.SQLDataTypeImpl;
+import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Phoenix类型   此类型完全新增
+ */
+public class TypeUtil {
+
+    private static String joinArgs(String type, Object[] args) {
+        if (args.length > 0) {
+            return type + "(" + StringUtils.join(args, ",") + ")";
+        }
+        return type;
+    }
+
+    /**
+     * 根据SQL的定义返回Phoenix的类型定义
+     * @see "https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html"
+     * @param definition SQL的字段定义
+     * @param limit 是否启用字段长度限制
+     * @return Phoenix字段类型定义
+     */
+    public static String getPhoenixType(SQLColumnDefinition definition, boolean limit) {
+        if (definition == null) return "VARCHAR";
+        SQLDataType sqlDataType = definition.getDataType();
+        SQLDataTypeImpl sqlDataType1 = sqlDataType instanceof SQLDataTypeImpl ? (SQLDataTypeImpl)sqlDataType : null;
+        boolean isUnsigned = sqlDataType1 != null && sqlDataType1.isUnsigned();
+
+        return getPhoenixType(sqlDataType.getName().toUpperCase(), sqlDataType.getArguments().toArray(), isUnsigned, limit);
+    }
+
+    //MySQL类型和Phoenix类型一一对应
+    public static String getPhoenixType(String name, Object[] args, boolean isUnsigned, boolean limit) {
+        switch (name) {
+            case "BIT":
+                if (limit) {
+                    return joinArgs("BINARY", args);
+                }
+                return "BINARY";
+            case "TINYINT":
+                if (isUnsigned) {
+                    return "UNSIGNED_TINYINT";
+                }
+                return "TINYINT";
+            case "BOOLEAN":
+            case "BOOL":
+                return "BOOLEAN";
+            case "SMALLINT":
+                if (isUnsigned) {
+                    return "UNSIGNED_SMALLINT";
+                }
+                return "SMALLINT";
+            case "MEDIUMINT":
+                return "INTEGER";
+            case "INT":
+            case "INTEGER":
+                if (isUnsigned) {
+                    return "UNSIGNED_INT";
+                }
+                return "INTEGER";
+            case "BIGINT":
+                if (isUnsigned) {
+                    return "UNSIGNED_LONG";
+                }
+                return "BIGINT";
+            case "FLOAT":
+                if (isUnsigned) {
+                    return "UNSIGNED_FLOAT";
+                }
+                return "FLOAT";
+            case "DOUBLE":
+                if (isUnsigned) {
+                    return "UNSIGNED_DOUBLE";
+                }
+                return "DOUBLE";
+            case "DECIMAL":
+                if (limit) {
+                    return joinArgs("DECIMAL", args);
+                }
+                return "DECIMAL";
+            case "DATE":
+                if (isUnsigned) {
+                    return "UNSIGNED_DATE";
+                }
+                return "DATE";
+            case "DATETIME":
+            case "TIMESTAMP":
+                if (isUnsigned) {
+                    return "UNSIGNED_TIMESTAMP";
+                }
+                return "TIMESTAMP";
+            case "TIME":
+                if (isUnsigned) {
+                    return "UNSIGNED_TIME";
+                }
+                return "TIME";
+            case "YEAR":
+                return "INTEGER";
+            case "CHAR":
+                if (limit) {
+                    return joinArgs(name, args);
+                }
+                return "VARCHAR";
+            case "VARCHAR":
+                if (limit) {
+                    return joinArgs(name, args);
+                }
+                return "VARCHAR";
+            case "BINARY":
+                if (limit) {
+                    return joinArgs(name, args);
+                }
+                return "VARBINARY";
+            case "VARBINARY":
+                return "VARBINARY";
+            case "TINYBLOB":
+                return "VARBINARY";
+            case "TINYTEXT":
+                return "VARCHAR";
+            case "BLOB":
+                return "VARBINARY";
+            case "TEXT":
+                return "VARCHAR";
+            case "MEDIUMBLOB":
+                return "VARBINARY";
+            case "MEDIUMTEXT":
+                return "VARCHAR";
+            case "LONGBLOB":
+                return "VARBINARY";
+            case "LONGTEXT":
+                return "VARCHAR";
+            case "ENUM":
+            case "SET":
+                return "VARCHAR";
+        }
+        return "VARCHAR";
+    }
+}

+ 1 - 0
client-adapter/phoenix/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+phoenix=com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter

+ 99 - 0
client-adapter/phoenix/src/main/resources/hbase-site.xml

@@ -0,0 +1,99 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+  <!--支持NAMESPACE映射 -->
+   <property>
+          <name>phoenix.schema.isNamespaceMappingEnabled</name>
+          <value>true</value>
+   </property>
+   <property>
+          <name>phoenix.schema.mapSystemTablesToNamespace</name>
+          <value>true</value>
+   </property>
+  <!--支持phoenix的二级索引 -->
+   <property>
+          <name>hbase.regionserver.wal.codec</name>
+          <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
+   </property>
+   <property>  
+        <name>hbase.table.sanity.checks</name>  
+        <value>false</value> 
+   </property> 
+  <!--启动压缩文件检查 -->
+   <property>
+     <name>hbase.regionserver.codecs</name>
+     <value>snappy,lzo</value>
+   </property>
+  <!-- rpc 超时时长 -->
+	<property>
+	  <name>hbase.rpc.timeout</name>
+	  <value>1200000</value>
+	</property>
+	<property>
+	  <name>hbase.client.operation.timeout</name>
+	  <value>600000</value>
+	</property>
+	<property>
+	  <name>hbase.client.scanner.timeout.period</name>
+	  <value>1200000</value>
+	</property>
+	<property>
+	  <name>hbase.client.scanner.timeout.period</name>
+	  <value>1200000</value>
+	</property>
+	<property>
+	 <name>phoenix.query.timeoutMs</name>
+	 <value>1800000</value>
+	</property>
+	<property>
+	 <name>phoenix.query.keepAliveMs</name>
+	 <value>600000</value>
+	</property>
+	<property>
+	  <name>hbase.client.ipc.pool.type</name>
+	  <value>RoundRobinPool</value>
+	</property>
+	<property>
+	  <name>hbase.client.ipc.pool.size</name>
+	  <value>10</value>
+	</property>
+	<property>
+	  <name>index.builder.threads.keepalivetime</name>
+	  <value>1200000</value>
+	</property>
+	<property>
+	  <name>index.write.threads.keepalivetime</name>
+	  <value>1200000</value>
+	</property>
+	<property>
+	  <name>hbase.htable.threads.keepalivetime</name>
+	  <value>1200000</value>
+	</property>
+<!--允许写入自定义WAL编辑,确保索引更新的正确性 -->
+	<property>
+	  <name>hbase.regionserver.wal.codec</name>
+	  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
+	</property>
+
+</configuration>

+ 2 - 0
client-adapter/phoenix/src/main/resources/phoenix/phoenix_common.properties

@@ -0,0 +1,2 @@
+# ͬ²½Ïß³ÌÊý
+threads =3

+ 22 - 0
client-adapter/phoenix/src/main/resources/phoenix/phoenixtest_user.yml

@@ -0,0 +1,22 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+outerAdapterKey: phoenix
+concurrent: true
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.user
+  escapeUpper: true  # 字段默认大写,并用双引号引起来
+  targetPk:
+    id: ID
+  mapAll: true  # 映射所有字段(默认true,不包含排除的字段)
+  alter: true   # 允许修改表结构(默认true,mapAll=true时可以新增,drop=true时可以删除字段)
+  drop: false   # 允许删除字段(默认false)
+  skipMissing: false # 是否跳过缺失的字段(默认false,允许新增字段时会自动同步缺失的字段;true时跳过缺失的字段)
+  limit: false  # 是否限与数据长度限制一致(默认false,不限制长度避免修改长度而无法修改)
+  targetColumns:
+    id: ID
+    name: NAME
+  excludeColumns: # 排除字段
+    - password

+ 32 - 0
client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/PhoenixConnectionTest.java

@@ -0,0 +1,32 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 16:58
+ * @Description:
+ */
+public class PhoenixConnectionTest {
+
+    public static void main(String[] args) {
+        Properties phoenixPro = new Properties();
+        //phoenix内部本身有连接池,不需要使用Druid初始化
+        phoenixPro.setProperty("hbase.rpc.timeout","600000");
+        phoenixPro.setProperty("hbase.client.scanner.timeout.period","600000");
+        phoenixPro.setProperty("dfs.client.socket-timeout","600000");
+        phoenixPro.setProperty("phoenix.query.keepAliveMs","600000");
+        phoenixPro.setProperty("phoenix.query.timeoutMs","3600000");
+        try {
+            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+            Connection connection = DriverManager.getConnection("jdbc:phoenix:zookeeper01,zookeeper02,zookeeper03:2181:/hbase/db", phoenixPro);
+            System.out.println(connection);
+            connection.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}

+ 29 - 0
client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/TestConfigLoad.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.test;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.config.ConfigLoader;
+import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 17:07
+ * @Description:
+ */
+public class TestConfigLoad {
+    @Before
+    public void before() {
+        // 加载数据源连接池
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+    }
+
+    @Test
+    public void testLoad() {
+        Map<String, MappingConfig> configMap = ConfigLoader.load(null);
+        Assert.assertFalse(configMap.isEmpty());
+    }
+}

+ 40 - 0
client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/TestConstant.java

@@ -0,0 +1,40 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.sql.SQLException;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 17:09
+ * @Description:
+ */
+public class TestConstant {
+    public final static String          jdbcUrl      = "jdbc:mysql://127.0.0.1:3306/canal_adapter?useUnicode=true";
+    public final static String          jdbcUser     = "root";
+    public final static String          jdbcPassword = "!123456";
+
+    public final static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 36 - 0
client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/sync/Common.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
+import com.alibaba.otter.canal.client.adapter.phoenix.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 23:15
+ * @Description:
+ */
+public class Common {
+    public static PhoenixAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("phoenix");
+        outerAdapterConfig.setKey("phoenix");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("jdbc.driverClassName", "org.apache.phoenix.jdbc.PhoenixDriver");
+        properties.put("jdbc.url", "jdbc:phoenix:zookeeper01,zookeeper02,zookeeper03:2181:/hbase/db");
+        outerAdapterConfig.setProperties(properties);
+
+        PhoenixAdapter adapter = new PhoenixAdapter();
+        adapter.init(outerAdapterConfig, null);
+        return adapter;
+    }
+
+    public static void main(String[] args) {
+        init();
+    }
+}

+ 73 - 0
client-adapter/phoenix/src/test/java/com/alibaba/otter/canal/client/adapter/phoenix/test/sync/PhoenixSyncTest.java

@@ -0,0 +1,73 @@
+package com.alibaba.otter.canal.client.adapter.phoenix.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * @author: lihua
+ * @date: 2021/1/5 23:16
+ * @Description:
+ */
+public class PhoenixSyncTest {
+
+    private PhoenixAdapter phoenixAdapter;
+
+    @Before
+    public void init() {
+        phoenixAdapter = Common.init();
+    }
+
+    @Test
+    public void testEtl() {
+        List<String> param = new ArrayList<>();
+        phoenixAdapter.etl("phoenixtest_user.yml", param);
+    }
+    @Test
+    public void testCount() {
+        phoenixAdapter.count("phoenixtest_user.yml");
+    }
+    @Test
+    public void test01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1);
+        data.put("name", "sixPulseExcalibur");
+        data.put("password", "123456");
+        dml.setData(dataList);
+
+        phoenixAdapter.sync(Collections.singletonList(dml));
+    }
+
+    @Test
+    public void test02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1);
+        data.put("name", "sixPulseExcalibur2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "sixPulseExcalibur");
+        dml.setOld(oldList);
+        phoenixAdapter.sync(Collections.singletonList(dml));
+    }
+}

+ 1 - 0
client-adapter/pom.xml

@@ -32,6 +32,7 @@
         <module>es7x</module>
         <module>escore</module>
         <module>kudu</module>
+        <module>phoenix</module>
     </modules>
 
     <licenses>