Pārlūkot izejas kodu

整合canal.server和canal.kafka

mcy 6 gadi atpakaļ
vecāks
revīzija
e7f07d7d3e

+ 1 - 0
deployer/pom.xml

@@ -40,6 +40,7 @@
 						<exclude>**/canal.properties</exclude>
 						<exclude>**/spring/**</exclude>
 						<exclude>**/example/**</exclude>
+						<exclude>**/kafka.yml</exclude>
 					</excludes>
 				</configuration>
 			</plugin>

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -22,6 +22,7 @@ public class CanalConstants {
     public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";
     public static final String CANAL_AUTO_SCAN_INTERVAL          = ROOT + "." + "auto.scan.interval";
     public static final String CANAL_CONF_DIR                    = ROOT + "." + "conf.dir";
+    public static final String CANAL_SERVER_MODE                 = ROOT + "." + "serverMode";
 
     public static final String CANAL_DESTINATION_SPLIT           = ",";
     public static final String GLOBAL_NAME                       = "global";

+ 13 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -3,6 +3,8 @@ package com.alibaba.otter.canal.deployer;
 import java.io.FileInputStream;
 import java.util.Properties;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaStarter;
+import com.alibaba.otter.canal.server.CanalServerStarter;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +53,17 @@ public class CanalLauncher {
                 }
 
             });
+
+            CanalServerStarter canalServerStarter = null;
+            String serverMode = properties.getProperty(CanalConstants.CANAL_SERVER_MODE, "tcp");
+            if (serverMode.equalsIgnoreCase("kafka")) {
+                canalServerStarter = new CanalKafkaStarter();
+            } else if (serverMode.equalsIgnoreCase("rocketMQ")) {
+                // 预留rocketMQ启动
+            }
+            if (canalServerStarter != null) {
+                canalServerStarter.init();
+            }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);
             System.exit(0);

+ 2 - 0
deployer/src/main/resources/canal.properties

@@ -8,6 +8,8 @@ canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
+# tcp, kafka, rocketMQ
+canal.serverMode = tcp
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 3 - 3
kafka/src/main/resources/kafka.yml → deployer/src/main/resources/kafka.yml

@@ -12,8 +12,8 @@ canalDestinations:
     topic: example
     partition:
     # 一个destination可以对应多个topic
-#    topics:
-#      - topic: example
-#        partition:
+    #topics:
+    #  - topic: example
+    #    partition:
 
 

+ 0 - 145
kafka/pom.xml

@@ -1,145 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>canal</artifactId>
-        <groupId>com.alibaba.otter</groupId>
-        <version>1.1.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <groupId>com.alibaba.otter</groupId>
-    <artifactId>canal.kafka</artifactId>
-    <packaging>jar</packaging>
-    <name>canal kafka module for otter ${project.version}</name>
-    <dependencies>
-        <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>canal.deployer</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.yaml</groupId>
-            <artifactId>snakeyaml</artifactId>
-            <version>1.17</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>1.1.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
-        <dependency>
-            <groupId>org.jboss.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>3.2.2.Final</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
-            <plugin>
-                <artifactId>maven-jar-plugin</artifactId>
-                <configuration>
-                    <archive>
-                        <addMavenDescriptor>true</addMavenDescriptor>
-                    </archive>
-                    <excludes>
-                        <exclude>**/logback.xml</exclude>
-                        <exclude>**/canal.properties</exclude>
-                        <exclude>**/spring/**</exclude>
-                        <exclude>**/example/**</exclude>
-                        <exclude>**/kafka.yml</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <!-- 这是最新版本,推荐使用这个版本 -->
-                <version>2.2.1</version>
-                <executions>
-                    <execution>
-                        <id>assemble</id>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-                <configuration>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <attach>false</attach>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>dev</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-                <property>
-                    <name>env</name>
-                    <value>!release</value>
-                </property>
-            </activation>
-
-            <build>
-                <plugins>
-                    <plugin>
-                        <artifactId>maven-assembly-plugin</artifactId>
-                        <configuration>
-                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
-                            <descriptors>
-                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
-                            </descriptors>
-                            <finalName>canal</finalName>
-                            <outputDirectory>${project.build.directory}</outputDirectory>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-
-        </profile>
-
-        <profile>
-            <id>release</id>
-            <activation>
-                <property>
-                    <name>env</name>
-                    <value>release</value>
-                </property>
-            </activation>
-
-            <build>
-                <plugins>
-                    <plugin>
-                        <artifactId>maven-assembly-plugin</artifactId>
-                        <configuration>
-                            <!-- 发布模式使用的maven assembly插件描述文件 -->
-                            <descriptors>
-                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
-                            </descriptors>
-                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
-                            <finalName>${project.artifactId}-${project.version}</finalName>
-                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
-                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
-</project>

+ 0 - 64
kafka/src/main/assembly/dev.xml

@@ -1,64 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-	<id>dist</id>
-	<formats>
-		<format>dir</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>.</directory>
-			<outputDirectory>/</outputDirectory>
-			<includes>
-				<include>README*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/bin</directory>
-			<outputDirectory>bin</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<fileMode>0755</fileMode>
-		</fileSet>
-		<fileSet>
-			<directory>../deployer/src/main/conf</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>../deployer/src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<excludes>
-				<exclude>logback.xml</exclude>
-			</excludes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>target</directory>
-			<outputDirectory>logs</outputDirectory>
-			<excludes>
-				<exclude>**/*</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-	<dependencySets>
-		<dependencySet>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>junit:junit</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
-</assembly>

+ 0 - 64
kafka/src/main/assembly/release.xml

@@ -1,64 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-	<id>dist</id>
-	<formats>
-		<format>tar.gz</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>.</directory>
-			<outputDirectory>/</outputDirectory>
-			<includes>
-				<include>README*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/bin</directory>
-			<outputDirectory>bin</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<fileMode>0755</fileMode>
-		</fileSet>
-		<fileSet>
-			<directory>../deployer/src/main/conf</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>../deployer/src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<excludes>
-				<exclude>logback.xml</exclude>
-			</excludes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>target</directory>
-			<outputDirectory>logs</outputDirectory>
-			<excludes>
-				<exclude>**/*</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-	<dependencySets>
-		<dependencySet>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>junit:junit</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
-</assembly>

+ 0 - 25
kafka/src/main/bin/startup.bat

@@ -1,25 +0,0 @@
-@echo off
-@if not "%ECHO%" == ""  echo %ECHO%
-@if "%OS%" == "Windows_NT"  setlocal
-
-set ENV_PATH=.\
-if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
-
-set conf_dir=%ENV_PATH%\..\conf
-set canal_conf=%conf_dir%\canal.properties
-set logback_configurationFile=%conf_dir%\logback.xml
-
-set CLASSPATH=%conf_dir%
-set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
-
-set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
-set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
-set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
-set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
-
-set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
-
-set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher
-echo start cmd : %CMD_STR%
-
-java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher

+ 0 - 104
kafka/src/main/bin/startup.sh

@@ -1,104 +0,0 @@
-#!/bin/bash 
-
-current_path=`pwd`
-case "`uname`" in
-    Linux)
-		bin_abs_path=$(readlink -f $(dirname $0))
-		;;
-	*)
-		bin_abs_path=`cd $(dirname $0); pwd`
-		;;
-esac
-base=${bin_abs_path}/..
-canal_conf=$base/conf/canal.properties
-logback_configurationFile=$base/conf/logback.xml
-export LANG=en_US.UTF-8
-export BASE=$base
-
-if [ -f $base/bin/canal.pid ] ; then
-	echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
-    exit 1
-fi
-
-if [ ! -d $base/logs/canal ] ; then 
-	mkdir -p $base/logs/canal
-fi
-
-## set java path
-if [ -z "$JAVA" ] ; then
-  JAVA=$(which java)
-fi
-
-ALIBABA_JAVA="/usr/alibaba/java/bin/java"
-TAOBAO_JAVA="/opt/taobao/java/bin/java"
-if [ -z "$JAVA" ]; then
-  if [ -f $ALIBABA_JAVA ] ; then
-  	JAVA=$ALIBABA_JAVA
-  elif [ -f $TAOBAO_JAVA ] ; then
-  	JAVA=$TAOBAO_JAVA
-  else
-  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
-    exit 1
-  fi
-fi
-
-case "$#" 
-in
-0 ) 
-	;;
-1 )	
-	var=$*
-	if [ -f $var ] ; then 
-		canal_conf=$var
-	else
-		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
-        exit
-	fi;;
-2 )	
-	var=$1
-	if [ -f $var ] ; then
-		canal_conf=$var
-	else 
-		if [ "$1" = "debug" ]; then
-			DEBUG_PORT=$2
-			DEBUG_SUSPEND="n"
-			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
-		fi
-     fi;;
-* )
-	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
-	exit;;
-esac
-
-str=`file -L $JAVA | grep 64-bit`
-if [ -n "$str" ]; then
-	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
-else
-	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
-fi
-
-JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
-CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
-
-if [ -e $canal_conf -a -e $logback_configurationFile ]
-then 
-	
-	for i in $base/lib/*;
-		do CLASSPATH=$i:"$CLASSPATH";
-	done
- 	CLASSPATH="$base/conf:$CLASSPATH";
- 	
- 	echo "cd to $bin_abs_path for workaround relative path"
-  	cd $bin_abs_path
- 	
-	echo LOG CONFIGURATION : $logback_configurationFile
-	echo canal conf : $canal_conf 
-	echo CLASSPATH :$CLASSPATH
-	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.kafka.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
-	echo $! > $base/bin/canal.pid 
-	
-	echo "cd to $current_path for continue"
-  	cd $current_path
-else 
-	echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
-fi

+ 0 - 65
kafka/src/main/bin/stop.sh

@@ -1,65 +0,0 @@
-#!/bin/bash
-
-cygwin=false;
-linux=false;
-case "`uname`" in
-    CYGWIN*)
-        cygwin=true
-        ;;
-    Linux*)
-    	linux=true
-    	;;
-esac
-
-get_pid() {	
-	STR=$1
-	PID=$2
-    if $cygwin; then
-        JAVA_CMD="$JAVA_HOME\bin\java"
-        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
-        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
-    else
-    	if $linux; then
-	        if [ ! -z "$PID" ]; then
-	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
-		    else 
-		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
-	        fi
-	    else
-	    	if [ ! -z "$PID" ]; then
-	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
-		    else 
-		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
-	        fi
-	    fi
-    fi
-    echo $JAVA_PID;
-}
-
-base=`dirname $0`/..
-pidfile=$base/bin/canal.pid
-if [ ! -f "$pidfile" ];then
-	echo "canal is not running. exists"
-	exit
-fi
-
-pid=`cat $pidfile`
-if [ "$pid" == "" ] ; then
-	pid=`get_pid "appName=otter-canal"`
-fi
-
-echo -e "`hostname`: stopping canal $pid ... "
-kill $pid
-
-LOOPS=0
-while (true); 
-do 
-	gpid=`get_pid "appName=otter-canal" "$pid"`
-    if [ "$gpid" == "" ] ; then
-    	echo "Oook! cost:$LOOPS"
-    	`rm $pidfile`
-    	break;
-    fi
-    let LOOPS=LOOPS+1
-    sleep 1
-done

+ 0 - 17
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -1,17 +0,0 @@
-package com.alibaba.otter.canal.kafka;
-
-import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
-
-/**
- * canal-kafka独立版本启动的入口类
- *
- * @author machengyuan 2018-6-11 下午05:30:49
- * @version 1.0.0
- */
-public class CanalLauncher {
-
-    public static void main(String[] args) {
-        CanalServerStarter.init();
-        CanalKafkaStarter.init();
-    }
-}

+ 0 - 78
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -1,78 +0,0 @@
-package com.alibaba.otter.canal.kafka;
-
-import java.io.FileInputStream;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.otter.canal.deployer.CanalController;
-
-/**
- * canal server 启动类
- *
- * @author machengyuan 2018-6-11 下午05:30:49
- * @version 1.0.0
- */
-public class CanalServerStarter {
-
-    private static final String     CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger     logger               = LoggerFactory.getLogger(CanalServerStarter.class);
-    private volatile static boolean running              = false;
-
-    public static void init() {
-        try {
-            logger.info("## set default uncaught exception handler");
-            setGlobalUncaughtExceptionHandler();
-
-            logger.info("## load canal configurations");
-            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
-            Properties properties = new Properties();
-            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
-                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
-            } else {
-                properties.load(new FileInputStream(conf));
-            }
-
-            logger.info("## start the canal server.");
-            final CanalController controller = new CanalController(properties);
-            controller.start();
-            running = true;
-            logger.info("## the canal server is running now ......");
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-
-                public void run() {
-                    try {
-                        logger.info("## stop the canal server");
-                        running = false;
-                        controller.stop();
-                    } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping canal Server:", e);
-                    } finally {
-                        logger.info("## canal server is down.");
-                    }
-                }
-
-            });
-        } catch (Throwable e) {
-            logger.error("## Something goes wrong when starting up the canal Server:", e);
-            System.exit(0);
-        }
-    }
-
-    public static boolean isRunning() {
-        return running;
-    }
-
-    private static void setGlobalUncaughtExceptionHandler() {
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                logger.error("UnCaughtException", e);
-            }
-        });
-    }
-}

+ 0 - 85
kafka/src/main/resources/logback.xml

@@ -1,85 +0,0 @@
-<configuration scan="true" scanPeriod=" 5 seconds">
-	<jmxConfigurator />
-	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-		<encoder>
-			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-			</pattern>
-		</encoder>
-	</appender>
-	
-	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
-		<discriminator>
-			<Key>destination</Key>
-			<DefaultValue>canal</DefaultValue>
-		</discriminator>
-		<sift>
-			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
-				<File>../logs/${destination}/${destination}.log</File>
-				<rollingPolicy
-					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-					<!-- rollover daily -->
-					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
-					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-						<!-- or whenever the file size reaches 100MB -->
-						<maxFileSize>512MB</maxFileSize>
-					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
-				</rollingPolicy>
-				<encoder>
-					<pattern>
-						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-					</pattern>
-				</encoder>
-			</appender>
-		</sift>
-	</appender>
-	
-	<appender name="CANAL-META" class="ch.qos.logback.classic.sift.SiftingAppender">
-		<discriminator>
-			<Key>destination</Key>
-			<DefaultValue>canal</DefaultValue>
-		</discriminator>
-		<sift>
-			<appender name="META-FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
-				<File>../logs/${destination}/meta.log</File>
-				<rollingPolicy
-					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-					<!-- rollover daily -->
-					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/meta-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
-					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-						<!-- or whenever the file size reaches 100MB -->
-						<maxFileSize>32MB</maxFileSize>
-					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
-				</rollingPolicy>
-				<encoder>
-					<pattern>
-						%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n
-					</pattern>
-				</encoder>
-			</appender>
-		</sift>
-	</appender>
-	
-    <logger name="com.alibaba.otter.canal.instance" additivity="false">  
-        <level value="INFO" />  
-        <appender-ref ref="CANAL-ROOT" />
-    </logger>
-    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
-        <level value="INFO" />  
-        <appender-ref ref="CANAL-ROOT" />
-    </logger>
-    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
-        <level value="INFO" />
-        <appender-ref ref="CANAL-META" />
-    </logger>
-	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
-		<level value="INFO" />
-		<appender-ref ref="CANAL-ROOT" />
-	</logger>
-    
-	<root level="WARN">
-		<appender-ref ref="STDOUT"/>
-		<appender-ref ref="CANAL-ROOT" />
-	</root>
-</configuration>

+ 0 - 1
pom.xml

@@ -117,7 +117,6 @@
         <module>client</module>
         <module>deployer</module>
         <module>example</module>
-        <module>kafka</module>
         <module>kafka-client</module>
         <module>prometheus</module>
         <module>client-adapter</module>

+ 23 - 0
server/pom.xml

@@ -25,6 +25,29 @@
 			<artifactId>canal.instance.manager</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<!-- kafka -->
+		<dependency>
+			<groupId>org.yaml</groupId>
+			<artifactId>snakeyaml</artifactId>
+			<version>1.17</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>1.1.1</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
+		<dependency>
+			<groupId>org.jboss.netty</groupId>
+			<artifactId>netty</artifactId>
+			<version>3.2.2.Final</version>
+		</dependency>
 		
 		<!-- test dependency -->
 		<dependency>

+ 3 - 4
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java → server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.producer;
+package com.alibaba.otter.canal.kafka;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -10,7 +10,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -21,7 +20,7 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalKafkaProducer {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
 
@@ -49,7 +48,7 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(Topic topic, Message message) throws IOException {
+    public void send(KafkaProperties.Topic topic, Message message) throws IOException {
         // set canal.instance.filter.transaction.entry = true
 
         // boolean valid = false;

+ 15 - 16
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java → server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.producer;
+package com.alibaba.otter.canal.kafka;
 
 import java.io.FileInputStream;
 import java.util.List;
@@ -10,11 +10,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
-import com.alibaba.otter.canal.kafka.CanalServerStarter;
-import com.alibaba.otter.canal.kafka.producer.KafkaProperties.CanalDestination;
-import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
+import com.alibaba.otter.canal.kafka.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.CanalServerStarter;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 
 /**
@@ -23,20 +23,21 @@ import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class CanalKafkaStarter {
+public class CanalKafkaStarter implements CanalServerStarter {
 
-    private static final String       CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger       logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
+    private static final Logger logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
 
-    private volatile static boolean   running              = false;
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
 
-    private static ExecutorService    executorService;
+    private volatile boolean    running              = false;
 
-    private static CanalKafkaProducer canalKafkaProducer;
+    private ExecutorService     executorService;
 
-    private static KafkaProperties    kafkaProperties;
+    private CanalKafkaProducer canalKafkaProducer;
 
-    public static void init() {
+    private KafkaProperties kafkaProperties;
+
+    public void init() {
         try {
             logger.info("## load kafka configurations");
             String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
@@ -96,11 +97,9 @@ public class CanalKafkaStarter {
         }
     }
 
-    private static void worker(CanalDestination destination) {
+    private void worker(CanalDestination destination) {
         while (!running)
             ;
-        while (!CanalServerStarter.isRunning())
-            ; // 等待server启动完成
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
         CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
         ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
@@ -121,7 +120,7 @@ public class CanalKafkaStarter {
                     Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
                     long batchId = message.getId();
                     try {
-                        int size = message.isRaw() ?  message.getRawEntries().size() : message.getEntries().size();
+                        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
                             if (!StringUtils.isEmpty(destination.getTopic())) {
                                 Topic topic = new Topic();

+ 1 - 1
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java → server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.producer;
+package com.alibaba.otter.canal.kafka;
 
 import java.util.ArrayList;
 import java.util.HashSet;

+ 7 - 7
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java → server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.producer;
+package com.alibaba.otter.canal.kafka;
 
 import java.util.List;
 import java.util.Map;
@@ -37,20 +37,20 @@ public class MessageSerializer implements Serializer<Message> {
                         List<ByteString> rowEntries = data.getRawEntries();
                         // message size
                         int messageSize = 0;
-                        messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, data.getId());
+                        messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
 
                         int dataSize = 0;
                         for (int i = 0; i < rowEntries.size(); i++) {
-                            dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
                         }
                         messageSize += dataSize;
                         messageSize += 1 * rowEntries.size();
                         // packet size
                         int size = 0;
-                        size += com.google.protobuf.CodedOutputStream.computeEnumSize(3,
+                        size += CodedOutputStream.computeEnumSize(3,
                             PacketType.MESSAGES.getNumber());
-                        size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
-                                + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
+                        size += CodedOutputStream.computeTagSize(5)
+                                + CodedOutputStream.computeRawVarint32Size(messageSize)
                                 + messageSize;
                         // build data
                         byte[] body = new byte[size];
@@ -73,7 +73,7 @@ public class MessageSerializer implements Serializer<Message> {
                         }
 
                         CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
-                        packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                        packetBuilder.setType(PacketType.MESSAGES);
                         packetBuilder.setBody(messageBuilder.build().toByteString());
                         return packetBuilder.build().toByteArray();
                     }

+ 12 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalServerStarter.java

@@ -0,0 +1,12 @@
+package com.alibaba.otter.canal.server;
+
+/**
+ * 外部服务如Kafka, RocketMQ启动接口
+ *
+ * @author machengyuan 2018-8-23 下午05:20:29
+ * @version 1.0.0
+ */
+public interface CanalServerStarter {
+
+    void init();
+}