Просмотр исходного кода

增加 kafaka producer 直接从canal server消费数据发送

rewerma 7 лет назад
Родитель
Сommit
31f72e1aac

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -16,6 +16,7 @@ public class CanalConstants {
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 
 
     public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
     public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
     public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";
     public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";

+ 14 - 7
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -43,7 +43,7 @@ import com.google.common.collect.MigrateMap;
 
 
 /**
 /**
  * canal调度控制器
  * canal调度控制器
- * 
+ *
  * @author jianghang 2012-11-8 下午12:03:11
  * @author jianghang 2012-11-8 下午12:03:11
  * @version 1.0.0
  * @version 1.0.0
  */
  */
@@ -97,9 +97,12 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        canalServer = CanalServerWithNetty.instance();
-        canalServer.setIp(ip);
-        canalServer.setPort(port);
+        String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
+        if (canalWithoutNetty == null || "true".equals(canalWithoutNetty)) {
+            canalServer = CanalServerWithNetty.instance();
+            canalServer.setIp(ip);
+            canalServer.setPort(port);
+        }
 
 
         // 处理下ip为空,默认使用hostIp暴露到zk中
         // 处理下ip为空,默认使用hostIp暴露到zk中
         if (StringUtils.isEmpty(ip)) {
         if (StringUtils.isEmpty(ip)) {
@@ -431,11 +434,15 @@ public class CanalController {
         }
         }
 
 
         // 启动网络接口
         // 启动网络接口
-        canalServer.start();
+        if(canalServer!=null) {
+            canalServer.start();
+        }
     }
     }
 
 
     public void stop() throws Throwable {
     public void stop() throws Throwable {
-        canalServer.stop();
+        if(canalServer!=null) {
+            canalServer.stop();
+        }
 
 
         if (autoScan) {
         if (autoScan) {
             for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
             for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
@@ -454,7 +461,7 @@ public class CanalController {
         // 释放canal的工作节点
         // 释放canal的工作节点
         releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
         releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
         logger.info("## stop the canal server[{}:{}]", ip, port);
         logger.info("## stop the canal server[{}:{}]", ip, port);
-        
+
         if (zkclientx != null) {
         if (zkclientx != null) {
             zkclientx.close();
             zkclientx.close();
         }
         }

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -7,6 +7,7 @@ canal.port= 11111
 canal.zkServers=
 canal.zkServers=
 # flush data to zk
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.zookeeper.flush.period = 1000
+canal.withoutNetty = false
 # flush meta cursor/parse position to file
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000
 canal.file.flush.period = 1000

+ 131 - 0
kafka/pom.xml

@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>canal.kafka</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.deployer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>true</addMavenDescriptor>
+                    </archive>
+                    <excludes>
+                        <exclude>**/logback.xml</exclude>
+                        <exclude>**/canal.properties</exclude>
+                        <exclude>**/spring/**</exclude>
+                        <exclude>**/example/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <!-- 这是最新版本,推荐使用这个版本 -->
+                <version>2.2.1</version>
+                <executions>
+                    <execution>
+                        <id>assemble</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
+                            </descriptors>
+                            <finalName>canal</finalName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+
+        </profile>
+
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- 发布模式使用的maven assembly插件描述文件 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+                            </descriptors>
+                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
+                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 61 - 0
kafka/src/main/assembly/dev.xml

@@ -0,0 +1,61 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 61 - 0
kafka/src/main/assembly/release.xml

@@ -0,0 +1,61 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>tar.gz</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 25 - 0
kafka/src/main/bin/startup.bat

@@ -0,0 +1,25 @@
+@echo off
+@if not "%ECHO%" == ""  echo %ECHO%
+@if "%OS%" == "Windows_NT"  setlocal
+
+set ENV_PATH=.\
+if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
+
+set conf_dir=%ENV_PATH%\..\conf
+set canal_conf=%conf_dir%\canal.properties
+set logback_configurationFile=%conf_dir%\logback.xml
+
+set CLASSPATH=%conf_dir%
+set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
+
+set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
+set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
+set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
+set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
+
+set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
+
+set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher
+echo start cmd : %CMD_STR%
+
+java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher

+ 104 - 0
kafka/src/main/bin/startup.sh

@@ -0,0 +1,104 @@
+#!/bin/bash 
+
+current_path=`pwd`
+case "`uname`" in
+    Linux)
+		bin_abs_path=$(readlink -f $(dirname $0))
+		;;
+	*)
+		bin_abs_path=`cd $(dirname $0); pwd`
+		;;
+esac
+base=${bin_abs_path}/..
+canal_conf=$base/conf/canal.properties
+logback_configurationFile=$base/conf/logback.xml
+export LANG=en_US.UTF-8
+export BASE=$base
+
+if [ -f $base/bin/canal.pid ] ; then
+	echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
+    exit 1
+fi
+
+if [ ! -d $base/logs/canal ] ; then 
+	mkdir -p $base/logs/canal
+fi
+
+## set java path
+if [ -z "$JAVA" ] ; then
+  JAVA=$(which java)
+fi
+
+ALIBABA_JAVA="/usr/alibaba/java/bin/java"
+TAOBAO_JAVA="/opt/taobao/java/bin/java"
+if [ -z "$JAVA" ]; then
+  if [ -f $ALIBABA_JAVA ] ; then
+  	JAVA=$ALIBABA_JAVA
+  elif [ -f $TAOBAO_JAVA ] ; then
+  	JAVA=$TAOBAO_JAVA
+  else
+  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
+    exit 1
+  fi
+fi
+
+case "$#" 
+in
+0 ) 
+	;;
+1 )	
+	var=$*
+	if [ -f $var ] ; then 
+		canal_conf=$var
+	else
+		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
+        exit
+	fi;;
+2 )	
+	var=$1
+	if [ -f $var ] ; then
+		canal_conf=$var
+	else 
+		if [ "$1" = "debug" ]; then
+			DEBUG_PORT=$2
+			DEBUG_SUSPEND="n"
+			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
+		fi
+     fi;;
+* )
+	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
+	exit;;
+esac
+
+str=`file -L $JAVA | grep 64-bit`
+if [ -n "$str" ]; then
+	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+else
+	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+fi
+
+JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
+CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
+
+if [ -e $canal_conf -a -e $logback_configurationFile ]
+then 
+	
+	for i in $base/lib/*;
+		do CLASSPATH=$i:"$CLASSPATH";
+	done
+ 	CLASSPATH="$base/conf:$CLASSPATH";
+ 	
+ 	echo "cd to $bin_abs_path for workaround relative path"
+  	cd $bin_abs_path
+ 	
+	echo LOG CONFIGURATION : $logback_configurationFile
+	echo canal conf : $canal_conf 
+	echo CLASSPATH :$CLASSPATH
+	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.kafka.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
+	echo $! > $base/bin/canal.pid 
+	
+	echo "cd to $current_path for continue"
+  	cd $current_path
+else 
+	echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
+fi

+ 65 - 0
kafka/src/main/bin/stop.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+
+cygwin=false;
+linux=false;
+case "`uname`" in
+    CYGWIN*)
+        cygwin=true
+        ;;
+    Linux*)
+    	linux=true
+    	;;
+esac
+
+get_pid() {	
+	STR=$1
+	PID=$2
+    if $cygwin; then
+        JAVA_CMD="$JAVA_HOME\bin\java"
+        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
+        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
+    else
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
+    fi
+    echo $JAVA_PID;
+}
+
+base=`dirname $0`/..
+pidfile=$base/bin/canal.pid
+if [ ! -f "$pidfile" ];then
+	echo "canal is not running. exists"
+	exit
+fi
+
+pid=`cat $pidfile`
+if [ "$pid" == "" ] ; then
+	pid=`get_pid "appName=otter-canal"`
+fi
+
+echo -e "`hostname`: stopping canal $pid ... "
+kill $pid
+
+LOOPS=0
+while (true); 
+do 
+	gpid=`get_pid "appName=otter-canal" "$pid"`
+    if [ "$gpid" == "" ] ; then
+    	echo "Oook! cost:$LOOPS"
+    	`rm $pidfile`
+    	break;
+    fi
+    let LOOPS=LOOPS+1
+    sleep 1
+done

+ 70 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -0,0 +1,70 @@
+package com.alibaba.otter.canal.kafka;
+
+
+import com.alibaba.otter.canal.deployer.CanalController;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * canal-kafka独立版本启动的入口类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalLauncher {
+
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
+
+    public static void main(String[] args) throws Throwable {
+        try {
+            logger.info("## set default uncaught exception handler");
+            setGlobalUncaughtExceptionHandler();
+
+            logger.info("## load canal configurations");
+            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
+            Properties properties = new Properties();
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                properties.load(com.alibaba.otter.canal.deployer.CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
+            } else {
+                properties.load(new FileInputStream(conf));
+            }
+
+            logger.info("## start the canal server.");
+            final CanalController controller = new CanalController(properties);
+            controller.start();
+            logger.info("## the canal server is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal server");
+                        controller.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping canal Server:", e);
+                    } finally {
+                        logger.info("## canal server is down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal Server:", e);
+            System.exit(0);
+        }
+    }
+
+    private static void setGlobalUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("UnCaughtException", e);
+            }
+        });
+    }
+}

+ 62 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * kafka producer 主操作类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalKafkaProducer {
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+
+    private static Producer<String, Message> producer;
+
+    public static void init(KafkaProperties kafkaProperties) {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", kafkaProperties.getServers());
+        properties.put("acks", "all");
+        properties.put("retries", kafkaProperties.getRetries());
+        properties.put("batch.size", kafkaProperties.getBatchSize());
+        properties.put("linger.ms", kafkaProperties.getLingerMs());
+        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
+        properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("value.serializer", MessageSerializer.class.getName());
+        producer = new KafkaProducer<String, Message>(properties);
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+
+            public void run() {
+                try {
+                    logger.info("## stop the kafka producer");
+                    producer.close();
+                } catch (Throwable e) {
+                    logger.warn("##something goes wrong when stopping kafka producer:", e);
+                } finally {
+                    logger.info("## kafka producer is down.");
+                }
+            }
+
+        });
+    }
+
+    public static void send(Topic topic, Message message) {
+        ProducerRecord<String, Message> record;
+        if (topic.getPartition() != null) {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+        } else {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+        }
+        producer.send(record);
+    }
+}

+ 88 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -0,0 +1,88 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CanalKafkaStarter {
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaStarter.class);
+
+    private volatile static boolean running = false;
+    private static short CLIENT_ID = 1001;
+
+    public static void init(KafkaProperties kafkaProperties) {
+        //初始化 kafka producer
+        CanalKafkaProducer.init(kafkaProperties);
+
+        //对应每个instance启动一个worker线程
+        List<Topic> topics = kafkaProperties.getTopics();
+
+        ExecutorService executorService = Executors.newFixedThreadPool(topics.size());
+
+        for (final Topic topic : topics) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    worker(topic);
+                }
+            });
+        }
+        running = true;
+    }
+
+    private static void sleep(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            //ignore
+        }
+    }
+
+    public static void worker(Topic topic) {
+        while (!running) ;
+        //TODO 等待canal  server启动完毕
+        logger.info("## start the canal consumer: {}.", topic.getDestination());
+        CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        ClientIdentity clientIdentity = new ClientIdentity(topic.getDestination(), CLIENT_ID, "");
+        while (running) {
+            try {
+                if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
+                    sleep(3000);
+                    continue;
+                }
+                server.subscribe(clientIdentity);
+                logger.info("## the canal consumer {} is running now ......", topic.getDestination());
+
+                while (running) {
+                    Message message = server.getWithoutAck(clientIdentity, 5 * 1024); // 获取指定数量的数据
+                    long batchId = message.getId();
+                    try {
+                        int size = message.getEntries().size();
+                        if (batchId == -1 || size == 0) {
+                            sleep(1000);
+                        } else {
+                            CanalKafkaProducer.send(topic, message);
+                        }
+
+                        if (batchId != -1) {
+                            server.ack(clientIdentity, batchId); // 提交确认
+                        }
+                    } catch (Exception e) {
+                        server.rollback(clientIdentity);
+                        logger.error(e.getMessage(), e);
+                        sleep(1000);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("process error!", e);
+            }
+        }
+    }
+}

+ 111 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -0,0 +1,111 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * kafka 配置项
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class KafkaProperties {
+    private String servers = "localhost:6667";
+    private int retries = 0;
+    private int batchSize = 16384;
+    private int lingerMs = 1;
+    private long bufferMemory = 33554432L;
+
+    private List<Topic> topics = new ArrayList<Topic>();
+    private Map<String, Topic> topicMap = new HashMap<String, Topic>();
+
+    public static class Topic {
+        private String topic;
+        private Integer partition;
+        private String destination;
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        public String getDestination() {
+            return destination;
+        }
+
+        public void setDestination(String destination) {
+            this.destination = destination;
+        }
+    }
+
+    public String getServers() {
+        return servers;
+    }
+
+    public void setServers(String servers) {
+        this.servers = servers;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public int getLingerMs() {
+        return lingerMs;
+    }
+
+    public void setLingerMs(int lingerMs) {
+        this.lingerMs = lingerMs;
+    }
+
+    public long getBufferMemory() {
+        return bufferMemory;
+    }
+
+    public void setBufferMemory(long bufferMemory) {
+        this.bufferMemory = bufferMemory;
+    }
+
+    public List<Topic> getTopics() {
+        return topics;
+    }
+
+    public void setTopics(List<Topic> topics) {
+        this.topics = topics;
+
+        if (topics != null) {
+            for (Topic topic : topics) {
+                this.topicMap.put(topic.destination, topic);
+            }
+        }
+    }
+
+    public Topic getTopicByDestination(String destination) {
+        return this.topicMap.get(destination);
+    }
+}

+ 49 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -0,0 +1,49 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Map;
+
+/**
+ * Message类的序列化Kafka实现
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class MessageSerializer implements Serializer<Message> {
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] serialize(String topic, Message data) {
+        try {
+            if (data == null)
+                return null;
+            else {
+                CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {
+                    for (CanalEntry.Entry entry : data.getEntries()) {
+                        messageBuilder.addMessages(entry.toByteString());
+                    }
+                }
+                CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                packetBuilder.setBody(messageBuilder.build().toByteString());
+                return packetBuilder.build().toByteArray();
+            }
+        } catch (Exception e) {
+            throw new SerializationException("Error when serializing message to byte[] ");
+        }
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 12 - 0
kafka/src/main/resources/kafka.yml

@@ -0,0 +1,12 @@
+servers: localhost:6667
+retries: 0
+batchSize: 16384
+lingerMs: 1
+bufferMemory: 33554432
+
+topics:
+  - topic: exampe
+    partition:
+    destination: example
+
+

+ 2 - 1
pom.xml

@@ -127,6 +127,7 @@
         <module>client</module>
         <module>client</module>
         <module>deployer</module>
         <module>deployer</module>
         <module>example</module>
         <module>example</module>
+        <module>kafka</module>
     </modules>
     </modules>
 
 
     <dependencyManagement>
     <dependencyManagement>
@@ -254,7 +255,7 @@
             <dependency>
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_371</version>
+                <version>2.0.0_preview_186</version>
             </dependency>
             </dependency>
             <dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <groupId>com.alibaba</groupId>