Browse Source

Merge pull request #14 from alibaba/master

merge
rewerma 6 years ago
parent
commit
8446e17703
36 changed files with 376 additions and 1554 deletions
  1. 3 3
      client-adapter/README.md
  2. 0 149
      client-adapter/example/pom.xml
  3. 0 57
      client-adapter/example/src/main/assembly/dev.xml
  4. 0 57
      client-adapter/example/src/main/assembly/release.xml
  5. 0 25
      client-adapter/example/src/main/bin/startup.bat
  6. 0 100
      client-adapter/example/src/main/bin/startup.sh
  7. 0 65
      client-adapter/example/src/main/bin/stop.sh
  8. 0 66
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java
  9. 0 168
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  10. 0 178
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  11. 0 177
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  12. 0 124
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  13. 0 186
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java
  14. 0 18
      client-adapter/example/src/main/resources/canal-client.yml
  15. 0 47
      client-adapter/example/src/main/resources/logback.xml
  16. 54 42
      client-adapter/hbase/pom.xml
  17. 50 19
      client-adapter/launcher/pom.xml
  18. 0 0
      client-adapter/launcher/src/main/assembly/release.xml
  19. 35 23
      client-adapter/logger/pom.xml
  20. 0 1
      client-adapter/pom.xml
  21. 1 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  22. 1 0
      deployer/src/main/resources/canal.properties
  23. 1 0
      deployer/src/main/resources/spring/default-instance.xml
  24. 1 0
      deployer/src/main/resources/spring/file-instance.xml
  25. 1 0
      deployer/src/main/resources/spring/group-instance.xml
  26. 1 0
      deployer/src/main/resources/spring/memory-instance.xml
  27. 53 4
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  28. 45 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  29. 12 4
      protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java
  30. 1 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java
  31. 8 11
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  32. 5 0
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
  33. 60 23
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  34. 8 2
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java
  35. 11 0
      store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java
  36. 25 3
      store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

+ 3 - 3
client-adapter/README.md

@@ -68,7 +68,7 @@ adapter.conf:
   adapterConfigs:                   # 适配器内部配置列表
   - hbase/mytest_person2.yml        # 类型/配置文件名, 这里示例就是对应HBase适配器hbase目录下的mytest_person2.yml文件
 ```
-## 2.2 适配器管理REST接口
+## 2.2 同步管理REST接口
 #### 2.2.1 查询所有订阅同步的canal destination或MQ topic
 ```
 curl http://127.0.0.1:8081/destinations
@@ -224,7 +224,7 @@ $BIGINT                   对应PHOENIX里的BIGINT                 8字节
 $TINYINT                  对应PHOENIX里的TINYINT                1字节
 $SMALLINT                 对应PHOENIX里的SMALLINT               2字节
 $FLOAT                    对应PHOENIX里的FLOAT                  4字节
-$DOUBLE                    对应PHOENIX里的DOUBLE                 8字节
+$DOUBLE                   对应PHOENIX里的DOUBLE                 8字节
 $BOOLEAN                  对应PHOENIX里的BOOLEAN                1字节
 $TIME                     对应PHOENIX里的TIME                   8字节
 $DATE                     对应PHOENIX里的DATE                   8字节
@@ -248,4 +248,4 @@ create 'MYTEST.PERSON', {NAME=>'CF'}
 java -jar canal-adapter-launcher.jar
 ```
 #### 验证
-修改mysql mytest.person表的数据, 将会自动同步到HBase的MYTEST.PERSON表下面, 并会打出DML的log
+修改mysql mytest.person表的数据, 将会自动同步到HBase的MYTEST.PERSON表下面, 并会打出DML的log

+ 0 - 149
client-adapter/example/pom.xml

@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>canal.client-adapter</artifactId>
-        <groupId>com.alibaba.otter</groupId>
-        <version>1.1.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <groupId>com.alibaba.otter</groupId>
-    <artifactId>client-adapter.example</artifactId>
-    <packaging>jar</packaging>
-    <name>canal client adapter example module for otter ${project.version}</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>client-adapter.common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>canal.client</artifactId>
-            <version>${canal_version}</version>
-        </dependency>
-        <!-- 单独引入rocketmq依赖 -->
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>4.3.0</version>
-        </dependency>
-        <!-- 单独引入kafka依赖 -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>1.1.1</version>
-        </dependency>
-        <dependency>
-            <groupId>org.yaml</groupId>
-            <artifactId>snakeyaml</artifactId>
-            <version>1.17</version>
-        </dependency>
-
-        <!-- outer adapter -->
-        <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>client-adapter.logger</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
-            <plugin>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>3.0.2</version>
-                <configuration>
-                    <archive>
-                        <addMavenDescriptor>true</addMavenDescriptor>
-                    </archive>
-                    <excludes>
-                        <exclude>**/logback.xml</exclude>
-                        <exclude>**/canal-client.yml</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <!-- 这是最新版本,推荐使用这个版本 -->
-                <version>2.2.1</version>
-                <executions>
-                    <execution>
-                        <id>assemble</id>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-                <configuration>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <attach>false</attach>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>dev</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-                <property>
-                    <name>env</name>
-                    <value>!release</value>
-                </property>
-            </activation>
-
-            <build>
-                <plugins>
-                    <plugin>
-                        <artifactId>maven-assembly-plugin</artifactId>
-                        <configuration>
-                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
-                            <descriptors>
-                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
-                            </descriptors>
-                            <finalName>canal_client</finalName>
-                            <outputDirectory>${project.build.directory}</outputDirectory>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-
-        </profile>
-
-        <profile>
-            <id>release</id>
-            <activation>
-                <property>
-                    <name>env</name>
-                    <value>release</value>
-                </property>
-            </activation>
-
-            <build>
-                <plugins>
-                    <plugin>
-                        <artifactId>maven-assembly-plugin</artifactId>
-                        <configuration>
-                            <!-- 发布模式使用的maven assembly插件描述文件 -->
-                            <descriptors>
-                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
-                            </descriptors>
-                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
-                            <finalName>${project.artifactId}-${project.version}</finalName>
-                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
-                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
-</project>

+ 0 - 57
client-adapter/example/src/main/assembly/dev.xml

@@ -1,57 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-	<id>dist</id>
-	<formats>
-		<format>dir</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>.</directory>
-			<outputDirectory>/</outputDirectory>
-			<includes>
-				<include>README*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/bin</directory>
-			<outputDirectory>bin</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<fileMode>0755</fileMode>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/conf</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<excludes>
-				<exclude>META-INF/**</exclude>
-			</excludes>
-		</fileSet>
-		<fileSet>
-			<directory>target</directory>
-			<outputDirectory>logs</outputDirectory>
-			<excludes>
-				<exclude>**/*</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-	<dependencySets>
-		<dependencySet>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>junit:junit</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
-</assembly>

+ 0 - 57
client-adapter/example/src/main/assembly/release.xml

@@ -1,57 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-	<id>dist</id>
-	<formats>
-		<format>tar.gz</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>.</directory>
-			<outputDirectory>/</outputDirectory>
-			<includes>
-				<include>README*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/bin</directory>
-			<outputDirectory>bin</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<fileMode>0755</fileMode>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/conf</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<excludes>
-				<exclude>META-INF/**</exclude>
-			</excludes>
-		</fileSet>
-		<fileSet>
-			<directory>target</directory>
-			<outputDirectory>logs</outputDirectory>
-			<excludes>
-				<exclude>**/*</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-	<dependencySets>
-		<dependencySet>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>junit:junit</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
-</assembly>

+ 0 - 25
client-adapter/example/src/main/bin/startup.bat

@@ -1,25 +0,0 @@
-@echo off
-@if not "%ECHO%" == ""  echo %ECHO%
-@if "%OS%" == "Windows_NT"  setlocal
-
-set ENV_PATH=.\
-if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
-
-set conf_dir=%ENV_PATH%\..\conf
-set canal_conf=%conf_dir%\canal-client.yml
-set logback_configurationFile=%conf_dir%\logback.xml
-
-set CLASSPATH=%conf_dir%
-set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
-
-set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
-set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
-set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
-set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
-
-set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
-
-set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.client.ClientLauncher
-echo start cmd : %CMD_STR%
-
-java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.client.ClientLauncher

+ 0 - 100
client-adapter/example/src/main/bin/startup.sh

@@ -1,100 +0,0 @@
-#!/bin/bash 
-
-current_path=`pwd`
-case "`uname`" in
-    Linux)
-		bin_abs_path=$(readlink -f $(dirname $0))
-		;;
-	*)
-		bin_abs_path=`cd $(dirname $0); pwd`
-		;;
-esac
-base=${bin_abs_path}/..
-canal_conf=$base/conf/canal-client.yml
-logback_configurationFile=$base/conf/logback.xml
-export LANG=en_US.UTF-8
-export BASE=$base
-
-if [ -f $base/bin/canal_client.pid ] ; then
-	echo "found canal_client.pid , Please run stop.sh first ,then startup.sh" 2>&2
-    exit 1
-fi
-
-## set java path
-if [ -z "$JAVA" ] ; then
-  JAVA=$(which java)
-fi
-
-ALIBABA_JAVA="/usr/alibaba/java/bin/java"
-TAOBAO_JAVA="/opt/taobao/java/bin/java"
-if [ -z "$JAVA" ]; then
-  if [ -f $ALIBABA_JAVA ] ; then
-  	JAVA=$ALIBABA_JAVA
-  elif [ -f $TAOBAO_JAVA ] ; then
-  	JAVA=$TAOBAO_JAVA
-  else
-  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
-    exit 1
-  fi
-fi
-
-case "$#" 
-in
-0 ) 
-	;;
-1 )	
-	var=$*
-	if [ -f $var ] ; then 
-		canal_conf=$var
-	else
-		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
-        exit
-	fi;;
-2 )	
-	var=$1
-	if [ -f $var ] ; then
-		canal_conf=$var
-	else 
-		if [ "$1" = "debug" ]; then
-			DEBUG_PORT=$2
-			DEBUG_SUSPEND="n"
-			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
-		fi
-     fi;;
-* )
-	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
-	exit;;
-esac
-
-str=`file -L $JAVA | grep 64-bit`
-if [ -n "$str" ]; then
-	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
-else
-	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
-fi
-
-JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
-CANAL_OPTS="-DappName=otter-canal-client -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
-
-if [ -e $canal_conf -a -e $logback_configurationFile ]
-then 
-	
-	for i in $base/lib/*;
-		do CLASSPATH=$i:"$CLASSPATH";
-	done
- 	CLASSPATH="$base/conf:$CLASSPATH";
- 	
- 	echo "cd to $bin_abs_path for workaround relative path"
-  	cd $bin_abs_path
- 	
-	echo LOG CONFIGURATION : $logback_configurationFile
-	echo canal conf : $canal_conf 
-	echo CLASSPATH :$CLASSPATH
-	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.client.ClientLauncher 1>>$base/logs/canal_client.log 2>&1 &
-	echo $! > $base/bin/canal_client.pid
-	
-	echo "cd to $current_path for continue"
-  	cd $current_path
-else 
-	echo "canal client conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
-fi

+ 0 - 65
client-adapter/example/src/main/bin/stop.sh

@@ -1,65 +0,0 @@
-#!/bin/bash
-
-cygwin=false;
-linux=false;
-case "`uname`" in
-    CYGWIN*)
-        cygwin=true
-        ;;
-    Linux*)
-    	linux=true
-    	;;
-esac
-
-get_pid() {	
-	STR=$1
-	PID=$2
-    if $cygwin; then
-        JAVA_CMD="$JAVA_HOME\bin\java"
-        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
-        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
-    else
-    	if $linux; then
-	        if [ ! -z "$PID" ]; then
-	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
-		    else 
-		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
-	        fi
-	    else
-	    	if [ ! -z "$PID" ]; then
-	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
-		    else 
-		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
-	        fi
-	    fi
-    fi
-    echo $JAVA_PID;
-}
-
-base=`dirname $0`/..
-pidfile=$base/bin/canal_client.pid
-if [ ! -f "$pidfile" ];then
-	echo "canal client is not running. exists"
-	exit
-fi
-
-pid=`cat $pidfile`
-if [ "$pid" == "" ] ; then
-	pid=`get_pid "appName=otter-canal-client"`
-fi
-
-echo -e "`hostname`: stopping canal $pid ... "
-kill $pid
-
-LOOPS=0
-while (true); 
-do 
-	gpid=`get_pid "appName=otter-canal-client" "$pid"`
-    if [ "$gpid" == "" ] ; then
-    	echo "Oook! cost:$LOOPS"
-    	`rm $pidfile`
-    	break;
-    fi
-    let LOOPS=LOOPS+1
-    sleep 1
-done

+ 0 - 66
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java

@@ -1,66 +0,0 @@
-package com.alibaba.otter.canal.client;
-
-import java.io.FileInputStream;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import com.alibaba.otter.canal.client.adapter.loader.CanalAdapterLoader;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-
-public class ClientLauncher {
-
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(ClientLauncher.class);
-
-    public static void main(String[] args) {
-        try {
-            logger.info("## set default uncaught exception handler");
-            setGlobalUncaughtExceptionHandler();
-
-            logger.info("## load canal client configurations");
-            String conf = System.getProperty("client.conf", "classpath:canal-client.yml");
-            CanalClientConfig canalClientConfig;
-            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
-                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                canalClientConfig = new Yaml().loadAs(ClientLauncher.class.getClassLoader().getResourceAsStream(conf),
-                    CanalClientConfig.class);
-            } else {
-                canalClientConfig = new Yaml().loadAs(new FileInputStream(conf), CanalClientConfig.class);
-            }
-            logger.info("## start the canal client adapters.");
-            final CanalAdapterLoader adapterLoader = new CanalAdapterLoader(canalClientConfig);
-            adapterLoader.init();
-            logger.info("## the canal client adapters are running now ......");
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-
-                public void run() {
-                    try {
-                        logger.info("## stop the canal client adapters");
-                        adapterLoader.destroy();
-                    } catch (Throwable e) {
-                        logger.warn("## something goes wrong when stopping canal client adapters:", e);
-                    } finally {
-                        logger.info("## canal client adapters are down.");
-                    }
-                }
-
-            });
-        } catch (Throwable e) {
-            logger.error("## something goes wrong when starting up the canal client adapters:", e);
-            System.exit(0);
-        }
-    }
-
-    private static void setGlobalUncaughtExceptionHandler() {
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                logger.error("UnCaughtException", e);
-            }
-        });
-    }
-}

+ 0 - 168
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -1,168 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.loader;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-
-/**
- * 适配器工作线程抽象类
- *
- * @author machengyuan 2018-8-19 下午11:30:49
- * @version 1.0.0
- */
-public abstract class AbstractCanalAdapterWorker {
-
-    protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
-
-    protected String                          canalDestination;                                                 // canal实例
-    protected List<List<OuterAdapter>>        canalOuterAdapters;                                               // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
-    protected volatile boolean                running = false;                                                  // 是否运行中
-    protected Thread                          thread  = null;
-    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
-
-                                                          @Override
-                                                          public void uncaughtException(Thread t, Throwable e) {
-                                                              logger.error("parse events has an error", e);
-                                                          }
-                                                      };
-
-    protected void writeOut(final Message message) {
-        List<Future<Boolean>> futures = new ArrayList<>();
-        // 组间适配器并行运行
-        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<OuterAdapter> adapters = outerAdapters;
-            futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    try {
-                        // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (final OuterAdapter c : adapters) {
-                            long begin = System.currentTimeMillis();
-                            MessageUtil.parse4Dml(canalDestination, message, new MessageUtil.Consumer<Dml>() {
-
-                                @Override
-                                public void accept(Dml dml) {
-                                    c.sync(dml);
-                                }
-                            });
-
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("{} elapsed time: {}",
-                                    c.getClass().getName(),
-                                    (System.currentTimeMillis() - begin));
-                            }
-                        }
-                        return true;
-                    } catch (Exception e) {
-                        return false;
-                    }
-                }
-            }));
-
-            // 等待所有适配器写入完成
-            // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-            for (Future<Boolean> f : futures) {
-                try {
-                    if (!f.get()) {
-                        logger.error("Outer adapter write failed");
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-    protected void writeOut(final FlatMessage flatMessage) {
-        List<Future<Boolean>> futures = new ArrayList<>();
-        // 组间适配器并行运行
-        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<OuterAdapter> adapters = outerAdapters;
-            futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    try {
-                        // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (OuterAdapter c : adapters) {
-                            long begin = System.currentTimeMillis();
-                            Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
-                            c.sync(dml);
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("{} elapsed time: {}",
-                                    c.getClass().getName(),
-                                    (System.currentTimeMillis() - begin));
-                            }
-                        }
-                        return true;
-                    } catch (Exception e) {
-                        return false;
-                    }
-                }
-            }));
-
-            // 等待所有适配器写入完成
-            // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-            for (Future<Boolean> f : futures) {
-                try {
-                    if (!f.get()) {
-                        logger.error("Outer adapter write failed");
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-    protected void writeOut(Message message, String topic) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
-        }
-        long begin = System.currentTimeMillis();
-        writeOut(message);
-        long now = System.currentTimeMillis();
-        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
-    }
-
-    protected void stopOutAdapters() {
-        if (thread != null) {
-            try {
-                thread.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        groupInnerExecutorService.shutdown();
-        logger.info("topic connectors' worker thread dead!");
-        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
-            for (OuterAdapter adapter : outerAdapters) {
-                adapter.destroy();
-            }
-        }
-        logger.info("topic all connectors destroyed!");
-    }
-
-    public abstract void start();
-
-    public abstract void stop();
-}

+ 0 - 178
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -1,178 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.loader;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.consumer.CommitFailedException;
-import org.apache.kafka.common.errors.WakeupException;
-
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-
-/**
- * kafka对应的client适配器工作线程
- *
- * @author machengyuan 2018-8-19 下午11:30:49
- * @version 1.0.0
- */
-public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
-
-    private KafkaCanalConnector connector;
-
-    private String              topic;
-
-    private boolean             flatMessage;
-
-    public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
-        this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
-        this.topic = topic;
-        this.canalDestination = topic;
-        this.flatMessage = flatMessage;
-        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
-        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
-
-        // super.initSwitcher(topic);
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
-            thread.setUncaughtExceptionHandler(handler);
-            running = true;
-            thread.start();
-        }
-    }
-
-    @Override
-    public void stop() {
-        try {
-            if (!running) {
-                return;
-            }
-
-            connector.stopRunning();
-            running = false;
-
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
-            if (thread != null) {
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            groupInnerExecutorService.shutdown();
-            logger.info("topic {} connectors' worker thread dead!", this.topic);
-            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (OuterAdapter adapter : outerAdapters) {
-                    adapter.destroy();
-                }
-            }
-            logger.info("topic {} all connectors destroyed!", this.topic);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    private void process() {
-        while (!running)
-            ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        // final AtomicBoolean executing = new AtomicBoolean(true);
-        while (running) {
-            try {
-                logger.info("=============> Start to connect topic: {} <=============", this.topic);
-                connector.connect();
-                logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
-                connector.subscribe();
-                logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
-                while (running) {
-                    try {
-                        // switcher.get(); //等待开关开启
-
-                        List<?> messages;
-                        if (!flatMessage) {
-                            messages = connector.getWithoutAck();
-                        } else {
-                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
-                        }
-                        if (messages != null) {
-                            for (final Object message : messages) {
-                                if (message instanceof FlatMessage) {
-                                    writeOut((FlatMessage) message);
-                                } else {
-                                    writeOut((Message) message);
-                                }
-                                // executing.set(true);
-                                // if (message != null) {
-                                // executor.submit(new Runnable() {
-                                //
-                                // @Override
-                                // public void run() {
-                                // try {
-                                // if (message instanceof FlatMessage) {
-                                // writeOut((FlatMessage) message);
-                                // } else {
-                                // writeOut((Message) message);
-                                // }
-                                // } catch (Exception e) {
-                                // logger.error(e.getMessage(), e);
-                                // } finally {
-                                // executing.compareAndSet(true, false);
-                                // }
-                                // }
-                                // });
-                                //
-                                // // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
-                                // long currentTS = System.currentTimeMillis();
-                                // while (executing.get()) {
-                                // // 大于10秒未消费完ack一次keep alive
-                                // if (System.currentTimeMillis() - currentTS > 10000) {
-                                // connector.ack();
-                                // currentTS = System.currentTimeMillis();
-                                // }
-                                // }
-                                // }
-                            }
-                        }
-                        connector.ack();
-                    } catch (CommitFailedException e) {
-                        logger.warn(e.getMessage());
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                        TimeUnit.SECONDS.sleep(1L);
-                    }
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        executor.shutdown();
-
-        try {
-            connector.unsubscribe();
-        } catch (WakeupException e) {
-            // No-op. Continue process
-        }
-        connector.disconnect();
-        logger.info("=============> Disconnect topic: {} <=============", this.topic);
-    }
-}

+ 0 - 177
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -1,177 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.loader;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-
-/**
- * MQ外部适配器的加载器
- *
- * @version 1.0.0
- */
-public class CanalAdapterLoader {
-
-    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);
-
-    private CanalClientConfig                       canalClientConfig;
-
-    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();
-
-    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
-
-    private ExtensionLoader<OuterAdapter>      loader;
-
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
-        this.canalClientConfig = canalClientConfig;
-    }
-
-    /**
-     * 初始化canal-client、 canal-client-rocketmq的适配器
-     */
-    public void init() {
-        // canal instances 和 mq topics 配置不能同时为空
-        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
-            throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
-        }
-
-        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class, "" /*
-                                                                                 * TODO
-                                                                                 * canalClientConfig
-                                                                                 * .
-                                                                                 * getClassloaderPolicy
-                                                                                 * (
-                                                                                 * )
-                                                                                 */);
-
-        String canalServerHost = this.canalClientConfig.getCanalServerHost();
-        SocketAddress sa = null;
-        if (canalServerHost != null) {
-            String[] ipPort = canalServerHost.split(":");
-            sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
-        }
-        String zkHosts = this.canalClientConfig.getZookeeperHosts();
-
-        // 初始化canal-client的适配器
-        if (canalClientConfig.getCanalInstances() != null) {
-            for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-
-                for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
-                        loadConnector(c, canalOutConnectors);
-                    }
-                    canalOuterAdapterGroups.add(canalOutConnectors);
-                }
-                CanalAdapterWorker worker;
-                if (zkHosts != null) {
-                    worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
-                } else {
-                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
-                }
-                canalWorkers.put(instance.getInstance(), worker);
-                worker.start();
-                logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
-            }
-        }
-
-        // 初始化canal-client-mq的适配器
-        if (canalClientConfig.getMqTopics() != null) {
-            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
-                for (CanalClientConfig.Group group : topic.getGroups()) {
-                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-
-                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
-
-                    for (OuterAdapterConfig config : group.getOutAdapters()) {
-                        loadConnector(config, canalOuterAdapters);
-                    }
-                    canalOuterAdapterGroups.add(canalOuterAdapters);
-                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups);
-                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
-                        rocketMQWorker.start();
-                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig.getBootstrapServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups,
-                            canalClientConfig.getFlatMessage());
-                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
-                        canalKafkaWorker.start();
-                    }
-                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed", topic.getTopic() + "-"
-                                                                                             + group.getGroupId());
-
-                }
-            }
-        }
-    }
-
-    private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
-        try {
-            OuterAdapter adapter = loader.getExtension(config.getName());
-            ClassLoader cl = Thread.currentThread().getContextClassLoader();
-            // 替换ClassLoader
-            Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
-            adapter.init(config);
-            Thread.currentThread().setContextClassLoader(cl);
-            canalOutConnectors.add(adapter);
-            logger.info("Load canal adapter: {} succeed", config.getName());
-        } catch (Exception e) {
-            logger.error("Load canal adapter: {} failed", config.getName(), e);
-        }
-    }
-
-    /**
-     * 销毁所有适配器 为防止canal实例太多造成销毁阻塞, 并行销毁
-     */
-    public void destroy() {
-        if (canalWorkers.size() > 0) {
-            ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
-            for (CanalAdapterWorker v : canalWorkers.values()) {
-                final CanalAdapterWorker caw = v;
-                stopExecutorService.submit(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        caw.stop();
-                    }
-                });
-            }
-            stopExecutorService.shutdown();
-        }
-        if (canalMQWorker.size() > 0) {
-            ExecutorService stopMQWokerService = Executors.newFixedThreadPool(canalMQWorker.size());
-            for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
-                final AbstractCanalAdapterWorker worker = tmp;
-                stopMQWokerService.submit(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        worker.stop();
-                    }
-                });
-            }
-            stopMQWokerService.shutdown();
-        }
-        logger.info("All canal adapters destroyed");
-    }
-}

+ 0 - 124
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -1,124 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.loader;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.consumer.CommitFailedException;
-import org.apache.kafka.common.errors.WakeupException;
-
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
-import com.alibaba.otter.canal.protocol.Message;
-
-/**
- * kafka对应的client适配器工作线程
- *
- * @author machengyuan 2018-8-19 下午11:30:49
- * @version 1.0.0
- */
-public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
-
-    private RocketMQCanalConnector connector;
-
-    private String                 topic;
-
-    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<OuterAdapter>> canalOuterAdapters){
-        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
-        this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
-        this.topic = topic;
-        this.canalDestination = topic;
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
-            thread.setUncaughtExceptionHandler(handler);
-            running = true;
-            thread.start();
-        }
-    }
-
-    @Override
-    public void stop() {
-        try {
-            if (!running) {
-                return;
-            }
-            connector.stopRunning();
-            running = false;
-            logger.info("Stop topic {} out adapters begin", this.topic);
-            stopOutAdapters();
-            logger.info("Stop topic {} out adapters end", this.topic);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    private void process() {
-        while (!running)
-            ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        while (running) {
-            try {
-                logger.info("=============> Start to connect topic: {} <=============", this.topic);
-                connector.connect();
-                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
-                connector.subscribe();
-                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
-                while (running) {
-                    try {
-                        // switcher.get(); //等待开关开启
-
-                        final Message message = connector.getWithoutAck(1);
-                        if (message != null) {
-                            executor.submit(new Runnable() {
-
-                                @Override
-                                public void run() {
-                                    try {
-                                        writeOut(message, topic);
-                                    } catch (Exception e) {
-                                        logger.error(e.getMessage(), e);
-                                    }
-                                    connector.ack(message.getId());
-                                }
-                            });
-                        } else {
-                            logger.debug("Message is null");
-                        }
-                    } catch (CommitFailedException e) {
-                        logger.warn(e.getMessage());
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                        TimeUnit.SECONDS.sleep(1L);
-                    }
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        executor.shutdown();
-
-        try {
-            connector.unsubscribe();
-        } catch (WakeupException e) {
-            // No-op. Continue process
-        }
-        connector.stopRunning();
-        logger.info("=============> Disconnect topic: {} <=============", this.topic);
-    }
-}

+ 0 - 186
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java

@@ -1,186 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.loader;
-
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
-import com.alibaba.otter.canal.protocol.Message;
-
-/**
- * 原生canal-server对应的client适配器工作线程
- *
- * @author machengyuan 2018-8-19 下午11:30:49
- * @version 1.0.0
- */
-public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
-
-    private static final int BATCH_SIZE = 50;
-    private static final int SO_TIMEOUT = 0;
-
-    private CanalConnector   connector;
-
-    /**
-     * 单台client适配器worker的构造方法
-     *
-     * @param canalDestination canal实例名
-     * @param address canal-server地址
-     * @param canalOuterAdapters 外部适配器组
-     */
-    public CanalAdapterWorker(String canalDestination, SocketAddress address,
-                              List<List<OuterAdapter>> canalOuterAdapters){
-        this.canalOuterAdapters = canalOuterAdapters;
-        this.canalDestination = canalDestination;
-        groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
-        connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
-    }
-
-    /**
-     * HA模式下client适配器worker的构造方法
-     *
-     * @param canalDestination canal实例名
-     * @param zookeeperHosts zookeeper地址
-     * @param canalOuterAdapters 外部适配器组
-     */
-    public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
-                              List<List<OuterAdapter>> canalOuterAdapters){
-        this.canalOuterAdapters = canalOuterAdapters;
-        this.canalDestination = canalDestination;
-        groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
-        connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
-        ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
-
-        // super.initSwitcher(canalDestination);
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
-            thread.setUncaughtExceptionHandler(handler);
-            thread.start();
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-        try {
-            if (!running) {
-                return;
-            }
-
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
-            connector.stopRunning();
-            running = false;
-
-            logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
-            if (thread != null) {
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            groupInnerExecutorService.shutdown();
-            logger.info("destination {} adapters' worker thread dead!", canalDestination);
-            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (OuterAdapter adapter : outerAdapters) {
-                    adapter.destroy();
-                }
-            }
-            logger.info("destination {} all adapters destroyed!", canalDestination);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    private void process() {
-        while (!running)
-            ; // waiting until running == true
-        while (running) {
-            try {
-                // if (switcher != null) {
-                // switcher.get();
-                // }
-                logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
-                connector.connect();
-                logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
-                connector.subscribe();
-                logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
-                while (running) {
-                    // try {
-                    // if (switcher != null) {
-                    // switcher.get();
-                    // }
-                    // } catch (TimeoutException e) {
-                    // break;
-                    // }
-
-                    // server配置canal.instance.network.soTimeout(默认: 30s)
-                    // 范围内未与server交互,server将关闭本次socket连接
-                    Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
-                    long batchId = message.getId();
-                    try {
-                        int size = message.getEntries().size();
-
-                        if (batchId == -1 || size == 0) {
-                            try {
-                                Thread.sleep(1000);
-                            } catch (InterruptedException e) {
-                                // ignore
-                            }
-                        } else {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("destination: {} batchId: {} batchSize: {} ",
-                                    this.canalDestination,
-                                    batchId,
-                                    size);
-                            }
-                            long begin = System.currentTimeMillis();
-                            writeOut(message);
-                            long now = System.currentTimeMillis();
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("destination: {} batchId: {} elapsed time: {} ms",
-                                    this.canalDestination,
-                                    batchId,
-                                    now - begin);
-                            }
-                        }
-                        connector.ack(batchId); // 提交确认
-                    } catch (Exception e) {
-                        connector.rollback(batchId); // 处理失败, 回滚数据
-                        throw e;
-                    }
-                }
-
-            } catch (Exception e) {
-                logger.error("process error!", e);
-            } finally {
-                connector.disconnect();
-                logger.info("=============> Disconnect destination: {} <=============", this.canalDestination);
-            }
-
-            if (running) { // is reconnect
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-
-        }
-    }
-}

+ 0 - 18
client-adapter/example/src/main/resources/canal-client.yml

@@ -1,18 +0,0 @@
-canalServerHost: 127.0.0.1:11111
-#zookeeperHosts: slave1:2181
-#bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
-flatMessage: false
-
-canalInstances:
-- instance: example
-  adapterGroups:
-  - outAdapters:
-    - name: logger
-
-#mqTopics:
-#- mqMode: rocketmq
-#  topic: example
-#  groups:
-#  - groupId: example
-#    outAdapters:
-#    - name: logger

+ 0 - 47
client-adapter/example/src/main/resources/logback.xml

@@ -1,47 +0,0 @@
-<configuration scan="true" scanPeriod=" 5 seconds">
-	<jmxConfigurator />
-	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-		<encoder>
-			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-			</pattern>
-		</encoder>
-	</appender>
-	
-	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
-		<discriminator>
-			<Key>destination</Key>
-			<DefaultValue>canal_client</DefaultValue>
-		</discriminator>
-		<sift>
-			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
-				<File>../logs/${destination}.log</File>
-				<rollingPolicy
-					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-					<!-- rollover daily -->
-					<fileNamePattern>../logs/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
-					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-						<!-- or whenever the file size reaches 100MB -->
-						<maxFileSize>512MB</maxFileSize>
-					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
-				</rollingPolicy>
-				<encoder>
-					<pattern>
-						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-					</pattern>
-				</encoder>
-			</appender>
-		</sift>
-	</appender>
-
-	<logger name="com.alibaba.otter.canal.client" additivity="false">
-		<level value="INFO" />
-		<appender-ref ref="STDOUT"/>
-		<appender-ref ref="CANAL-ROOT" />
-	</logger>
-    
-	<root level="WARN">
-		<!--<appender-ref ref="STDOUT"/>-->
-		<appender-ref ref="CANAL-ROOT" />
-	</root>
-</configuration>

+ 54 - 42
client-adapter/hbase/pom.xml

@@ -43,48 +43,60 @@
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4</version>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
+
+    <profiles>
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <version>2.4</version>
                         <configuration>
-                            <tasks>
-                                <copy todir="${project.basedir}/../launcher/target/config/hbase" overwrite="true" >
-                                    <fileset dir="${project.basedir}/src/main/resources/hbase" erroronmissingdir="true">
-                                        <include name="*.yml"/>
-                                    </fileset>
-                                </copy>
-                            </tasks>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
                         </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-antrun-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>run</goal>
+                                </goals>
+                                <configuration>
+                                    <tasks>
+                                        <copy todir="${project.basedir}/../launcher/target/config/hbase" overwrite="true" >
+                                            <fileset dir="${project.basedir}/src/main/resources/hbase" erroronmissingdir="true">
+                                                <include name="*.yml"/>
+                                            </fileset>
+                                        </copy>
+                                    </tasks>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>

+ 50 - 19
client-adapter/launcher/pom.xml

@@ -104,6 +104,18 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <version>2.0.1.RELEASE</version>
+                <configuration>
+                    <excludes>
+                        <exclude>
+                            <groupId>com.alibaba.otter</groupId>
+                            <artifactId>client-adapter.hbase</artifactId>
+                        </exclude>
+                        <exclude>
+                            <groupId>com.alibaba.otter</groupId>
+                            <artifactId>client-adapter.logger</artifactId>
+                        </exclude>
+                    </excludes>
+                </configuration>
                 <executions>
                     <execution>
                         <goals>
@@ -150,24 +162,43 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <descriptors>
-                        <descriptor>assembly.xml</descriptor>
-                    </descriptors>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
-</project>
+
+    <profiles>
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+                            </descriptors>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <outputDirectory>${project.basedir}/../../target</outputDirectory>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 0 - 0
client-adapter/launcher/assembly.xml → client-adapter/launcher/src/main/assembly/release.xml


+ 35 - 23
client-adapter/logger/pom.xml

@@ -21,27 +21,39 @@
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4</version>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+    <profiles>
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <version>2.4</version>
+                        <configuration>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>

+ 0 - 1
client-adapter/pom.xml

@@ -24,7 +24,6 @@
         <module>logger</module>
         <module>hbase</module>
         <module>launcher</module>
-        <module>example</module>
     </modules>
 
     <build>

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -859,7 +859,7 @@ public final class RowsLogBuffer {
                     // 转化为unsign long
                     switch (len) {
                         case 1:
-                            value = buffer.getInt8();
+                            value = buffer.getUint8();
                             break;
                         case 2:
                             value = buffer.getUint16();

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -20,6 +20,7 @@ canal.instance.memory.buffer.size = 16384
 canal.instance.memory.buffer.memunit = 1024 
 ## meory store gets mode used MEMSIZE or ITEMSIZE
 canal.instance.memory.batch.mode = MEMSIZE
+canal.instance.memory.rawEntry = true
 
 ## detecing config
 canal.instance.detecting.enable = false

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -58,6 +58,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -44,6 +44,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -41,6 +41,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -41,6 +41,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 53 - 4
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,7 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.alibaba.otter.canal.meta.FileMixedMetaManager;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +39,16 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.*;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
+import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
+import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -138,6 +147,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             memoryEventStore.setBufferMemUnit(parameters.getMemoryStorageBufferMemUnit());
             memoryEventStore.setBatchMode(BatchMode.valueOf(parameters.getStorageBatchMode().name()));
             memoryEventStore.setDdlIsolation(parameters.getDdlIsolation());
+            memoryEventStore.setRaw(parameters.getMemoryStorageRawEntry());
             eventStore = memoryEventStore;
         } else if (mode.isFile()) {
             // 后续版本支持
@@ -229,7 +239,18 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
     private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAddress> dbAddresses) {
         CanalEventParser eventParser;
         if (type.isMysql()) {
-            MysqlEventParser mysqlEventParser = new MysqlEventParser();
+            MysqlEventParser mysqlEventParser = null;
+            if (StringUtils.isNotEmpty(parameters.getRdsAccesskey())
+                && StringUtils.isNotEmpty(parameters.getRdsSecretkey())
+                && StringUtils.isNotEmpty(parameters.getRdsInstanceId())) {
+
+                mysqlEventParser = new RdsBinlogEventParserProxy();
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setAccesskey(parameters.getRdsAccesskey());
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setSecretkey(parameters.getRdsSecretkey());
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setInstanceId(parameters.getRdsInstanceId());
+            } else {
+                mysqlEventParser = new MysqlEventParser();
+            }
             mysqlEventParser.setDestination(destination);
             // 编码参数
             mysqlEventParser.setConnectionCharset(Charset.forName(parameters.getConnectionCharset()));
@@ -273,6 +294,34 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
             mysqlEventParser.setProfilingEnabled(false);
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
+            mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
+            // tsdb
+            boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
+            if (tsdbEnable) {
+                mysqlEventParser.setEnableTsdb(tsdbEnable);
+                mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
+
+                    @Override
+                    public void destory(String destination) {
+                        TableMetaTSDBBuilder.destory(destination);
+                    }
+
+                    @Override
+                    public TableMetaTSDB build(String destination, String springXml) {
+                        try {
+                            System.setProperty("canal.instance.tsdb.url", parameters.getTsdbJdbcUrl());
+                            System.setProperty("canal.instance.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
+                            System.setProperty("canal.instance.tsdb.dbPassword", parameters.getTsdbJdbcPassword());
+
+                            return TableMetaTSDBBuilder.build(destination, "classpath:spring/tsdb/mysql-tsdb.xml");
+                        } finally {
+                            System.setProperty("canal.instance.tsdb.url", "");
+                            System.setProperty("canal.instance.tsdb.dbUsername", "");
+                            System.setProperty("canal.instance.tsdb.dbPassword", "");
+                        }
+                    }
+                });
+            }
             eventParser = mysqlEventParser;
         } else if (type.isLocalBinlog()) {
             LocalBinlogEventParser localBinlogEventParser = new LocalBinlogEventParser();
@@ -291,8 +340,8 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                     parameters.getDbUsername(),
                     parameters.getDbPassword(),
                     parameters.getDefaultDatabaseName()));
-
             }
+
             eventParser = localBinlogEventParser;
         } else if (type.isOracle()) {
             throw new CanalException("unsupport SourcingType for " + type);

+ 45 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -39,6 +39,7 @@ public class CanalParameter implements Serializable {
     private BatchMode                storageBatchMode                   = BatchMode.MEMSIZE;         // 基于大小返回结果
     private Integer                  memoryStorageBufferSize            = 16 * 1024;                 // 内存存储的buffer大小
     private Integer                  memoryStorageBufferMemUnit         = 1024;                      // 内存存储的buffer内存占用单位,默认为1kb
+    private Boolean                  memoryStorageRawEntry              = Boolean.TRUE;              // 内存存储的对象是否启用raw的ByteString模式
     private String                   fileStorageDirectory;                                           // 文件存储的目录位置
     private Integer                  fileStorageStoreCount;                                          // 每个文件store存储的记录数
     private Integer                  fileStorageRollverCount;                                        // store文件的个数
@@ -97,6 +98,10 @@ public class CanalParameter implements Serializable {
     private String                   tsdbJdbcUrl;
     private String                   tsdbJdbcUserName;
     private String                   tsdbJdbcPassword;
+    private String                   rdsAccesskey;
+    private String                   rdsSecretkey;
+    private String                   rdsInstanceId;
+    private Boolean                  gtidEnable                         = Boolean.FALSE;             // 是否开启gtid
     // ================================== 兼容字段处理
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
@@ -919,6 +924,46 @@ public class CanalParameter implements Serializable {
         this.tsdbJdbcPassword = tsdbJdbcPassword;
     }
 
+    public String getRdsAccesskey() {
+        return rdsAccesskey;
+    }
+
+    public void setRdsAccesskey(String rdsAccesskey) {
+        this.rdsAccesskey = rdsAccesskey;
+    }
+
+    public String getRdsSecretkey() {
+        return rdsSecretkey;
+    }
+
+    public void setRdsSecretkey(String rdsSecretkey) {
+        this.rdsSecretkey = rdsSecretkey;
+    }
+
+    public String getRdsInstanceId() {
+        return rdsInstanceId;
+    }
+
+    public void setRdsInstanceId(String rdsInstanceId) {
+        this.rdsInstanceId = rdsInstanceId;
+    }
+
+    public Boolean getGtidEnable() {
+        return gtidEnable;
+    }
+
+    public void setGtidEnable(Boolean gtidEnable) {
+        this.gtidEnable = gtidEnable;
+    }
+
+    public Boolean getMemoryStorageRawEntry() {
+        return memoryStorageRawEntry;
+    }
+
+    public void setMemoryStorageRawEntry(Boolean memoryStorageRawEntry) {
+        this.memoryStorageRawEntry = memoryStorageRawEntry;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 12 - 4
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -149,11 +149,19 @@ public class FlatMessage implements Serializable {
             }
 
             List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
 
-            List<ByteString> rawEntries = message.getRawEntries();
-
-            for (ByteString byteString : rawEntries) {
-                CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+            for (CanalEntry.Entry entry : entrys) {
                 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                     || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                     continue;

+ 1 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -19,7 +19,6 @@ public class Message implements Serializable {
     private static final long      serialVersionUID = 1234034768477580009L;
 
     private long                   id;
-    @Deprecated
     private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:
     // https://github.com/alibaba/canal/issues/726
@@ -39,6 +38,7 @@ public class Message implements Serializable {
         } else {
             this.entries = entries == null ? new ArrayList<Entry>() : entries;
         }
+        this.raw = raw;
     }
 
     public Message(long id){

+ 8 - 11
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,24 +1,21 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.Future;
 
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
  * kafka producer 主操作类
@@ -104,8 +101,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
-                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                canalDestination.getPartition(),
+                                null,
+                                JSON.toJSONString(flatMessage));
                             producer2.send(record);
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
@@ -123,8 +122,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart));
@@ -138,8 +136,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -57,6 +57,11 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
 
+            if (properties.getFlatMessage()) {
+                // 针对flat message模式,设置为raw避免ByteString->Entry的二次解析
+                System.setProperty("canal.instance.memory.rawEntry", "false");
+            }
+
             // 对应每个instance启动一个worker线程
             List<MQProperties.CanalDestination> destinations = properties.getCanalDestinations();
 

+ 60 - 23
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.server.embedded;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.spi.CanalMetricsProvider;
-import com.alibaba.otter.canal.spi.CanalMetricsService;
-import com.alibaba.otter.canal.spi.NopCanalMetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -14,6 +15,7 @@ import org.springframework.util.CollectionUtils;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -22,7 +24,11 @@ import com.alibaba.otter.canal.protocol.position.PositionRange;
 import com.alibaba.otter.canal.server.CanalServer;
 import com.alibaba.otter.canal.server.CanalService;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMetricsProvider;
+import com.alibaba.otter.canal.spi.CanalMetricsService;
+import com.alibaba.otter.canal.spi.NopCanalMetricsService;
 import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
 import com.alibaba.otter.canal.store.model.Event;
 import com.alibaba.otter.canal.store.model.Events;
 import com.google.common.base.Function;
@@ -40,12 +46,12 @@ import com.google.protobuf.ByteString;
  */
 public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
 
-    private static final Logger        logger           = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
+    private static final Logger        logger  = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
     private int                        metricsPort;
-    private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
+    private CanalMetricsService        metrics = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
 
@@ -207,7 +213,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     *
+     * 
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -239,12 +245,23 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+                boolean raw = isRaw(canalInstance.getEventStore());
+                List entrys = null;
+                if (raw) {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+
+                        public ByteString apply(Event input) {
+                            return input.getRawEntry();
+                        }
+                    });
+                } else {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {
 
-                    public ByteString apply(Event input) {
-                        return input.getRawEntry();
-                    }
-                });
+                        public CanalEntry.Entry apply(Event input) {
+                            return input.getEntry();
+                        }
+                    });
+                }
                 if (logger.isInfoEnabled()) {
                     logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
                         clientIdentity.getClientId(),
@@ -255,7 +272,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
-                return new Message(batchId, true, entrys);
+                return new Message(batchId, raw, entrys);
             }
         }
     }
@@ -283,7 +300,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     *
+     * 
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -311,7 +328,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             }
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
-                // logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result
+                // logger.debug("getWithoutAck successfully, clientId:{}
+                // batchSize:{} but result
                 // is null",
                 // clientIdentity.getClientId(),
                 // batchSize);
@@ -319,12 +337,23 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+                boolean raw = isRaw(canalInstance.getEventStore());
+                List entrys = null;
+                if (raw) {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+
+                        public ByteString apply(Event input) {
+                            return input.getRawEntry();
+                        }
+                    });
+                } else {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {
 
-                    public ByteString apply(Event input) {
-                        return input.getRawEntry();
-                    }
-                });
+                        public CanalEntry.Entry apply(Event input) {
+                            return input.getEntry();
+                        }
+                    });
+                }
                 if (logger.isInfoEnabled()) {
                     logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                         clientIdentity.getClientId(),
@@ -333,7 +362,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                         batchId,
                         events.getPositionRange());
                 }
-                return new Message(batchId, true, entrys);
+                return new Message(batchId, raw, entrys);
             }
 
         }
@@ -515,17 +544,25 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             // 发现provider, 进行初始化
             if (list.size() > 1) {
                 logger.warn("Found more than one CanalMetricsProvider, use the first one.");
-                //报告冲突
+                // 报告冲突
                 for (CanalMetricsProvider p : list) {
                     logger.warn("Found CanalMetricsProvider: {}.", p.getClass().getName());
                 }
             }
-            //默认使用第一个
+            // 默认使用第一个
             CanalMetricsProvider provider = list.get(0);
             this.metrics = provider.getService();
         }
     }
 
+    private boolean isRaw(CanalEventStore eventStore) {
+        if (eventStore instanceof MemoryEventStoreWithBuffer) {
+            return ((MemoryEventStoreWithBuffer) eventStore).isRaw();
+        }
+
+        return true;
+    }
+
     // ========= setter ==========
 
     public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {

+ 8 - 2
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -20,6 +20,7 @@ import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.exception.CanalSinkException;
 import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
 import com.alibaba.otter.canal.store.model.Event;
 
 /**
@@ -42,7 +43,8 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
     protected volatile long        lastEmptyTransactionTimestamp = 0L;
     protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
-    private AtomicLong             eventsSinkBlockingTime        = new AtomicLong(0L);
+    protected AtomicLong           eventsSinkBlockingTime        = new AtomicLong(0L);
+    protected boolean              raw;
 
     public EntryEventSink(){
         addHandler(new HeartBeatEntryEventHandler());
@@ -52,6 +54,10 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         super.start();
         Assert.notNull(eventStore);
 
+        if (eventStore instanceof MemoryEventStoreWithBuffer) {
+            this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
+        }
+
         for (CanalEventDownStreamHandler handler : getHandlers()) {
             if (!handler.isStart()) {
                 handler.start();
@@ -104,7 +110,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
 
             hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
             hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
-            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
+            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
             events.add(event);
         }
 

+ 11 - 0
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -70,6 +70,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
     private BatchMode         batchMode     = BatchMode.ITEMSIZE;                        // 默认为内存大小模式
     private boolean           ddlIsolation  = false;
+    private boolean           raw           = true;                                      // 针对entry是否开启raw模式
 
     public MemoryEventStoreWithBuffer(){
 
@@ -628,6 +629,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
         this.ddlIsolation = ddlIsolation;
     }
 
+    public boolean isRaw() {
+        return raw;
+    }
+
+    public void setRaw(boolean raw) {
+        this.raw = raw;
+    }
+
     public AtomicLong getPutSequence() {
         return putSequence;
     }
@@ -671,4 +680,6 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     public AtomicLong getAckTableRows() {
         return ackTableRows;
     }
+
+
 }

+ 25 - 3
store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

@@ -34,10 +34,17 @@ public class Event implements Serializable {
     private long              rawLength;
     private int               rowsCount;
 
+    // ==== https://github.com/alibaba/canal/issues/1019
+    private CanalEntry.Entry  entry;
+
     public Event(){
     }
 
     public Event(LogIdentity logIdentity, CanalEntry.Entry entry){
+        this(logIdentity, entry, true);
+    }
+
+    public Event(LogIdentity logIdentity, CanalEntry.Entry entry, boolean raw){
         this.logIdentity = logIdentity;
         this.entryType = entry.getEntryType();
         this.executeTime = entry.getHeader().getExecuteTime();
@@ -46,9 +53,6 @@ public class Event implements Serializable {
         this.serverId = entry.getHeader().getServerId();
         this.gtid = entry.getHeader().getGtid();
         this.eventType = entry.getHeader().getEventType();
-        // build raw
-        this.rawEntry = entry.toByteString();
-        this.rawLength = rawEntry.size();
         if (entryType == EntryType.ROWDATA) {
             List<CanalEntry.Pair> props = entry.getHeader().getPropsList();
             if (props != null) {
@@ -60,6 +64,16 @@ public class Event implements Serializable {
                 }
             }
         }
+
+        if (raw) {
+            // build raw
+            this.rawEntry = entry.toByteString();
+            this.rawLength = rawEntry.size();
+        } else {
+            this.entry = entry;
+            // 按照3倍的event length预估
+            this.rawLength = entry.getHeader().getEventLength() * 3;
+        }
     }
 
     public LogIdentity getLogIdentity() {
@@ -150,6 +164,14 @@ public class Event implements Serializable {
         this.rowsCount = rowsCount;
     }
 
+    public CanalEntry.Entry getEntry() {
+        return entry;
+    }
+
+    public void setEntry(CanalEntry.Entry entry) {
+        this.entry = entry;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }