Explorar o código

新增client-launcher模块,外部数据落地适配器的总入口

mcy %!s(int64=6) %!d(string=hai) anos
pai
achega
47348e2e96
Modificáronse 23 ficheiros con 2304 adicións e 0 borrados
  1. 135 0
      client-launcher/pom.xml
  2. 57 0
      client-launcher/src/main/assembly/dev.xml
  3. 57 0
      client-launcher/src/main/assembly/release.xml
  4. 25 0
      client-launcher/src/main/bin/startup.bat
  5. 100 0
      client-launcher/src/main/bin/startup.sh
  6. 65 0
      client-launcher/src/main/bin/stop.sh
  7. 160 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/CanalAdapterLoader.java
  8. 63 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java
  9. 156 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/AbstractCanalAdapterWorker.java
  10. 154 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalAdapterKafkaWorker.java
  11. 174 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalAdapterWorker.java
  12. 33 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java
  13. 51 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapterConfiguration.java
  14. 36 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/example/LoggerAdapterExample.java
  15. 177 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/support/CanalClientConfig.java
  16. 115 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/support/Dml.java
  17. 527 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/support/ExtensionLoader.java
  18. 137 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/support/JdbcTypeUtil.java
  19. 11 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/support/SPI.java
  20. 1 0
      client-launcher/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter
  21. 22 0
      client-launcher/src/main/resources/canal-client.yml
  22. 47 0
      client-launcher/src/main/resources/logback.xml
  23. 1 0
      pom.xml

+ 135 - 0
client-launcher/pom.xml

@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.client-launcher</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client launcher module for otter ${project.version}</name>
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.kafka.client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>2.9.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>true</addMavenDescriptor>
+                    </archive>
+                    <excludes>
+                        <exclude>**/logback.xml</exclude>
+                        <exclude>**/canal-client.yml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <!-- 这是最新版本,推荐使用这个版本 -->
+                <version>2.2.1</version>
+                <executions>
+                    <execution>
+                        <id>assemble</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
+                            </descriptors>
+                            <finalName>canal_client</finalName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+
+        </profile>
+
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- 发布模式使用的maven assembly插件描述文件 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+                            </descriptors>
+                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
+                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>

+ 57 - 0
client-launcher/src/main/assembly/dev.xml

@@ -0,0 +1,57 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>META-INF/**</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 57 - 0
client-launcher/src/main/assembly/release.xml

@@ -0,0 +1,57 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>tar.gz</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>META-INF/**</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 25 - 0
client-launcher/src/main/bin/startup.bat

@@ -0,0 +1,25 @@
+@echo off
+@if not "%ECHO%" == ""  echo %ECHO%
+@if "%OS%" == "Windows_NT"  setlocal
+
+set ENV_PATH=.\
+if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
+
+set conf_dir=%ENV_PATH%\..\conf
+set canal_conf=%conf_dir%\canal-client.yml
+set logback_configurationFile=%conf_dir%\logback.xml
+
+set CLASSPATH=%conf_dir%
+set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
+
+set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
+set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
+set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
+set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
+
+set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
+
+set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.client.ClientLauncher
+echo start cmd : %CMD_STR%
+
+java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.client.ClientLauncher

+ 100 - 0
client-launcher/src/main/bin/startup.sh

@@ -0,0 +1,100 @@
+#!/bin/bash 
+
+current_path=`pwd`
+case "`uname`" in
+    Linux)
+		bin_abs_path=$(readlink -f $(dirname $0))
+		;;
+	*)
+		bin_abs_path=`cd $(dirname $0); pwd`
+		;;
+esac
+base=${bin_abs_path}/..
+canal_conf=$base/conf/canal-client.yml
+logback_configurationFile=$base/conf/logback.xml
+export LANG=en_US.UTF-8
+export BASE=$base
+
+if [ -f $base/bin/canal_client.pid ] ; then
+	echo "found canal_client.pid , Please run stop.sh first ,then startup.sh" 2>&2
+    exit 1
+fi
+
+## set java path
+if [ -z "$JAVA" ] ; then
+  JAVA=$(which java)
+fi
+
+ALIBABA_JAVA="/usr/alibaba/java/bin/java"
+TAOBAO_JAVA="/opt/taobao/java/bin/java"
+if [ -z "$JAVA" ]; then
+  if [ -f $ALIBABA_JAVA ] ; then
+  	JAVA=$ALIBABA_JAVA
+  elif [ -f $TAOBAO_JAVA ] ; then
+  	JAVA=$TAOBAO_JAVA
+  else
+  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
+    exit 1
+  fi
+fi
+
+case "$#" 
+in
+0 ) 
+	;;
+1 )	
+	var=$*
+	if [ -f $var ] ; then 
+		canal_conf=$var
+	else
+		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
+        exit
+	fi;;
+2 )	
+	var=$1
+	if [ -f $var ] ; then
+		canal_conf=$var
+	else 
+		if [ "$1" = "debug" ]; then
+			DEBUG_PORT=$2
+			DEBUG_SUSPEND="n"
+			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
+		fi
+     fi;;
+* )
+	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
+	exit;;
+esac
+
+str=`file -L $JAVA | grep 64-bit`
+if [ -n "$str" ]; then
+	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+else
+	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+fi
+
+JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
+CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
+
+if [ -e $canal_conf -a -e $logback_configurationFile ]
+then 
+	
+	for i in $base/lib/*;
+		do CLASSPATH=$i:"$CLASSPATH";
+	done
+ 	CLASSPATH="$base/conf:$CLASSPATH";
+ 	
+ 	echo "cd to $bin_abs_path for workaround relative path"
+  	cd $bin_abs_path
+ 	
+	echo LOG CONFIGURATION : $logback_configurationFile
+	echo canal conf : $canal_conf 
+	echo CLASSPATH :$CLASSPATH
+	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.client.ClientLauncher 1>>$base/logs/canal_client.log 2>&1 &
+	echo $! > $base/bin/canal_client.pid
+	
+	echo "cd to $current_path for continue"
+  	cd $current_path
+else 
+	echo "canal client conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
+fi

+ 65 - 0
client-launcher/src/main/bin/stop.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+
+cygwin=false;
+linux=false;
+case "`uname`" in
+    CYGWIN*)
+        cygwin=true
+        ;;
+    Linux*)
+    	linux=true
+    	;;
+esac
+
+get_pid() {	
+	STR=$1
+	PID=$2
+    if $cygwin; then
+        JAVA_CMD="$JAVA_HOME\bin\java"
+        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
+        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
+    else
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
+    fi
+    echo $JAVA_PID;
+}
+
+base=`dirname $0`/..
+pidfile=$base/bin/canal_client.pid
+if [ ! -f "$pidfile" ];then
+	echo "canal client is not running. exists"
+	exit
+fi
+
+pid=`cat $pidfile`
+if [ "$pid" == "" ] ; then
+	pid=`get_pid "appName=otter-canal"`
+fi
+
+echo -e "`hostname`: stopping canal $pid ... "
+kill $pid
+
+LOOPS=0
+while (true); 
+do 
+	gpid=`get_pid "appName=otter-canal" "$pid"`
+    if [ "$gpid" == "" ] ; then
+    	echo "Oook! cost:$LOOPS"
+    	`rm $pidfile`
+    	break;
+    fi
+    let LOOPS=LOOPS+1
+    sleep 1
+done

+ 160 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/CanalAdapterLoader.java

@@ -0,0 +1,160 @@
+package com.alibaba.otter.canal.client;
+
+import com.alibaba.otter.canal.client.adapter.CanalAdapterKafkaWorker;
+import com.alibaba.otter.canal.client.adapter.CanalAdapterWorker;
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.support.ExtensionLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * 外部适配器的加载器
+ *
+ * @author machengyuan 2018-8-19 下午11:45:49
+ * @version 1.0.0
+ */
+public class CanalAdapterLoader {
+    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
+
+    private CanalClientConfig canalClientConfig;
+
+    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
+
+    private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
+
+    private ExtensionLoader<CanalOuterAdapter> loader;
+
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
+        this.canalClientConfig = canalClientConfig;
+    }
+
+    /**
+     * 初始化canal-client、 canal-client-kafka的适配器
+     */
+    public void init() {
+        // canal instances 和 kafka topics 配置不能同时为空
+        if (canalClientConfig.getCanalInstances().isEmpty() && canalClientConfig.getKafkaTopics().isEmpty()) {
+            throw new RuntimeException("Blank config property: canalInstances or canalKafkaTopics");
+        }
+
+        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class, "" /*TODO canalClientConfig.getClassloaderPolicy()*/);
+
+        String canalServerHost = this.canalClientConfig.getCanalServerHost();
+        SocketAddress sa = null;
+        if (canalServerHost != null) {
+            String[] ipPort = canalServerHost.split(":");
+            sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
+        }
+        String zkHosts = this.canalClientConfig.getZookeeperHosts();
+
+        if (zkHosts == null && sa == null) {
+            throw new RuntimeException("Blank config property: canalServerHost or zookeeperHosts");
+        }
+
+        // 初始化canal-client的适配器
+        for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
+            List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+
+            for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
+                List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
+                for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                    loadConnector(c, canalOutConnectors);
+                }
+                canalOuterAdapterGroups.add(canalOutConnectors);
+            }
+            CanalAdapterWorker worker;
+            if (zkHosts != null) {
+                worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
+            } else {
+                worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+            }
+            canalWorkers.put(instance.getInstance(), worker);
+            worker.start();
+            logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
+        }
+
+        // 初始化canal-client-kafka的适配器
+        for (CanalClientConfig.KafkaTopic kafkaTopic : canalClientConfig.getKafkaTopics()) {
+            for (CanalClientConfig.Group group : kafkaTopic.getGroups()) {
+                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+
+                List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
+
+                for (CanalOuterAdapterConfiguration config  : group.getOutAdapters()) {
+//                    for (CanalOuterAdapterConfiguration config : adaptor.getOutAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
+//                    }
+                }
+                canalOuterAdapterGroups.add(canalOuterAdapters);
+
+                String zkServers = canalClientConfig.getZookeeperHosts();
+                CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkServers,
+                        canalClientConfig.getBootstrapServers(),
+                        kafkaTopic.getTopic(), group.getGroupId(), canalOuterAdapterGroups);
+                canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
+                canalKafkaWorker.start();
+                logger.info("Start adapter for canal-client kafka topic: {} succeed", kafkaTopic.getTopic() + "-" + group.getGroupId());
+            }
+        }
+    }
+
+    private void loadConnector(CanalOuterAdapterConfiguration config, List<CanalOuterAdapter> canalOutConnectors) {
+        try {
+            CanalOuterAdapter adapter = loader.getExtension(config.getName());
+            ClassLoader cl = Thread.currentThread().getContextClassLoader();
+            // 替换ClassLoader
+            Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
+            adapter.init(config);
+            Thread.currentThread().setContextClassLoader(cl);
+            canalOutConnectors.add(adapter);
+            logger.info("Load canal adapter: {} succeed", config.getName());
+        } catch (Exception e) {
+            logger.error("Load canal adapter: {} failed", config.getName(), e);
+        }
+    }
+
+    /**
+     * 销毁所有适配器
+     * 为防止canal实例太多造成销毁阻塞, 并行销毁
+     */
+    public void destroy() {
+        if (canalWorkers.size() > 0) {
+            ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
+            for (CanalAdapterWorker v : canalWorkers.values()) {
+                final CanalAdapterWorker caw = v;
+                stopExecutorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        caw.stop();
+                    }
+                });
+            }
+            stopExecutorService.shutdown();
+        }
+        if (canalKafkaWorkers.size() > 0) {
+            ExecutorService stopKafkaExecutorService = Executors.newFixedThreadPool(canalKafkaWorkers.size());
+            for (CanalAdapterKafkaWorker v : canalKafkaWorkers.values()) {
+                final CanalAdapterKafkaWorker cakw = v;
+                stopKafkaExecutorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        cakw.stop();
+                    }
+                });
+            }
+            stopKafkaExecutorService.shutdown();
+        }
+        logger.info("All canal adapters destroyed");
+    }
+}

+ 63 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java

@@ -0,0 +1,63 @@
+package com.alibaba.otter.canal.client;
+
+import com.alibaba.otter.canal.client.support.CanalClientConfig;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+
+public class ClientLauncher {
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger = LoggerFactory.getLogger(ClientLauncher.class);
+
+    public static void main(String[] args) {
+        try {
+            logger.info("## set default uncaught exception handler");
+            setGlobalUncaughtExceptionHandler();
+
+            logger.info("## load canal client configurations");
+            String conf = System.getProperty("client.conf", "classpath:canal-client.yml");
+            CanalClientConfig canalClientConfig;
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                canalClientConfig = new Yaml().loadAs(ClientLauncher.class.getClassLoader().getResourceAsStream(conf),
+                        CanalClientConfig.class);
+            } else {
+                canalClientConfig = new Yaml().loadAs(new FileInputStream(conf), CanalClientConfig.class);
+            }
+            logger.info("## start the canal client adapters.");
+            final CanalAdapterLoader adapterLoader = new CanalAdapterLoader(canalClientConfig);
+            adapterLoader.init();
+            logger.info("## the canal client adapters are running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal client adapters");
+                        adapterLoader.destroy();
+                    } catch (Throwable e) {
+                        logger.warn("## something goes wrong when stopping canal client adapters:", e);
+                    } finally {
+                        logger.info("## canal client adapters are down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## something goes wrong when starting up the canal client adapters:", e);
+            System.exit(0);
+        }
+    }
+
+    private static void setGlobalUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("UnCaughtException", e);
+            }
+        });
+    }
+}

+ 156 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/AbstractCanalAdapterWorker.java

@@ -0,0 +1,156 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.support.Dml;
+import com.alibaba.otter.canal.client.support.JdbcTypeUtil;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * 适配器工作线程抽象类
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public abstract class AbstractCanalAdapterWorker {
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected String canalDestination;                              // canal实例
+    protected List<List<CanalOuterAdapter>> canalOuterAdapters;     // 外部适配器
+    protected ExecutorService groupInnerExecutorService;            // 组内工作线程池
+    protected volatile boolean running = false;                     // 是否运行中
+    protected Thread thread = null;
+    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    /**
+     * 将每个entry转换为DML操作对象并调用所有适配器写入
+     *
+     * @param entries
+     */
+    protected void convertAndWrite(List<CanalEntry.Entry> entries) {
+        for (CanalEntry.Entry entry : entries) {
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                continue;
+            }
+
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
+            }
+
+            CanalEntry.EventType eventType = rowChange.getEventType();
+
+            final Dml dml = new Dml();
+            dml.setCanalDestination(canalDestination);
+            dml.setDatabase(entry.getHeader().getSchemaName());
+            dml.setTable(entry.getHeader().getTableName());
+            dml.setType(eventType.toString());
+            dml.setTs(System.currentTimeMillis());
+            dml.setSql(rowChange.getSql());
+            List<Map<String, Object>> data = new ArrayList<>();
+            List<Map<String, Object>> old = new ArrayList<>();
+
+            if (!rowChange.getIsDdl()) {
+                Set<String> updateSet = new HashSet<>();
+                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                    Map<String, Object> row = new LinkedHashMap<>();
+                    List<CanalEntry.Column> columns;
+
+                    if (eventType == CanalEntry.EventType.DELETE) {
+                        columns = rowData.getBeforeColumnsList();
+                    } else {
+                        columns = rowData.getAfterColumnsList();
+                    }
+
+                    for (CanalEntry.Column column : columns) {
+                        row.put(column.getName(), JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(), column.getValue(),
+                                column.getSqlType(), column.getMysqlType()));
+                        //获取update为true的字段
+                        if (column.getUpdated()) {
+                            updateSet.add(column.getName());
+                        }
+                    }
+                    if (!row.isEmpty()) {
+                        data.add(row);
+                    }
+
+                    if (eventType == CanalEntry.EventType.UPDATE) {
+                        Map<String, Object> rowOld = new LinkedHashMap<>();
+                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                            if (updateSet.contains(column.getName())) {
+                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(),
+                                        column.getValue(), column.getSqlType(), column.getMysqlType()));
+                            }
+                        }
+                        // update操作将记录修改前的值
+                        if (!rowOld.isEmpty()) {
+                            old.add(rowOld);
+                        }
+                    }
+                }
+                if (!data.isEmpty()) {
+                    dml.setData(data);
+                }
+                if (!old.isEmpty()) {
+                    dml.setOld(old);
+                }
+            }
+            List<Future<Boolean>> futures = new ArrayList<>();
+            // 组间适配器并行运行
+            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+                final List<CanalOuterAdapter> adapters = outerAdapters;
+                futures.add(groupInnerExecutorService.submit(
+                        new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() {
+                                boolean flag = true;
+                                // 组内适配器穿行运行,尽量不要配置组内适配器
+                                for (CanalOuterAdapter c : adapters) {
+                                    long begin = System.currentTimeMillis();
+                                    if (!c.writeOut(dml)) {
+                                        logger.error("{} write fail! data: {}", c.getClass().getName(), JSON.toJSONString(dml));
+
+                                        flag = false;
+                                    }
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("{} elapsed time: {}", c.getClass().getName(), (System.currentTimeMillis() - begin));
+                                    }
+                                }
+                                return flag;
+                            }
+                        })
+                );
+
+                // 等待所有适配器写入完成
+                // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
+                for (Future<Boolean> f : futures) {
+                    try {
+                        if (!f.get()) {
+                            logger.error("Outer adapter write failed");
+                        }
+                    } catch (InterruptedException | ExecutionException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    public abstract void start();
+
+    public abstract void stop();
+}

+ 154 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalAdapterKafkaWorker.java

@@ -0,0 +1,154 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
+    private KafkaCanalConnector connector;
+
+    private String topic;
+
+    public CanalAdapterKafkaWorker(String zkServers, String bootstrapServers, String topic, String groupId, List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        connector = KafkaCanalConnectors.newKafkaConnector(zkServers, bootstrapServers, topic, null, groupId);
+        // connector.setSessionTimeout(5L, TimeUnit.MINUTES);
+
+//        super.initSwitcher(topic);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+            connector.stopRunning();
+            running = false;
+
+//        if (switcher != null && !switcher.state()) {
+//            switcher.set(true);
+//        }
+
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("topic {} connectors' worker thread dead!", this.topic);
+            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (CanalOuterAdapter adapter : outerAdapters) {
+                    adapter.destroy();
+                }
+            }
+            logger.info("topic {} all connectors destroyed!", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running) ;
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        final AtomicBoolean executing = new AtomicBoolean(true);
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    try {
+//                        switcher.get(); //等待开关开启
+
+                        final Message message = connector.getWithoutAck();
+
+                        executing.set(true);
+                        if (message != null) {
+                            executor.submit(
+                                    new Runnable() {
+                                        @Override
+                                        public void run() {
+                                            try {
+                                                if (logger.isDebugEnabled()) {
+                                                    logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
+                                                }
+                                                long begin = System.currentTimeMillis();
+                                                convertAndWrite(message.getEntries());
+                                                long now = System.currentTimeMillis();
+                                                if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+                                                    logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
+                                                }
+                                                if (logger.isDebugEnabled()) {
+                                                    logger.debug("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
+                                                }
+                                            } catch (Exception e) {
+                                                logger.error(e.getMessage(), e);
+                                            } finally {
+                                                executing.compareAndSet(true, false);
+                                            }
+                                        }
+                                    }
+                            );
+
+                            while (executing.get()) { // keeping kafka client active
+                                connector.ack();
+                                Thread.sleep(500);
+                            }
+                        } else {
+                            connector.ack();
+                        }
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.disconnnect();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 174 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalAdapterWorker.java

@@ -0,0 +1,174 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+/**
+ * 原生canal-server对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
+    private static final int BATCH_SIZE = 50;
+    private static final int SO_TIMEOUT = 0;
+
+    private CanalConnector connector;
+
+    /**
+     * 单台client适配器worker的构造方法
+     *
+     * @param canalDestination   canal实例名
+     * @param address            canal-server地址
+     * @param canalOuterAdapters 外部适配器组
+     */
+    public CanalAdapterWorker(String canalDestination, SocketAddress address, List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.canalDestination = canalDestination;
+        groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
+    }
+
+    /**
+     * HA模式下client适配器worker的构造方法
+     *
+     * @param canalDestination   canal实例名
+     * @param zookeeperHosts     zookeeper地址
+     * @param canalOuterAdapters 外部适配器组
+     */
+    public CanalAdapterWorker(String canalDestination, String zookeeperHosts, List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.canalDestination = canalDestination;
+        groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
+        ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
+
+//        super.initSwitcher(canalDestination);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            thread.start();
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+//        if (switcher != null && !switcher.state()) {
+//            switcher.set(true);
+//        }
+
+            connector.stopRunning();
+            running = false;
+
+            logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("destination {} adapters' worker thread dead!", canalDestination);
+            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (CanalOuterAdapter adapter : outerAdapters) {
+                    adapter.destroy();
+                }
+            }
+            logger.info("destination {} all adapters destroyed!", canalDestination);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running) ;       //waiting until running == true
+        while (running) {
+            try {
+//                if (switcher != null) {
+//                    switcher.get();
+//                }
+                logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
+                connector.connect();
+                logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
+                connector.subscribe();
+                logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
+                while (running) {
+//                    try {
+//                        if (switcher != null) {
+//                            switcher.get();
+//                        }
+//                    } catch (TimeoutException e) {
+//                        break;
+//                    }
+
+                    // server配置canal.instance.network.soTimeout(默认: 30s) 范围内未与server交互,server将关闭本次socket连接
+                    Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
+                    long batchId = message.getId();
+                    try {
+                        int size = message.getEntries().size();
+
+                        if (batchId == -1 || size == 0) {
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                //ignore
+                            }
+                        } else {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("destination: {} batchId: {} batchSize: {} ", this.canalDestination, batchId, size);
+                            }
+                            long begin = System.currentTimeMillis();
+                            convertAndWrite(message.getEntries());
+                            long now = System.currentTimeMillis();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("destination: {} batchId: {} elapsed time: {} ms", this.canalDestination, batchId, now - begin);
+                            }
+                        }
+                        connector.ack(batchId); // 提交确认
+                    } catch (Exception e) {
+                        connector.rollback(batchId); // 处理失败, 回滚数据
+                        throw e;
+                    }
+                }
+
+
+            } catch (Exception e) {
+                logger.error("process error!", e);
+            } finally {
+                connector.disconnect();
+                logger.info("=============> Disconnect destination: {} <=============", this.canalDestination);
+            }
+
+            if (running) { //is reconnect
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+            }
+
+        }
+    }
+}

+ 33 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java

@@ -0,0 +1,33 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import com.alibaba.otter.canal.client.support.Dml;
+import com.alibaba.otter.canal.client.support.SPI;
+
+/**
+ * 外部适配器接口
+ *
+ * @author machengyuan 2018-8-18 下午10:14:02
+ * @version 1.0.0
+ */
+@SPI("logger")
+public interface CanalOuterAdapter {
+    /**
+     * 外部适配器初始化接口
+     *
+     * @param configuration 外部适配器配置信息
+     */
+    void init(CanalOuterAdapterConfiguration configuration);
+
+    /**
+     * 往适配器中写入数据
+     *
+     * @param dml DML操作数据包
+     * @return 是否成功
+     */
+    Boolean writeOut(Dml dml);
+
+    /**
+     * 外部适配器销毁接口
+     */
+    void destroy();
+}

+ 51 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapterConfiguration.java

@@ -0,0 +1,51 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import java.util.Properties;
+
+/**
+ * 外部适配器配置信息类
+ *
+ * @author machengyuan 2018-8-18 下午10:15:12
+ * @version 1.0.0
+ */
+public class CanalOuterAdapterConfiguration {
+    private String name;            // 适配器名称, 如: logger, hbase, es
+
+    private String hosts;           // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
+
+    private String zkHosts;         // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
+
+    private Properties properties;  // 其余参数, 可填写适配器中的所需的配置信息
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public String getZkHosts() {
+        return zkHosts;
+    }
+
+    public void setZkHosts(String zkHosts) {
+        this.zkHosts = zkHosts;
+    }
+}

+ 36 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/example/LoggerAdapterExample.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.client.example;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.support.Dml;
+import com.alibaba.otter.canal.client.support.SPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 外部适配器示例
+ *
+ * @author machengyuan 2018-8-19 下午11:45:38
+ * @version 1.0.0
+ */
+@SPI("logger") // logger参数对应CanalOuterAdapterConfiguration配置中的name
+public class LoggerAdapterExample implements CanalOuterAdapter {
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    @Override
+    public Boolean writeOut(Dml dml) {
+        // 直接输出一个日志信息
+        logger.info(dml.toString());
+        return true;
+    }
+
+    @Override
+    public void init(CanalOuterAdapterConfiguration configuration) {
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}

+ 177 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/support/CanalClientConfig.java

@@ -0,0 +1,177 @@
+package com.alibaba.otter.canal.client.support;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapterConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 配置信息类
+ *
+ * @author machengyuan 2018-8-18 下午10:40:12
+ * @version 1.0.0
+ */
+public class CanalClientConfig {
+    private String canalServerHost;
+
+    private String zookeeperHosts;
+
+    private Properties properties;
+
+    private String bootstrapServers;
+
+    private List<KafkaTopic> kafkaTopics = new ArrayList<>();
+
+    private List<CanalInstance> canalInstances = new ArrayList<>();
+
+    public String getCanalServerHost() {
+        return canalServerHost;
+    }
+
+    public void setCanalServerHost(String canalServerHost) {
+        this.canalServerHost = canalServerHost;
+    }
+
+    public String getZookeeperHosts() {
+        return zookeeperHosts;
+    }
+
+    public void setZookeeperHosts(String zookeeperHosts) {
+        this.zookeeperHosts = zookeeperHosts;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public List<KafkaTopic> getKafkaTopics() {
+        return kafkaTopics;
+    }
+
+    public void setKafkaTopics(List<KafkaTopic> kafkaTopics) {
+        this.kafkaTopics = kafkaTopics;
+    }
+
+    public List<CanalInstance> getCanalInstances() {
+        return canalInstances;
+    }
+
+    public void setCanalInstances(List<CanalInstance> canalInstances) {
+        this.canalInstances = canalInstances;
+    }
+
+    public static class CanalInstance {
+        private String instance;
+
+        private List<AdapterGroup> adapterGroups;
+
+        public String getInstance() {
+            return instance;
+        }
+
+        public void setInstance(String instance) {
+            if (instance != null) {
+                this.instance = instance.trim();
+            }
+        }
+
+        public List<AdapterGroup> getAdapterGroups() {
+            return adapterGroups;
+        }
+
+        public void setAdapterGroups(List<AdapterGroup> adapterGroups) {
+            this.adapterGroups = adapterGroups;
+        }
+    }
+
+    public static class AdapterGroup {
+        private List<CanalOuterAdapterConfiguration> outAdapters;
+
+        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+            return outAdapters;
+        }
+
+        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+            this.outAdapters = outAdapters;
+        }
+    }
+
+    public static class KafkaTopic {
+        private String topic;
+
+        private List<Group> groups = new ArrayList<>();
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public List<Group> getGroups() {
+            return groups;
+        }
+
+        public void setGroups(List<Group> groups) {
+            this.groups = groups;
+        }
+    }
+
+    public static class Group {
+        private String groupId;
+
+//        private List<Adaptor> adapters = new ArrayList<>();
+
+        private List<CanalOuterAdapterConfiguration> outAdapters;
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public void setGroupId(String groupId) {
+            this.groupId = groupId;
+        }
+
+        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+            return outAdapters;
+        }
+
+        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+            this.outAdapters = outAdapters;
+        }
+
+        //        public List<Adaptor> getAdapters() {
+//            return adapters;
+//        }
+//
+//        public void setAdapters(List<Adaptor> adapters) {
+//            this.adapters = adapters;
+//        }
+    }
+
+//    public static class Adaptor {
+//        private List<CanalOuterAdapterConfiguration> outAdapters;
+//
+//        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+//            return outAdapters;
+//        }
+//
+//        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+//            this.outAdapters = outAdapters;
+//        }
+//    }
+}

+ 115 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/support/Dml.java

@@ -0,0 +1,115 @@
+package com.alibaba.otter.canal.client.support;
+
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DML操作转换对象
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class Dml implements Serializable {
+    private static final long serialVersionUID = 2611556444074013268L;
+
+    private String canalDestination;
+    private String database;
+    private String table;
+    private String type;
+    private Long ts;
+    private String sql;
+    private List<Map<String, Object>> data;
+    private List<Map<String, Object>> old;
+
+
+    public String getCanalDestination() {
+        return canalDestination;
+    }
+
+    public void setCanalDestination(String canalDestination) {
+        this.canalDestination = canalDestination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Long getTs() {
+        return ts;
+    }
+
+    public void setTs(Long ts) {
+        this.ts = ts;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public List<Map<String, Object>> getData() {
+        return data;
+    }
+
+    public void setData(List<Map<String, Object>> data) {
+        this.data = data;
+    }
+
+    public List<Map<String, Object>> getOld() {
+        return old;
+    }
+
+    public void setOld(List<Map<String, Object>> old) {
+        this.old = old;
+    }
+
+    public void clear() {
+        canalDestination = null;
+        database = null;
+        table = null;
+        type = null;
+        ts = null;
+        data = null;
+        old = null;
+        sql = null;
+    }
+
+    @Override
+    public String toString() {
+        return "Dml{" +
+                "canalDestination='" + canalDestination + '\'' +
+                ", database='" + database + '\'' +
+                ", table='" + table + '\'' +
+                ", type='" + type + '\'' +
+                ", ts=" + ts +
+                ", sql='" + sql + '\'' +
+                ", data=" + data +
+                ", old=" + old +
+                '}';
+    }
+}

+ 527 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/support/ExtensionLoader.java

@@ -0,0 +1,527 @@
+package com.alibaba.otter.canal.client.support;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+
+/**
+ * SPI 类加载器
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class ExtensionLoader<T> {
+    private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
+
+    private static final String SERVICES_DIRECTORY = "META-INF/services/";
+
+    private static final String CANAL_DIRECTORY = "META-INF/canal/";
+
+    private static final String DEFAULT_CLASSLOADER_POLICY = "internal";
+
+    private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
+
+    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
+
+    private final Class<?> type;
+
+    private final String classLoaderPolicy;
+
+    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();
+
+    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
+
+    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
+
+    private String cachedDefaultName;
+
+    private ConcurrentHashMap<String, IllegalStateException> exceptions = new ConcurrentHashMap<>();
+
+    private static <T> boolean withExtensionAnnotation(Class<T> type) {
+        return type.isAnnotationPresent(SPI.class);
+    }
+
+    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
+        return getExtensionLoader(type, DEFAULT_CLASSLOADER_POLICY);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type, String classLoaderPolicy) {
+        if (type == null)
+            throw new IllegalArgumentException("Extension type == null");
+        if (!type.isInterface()) {
+            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
+        }
+        if (!withExtensionAnnotation(type)) {
+            throw new IllegalArgumentException("Extension type(" + type +
+                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
+        }
+
+        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
+        if (loader == null) {
+            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type, classLoaderPolicy));
+            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
+        }
+        return loader;
+    }
+
+    private ExtensionLoader(Class<?> type, String classLoaderPolicy) {
+        this.type = type;
+        this.classLoaderPolicy = classLoaderPolicy;
+    }
+
+    public String getExtensionName(T extensionInstance) {
+        return getExtensionName(extensionInstance.getClass());
+    }
+
+    public String getExtensionName(Class<?> extensionClass) {
+        return cachedNames.get(extensionClass);
+    }
+
+
+    public ConcurrentHashMap<String, IllegalStateException> getExceptions() {
+        return exceptions;
+    }
+
+    /**
+     * 返回扩展点实例,如果没有指定的扩展点或是还没加载(即实例化)则返回<code>null</code>注意:此方法不会触发扩展点的加载
+     * <p/>
+     * 一般应该调用{@link #getExtension(String)}方法获得扩展,这个方法会触发扩展点加载
+     *
+     * @see #getExtension(String)
+     */
+    @SuppressWarnings("unchecked")
+    public T getLoadedExtension(String name) {
+        if (name == null || name.length() == 0)
+            throw new IllegalArgumentException("Extension name == null");
+        Holder<Object> holder = cachedInstances.get(name);
+        if (holder == null) {
+            cachedInstances.putIfAbsent(name, new Holder<>());
+            holder = cachedInstances.get(name);
+        }
+        return (T) holder.get();
+    }
+
+    /**
+     * 返回已经加载的扩展点的名字
+     * <p/>
+     * 一般应该调用{@link #getSupportedExtensions()}方法获得扩展,这个方法会返回所有的扩展点
+     *
+     * @see #getSupportedExtensions()
+     */
+    public Set<String> getLoadedExtensions() {
+        return Collections.unmodifiableSet(new TreeSet<>(cachedInstances.keySet()));
+    }
+
+    /**
+     * 返回指定名字的扩展
+     *
+     * @param name
+     * @return
+     */
+    @SuppressWarnings("unchecked")
+    public T getExtension(String name) {
+        if (name == null || name.length() == 0)
+            throw new IllegalArgumentException("Extension name == null");
+        if ("true".equals(name)) {
+            return getDefaultExtension();
+        }
+        Holder<Object> holder = cachedInstances.get(name);
+        if (holder == null) {
+            cachedInstances.putIfAbsent(name, new Holder<>());
+            holder = cachedInstances.get(name);
+        }
+        Object instance = holder.get();
+        if (instance == null) {
+            synchronized (holder) {
+                instance = holder.get();
+                if (instance == null) {
+                    instance = createExtension(name);
+                    holder.set(instance);
+                }
+            }
+        }
+        return (T) instance;
+    }
+
+    /**
+     * 返回缺省的扩展,如果没有设置则返回<code>null</code>
+     */
+    public T getDefaultExtension() {
+        getExtensionClasses();
+        if (null == cachedDefaultName || cachedDefaultName.length() == 0
+                || "true".equals(cachedDefaultName)) {
+            return null;
+        }
+        return getExtension(cachedDefaultName);
+    }
+
+    public boolean hasExtension(String name) {
+        if (name == null || name.length() == 0)
+            throw new IllegalArgumentException("Extension name == null");
+        try {
+            return getExtensionClass(name) != null;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+
+    public Set<String> getSupportedExtensions() {
+        Map<String, Class<?>> clazzes = getExtensionClasses();
+        return Collections.unmodifiableSet(new TreeSet<String>(clazzes.keySet()));
+    }
+
+    /**
+     * 返回缺省的扩展点名,如果没有设置缺省则返回<code>null</code>
+     */
+    public String getDefaultExtensionName() {
+        getExtensionClasses();
+        return cachedDefaultName;
+    }
+
+    /**
+     * 编程方式添加新扩展点
+     *
+     * @param name  扩展点名
+     * @param clazz 扩展点类
+     * @throws IllegalStateException 要添加扩展点名已经存在
+     */
+    public void addExtension(String name, Class<?> clazz) {
+        getExtensionClasses(); // load classes
+
+        if (!type.isAssignableFrom(clazz)) {
+            throw new IllegalStateException("Input type " +
+                    clazz + "not implement Extension " + type);
+        }
+        if (clazz.isInterface()) {
+            throw new IllegalStateException("Input type " +
+                    clazz + "can not be interface!");
+        }
+
+        if (StringUtils.isBlank(name)) {
+            throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
+        }
+        if (cachedClasses.get().containsKey(name)) {
+            throw new IllegalStateException("Extension name " +
+                    name + " already existed(Extension " + type + ")!");
+        }
+
+        cachedNames.put(clazz, name);
+        cachedClasses.get().put(name, clazz);
+    }
+
+    /**
+     * 编程方式添加替换已有扩展点
+     *
+     * @param name  扩展点名
+     * @param clazz 扩展点类
+     * @throws IllegalStateException 要添加扩展点名已经存在
+     * @deprecated 不推荐应用使用,一般只在测试时可以使用
+     */
+    @Deprecated
+    public void replaceExtension(String name, Class<?> clazz) {
+        getExtensionClasses(); // load classes
+
+        if (!type.isAssignableFrom(clazz)) {
+            throw new IllegalStateException("Input type " +
+                    clazz + "not implement Extension " + type);
+        }
+        if (clazz.isInterface()) {
+            throw new IllegalStateException("Input type " +
+                    clazz + "can not be interface!");
+        }
+
+        if (StringUtils.isBlank(name)) {
+            throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
+        }
+        if (!cachedClasses.get().containsKey(name)) {
+            throw new IllegalStateException("Extension name " +
+                    name + " not existed(Extension " + type + ")!");
+        }
+
+        cachedNames.put(clazz, name);
+        cachedClasses.get().put(name, clazz);
+        cachedInstances.remove(name);
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private T createExtension(String name) {
+        Class<?> clazz = getExtensionClasses().get(name);
+        if (clazz == null) {
+            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
+                    type + ")  could not be instantiated: class could not be found");
+        }
+        try {
+            T instance = (T) EXTENSION_INSTANCES.get(clazz);
+            if (instance == null) {
+                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
+                instance = (T) EXTENSION_INSTANCES.get(clazz);
+            }
+            return instance;
+        } catch (Throwable t) {
+            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
+                    type + ")  could not be instantiated: " + t.getMessage(), t);
+        }
+    }
+
+
+    private Class<?> getExtensionClass(String name) {
+        if (type == null)
+            throw new IllegalArgumentException("Extension type == null");
+        if (name == null)
+            throw new IllegalArgumentException("Extension name == null");
+        Class<?> clazz = getExtensionClasses().get(name);
+        if (clazz == null)
+            throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
+        return clazz;
+    }
+
+    private Map<String, Class<?>> getExtensionClasses() {
+        Map<String, Class<?>> classes = cachedClasses.get();
+        if (classes == null) {
+            synchronized (cachedClasses) {
+                classes = cachedClasses.get();
+                if (classes == null) {
+                    classes = loadExtensionClasses();
+                    cachedClasses.set(classes);
+                }
+            }
+        }
+        return classes;
+    }
+
+    private String getJarDirectoryPath() {
+        URL url = Thread.currentThread().getContextClassLoader().getResource("");
+        if (url == null) {
+            throw new IllegalStateException("failed to get class loader resource");
+        }
+        String dirtyPath = url.toString();
+        String jarPath = dirtyPath.replaceAll("^.*file:/", ""); //removes file:/ and everything before it
+        jarPath = jarPath.replaceAll("jar!.*", "jar"); //removes everything after .jar, if .jar exists in dirtyPath
+        jarPath = jarPath.replaceAll("%20", " "); //necessary if path has spaces within
+        if (!jarPath.endsWith(".jar")) { // this is needed if you plan to run the app using Spring Tools Suit play button.
+            jarPath = jarPath.replaceAll("/classes/.*", "/classes/");
+        }
+        return Paths.get(jarPath).getParent().toString(); //Paths - from java 8
+    }
+
+    private Map<String, Class<?>> loadExtensionClasses() {
+        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
+        if (defaultAnnotation != null) {
+            String value = defaultAnnotation.value();
+            if ((value = value.trim()).length() > 0) {
+                String[] names = NAME_SEPARATOR.split(value);
+                if (names.length > 1) {
+                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+                            + ": " + Arrays.toString(names));
+                }
+                if (names.length == 1) cachedDefaultName = names[0];
+            }
+        }
+
+        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
+
+        //1. lib folder,customized extension classLoader (jar_dir/lib)
+        String dir = File.separator + this.getJarDirectoryPath() + File.separator + "lib";
+        logger.info("extension classpath dir: " + dir);
+        File externalLibDir = new File(dir);
+        if (externalLibDir.exists()) {
+            File[] files = externalLibDir.listFiles(new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                    return name.endsWith(".jar");
+                }
+            });
+            if (files != null) {
+                for (File f : files) {
+                    URL url = null;
+                    try {
+                        url = f.toURI().toURL();
+                    } catch (MalformedURLException e) {
+                        throw new RuntimeException("load extension jar failed!", e);
+                    }
+
+                    ClassLoader parent = Thread.currentThread().getContextClassLoader();
+                    URLClassLoader localClassLoader;
+                    if (!StringUtils.isEmpty(classLoaderPolicy) && DEFAULT_CLASSLOADER_POLICY.equalsIgnoreCase(classLoaderPolicy)) {
+                        localClassLoader = new URLClassLoader(new URL[]{url}, parent) {
+                            @Override
+                            public Class<?> loadClass(String name) throws ClassNotFoundException {
+                                Class<?> c = findLoadedClass(name);
+                                if (c != null) {
+                                    return c;
+                                }
+
+                                if (name.startsWith("java.")
+                                        || name.startsWith("org.slf4j.")
+                                        || name.startsWith("org.apache.logging")
+                                        || name.startsWith("org.apache.commons.logging.")) {
+                                    //|| name.startsWith("org.apache.hadoop.")) {
+                                    c = super.loadClass(name);
+                                }
+                                if (c != null)
+                                    return c;
+
+                                try {
+                                    // 先加载jar内的class,可避免jar冲突
+                                    c = findClass(name);
+                                } catch (ClassNotFoundException e) {
+                                    c = null;
+                                }
+                                if (c != null) {
+                                    return c;
+                                }
+
+                                return super.loadClass(name);
+                            }
+
+                            // @Override
+                            // public Enumeration<URL> getResources(String name) throws IOException {
+                            //     @SuppressWarnings("unchecked")
+                            //     Enumeration<URL>[] tmp = (Enumeration<URL>[]) new Enumeration<?>[2];
+
+                            //      tmp[0] = findResources(name);       //local class path first
+                            //     //tmp[1] = super.getResources(name);
+
+                            //     return new CompoundEnumeration<>(tmp);
+                            // }
+                        };
+                    } else {
+                        localClassLoader = new URLClassLoader(new URL[]{url}, parent);
+                    }
+
+                    loadFile(extensionClasses, CANAL_DIRECTORY, localClassLoader);
+                    loadFile(extensionClasses, SERVICES_DIRECTORY, localClassLoader);
+                }
+            }
+        }
+        //2. load inner extension class with default classLoader
+        ClassLoader classLoader = findClassLoader();
+        loadFile(extensionClasses, CANAL_DIRECTORY, classLoader);
+        loadFile(extensionClasses, SERVICES_DIRECTORY, classLoader);
+
+        return extensionClasses;
+    }
+
+    private void loadFile(Map<String, Class<?>> extensionClasses, String dir, ClassLoader classLoader) {
+        String fileName = dir + type.getName();
+        try {
+            Enumeration<URL> urls;
+            if (classLoader != null) {
+                urls = classLoader.getResources(fileName);
+            } else {
+                urls = ClassLoader.getSystemResources(fileName);
+            }
+            if (urls != null) {
+                while (urls.hasMoreElements()) {
+                    URL url = urls.nextElement();
+                    try {
+                        BufferedReader reader = null;
+                        try {
+                            reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
+                            String line = null;
+                            while ((line = reader.readLine()) != null) {
+                                final int ci = line.indexOf('#');
+                                if (ci >= 0) line = line.substring(0, ci);
+                                line = line.trim();
+                                if (line.length() > 0) {
+                                    try {
+                                        String name = null;
+                                        int i = line.indexOf('=');
+                                        if (i > 0) {
+                                            name = line.substring(0, i).trim();
+                                            line = line.substring(i + 1).trim();
+                                        }
+                                        if (line.length() > 0) {
+                                            Class<?> clazz = classLoader.loadClass(line);
+                                            //Class<?> clazz = Class.forName(line, true, classLoader);
+                                            if (!type.isAssignableFrom(clazz)) {
+                                                throw new IllegalStateException("Error when load extension class(interface: " +
+                                                        type + ", class line: " + clazz.getName() + "), class "
+                                                        + clazz.getName() + "is not subtype of interface.");
+                                            } else {
+                                                try {
+                                                    clazz.getConstructor(type);
+                                                } catch (NoSuchMethodException e) {
+                                                    clazz.getConstructor();
+                                                    String[] names = NAME_SEPARATOR.split(name);
+                                                    if (names != null && names.length > 0) {
+                                                        for (String n : names) {
+                                                            if (!cachedNames.containsKey(clazz)) {
+                                                                cachedNames.put(clazz, n);
+                                                            }
+                                                            Class<?> c = extensionClasses.get(n);
+                                                            if (c == null) {
+                                                                extensionClasses.put(n, clazz);
+                                                            } else if (c != clazz) {
+                                                                cachedNames.remove(clazz);
+                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
+                                                            }
+                                                        }
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    } catch (Throwable t) {
+                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
+                                        exceptions.put(line, e);
+                                    }
+                                }
+                            } // end of while read lines
+                        } finally {
+                            if (reader != null) {
+                                reader.close();
+                            }
+                        }
+                    } catch (Throwable t) {
+                        logger.error("Exception when load extension class(interface: " +
+                                type + ", class file: " + url + ") in " + url, t);
+                    }
+                } // end of while urls
+            }
+        } catch (Throwable t) {
+            logger.error("Exception when load extension class(interface: " +
+                    type + ", description file: " + fileName + ").", t);
+        }
+    }
+
+
+    private static ClassLoader findClassLoader() {
+        return ExtensionLoader.class.getClassLoader();
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getName() + "[" + type.getName() + "]";
+    }
+
+    private static class Holder<T> {
+
+        private volatile T value;
+
+        private void set(T value) {
+            this.value = value;
+        }
+
+        private T get() {
+            return value;
+        }
+
+    }
+}

+ 137 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/support/JdbcTypeUtil.java

@@ -0,0 +1,137 @@
+package com.alibaba.otter.canal.client.support;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class JdbcTypeUtil {
+    private static Logger logger = LoggerFactory.getLogger(JdbcTypeUtil.class);
+
+    public static Class<?> jdbcType2javaType(int jdbcType) {
+        switch (jdbcType) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                // return Boolean.class;
+            case Types.TINYINT:
+                return Byte.TYPE;
+            case Types.SMALLINT:
+                return Short.class;
+            case Types.INTEGER:
+                return Integer.class;
+            case Types.BIGINT:
+                return Long.class;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                return BigDecimal.class;
+            case Types.REAL:
+                return Float.class;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                return Double.class;
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                return String.class;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return byte[].class;
+            case Types.DATE:
+                return Date.class;
+            case Types.TIME:
+                return Time.class;
+            case Types.TIMESTAMP:
+                return Timestamp.class;
+            default:
+                return String.class;
+        }
+    }
+
+    public static Object typeConvert(String tableName, String columnName, String value, int sqlType, String mysqlType) {
+        if (value == null || value.equals("")) {
+            return null;
+        }
+
+        try {
+            Object res;
+            switch (sqlType) {
+                case Types.INTEGER:
+                    res = Integer.parseInt(value);
+                    break;
+                case Types.SMALLINT:
+                    res = Short.parseShort(value);
+                    break;
+                case Types.BIT:
+                case Types.TINYINT:
+                    res = Byte.parseByte(value);
+                    break;
+                case Types.BIGINT:
+                    if (mysqlType.startsWith("bigint") && mysqlType.endsWith("unsigned")) {
+                        res = new BigInteger(value);
+                    } else {
+                        res = Long.parseLong(value);
+                    }
+                    break;
+                //case Types.BIT:
+                case Types.BOOLEAN:
+                    res = !"0".equals(value);
+                    break;
+                case Types.DOUBLE:
+                case Types.FLOAT:
+                    res = Double.parseDouble(value);
+                    break;
+                case Types.REAL:
+                    res = Float.parseFloat(value);
+                    break;
+                case Types.DECIMAL:
+                case Types.NUMERIC:
+                    res = new BigDecimal(value);
+                    break;
+                case Types.BINARY:
+                case Types.VARBINARY:
+                case Types.LONGVARBINARY:
+                case Types.BLOB:
+                    res = value.getBytes("ISO-8859-1");
+                    break;
+                case Types.DATE:
+                    if (!value.startsWith("0000-00-00")) {
+                        value = value.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(value);
+                        res = new Date(dt.toDate().getTime());
+                    } else {
+                        res = null;
+                    }
+                    break;
+                case Types.TIME:
+                    value = "T" + value;
+                    DateTime dt = new DateTime(value);
+                    res = new Time(dt.toDate().getTime());
+                    break;
+                case Types.TIMESTAMP:
+                    if (!value.startsWith("0000-00-00")) {
+                        value = value.trim().replace(" ", "T");
+                        dt = new DateTime(value);
+                        res = new Timestamp(dt.toDate().getTime());
+                    } else {
+                        res = null;
+                    }
+                    break;
+                case Types.CLOB:
+                default:
+                    res = value;
+            }
+            return res;
+        } catch (Exception e) {
+            logger.error("table: {} column: {}, failed convert type {} to {}", tableName, columnName,value, sqlType);
+            return value;
+        }
+    }
+}

+ 11 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/support/SPI.java

@@ -0,0 +1,11 @@
+package com.alibaba.otter.canal.client.support;
+
+import java.lang.annotation.*;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface SPI {
+    //Default SPI name
+    String value() default "";
+}

+ 1 - 0
client-launcher/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter

@@ -0,0 +1 @@
+logger=com.alibaba.otter.canal.client.example.LoggerAdapterExample

+ 22 - 0
client-launcher/src/main/resources/canal-client.yml

@@ -0,0 +1,22 @@
+#canalServerHost: 127.0.0.1:11111
+zookeeperHosts: 127.0.0.1:2181
+#bootstrapServers: kafka1.mytest.com:9092,kafka2.mytest.com:9092
+
+canalInstances:
+- instance: example
+  adapterGroups:
+  - outAdapters:
+    - name: logger
+#    - name: hbase
+#      hosts:
+#      properties: {znodeParent: /hbase}
+
+#kafkaTopics:
+#- topic: devmysql4308
+#  groups:
+#  - groupId: devmysql4308_es
+#    adapters:
+#    - name: es
+#      hosts:
+#      zkHosts:
+#      properties: {clusterName: es-service-test}

+ 47 - 0
client-launcher/src/main/resources/logback.xml

@@ -0,0 +1,47 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+	
+	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal_client</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>512MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+
+	<logger name="com.alibaba.otter.canal.client" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+    
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</root>
+</configuration>

+ 1 - 0
pom.xml

@@ -120,6 +120,7 @@
         <module>kafka</module>
         <module>kafka-client</module>
         <module>prometheus</module>
+        <module>client-launcher</module>
     </modules>
 
     <dependencyManagement>