Parcourir la source

Add RocketMQ canal connector

duhengforever il y a 6 ans
Parent
commit
d5aa324ce4
24 fichiers modifiés avec 881 ajouts et 328 suppressions
  1. 6 6
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 21 22
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  3. 165 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  4. 13 13
      client-launcher/src/main/resources/canal-client.yml
  5. 112 105
      client/pom.xml
  6. 42 0
      client/src/main/java/com/alibaba/otter/canal/client/DeserializerUtil.java
  7. 4 35
      client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java
  8. 47 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java
  9. 197 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  10. 20 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java
  11. 11 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java
  12. 1 1
      deployer/pom.xml
  13. 15 10
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  14. 2 2
      deployer/src/main/resources/canal.properties
  15. 5 4
      deployer/src/main/resources/example/instance.properties
  16. 1 1
      deployer/src/main/resources/mq.yml
  17. 5 0
      server/pom.xml
  18. 70 0
      server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java
  19. 10 2
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  20. 9 30
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  21. 2 54
      server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java
  22. 58 0
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  23. 37 43
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
  24. 28 0
      server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

+ 6 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -20,7 +20,7 @@ public class CanalClientConfig {
 
     private String              bootstrapServers;
 
-    private List<KafkaTopic>    kafkaTopics;
+    private List<MQTopic>       mqTopics;
 
     private List<CanalInstance> canalInstances;
 
@@ -56,12 +56,12 @@ public class CanalClientConfig {
         this.bootstrapServers = bootstrapServers;
     }
 
-    public List<KafkaTopic> getKafkaTopics() {
-        return kafkaTopics;
+    public List<MQTopic> getMqTopics() {
+        return mqTopics;
     }
 
-    public void setKafkaTopics(List<KafkaTopic> kafkaTopics) {
-        this.kafkaTopics = kafkaTopics;
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        this.mqTopics = mqTopics;
     }
 
     public List<CanalInstance> getCanalInstances() {
@@ -110,7 +110,7 @@ public class CanalClientConfig {
         }
     }
 
-    public static class KafkaTopic {
+    public static class MQTopic {
 
         private String      topic;
 

+ 21 - 22
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -17,34 +17,34 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
- * 外部适配器的加载器
+ * RocktMQ外部适配器的加载器
  *
- * @author machengyuan 2018-8-19 下午11:45:49
  * @version 1.0.0
  */
 public class CanalAdapterLoader {
 
-    private static final Logger                  logger            = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
 
-    private CanalClientConfig                    canalClientConfig;
+    private CanalClientConfig canalClientConfig;
 
-    private Map<String, CanalAdapterWorker>      canalWorkers      = new HashMap<>();
+    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
 
     private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
+    private Map<String, CanalAdapterRocketMQWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter>   loader;
+    private ExtensionLoader<CanalOuterAdapter> loader;
 
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
         this.canalClientConfig = canalClientConfig;
     }
 
     /**
-     * 初始化canal-client、 canal-client-kafka的适配器
+     * 初始化canal-client、 canal-client-rocketmq的适配器
      */
     public void init() {
-        // canal instances 和 kafka topics 配置不能同时为空
-        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getKafkaTopics() == null) {
-            throw new RuntimeException("Blank config property: canalInstances or canalKafkaTopics");
+        // canal instances 和 rocketmq topics 配置不能同时为空
+        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
+            throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
 
         loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class,
@@ -87,10 +87,10 @@ public class CanalAdapterLoader {
             }
         }
 
-        // 初始化canal-client-kafka的适配器
-        if (canalClientConfig.getKafkaTopics() != null) {
-            for (CanalClientConfig.KafkaTopic kafkaTopic : canalClientConfig.getKafkaTopics()) {
-                for (CanalClientConfig.Group group : kafkaTopic.getGroups()) {
+        // 初始化canal-client-rocketmq的适配器
+        if (canalClientConfig.getMqTopics() != null) {
+            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
+                for (CanalClientConfig.Group group : topic.getGroups()) {
                     List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                     List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
@@ -102,16 +102,15 @@ public class CanalAdapterLoader {
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
 
-                    // String zkServers = canalClientConfig.getZookeeperHosts();
-                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkHosts,
+                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(
                         canalClientConfig.getBootstrapServers(),
-                        kafkaTopic.getTopic(),
+                        topic.getTopic(),
                         group.getGroupId(),
                         canalOuterAdapterGroups);
-                    canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
-                    canalKafkaWorker.start();
-                    logger.info("Start adapter for canal-client kafka topic: {} succeed",
-                        kafkaTopic.getTopic() + "-" + group.getGroupId());
+                    canalMQWorker.put(topic.getTopic() + "-" + group.getGroupId(), rocketMQWorker);
+                    rocketMQWorker.start();
+                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                        topic.getTopic() + "-" + group.getGroupId());
                 }
             }
         }

+ 165 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -0,0 +1,165 @@
+package com.alibaba.otter.canal.client.adapter.loader;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.protocol.Message;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
+
+    private RocketMQCanalConnector connector;
+
+    private String topic;
+
+    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
+        List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+            connector.stopRunning();
+            running = false;
+
+            // if (switcher != null && !switcher.state()) {
+            // switcher.set(true);
+            // }
+
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("topic {} connectors' worker thread dead!", this.topic);
+            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (CanalOuterAdapter adapter : outerAdapters) {
+                    adapter.destroy();
+                }
+            }
+            logger.info("topic {} all connectors destroyed!", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final AtomicBoolean executing = new AtomicBoolean(true);
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        final Message message = connector.getWithoutAck(1);
+
+                        executing.set(true);
+                        if (message != null) {
+                            executor.submit(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    try {
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
+                                                topic,
+                                                message.getId(),
+                                                message.getEntries().size());
+                                        }
+                                        long begin = System.currentTimeMillis();
+                                        writeOut(message);
+                                        long now = System.currentTimeMillis();
+                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+                                            logger.error("topic: {} batchId {} elapsed time: {} ms",
+                                                topic,
+                                                message.getId(),
+                                                now - begin);
+                                        }
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("topic: {} batchId {} elapsed time: {} ms",
+                                                topic,
+                                                message.getId(),
+                                                now - begin);
+                                        }
+                                        connector.ack(message.getId());
+                                    } catch (Exception e) {
+                                        logger.error(e.getMessage(), e);
+                                    } finally {
+                                        executing.compareAndSet(true, false);
+                                    }
+                                }
+                            });
+                        } else {
+                            logger.debug("Message is null");
+                        }
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 13 - 13
client-launcher/src/main/resources/canal-client.yml

@@ -2,20 +2,20 @@ canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
 #bootstrapServers: slave1:6667,slave2:6667
 
-canalInstances:
-- instance: example
-  adapterGroups:
-  - outAdapters:
-    - name: logger
-    - name: hbase
-      hosts: slave1:2181
-      properties: {znodeParent: "/hbase-unsecure"}
-#kafkaTopics:
-#- topic: example
-#  groups:
-#  - groupId: example_g1
-#    outAdapters:
+#canalInstances:
+#- instance: example
+#  adapterGroups:
+#  - outAdapters:
 #    - name: logger
 #    - name: hbase
 #      hosts: slave1:2181
 #      properties: {znodeParent: "/hbase-unsecure"}
+mqTopics:
+- topic: example
+  groups:
+  - groupId: example_g1
+    outAdapters:
+    - name: logger
+#    - name: hbase
+#      hosts: slave1:2181
+#      properties: {znodeParent: "/hbase-unsecure"}

+ 112 - 105
client/pom.xml

@@ -1,108 +1,115 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" ">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>com.alibaba.otter</groupId>
-		<artifactId>canal</artifactId>
-		<version>1.1.1-SNAPSHOT</version>
-		<relativePath>../pom.xml</relativePath>
-	</parent>
-	<groupId>com.alibaba.otter</groupId>
-	<artifactId>canal.client</artifactId>
-	<packaging>jar</packaging>
-	<name>canal client module for otter ${project.version}</name>
-	<dependencies>
-		<dependency>
-			<groupId>com.alibaba.otter</groupId>
-			<artifactId>canal.protocol</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<!-- 客户端要使用请单独引入kafka依赖 -->
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka-clients</artifactId>
-			<version>1.1.1</version>
-			<scope>provided</scope>
-		</dependency>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:p="http://www.springframework.org/schema/p"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" ">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.alibaba.otter</groupId>
+        <artifactId>canal</artifactId>
+        <version>1.1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.client</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client module for otter ${project.version}</name>
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- 客户端要使用请单独引入kafka依赖 -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>4.3.0</version>
+        </dependency>
+        <!-- junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
 
-		<!-- junit -->
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-		</dependency>
-	</dependencies>
-	
-	<profiles>
-		<profile>
-			<id>dev</id>
-			<activation>
-				<activeByDefault>true</activeByDefault>
-				<property>
-					<name>env</name>
-					<value>!javadoc</value>
-				</property>
-			</activation>
-		</profile>
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!javadoc</value>
+                </property>
+            </activation>
+        </profile>
 
-		<profile>
-			<id>javadoc</id>
-			<activation>
-				<property>
-					<name>env</name>
-					<value>javadoc</value>
-				</property>
-			</activation>
-			<build>
-				<plugins>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-javadoc-plugin</artifactId>
-						<version>2.9.1</version>
-						<executions>
-							<execution>
-								<id>attach-javadocs</id>
-								<phase>package</phase>
-								<goals>
-									<goal>jar</goal>
-								</goals>
-							</execution>
-						</executions>
-						<configuration>
-							<aggregate>true</aggregate>
-							<show>public</show>
-							<nohelp>true</nohelp>
-							<header>${project.artifactId}-${project.version}</header>
-							<footer>${project.artifactId}-${project.version}</footer>
-							<doctitle>${project.artifactId}-${project.version}</doctitle>
-							<links>
-								<link>https://github.com/alibaba/canal</link>
-							</links>
-							<outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
-						</configuration>
-					</plugin>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-scm-publish-plugin</artifactId>
-						<version>1.0-beta-2</version>
-						<executions>
-							<execution>
-								<id>attach-javadocs</id>
-								<phase>package</phase>
-								<goals>
-									<goal>publish-scm</goal>
-								</goals>
-							</execution>
-						</executions>
-						<configuration>
-							<checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
-							<checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
-							<content>${project.build.directory}/apidocs</content>
-							<skipDeletedFiles>true</skipDeletedFiles>
-							<pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
-							<scmBranch>gh-pages</scmBranch> 
-						</configuration>
-					</plugin>
-				</plugins>
-			</build>
-		</profile>
-	</profiles>
+        <profile>
+            <id>javadoc</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>javadoc</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-javadoc-plugin</artifactId>
+                        <version>2.9.1</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <aggregate>true</aggregate>
+                            <show>public</show>
+                            <nohelp>true</nohelp>
+                            <header>${project.artifactId}-${project.version}</header>
+                            <footer>${project.artifactId}-${project.version}</footer>
+                            <doctitle>${project.artifactId}-${project.version}</doctitle>
+                            <links>
+                                <link>https://github.com/alibaba/canal</link>
+                            </links>
+                            <outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}
+                            </outputDirectory>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-scm-publish-plugin</artifactId>
+                        <version>1.0-beta-2</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>publish-scm</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
+                            <checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}
+                            </checkinComment>
+                            <content>${project.build.directory}/apidocs</content>
+                            <skipDeletedFiles>true</skipDeletedFiles>
+                            <pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
+                            <scmBranch>gh-pages</scmBranch>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>

+ 42 - 0
client/src/main/java/com/alibaba/otter/canal/client/DeserializerUtil.java

@@ -0,0 +1,42 @@
+package com.alibaba.otter.canal.client;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeserializerUtil {
+    private static Logger logger = LoggerFactory.getLogger(DeserializerUtil.class);
+
+    public static Message deserializer(byte[] data) {
+        try {
+            if (data == null) {
+                return null;
+            } else {
+                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
+                switch (p.getType()) {
+                    case MESSAGES: {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                            throw new CanalClientException("compression is not supported in this connector");
+                        }
+
+                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
+                        Message result = new Message(messages.getBatchId());
+                        for (ByteString byteString : messages.getMessagesList()) {
+                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        }
+                        return result;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error when deserializing byte[] to message ", e);
+        }
+        return null;
+    }
+}

+ 4 - 35
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -1,17 +1,12 @@
 package com.alibaba.otter.canal.client.kafka;
 
+import com.alibaba.otter.canal.client.DeserializerUtil;
+import com.alibaba.otter.canal.protocol.Message;
 import java.util.Map;
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.google.protobuf.ByteString;
-
 /**
  * Kafka Message类的反序列化
  *
@@ -27,34 +22,8 @@ public class MessageDeserializer implements Deserializer<Message> {
     }
 
     @Override
-    public Message deserialize(String topic, byte[] data) {
-        try {
-            if (data == null) {
-                return null;
-            }
-            else {
-                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
-                switch (p.getType()) {
-                    case MESSAGES: {
-                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
-                            throw new CanalClientException("compression is not supported in this connector");
-                        }
-
-                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
-                        Message result = new Message(messages.getBatchId());
-                        for (ByteString byteString : messages.getMessagesList()) {
-                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
-                        }
-                        return result;
-                    }
-                    default:
-                        break;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Error when deserializing byte[] to message ");
-        }
-        return null;
+    public Message deserialize(String topic1, byte[] data) {
+        return DeserializerUtil.deserializer(data);
     }
 
     @Override

+ 47 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class ConsumerBatchMessage<T> {
+    private final BlockingQueue<T> data;
+    private CountDownLatch latch;
+    private boolean hasFailure = false;
+
+    public ConsumerBatchMessage(BlockingQueue<T> data) {
+        this.data = data;
+        latch = new CountDownLatch(data.size());
+    }
+
+    public boolean waitFinish(long timeout) throws InterruptedException {
+        return latch.await(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    public boolean isSuccess() {
+        return !hasFailure;
+    }
+
+    public BlockingQueue<T> getData() {
+        return data;
+    }
+
+    /**
+     * Countdown if the sub message is successful.
+     */
+    public void ack() {
+        latch.countDown();
+    }
+
+    /**
+     * Countdown and fail-fast if the sub message is failed.
+     */
+    public void fail() {
+        hasFailure = true;
+        // fail fast
+        long count = latch.getCount();
+        for (int i = 0; i < count; i++) {
+            latch.countDown();
+        }
+    }
+}

+ 197 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -0,0 +1,197 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.DeserializerUtil;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQCanalConnector implements CanalConnector {
+    private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+
+    private String nameServer;
+    private String topic;
+    private String groupName;
+    private volatile boolean connected = false;
+    private DefaultMQPushConsumer rocketMQConsumer;
+    private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
+    Map<Long, ConsumerBatchMessage<Message>> messageCache;
+    private long batchProcessTimeout = 10000;
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName) {
+        this.nameServer = nameServer;
+        this.topic = topic;
+        this.groupName = groupName;
+        messageBlockingQueue = new LinkedBlockingQueue<>();
+        messageCache = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void connect() throws CanalClientException {
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+    }
+
+    @Override
+    public void disconnect() throws CanalClientException {
+        rocketMQConsumer.shutdown();
+    }
+
+    @Override
+    public boolean checkValid() throws CanalClientException {
+        return connected;
+    }
+
+    @Override
+    public synchronized void subscribe(String filter) throws CanalClientException {
+        if (connected) {
+            return;
+        }
+        try {
+            rocketMQConsumer.subscribe(topic, "*");
+            if (rocketMQConsumer == null) {
+                rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+                if (!StringUtils.isBlank(nameServer)) {
+                    rocketMQConsumer.setNamesrvAddr(nameServer);
+                }
+            }
+            rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
+                    ConsumeOrderlyContext context) {
+                    context.setAutoCommit(true);
+                    boolean isSuccess = process(messageExts);
+                    if (isSuccess) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+            rocketMQConsumer.start();
+        } catch (MQClientException ex) {
+            connected = false;
+            logger.error("Start RocketMQ consumer error", ex);
+        }
+        connected = true;
+    }
+
+    private boolean process(List<MessageExt> messageExts) {
+        BlockingQueue<Message> messageList = new LinkedBlockingQueue<>();
+        for (MessageExt messageExt : messageExts) {
+            byte[] data = messageExt.getBody();
+            Message message = DeserializerUtil.deserializer(data);
+            try {
+                messageList.put(message);
+            } catch (InterruptedException ex) {
+                logger.error("Add message error");
+            }
+        }
+        ConsumerBatchMessage<Message> batchMessage = new ConsumerBatchMessage<>(messageList);
+        try {
+            messageBlockingQueue.put(batchMessage);
+        } catch (InterruptedException e) {
+            logger.error("Put message to queue error", e);
+            throw new RuntimeException(e);
+        }
+        boolean isCompleted;
+        try {
+            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted when waiting messages to be finished.", e);
+            throw new RuntimeException(e);
+        }
+        boolean isSuccess = batchMessage.isSuccess();
+        return isCompleted && isSuccess;
+    }
+
+    @Override
+    public void subscribe() throws CanalClientException {
+        this.subscribe(null);
+    }
+
+    @Override
+    public void unsubscribe() throws CanalClientException {
+        this.rocketMQConsumer.unsubscribe(this.topic);
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        Message message = getWithoutAck(batchSize);
+        ack(message.getId());
+        return message;
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        Message message = getWithoutAck(batchSize, timeout, unit);
+        ack(message.getId());
+        return message;
+    }
+
+    private Message getMessage(ConsumerBatchMessage consumerBatchMessage) {
+        BlockingQueue<Message> messageList = consumerBatchMessage.getData();
+        if (messageList != null & messageList.size() > 0) {
+            Message message = messageList.poll();
+            messageCache.put(message.getId(), consumerBatchMessage);
+            return message;
+        }
+        return null;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
+        if (batchMessage != null){
+            return getMessage(batchMessage);
+        }
+        return null;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            return getMessage(batchMessage);
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("failed to fetch the data after " + timeout);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        ConsumerBatchMessage batchMessage = messageCache.get(batchId);
+        batchMessage.ack();
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
+
+    }
+
+    @Override
+    public void stopRunning() throws CanalClientException {
+        this.rocketMQConsumer.shutdown();
+        connected = false;
+    }
+
+}

+ 20 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+/**
+ * RocketMQ connector provider.
+ */
+public class RocketMQCanalConnectorProvider {
+    /**
+     * Create RocketMQ connector
+     *
+     * @param nameServers name servers for RocketMQ
+     * @param topic
+     * @param groupId
+     * @return {@link RocketMQCanalConnector}
+     */
+    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic,
+        String groupId) {
+        return new RocketMQCanalConnector(nameServers, topic, groupId);
+    }
+
+}

+ 11 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java

@@ -0,0 +1,11 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * RocketMQ message listener
+ */
+public interface RocketMQCanalListener {
+    boolean onReceive(List<MessageExt> messageExts);
+}

+ 1 - 1
deployer/pom.xml

@@ -40,7 +40,7 @@
 						<exclude>**/canal.properties</exclude>
 						<exclude>**/spring/**</exclude>
 						<exclude>**/example/**</exclude>
-						<exclude>**/kafka.yml</exclude>
+						<exclude>**/mq.yml</exclude>
 					</excludes>
 				</configuration>
 			</plugin>

+ 15 - 10
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -1,24 +1,26 @@
 package com.alibaba.otter.canal.deployer;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.server.CanalMQStarter;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.Properties;
 
-import com.alibaba.otter.canal.kafka.CanalKafkaStarter;
-import com.alibaba.otter.canal.server.CanalServerStarter;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * canal独立版本启动的入口类
- * 
+ *
  * @author jianghang 2012-11-6 下午05:20:49
  * @version 1.0.0
  */
 public class CanalLauncher {
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
 
     public static void main(String[] args) throws Throwable {
         try {
@@ -54,15 +56,18 @@ public class CanalLauncher {
 
             });
 
-            CanalServerStarter canalServerStarter = null;
+            CanalMQProducer canalMQProducer = null;
             String serverMode = properties.getProperty(CanalConstants.CANAL_SERVER_MODE, "tcp");
             if (serverMode.equalsIgnoreCase("kafka")) {
-                canalServerStarter = new CanalKafkaStarter();
-            } else if (serverMode.equalsIgnoreCase("rocketMQ")) {
-                // 预留rocketMQ启动
+                canalMQProducer = new CanalKafkaProducer();
+            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
+                canalMQProducer = new CanalRocketMQProducer();
             }
-            if (canalServerStarter != null) {
-                canalServerStarter.init();
+            if (canalMQProducer != null) {
+                CanalMQStarter canalServerStarter = new CanalMQStarter(canalMQProducer);
+                if (canalServerStarter != null) {
+                    canalServerStarter.init();
+                }
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 2 - 2
deployer/src/main/resources/canal.properties

@@ -9,8 +9,8 @@ canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
-# tcp, kafka, rocketMQ
-canal.serverMode = tcp
+# tcp, kafka, RocketMQ
+canal.serverMode = rocketmq
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 5 - 4
deployer/src/main/resources/example/instance.properties

@@ -6,7 +6,7 @@
 canal.instance.gtidon=false
 
 # position info
-canal.instance.master.address=127.0.0.1:3306
+canal.instance.master.address=120.78.225.173:3307
 canal.instance.master.journal.name=
 canal.instance.master.position=
 canal.instance.master.timestamp=
@@ -18,8 +18,9 @@ canal.instance.rds.secretkey=
 canal.instance.rds.instanceId=
 
 # table meta tsdb info
-canal.instance.tsdb.enable=true
-#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
+#canal.instance.tsdb.enable=true
+#canal.instance.tsdb.url=jdbc:mysql://120.78.225.173:3306/test
+#canal.instance.mysql.slaveId = 1234
 #canal.instance.tsdb.dbUsername=canal
 #canal.instance.tsdb.dbPassword=canal
 
@@ -31,7 +32,7 @@ canal.instance.tsdb.enable=true
 
 # username/password
 canal.instance.dbUsername=canal
-canal.instance.dbPassword=canal
+canal.instance.dbPassword=P@ssw0rd
 canal.instance.connectionCharset=UTF-8
 
 # table regex

+ 1 - 1
deployer/src/main/resources/kafka.yml → deployer/src/main/resources/mq.yml

@@ -1,4 +1,4 @@
-servers: slave1:6667,slave2:6667,slave3:6667
+servers: localhost:9876 #for rocketmq: means the nameserver
 retries: 0
 batchSize: 16384
 lingerMs: 1

+ 5 - 0
server/pom.xml

@@ -42,6 +42,11 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.3.0</version>
+		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
 			<groupId>org.jboss.netty</groupId>

+ 70 - 0
server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java

@@ -0,0 +1,70 @@
+package com.alibaba.otter.canal.common;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+import java.util.List;
+import org.apache.kafka.common.errors.SerializationException;
+import org.springframework.util.CollectionUtils;
+
+public class CanalMessageSerializer {
+    public static byte[] serializer(Message data){
+        try {
+            if (data != null) {
+                if (data.getId() != -1) {
+                    if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
+                        // for performance
+                        List<ByteString> rowEntries = data.getRawEntries();
+                        // message size
+                        int messageSize = 0;
+                        messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
+
+                        int dataSize = 0;
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                        }
+                        messageSize += dataSize;
+                        messageSize += 1 * rowEntries.size();
+                        // packet size
+                        int size = 0;
+                        size += CodedOutputStream.computeEnumSize(3,
+                            CanalPacket.PacketType.MESSAGES.getNumber());
+                        size += CodedOutputStream.computeTagSize(5)
+                            + CodedOutputStream.computeRawVarint32Size(messageSize)
+                            + messageSize;
+                        // build data
+                        byte[] body = new byte[size];
+                        CodedOutputStream output = CodedOutputStream.newInstance(body);
+                        output.writeEnum(3, CanalPacket.PacketType.MESSAGES.getNumber());
+
+                        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+                        output.writeRawVarint32(messageSize);
+                        // message
+                        output.writeInt64(1, data.getId());
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            output.writeBytes(2, rowEntries.get(i));
+                        }
+                        output.checkNoSpaceLeft();
+                        return body;
+                    } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                        for (CanalEntry.Entry entry : data.getEntries()) {
+                            messageBuilder.addMessages(entry.toByteString());
+                        }
+
+                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                        packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                        packetBuilder.setBody(messageBuilder.build().toByteString());
+                        return packetBuilder.build().toByteArray();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SerializationException("Error when serializing message to byte[] ");
+        }
+        return null;
+    }
+}

+ 10 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java → server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka;
+package com.alibaba.otter.canal.common;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -11,7 +11,7 @@ import java.util.Set;
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class KafkaProperties {
+public class MQProperties {
 
     private String                 servers                = "localhost:6667";
     private int                    retries                = 0;
@@ -20,6 +20,7 @@ public class KafkaProperties {
     private long                   bufferMemory           = 33554432L;
     private boolean                filterTransactionEntry = true;
     private int                    canalBatchSize         = 5;
+    private String                 producerGroup          = "Canal-Producer";
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
@@ -167,4 +168,11 @@ public class KafkaProperties {
         this.filterTransactionEntry = filterTransactionEntry;
     }
 
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
 }

+ 9 - 30
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.kafka;
 
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -18,13 +20,14 @@ import com.alibaba.otter.canal.protocol.Message;
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class CanalKafkaProducer {
+public class CanalKafkaProducer implements CanalMQProducer {
 
-    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
 
-    public void init(KafkaProperties kafkaProperties) {
+    @Override
+    public void init(MQProperties kafkaProperties) {
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", "all");
@@ -37,6 +40,7 @@ public class CanalKafkaProducer {
         producer = new KafkaProducer<String, Message>(properties);
     }
 
+    @Override
     public void stop() {
         try {
             logger.info("## stop the kafka producer");
@@ -48,33 +52,8 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.Topic topic, Message message) throws IOException {
-        // set canal.instance.filter.transaction.entry = true
-
-        // boolean valid = false;
-        // if (message != null) {
-        // if (message.isRaw() && !message.getRawEntries().isEmpty()) {
-        // for (ByteString byteString : message.getRawEntries()) {
-        // CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // } else if (!message.getEntries().isEmpty()){
-        // for (CanalEntry.Entry entry : message.getEntries()) {
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // }
-        // }
-        // if (!valid) {
-        // return;
-        // }
+    @Override
+    public void send(MQProperties.Topic topic, Message message) throws IOException {
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);

+ 2 - 54
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.kafka;
 
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import java.util.List;
 import java.util.Map;
 
@@ -29,60 +30,7 @@ public class MessageSerializer implements Serializer<Message> {
 
     @Override
     public byte[] serialize(String topic, Message data) {
-        try {
-            if (data != null) {
-                if (data.getId() != -1) {
-                    if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
-                        // for performance
-                        List<ByteString> rowEntries = data.getRawEntries();
-                        // message size
-                        int messageSize = 0;
-                        messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
-
-                        int dataSize = 0;
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
-                        }
-                        messageSize += dataSize;
-                        messageSize += 1 * rowEntries.size();
-                        // packet size
-                        int size = 0;
-                        size += CodedOutputStream.computeEnumSize(3,
-                            PacketType.MESSAGES.getNumber());
-                        size += CodedOutputStream.computeTagSize(5)
-                                + CodedOutputStream.computeRawVarint32Size(messageSize)
-                                + messageSize;
-                        // build data
-                        byte[] body = new byte[size];
-                        CodedOutputStream output = CodedOutputStream.newInstance(body);
-                        output.writeEnum(3, PacketType.MESSAGES.getNumber());
-
-                        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
-                        output.writeRawVarint32(messageSize);
-                        // message
-                        output.writeInt64(1, data.getId());
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            output.writeBytes(2, rowEntries.get(i));
-                        }
-                        output.checkNoSpaceLeft();
-                        return body;
-                    } else if (!CollectionUtils.isEmpty(data.getEntries())) {
-                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
-                        for (CanalEntry.Entry entry : data.getEntries()) {
-                            messageBuilder.addMessages(entry.toByteString());
-                        }
-
-                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
-                        packetBuilder.setType(PacketType.MESSAGES);
-                        packetBuilder.setBody(messageBuilder.build().toByteString());
-                        return packetBuilder.build().toByteArray();
-                    }
-                }
-            }
-        } catch (Exception e) {
-            throw new SerializationException("Error when serializing message to byte[] ");
-        }
-        return null;
+        return CanalMessageSerializer.serializer(data);
     }
 
     @Override

+ 58 - 0
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -0,0 +1,58 @@
+package com.alibaba.otter.canal.rocketmq;
+
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CanalRocketMQProducer implements CanalMQProducer {
+    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
+
+    private DefaultMQProducer defaultMQProducer;
+
+    @Override
+    public void init(MQProperties rocketMQProperties) {
+        defaultMQProducer = new DefaultMQProducer();
+        defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
+        defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
+        defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
+        logger.info("##Start RocketMQ producer##");
+        try {
+            defaultMQProducer.start();
+        } catch (MQClientException ex) {
+            throw new CanalServerException("Start RocketMQ producer error", ex);
+        }
+    }
+
+    @Override
+    public void send(MQProperties.Topic topic, com.alibaba.otter.canal.protocol.Message data) {
+        try {
+            Message message = new Message(topic.getTopic(), CanalMessageSerializer.serializer(data));
+            this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                @Override
+                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                 //   int index = (arg.hashCode() % mqs.size());
+                    return mqs.get(1);
+                }
+            }, null);
+        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+            logger.error("Send message error!", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        logger.info("## Stop RocketMQ producer##");
+        this.defaultMQProducer.shutdown();
+    }
+}

+ 37 - 43
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java → server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,69 +1,63 @@
-package com.alibaba.otter.canal.kafka;
+package com.alibaba.otter.canal.server;
 
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
-import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
-import com.alibaba.otter.canal.kafka.KafkaProperties.Topic;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.server.CanalServerStarter;
-import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
-
-/**
- * kafka 启动类
- *
- * @author machengyuan 2018-6-11 下午05:30:49
- * @version 1.0.0
- */
-public class CanalKafkaStarter implements CanalServerStarter {
-
-    private static final Logger logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
+public class CanalMQStarter {
+    private static final Logger logger               = LoggerFactory.getLogger(CanalMQStarter.class);
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
 
     private volatile boolean    running              = false;
 
-    private ExecutorService     executorService;
+    private ExecutorService executorService;
 
-    private CanalKafkaProducer canalKafkaProducer;
+    private CanalMQProducer canalMQProducer;
 
-    private KafkaProperties kafkaProperties;
+    private MQProperties properties;
+
+    public CanalMQStarter(CanalMQProducer canalMQProducer){
+        this.canalMQProducer = canalMQProducer;
+    }
 
     public void init() {
         try {
-            logger.info("## load kafka configurations");
-            String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
+            logger.info("## load MQ configurations");
+            String conf = System.getProperty("mq.conf", "classpath:mq.yml");
 
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf),
-                    KafkaProperties.class);
+                properties = new Yaml().loadAs(CanalMQStarter.class.getClassLoader().getResourceAsStream(conf),
+                    MQProperties.class);
             } else {
-                kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
+                properties = new Yaml().loadAs(new FileInputStream(conf), MQProperties.class);
             }
 
             // 初始化 kafka producer
-            canalKafkaProducer = new CanalKafkaProducer();
-            canalKafkaProducer.init(kafkaProperties);
+//            canalMQProducer = new CanalKafkaProducer();
+            canalMQProducer.init(properties);
             // set filterTransactionEntry
-            if (kafkaProperties.isFilterTransactionEntry()) {
+            if (properties.isFilterTransactionEntry()) {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
             // 对应每个instance启动一个worker线程
-            List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
+            List<MQProperties.CanalDestination> destinations = properties.getCanalDestinations();
 
             executorService = Executors.newFixedThreadPool(destinations.size());
 
-            logger.info("## start the kafka workers.");
-            for (final CanalDestination destination : destinations) {
+            logger.info("## start the MQ workers.");
+            for (final MQProperties.CanalDestination destination : destinations) {
                 executorService.execute(new Runnable() {
 
                     @Override
@@ -73,31 +67,31 @@ public class CanalKafkaStarter implements CanalServerStarter {
                 });
             }
             running = true;
-            logger.info("## the kafka workers is running now ......");
+            logger.info("## the MQ workers is running now ......");
             Runtime.getRuntime().addShutdownHook(new Thread() {
 
                 public void run() {
                     try {
-                        logger.info("## stop the kafka workers");
+                        logger.info("## stop the MQ workers");
                         running = false;
                         executorService.shutdown();
-                        canalKafkaProducer.stop();
+                        canalMQProducer.stop();
                     } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping kafka workers:", e);
+                        logger.warn("##something goes wrong when stopping MQ workers:", e);
                     } finally {
-                        logger.info("## canal kafka is down.");
+                        logger.info("## canal MQ is down.");
                     }
                 }
 
             });
 
         } catch (Throwable e) {
-            logger.error("## Something goes wrong when starting up the canal kafka workers:", e);
+            logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
             System.exit(0);
         }
     }
 
-    private void worker(CanalDestination destination) {
+    private void worker(MQProperties.CanalDestination destination) {
         while (!running)
             ;
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
@@ -117,19 +111,19 @@ public class CanalKafkaStarter implements CanalServerStarter {
                 logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
                 while (running) {
-                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
+                    Message message = server.getWithoutAck(clientIdentity, properties.getCanalBatchSize()); // 获取指定数量的数据
                     long batchId = message.getId();
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
                             if (!StringUtils.isEmpty(destination.getTopic())) {
-                                Topic topic = new Topic();
+                                MQProperties.Topic topic = new MQProperties.Topic();
                                 topic.setTopic(destination.getTopic());
                                 topic.setPartition(destination.getPartition());
                                 destination.getTopics().add(topic);
                             }
-                            for (Topic topic : destination.getTopics()) {
-                                canalKafkaProducer.send(topic, message); // 发送message到所有topic
+                            for (MQProperties.Topic topic : destination.getTopics()) {
+                                canalMQProducer.send(topic, message); // 发送message到所有topic
                             }
                         }
 

+ 28 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.spi;
+
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.Message;
+import java.io.IOException;
+
+public interface CanalMQProducer {
+    /**
+     * Init producer.
+     *
+     * @param mqProperties MQ config
+     */
+    void init(MQProperties mqProperties);
+
+    /**
+     * Send canal message to related topic
+     *
+     * @param topic MQ topic
+     * @param message canal message
+     * @throws IOException
+     */
+    void send(MQProperties.Topic topic, Message message) throws IOException;
+
+    /**
+     * Stop MQ producer service
+     */
+    void stop();
+}