Parcourir la source

Merge pull request #695 from rewerma/master

增加kafka对canal binlog信息的生成消费支持
agapple il y a 6 ans
Parent
commit
dca1b20043
28 fichiers modifiés avec 1677 ajouts et 11 suppressions
  1. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  2. 14 7
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  3. 1 0
      deployer/src/main/resources/canal.properties
  4. 5 1
      deployer/src/main/resources/logback.xml
  5. 1 2
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  6. 111 0
      kafka-client/pom.xml
  7. 128 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java
  8. 34 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java
  9. 61 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java
  10. 25 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java
  11. 132 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java
  12. 58 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java
  13. 19 0
      kafka-client/src/test/resources/logback.xml
  14. 140 0
      kafka/pom.xml
  15. 64 0
      kafka/src/main/assembly/dev.xml
  16. 64 0
      kafka/src/main/assembly/release.xml
  17. 25 0
      kafka/src/main/bin/startup.bat
  18. 104 0
      kafka/src/main/bin/startup.sh
  19. 65 0
      kafka/src/main/bin/stop.sh
  20. 19 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java
  21. 76 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java
  22. 74 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java
  23. 143 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java
  24. 158 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java
  25. 49 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java
  26. 18 0
      kafka/src/main/resources/kafka.yml
  27. 85 0
      kafka/src/main/resources/logback.xml
  28. 3 1
      pom.xml

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -16,6 +16,7 @@ public class CanalConstants {
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 
     public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
     public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";

+ 14 - 7
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -43,7 +43,7 @@ import com.google.common.collect.MigrateMap;
 
 /**
  * canal调度控制器
- * 
+ *
  * @author jianghang 2012-11-8 下午12:03:11
  * @version 1.0.0
  */
@@ -97,9 +97,12 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        canalServer = CanalServerWithNetty.instance();
-        canalServer.setIp(ip);
-        canalServer.setPort(port);
+        String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
+        if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
+            canalServer = CanalServerWithNetty.instance();
+            canalServer.setIp(ip);
+            canalServer.setPort(port);
+        }
 
         // 处理下ip为空,默认使用hostIp暴露到zk中
         if (StringUtils.isEmpty(ip)) {
@@ -431,11 +434,15 @@ public class CanalController {
         }
 
         // 启动网络接口
-        canalServer.start();
+        if(canalServer!=null) {
+            canalServer.start();
+        }
     }
 
     public void stop() throws Throwable {
-        canalServer.stop();
+        if(canalServer!=null) {
+            canalServer.stop();
+        }
 
         if (autoScan) {
             for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
@@ -454,7 +461,7 @@ public class CanalController {
         // 释放canal的工作节点
         releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
         logger.info("## stop the canal server[{}:{}]", ip, port);
-        
+
         if (zkclientx != null) {
             zkclientx.close();
         }

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -7,6 +7,7 @@ canal.port= 11111
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000
+canal.withoutNetty = false
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 5 - 1
deployer/src/main/resources/logback.xml

@@ -70,9 +70,13 @@
         <appender-ref ref="CANAL-ROOT" />
     </logger>
     <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
-        <level value="INFO" />  
+        <level value="INFO" />
         <appender-ref ref="CANAL-META" />
     </logger>
+	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
     
 	<root level="WARN">
 		<appender-ref ref="STDOUT"/>

+ 1 - 2
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -38,7 +38,6 @@ public class AbstractCanalClientTest {
     protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
     protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
     protected volatile boolean                running            = false;
-    private volatile boolean                  waiting            = true;
     protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
 
                                                                      public void uncaughtException(Thread t, Throwable e) {
@@ -97,6 +96,7 @@ public class AbstractCanalClientTest {
             return;
         }
         connector.stopRunning();
+        running = false;
         if (thread != null) {
             try {
                 thread.join();
@@ -115,7 +115,6 @@ public class AbstractCanalClientTest {
                 MDC.put("destination", destination);
                 connector.connect();
                 connector.subscribe();
-                waiting = false;
                 while (running) {
                     Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                     long batchId = message.getId();

+ 111 - 0
kafka-client/pom.xml

@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.kafka.client</artifactId>
+    <packaging>jar</packaging>
+    <name>canal kafka client module for otter ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+
+        <!-- junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!javadoc</value>
+                </property>
+            </activation>
+        </profile>
+
+        <profile>
+            <id>javadoc</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>javadoc</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-javadoc-plugin</artifactId>
+                        <version>2.9.1</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <aggregate>true</aggregate>
+                            <show>public</show>
+                            <nohelp>true</nohelp>
+                            <header>${project.artifactId}-${project.version}</header>
+                            <footer>${project.artifactId}-${project.version}</footer>
+                            <doctitle>${project.artifactId}-${project.version}</doctitle>
+                            <links>
+                                <link>https://github.com/alibaba/canal</link>
+                            </links>
+                            <outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-scm-publish-plugin</artifactId>
+                        <version>1.0-beta-2</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>publish-scm</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
+                            <checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
+                            <content>${project.build.directory}/apidocs</content>
+                            <skipDeletedFiles>true</skipDeletedFiles>
+                            <pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
+                            <scmBranch>gh-pages</scmBranch>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 128 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -0,0 +1,128 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * canal kafka 数据操作客户端
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaCanalConnector {
+
+    private KafkaConsumer<String, Message> kafkaConsumer;
+
+    private String topic;
+
+    private Integer partition;
+
+
+    private Properties properties;
+
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId) {
+        this.topic = topic;
+        this.partition = partition;
+
+        properties = new Properties();
+        properties.put("bootstrap.servers", servers);
+        properties.put("group.id", groupId);
+        properties.put("enable.auto.commit", false);
+        properties.put("auto.commit.interval.ms", "1000");
+        properties.put("auto.offset.reset", "latest"); //earliest //如果没有offset则从最后的offset开始读
+        properties.put("request.timeout.ms", "40000"); //必须大于session.timeout.ms的设置
+        properties.put("session.timeout.ms", "30000"); //默认为30秒
+        properties.put("max.poll.records", "1"); //所以一次只取一条数据
+        properties.put("key.deserializer", StringDeserializer.class.getName());
+        properties.put("value.deserializer", MessageDeserializer.class.getName());
+    }
+
+    /**
+     * 重新设置sessionTime
+     *
+     * @param timeout
+     * @param unit
+     */
+    public void setSessionTimeout(Long timeout, TimeUnit unit) {
+        long t = unit.toMillis(timeout);
+        properties.put("request.timeout.ms", String.valueOf(t + 60000));
+        properties.put("session.timeout.ms", String.valueOf(t));
+    }
+
+    /**
+     * 关闭链接
+     */
+    public void close() {
+        kafkaConsumer.close();
+    }
+
+    /**
+     * 订阅topic
+     */
+    public void subscribe() {
+        if (kafkaConsumer == null) {
+            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+        }
+        if (partition == null) {
+            kafkaConsumer.subscribe(Collections.singletonList(topic));
+        } else {
+            TopicPartition topicPartition = new TopicPartition(topic, partition);
+            kafkaConsumer.assign(Collections.singletonList(topicPartition));
+        }
+    }
+
+    /**
+     * 取消订阅
+     */
+    public void unsubscribe() {
+        kafkaConsumer.unsubscribe();
+    }
+
+    /**
+     * 获取数据,自动进行确认
+     *
+     * @return
+     */
+    public Message get() {
+        return get(100L, TimeUnit.MILLISECONDS);
+    }
+
+    public Message get(Long timeout, TimeUnit unit) {
+        Message message = getWithoutAck(timeout, unit);
+        this.ack();
+        return message;
+    }
+
+    public Message getWithoutAck() {
+        return getWithoutAck(100L, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 获取数据,不进行确认,等待处理完成手工确认
+     *
+     * @return
+     */
+    public Message getWithoutAck(Long timeout, TimeUnit unit) {
+        ConsumerRecords<String, Message> records =
+                kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,最多只能poll到一条数据
+
+        if (!records.isEmpty()) {
+            return records.iterator().next().value();
+        }
+        return null;
+    }
+
+    /**
+     * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
+     */
+    public void ack() {
+        kafkaConsumer.commitSync();
+    }
+}

+ 34 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java

@@ -0,0 +1,34 @@
+package com.alibaba.otter.canal.kafka.client;
+
+/**
+ * canal kafka connectors创建工具类
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaCanalConnectors {
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param partition
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
+        return new KafkaCanalConnector(servers, topic, partition, groupId);
+    }
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic,  String groupId) {
+        return new KafkaCanalConnector(servers, topic, null, groupId);
+    }
+}

+ 61 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java

@@ -0,0 +1,61 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Kafka Message类的反序列化
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class MessageDeserializer implements Deserializer<Message> {
+    private static Logger logger = LoggerFactory.getLogger(MessageDeserializer.class);
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public Message deserialize(String topic, byte[] data) {
+        try {
+            if (data == null)
+                return null;
+            else {
+                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
+                switch (p.getType()) {
+                    case MESSAGES: {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                            throw new CanalClientException("compression is not supported in this connector");
+                        }
+
+                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
+                        Message result = new Message(messages.getBatchId());
+                        for (ByteString byteString : messages.getMessagesList()) {
+                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        }
+                        return result;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error when deserializing byte[] to message ");
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 25 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -0,0 +1,25 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import org.junit.Assert;
+
+/**
+ * Kafka 测试基类
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public abstract class AbstractKafkaTest {
+
+    public static String topic = "example";
+    public static Integer partition = null;
+    public static String groupId    = "g1";
+    public static String servers    = "slave1:6667,slave2:6667,slave3:6667";
+
+    public void sleep(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}

+ 132 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java

@@ -0,0 +1,132 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka client example
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class CanalKafkaClientExample {
+    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
+
+    private KafkaCanalConnector connector;
+
+    private static volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    public CanalKafkaClientExample(String servers, String topic, Integer partition, String groupId) {
+        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.servers,
+                    AbstractKafkaTest.topic, AbstractKafkaTest.partition, AbstractKafkaTest.groupId);
+            logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            kafkaCanalClientExample.start();
+            logger.info("## the canal kafka consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka consumer");
+                        kafkaCanalClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka consumer:", e);
+                    } finally {
+                        logger.info("## kafka consumer is down.");
+                    }
+                }
+
+            });
+            while (running) ;
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the kafka consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running) ;
+        try {
+            connector.subscribe();
+            while (running) {
+                try {
+                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); //获取message
+                    if (message == null) {
+                        continue;
+                    }
+                    long batchId = message.getId();
+                    int size = message.getEntries().size();
+                    if (batchId == -1 || size == 0) {
+                        // try {
+                        // Thread.sleep(1000);
+                        // } catch (InterruptedException e) {
+                        // }
+                    } else {
+                        // printSummary(message, batchId, size);
+                        // printEntry(message.getEntries());
+                        logger.info(message.toString());
+                    }
+
+                    connector.ack(); // 提交确认
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.close();
+    }
+}

+ 58 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java

@@ -0,0 +1,58 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka consumer获取Message的测试例子
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaClientRunningTest extends AbstractKafkaTest {
+    private Logger logger = LoggerFactory.getLogger(KafkaClientRunningTest.class);
+
+    private boolean running = true;
+
+    @Test
+    public void testKafkaConsumer() {
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+        final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                connector.subscribe();
+                while (running) {
+                    try {
+                        Message message = connector.getWithoutAck(3L, TimeUnit.SECONDS);
+                        if (message != null) {
+                            System.out.println(message);
+                        }
+                        connector.ack();
+                    } catch (WakeupException e) {
+                        //ignore
+                    }
+                }
+                connector.unsubscribe();
+                connector.close();
+            }
+        });
+
+        sleep(60000);
+        running = false;
+        executor.shutdown();
+        logger.info("shutdown completed");
+    }
+
+}

+ 19 - 0
kafka-client/src/test/resources/logback.xml

@@ -0,0 +1,19 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<logger name="org.apache.kafka" additivity="false">
+		<level value="ERROR" />
+		<appender-ref ref="STDOUT" />
+	</logger>
+
+	<root level="INFO">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 140 - 0
kafka/pom.xml

@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.kafka</artifactId>
+    <packaging>jar</packaging>
+    <name>canal kafka module for otter ${project.version}</name>
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.deployer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.0.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>true</addMavenDescriptor>
+                    </archive>
+                    <excludes>
+                        <exclude>**/logback.xml</exclude>
+                        <exclude>**/canal.properties</exclude>
+                        <exclude>**/spring/**</exclude>
+                        <exclude>**/example/**</exclude>
+                        <exclude>**/kafka.yml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <!-- 这是最新版本,推荐使用这个版本 -->
+                <version>2.2.1</version>
+                <executions>
+                    <execution>
+                        <id>assemble</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
+                            </descriptors>
+                            <finalName>canal</finalName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+
+        </profile>
+
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- 发布模式使用的maven assembly插件描述文件 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+                            </descriptors>
+                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
+                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 64 - 0
kafka/src/main/assembly/dev.xml

@@ -0,0 +1,64 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 64 - 0
kafka/src/main/assembly/release.xml

@@ -0,0 +1,64 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>tar.gz</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 25 - 0
kafka/src/main/bin/startup.bat

@@ -0,0 +1,25 @@
+@echo off
+@if not "%ECHO%" == ""  echo %ECHO%
+@if "%OS%" == "Windows_NT"  setlocal
+
+set ENV_PATH=.\
+if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
+
+set conf_dir=%ENV_PATH%\..\conf
+set canal_conf=%conf_dir%\canal.properties
+set logback_configurationFile=%conf_dir%\logback.xml
+
+set CLASSPATH=%conf_dir%
+set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
+
+set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
+set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
+set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
+set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
+
+set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
+
+set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher
+echo start cmd : %CMD_STR%
+
+java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher

+ 104 - 0
kafka/src/main/bin/startup.sh

@@ -0,0 +1,104 @@
+#!/bin/bash 
+
+current_path=`pwd`
+case "`uname`" in
+    Linux)
+		bin_abs_path=$(readlink -f $(dirname $0))
+		;;
+	*)
+		bin_abs_path=`cd $(dirname $0); pwd`
+		;;
+esac
+base=${bin_abs_path}/..
+canal_conf=$base/conf/canal.properties
+logback_configurationFile=$base/conf/logback.xml
+export LANG=en_US.UTF-8
+export BASE=$base
+
+if [ -f $base/bin/canal.pid ] ; then
+	echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
+    exit 1
+fi
+
+if [ ! -d $base/logs/canal ] ; then 
+	mkdir -p $base/logs/canal
+fi
+
+## set java path
+if [ -z "$JAVA" ] ; then
+  JAVA=$(which java)
+fi
+
+ALIBABA_JAVA="/usr/alibaba/java/bin/java"
+TAOBAO_JAVA="/opt/taobao/java/bin/java"
+if [ -z "$JAVA" ]; then
+  if [ -f $ALIBABA_JAVA ] ; then
+  	JAVA=$ALIBABA_JAVA
+  elif [ -f $TAOBAO_JAVA ] ; then
+  	JAVA=$TAOBAO_JAVA
+  else
+  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
+    exit 1
+  fi
+fi
+
+case "$#" 
+in
+0 ) 
+	;;
+1 )	
+	var=$*
+	if [ -f $var ] ; then 
+		canal_conf=$var
+	else
+		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
+        exit
+	fi;;
+2 )	
+	var=$1
+	if [ -f $var ] ; then
+		canal_conf=$var
+	else 
+		if [ "$1" = "debug" ]; then
+			DEBUG_PORT=$2
+			DEBUG_SUSPEND="n"
+			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
+		fi
+     fi;;
+* )
+	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
+	exit;;
+esac
+
+str=`file -L $JAVA | grep 64-bit`
+if [ -n "$str" ]; then
+	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+else
+	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+fi
+
+JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
+CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
+
+if [ -e $canal_conf -a -e $logback_configurationFile ]
+then 
+	
+	for i in $base/lib/*;
+		do CLASSPATH=$i:"$CLASSPATH";
+	done
+ 	CLASSPATH="$base/conf:$CLASSPATH";
+ 	
+ 	echo "cd to $bin_abs_path for workaround relative path"
+  	cd $bin_abs_path
+ 	
+	echo LOG CONFIGURATION : $logback_configurationFile
+	echo canal conf : $canal_conf 
+	echo CLASSPATH :$CLASSPATH
+	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.kafka.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
+	echo $! > $base/bin/canal.pid 
+	
+	echo "cd to $current_path for continue"
+  	cd $current_path
+else 
+	echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
+fi

+ 65 - 0
kafka/src/main/bin/stop.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+
+cygwin=false;
+linux=false;
+case "`uname`" in
+    CYGWIN*)
+        cygwin=true
+        ;;
+    Linux*)
+    	linux=true
+    	;;
+esac
+
+get_pid() {	
+	STR=$1
+	PID=$2
+    if $cygwin; then
+        JAVA_CMD="$JAVA_HOME\bin\java"
+        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
+        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
+    else
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
+    fi
+    echo $JAVA_PID;
+}
+
+base=`dirname $0`/..
+pidfile=$base/bin/canal.pid
+if [ ! -f "$pidfile" ];then
+	echo "canal is not running. exists"
+	exit
+fi
+
+pid=`cat $pidfile`
+if [ "$pid" == "" ] ; then
+	pid=`get_pid "appName=otter-canal"`
+fi
+
+echo -e "`hostname`: stopping canal $pid ... "
+kill $pid
+
+LOOPS=0
+while (true); 
+do 
+	gpid=`get_pid "appName=otter-canal" "$pid"`
+    if [ "$gpid" == "" ] ; then
+    	echo "Oook! cost:$LOOPS"
+    	`rm $pidfile`
+    	break;
+    fi
+    let LOOPS=LOOPS+1
+    sleep 1
+done

+ 19 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -0,0 +1,19 @@
+package com.alibaba.otter.canal.kafka;
+
+
+import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
+
+/**
+ * canal-kafka独立版本启动的入口类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalLauncher {
+
+    public static void main(String[] args)  {
+        CanalServerStarter.init();
+
+        CanalKafkaStarter.init();
+    }
+}

+ 76 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.kafka;
+
+import com.alibaba.otter.canal.deployer.CanalController;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * canal server 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalServerStarter {
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger = LoggerFactory.getLogger(CanalServerStarter.class);
+
+    private static boolean running = false;
+
+    public static void init() {
+        try {
+            logger.info("## set default uncaught exception handler");
+            setGlobalUncaughtExceptionHandler();
+
+            logger.info("## load canal configurations");
+            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
+            Properties properties = new Properties();
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
+            } else {
+                properties.load(new FileInputStream(conf));
+            }
+
+            logger.info("## start the canal server.");
+            final CanalController controller = new CanalController(properties);
+            controller.start();
+            running = true;
+            logger.info("## the canal server is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal server");
+                        running = false;
+                        controller.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping canal Server:", e);
+                    } finally {
+                        logger.info("## canal server is down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal Server:", e);
+            System.exit(0);
+        }
+    }
+
+    public static boolean isRunning(){
+        return running;
+    }
+
+    private static void setGlobalUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("UnCaughtException", e);
+            }
+        });
+    }
+}

+ 74 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -0,0 +1,74 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * kafka producer 主操作类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalKafkaProducer {
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+
+    private Producer<String, Message> producer;
+
+    public void init(KafkaProperties kafkaProperties) {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", kafkaProperties.getServers());
+        properties.put("acks", "all");
+        properties.put("retries", kafkaProperties.getRetries());
+        properties.put("batch.size", kafkaProperties.getBatchSize());
+        properties.put("linger.ms", kafkaProperties.getLingerMs());
+        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
+        properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("value.serializer", MessageSerializer.class.getName());
+        producer = new KafkaProducer<String, Message>(properties);
+    }
+
+    public void stop() {
+        try {
+            logger.info("## stop the kafka producer");
+            producer.close();
+        } catch (Throwable e) {
+            logger.warn("##something goes wrong when stopping kafka producer:", e);
+        } finally {
+            logger.info("## kafka producer is down.");
+        }
+    }
+
+    public void send(Topic topic, Message message) {
+        boolean valid = false;
+        if (message != null && !message.getEntries().isEmpty()) {
+            for (CanalEntry.Entry entry : message.getEntries()) {
+                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                    valid = true;
+                    break;
+                }
+            }
+        }
+        if (!valid) {
+            return;
+        }
+
+
+        ProducerRecord<String, Message> record;
+        if (topic.getPartition() != null) {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+        } else {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+        }
+        producer.send(record);
+        logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+    }
+}

+ 143 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -0,0 +1,143 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import com.alibaba.otter.canal.kafka.CanalServerStarter;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.CanalDestination;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * kafka 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalKafkaStarter {
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaStarter.class);
+
+    private volatile static boolean running = false;
+
+    private static ExecutorService executorService;
+
+    private static CanalKafkaProducer canalKafkaProducer;
+
+    private static KafkaProperties kafkaProperties;
+
+    public static void init() {
+        try {
+
+            logger.info("## load kafka configurations");
+            String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
+
+
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf), KafkaProperties.class);
+            } else {
+                kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
+            }
+
+            //初始化 kafka producer
+            canalKafkaProducer = new CanalKafkaProducer();
+            canalKafkaProducer.init(kafkaProperties);
+
+            //对应每个instance启动一个worker线程
+            List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
+
+            executorService = Executors.newFixedThreadPool(destinations.size());
+
+            logger.info("## start the kafka workers.");
+            for (final CanalDestination destination : destinations) {
+                executorService.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        worker(destination);
+                    }
+                });
+            }
+            running = true;
+            logger.info("## the kafka workers is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka workers");
+                        running = false;
+                        executorService.shutdown();
+                        canalKafkaProducer.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka workers:", e);
+                    } finally {
+                        logger.info("## canal kafka is down.");
+                    }
+                }
+
+            });
+
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal kafka workers:", e);
+            System.exit(0);
+        }
+    }
+
+
+    private static void worker(CanalDestination destination) {
+        while (!running) ;
+        while (!CanalServerStarter.isRunning()) ; //等待server启动完成
+        logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
+        CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
+        while (running) {
+            try {
+                if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
+                    try {
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {
+                        //ignore
+                    }
+                    continue;
+                }
+                server.subscribe(clientIdentity);
+                logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
+
+                while (running) {
+                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
+                    long batchId = message.getId();
+                    try {
+                        int size = message.getEntries().size();
+                        if (batchId != -1 && size != 0) {
+                            if (!StringUtils.isEmpty(destination.getTopic())) {
+                                Topic topic = new Topic();
+                                topic.setTopic(destination.getTopic());
+                                topic.setPartition(destination.getPartition());
+                                destination.getTopics().add(topic);
+                            }
+                            for (Topic topic : destination.getTopics()) {
+                                canalKafkaProducer.send(topic, message); //发送message到所有topic
+                            }
+                        }
+
+                        if (batchId != -1) {
+                            server.ack(clientIdentity, batchId); // 提交确认
+                        }
+                    } catch (Exception e) {
+                        server.rollback(clientIdentity);
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("process error!", e);
+            }
+        }
+    }
+}

+ 158 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -0,0 +1,158 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * kafka 配置项
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class KafkaProperties {
+    private String servers = "localhost:6667";
+    private int retries = 0;
+    private int batchSize = 16384;
+    private int lingerMs = 1;
+    private long bufferMemory = 33554432L;
+
+    private int canalBatchSize = 5;
+
+    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
+
+    public static class CanalDestination {
+        private String canalDestination;
+        private String topic;
+        private Integer partition;
+        private Set<Topic> topics = new HashSet<Topic>();
+
+        public String getCanalDestination() {
+            return canalDestination;
+        }
+
+        public void setCanalDestination(String canalDestination) {
+            this.canalDestination = canalDestination;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        public Set<Topic> getTopics() {
+            return topics;
+        }
+
+        public void setTopics(Set<Topic> topics) {
+            this.topics = topics;
+        }
+    }
+
+    public static class Topic {
+        private String topic;
+        private Integer partition;
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Topic topic1 = (Topic) o;
+
+            if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
+            return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = topic != null ? topic.hashCode() : 0;
+            result = 31 * result + (partition != null ? partition.hashCode() : 0);
+            return result;
+        }
+    }
+
+    public String getServers() {
+        return servers;
+    }
+
+    public void setServers(String servers) {
+        this.servers = servers;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public int getLingerMs() {
+        return lingerMs;
+    }
+
+    public void setLingerMs(int lingerMs) {
+        this.lingerMs = lingerMs;
+    }
+
+    public long getBufferMemory() {
+        return bufferMemory;
+    }
+
+    public void setBufferMemory(long bufferMemory) {
+        this.bufferMemory = bufferMemory;
+    }
+
+    public int getCanalBatchSize() {
+        return canalBatchSize;
+    }
+
+    public void setCanalBatchSize(int canalBatchSize) {
+        this.canalBatchSize = canalBatchSize;
+    }
+
+    public List<CanalDestination> getCanalDestinations() {
+        return canalDestinations;
+    }
+
+    public void setCanalDestinations(List<CanalDestination> canalDestinations) {
+        this.canalDestinations = canalDestinations;
+    }
+}

+ 49 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -0,0 +1,49 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Map;
+
+/**
+ * Kafka Message类的序列化
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class MessageSerializer implements Serializer<Message> {
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] serialize(String topic, Message data) {
+        try {
+            if (data == null)
+                return null;
+            else {
+                CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {
+                    for (CanalEntry.Entry entry : data.getEntries()) {
+                        messageBuilder.addMessages(entry.toByteString());
+                    }
+                }
+                CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                packetBuilder.setBody(messageBuilder.build().toByteString());
+                return packetBuilder.build().toByteArray();
+            }
+        } catch (Exception e) {
+            throw new SerializationException("Error when serializing message to byte[] ");
+        }
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 18 - 0
kafka/src/main/resources/kafka.yml

@@ -0,0 +1,18 @@
+servers: slave1:6667,slave2:6667,slave3:6667
+retries: 0
+batchSize: 16384
+lingerMs: 1
+bufferMemory: 33554432
+# canal的批次大小,单位 k
+canalBatchSize: 50
+
+canalDestinations:
+  - canalDestination: example
+    topic: example
+    partition:
+    # 一个destination可以对应多个topic
+#    topics:
+#      - topics: example
+#        partition:
+
+

+ 85 - 0
kafka/src/main/resources/logback.xml

@@ -0,0 +1,85 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+	
+	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/${destination}.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>512MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+	<appender name="CANAL-META" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="META-FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/meta.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/meta-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>32MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+    <logger name="com.alibaba.otter.canal.instance" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
+        <level value="INFO" />
+        <appender-ref ref="CANAL-META" />
+    </logger>
+	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+    
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</root>
+</configuration>

+ 3 - 1
pom.xml

@@ -127,6 +127,8 @@
         <module>client</module>
         <module>deployer</module>
         <module>example</module>
+        <module>kafka</module>
+        <module>kafka-client</module>
     </modules>
 
     <dependencyManagement>
@@ -254,7 +256,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_371</version>
+                <version>2.0.0_preview_186</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>