Sfoglia il codice sorgente

Merge pull request #973 from duhengforever/develop_rocketmq

[ISSUE 800]提供RocketMQ原生接入canal
agapple 6 anni fa
parent
commit
c3327e567e
28 ha cambiato i file con 1260 aggiunte e 693 eliminazioni
  1. 20 29
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 41 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  3. 2 2
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  4. 46 50
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  5. 123 0
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  6. 6 4
      client-launcher/src/main/resources/canal-client.yml
  7. 5 1
      client/pom.xml
  8. 42 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalMessageDeserializer.java
  9. 4 36
      client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java
  10. 47 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java
  11. 199 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  12. 20 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java
  13. 11 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java
  14. 8 0
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java
  15. 135 0
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java
  16. 1 1
      deployer/pom.xml
  17. 15 12
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  18. 2 2
      deployer/src/main/resources/canal.properties
  19. 4 3
      deployer/src/main/resources/mq.yml
  20. 264 405
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java
  21. 5 0
      server/pom.xml
  22. 71 0
      server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java
  23. 25 16
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  24. 18 21
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  25. 3 65
      server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java
  26. 64 0
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  27. 43 46
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
  28. 36 0
      server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

+ 20 - 29
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -20,9 +20,10 @@ public class CanalClientConfig {
 
     private String              bootstrapServers;
 
+    private List<MQTopic>       mqTopics;
+
     private Boolean             flatMessage = true;
 
-    private List<KafkaTopic>    kafkaTopics;
 
     private List<CanalInstance> canalInstances;
 
@@ -58,6 +59,10 @@ public class CanalClientConfig {
         this.bootstrapServers = bootstrapServers;
     }
 
+    public List<MQTopic> getMqTopics() {
+        return mqTopics;
+    }
+
     public Boolean getFlatMessage() {
         return flatMessage;
     }
@@ -66,12 +71,8 @@ public class CanalClientConfig {
         this.flatMessage = flatMessage;
     }
 
-    public List<KafkaTopic> getKafkaTopics() {
-        return kafkaTopics;
-    }
-
-    public void setKafkaTopics(List<KafkaTopic> kafkaTopics) {
-        this.kafkaTopics = kafkaTopics;
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        this.mqTopics = mqTopics;
     }
 
     public List<CanalInstance> getCanalInstances() {
@@ -120,12 +121,21 @@ public class CanalClientConfig {
         }
     }
 
-    public static class KafkaTopic {
+    public static class MQTopic {
+        private String      mqMode;
 
         private String      topic;
 
         private List<Group> groups = new ArrayList<>();
 
+        public String getMqMode() {
+            return mqMode;
+        }
+
+        public void setMqMode(String mqMode) {
+            this.mqMode = mqMode;
+        }
+
         public String getTopic() {
             return topic;
         }
@@ -167,25 +177,6 @@ public class CanalClientConfig {
             this.outAdapters = outAdapters;
         }
 
-        // public List<Adaptor> getAdapters() {
-        // return adapters;
-        // }
-        //
-        // public void setAdapters(List<Adaptor> adapters) {
-        // this.adapters = adapters;
-        // }
-    }
-
-    // public static class Adaptor {
-    // private List<CanalOuterAdapterConfiguration> outAdapters;
-    //
-    // public List<CanalOuterAdapterConfiguration> getOutAdapters() {
-    // return outAdapters;
-    // }
-    //
-    // public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters)
-    // {
-    // this.outAdapters = outAdapters;
-    // }
-    // }
+    }
+
 }

+ 41 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -129,6 +129,47 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
+    protected void writeOut(Message message,String topic){
+        if (logger.isDebugEnabled()) {
+            logger.debug("topic: {} batchId: {} batchSize: {} ",
+                topic,
+                message.getId(),
+                message.getEntries().size());
+        }
+        long begin = System.currentTimeMillis();
+        writeOut(message);
+        long now = System.currentTimeMillis();
+        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+            logger.error("topic: {} batchId {} elapsed time: {} ms",
+                topic,
+                message.getId(),
+                now - begin);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("topic: {} batchId {} elapsed time: {} ms",
+                topic,
+                message.getId(),
+                now - begin);
+        }
+    }
+
+    protected void stopOutAdapters(){
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        groupInnerExecutorService.shutdown();
+        logger.info("topic connectors' worker thread dead!");
+        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (CanalOuterAdapter adapter : outerAdapters) {
+                adapter.destroy();
+            }
+        }
+        logger.info("topic all connectors destroyed!");
+    }
     public abstract void start();
 
     public abstract void stop();

+ 2 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -30,7 +30,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-                                   List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
+        List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
@@ -171,4 +171,4 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         connector.disconnect();
         logger.info("=============> Disconnect topic: {} <=============", this.topic);
     }
-}
+}

+ 46 - 50
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -1,5 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -8,44 +12,38 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-
 /**
- * 外部适配器的加载器
+ * MQ外部适配器的加载器
  *
- * @author machengyuan 2018-8-19 下午11:45:49
  * @version 1.0.0
  */
 public class CanalAdapterLoader {
 
-    private static final Logger                  logger            = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
 
-    private CanalClientConfig                    canalClientConfig;
+    private CanalClientConfig canalClientConfig;
 
-    private Map<String, CanalAdapterWorker>      canalWorkers      = new HashMap<>();
+    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
 
-    private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
+    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter>   loader;
+    private ExtensionLoader<CanalOuterAdapter> loader;
 
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
         this.canalClientConfig = canalClientConfig;
     }
 
     /**
-     * 初始化canal-client、 canal-client-kafka的适配器
+     * 初始化canal-client、 canal-client-rocketmq的适配器
      */
     public void init() {
-        // canal instances 和 kafka topics 配置不能同时为空
-        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getKafkaTopics() == null) {
-            throw new RuntimeException("Blank config property: canalInstances or canalKafkaTopics");
+        // canal instances 和 mq topics 配置不能同时为空
+        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
+            throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
 
         loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class,
@@ -59,13 +57,6 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
-        boolean flatMessage = this.canalClientConfig.getFlatMessage();
-
-        // if (zkHosts == null && sa == null) {
-        // throw new RuntimeException("Blank config property: canalServerHost or
-        // zookeeperHosts");
-        // }
-
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
@@ -90,32 +81,38 @@ public class CanalAdapterLoader {
             }
         }
 
-        // 初始化canal-client-kafka的适配器
-        if (canalClientConfig.getKafkaTopics() != null) {
-            for (CanalClientConfig.KafkaTopic kafkaTopic : canalClientConfig.getKafkaTopics()) {
-                for (CanalClientConfig.Group group : kafkaTopic.getGroups()) {
+        // 初始化canal-client-mq的适配器
+        if (canalClientConfig.getMqTopics() != null) {
+            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
+                for (CanalClientConfig.Group group : topic.getGroups()) {
                     List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                     List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
 
                     for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
-                        // for (CanalOuterAdapterConfiguration config : adaptor.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
-                        // }
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
+                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups);
+                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                        rocketMQWorker.start();
+                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups,canalClientConfig.getFlatMessage());
+                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                        canalKafkaWorker.start();
+                    }
+                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                        topic.getTopic() + "-" + group.getGroupId());
 
-                    // String zkServers = canalClientConfig.getZookeeperHosts();
-                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
-                        canalClientConfig.getBootstrapServers(),
-                        kafkaTopic.getTopic(),
-                        group.getGroupId(),
-                        canalOuterAdapterGroups,
-                        flatMessage);
-                    canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
-                    canalKafkaWorker.start();
-                    logger.info("Start adapter for canal-client kafka topic: {} succeed",
-                        kafkaTopic.getTopic() + "-" + group.getGroupId());
                 }
             }
         }
@@ -154,19 +151,18 @@ public class CanalAdapterLoader {
             }
             stopExecutorService.shutdown();
         }
-        if (canalKafkaWorkers.size() > 0) {
-            ExecutorService stopKafkaExecutorService = Executors.newFixedThreadPool(canalKafkaWorkers.size());
-            for (CanalAdapterKafkaWorker v : canalKafkaWorkers.values()) {
-                final CanalAdapterKafkaWorker cakw = v;
-                stopKafkaExecutorService.submit(new Runnable() {
-
+        if (canalMQWorker.size() > 0) {
+            ExecutorService stopMQWokerService = Executors.newFixedThreadPool(canalMQWorker.size());
+            for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
+                final AbstractCanalAdapterWorker worker = tmp;
+                stopMQWokerService.submit(new Runnable() {
                     @Override
                     public void run() {
-                        cakw.stop();
+                        worker.stop();
                     }
                 });
             }
-            stopKafkaExecutorService.shutdown();
+            stopMQWokerService.shutdown();
         }
         logger.info("All canal adapters destroyed");
     }

+ 123 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -0,0 +1,123 @@
+package com.alibaba.otter.canal.client.adapter.loader;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.protocol.Message;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
+
+    private RocketMQCanalConnector connector;
+
+    private String topic;
+
+    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
+        List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+            connector.stopRunning();
+            running = false;
+            logger.info("Stop topic {} out adapters begin", this.topic);
+            stopOutAdapters();
+            logger.info("Stop topic {} out adapters end", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        final Message message = connector.getWithoutAck(1);
+                        if (message != null) {
+                            executor.submit(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    try {
+                                        writeOut(message, topic);
+                                    } catch (Exception e) {
+                                        logger.error(e.getMessage(), e);
+                                    }
+                                    connector.ack(message.getId());
+                                }
+                            });
+                        } else {
+                            logger.debug("Message is null");
+                        }
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 6 - 4
client-launcher/src/main/resources/canal-client.yml

@@ -1,6 +1,6 @@
 #canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
-bootstrapServers: slave1:6667,slave2:6667
+bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
 flatMessage: true
 
 #canalInstances:
@@ -11,10 +11,12 @@ flatMessage: true
 #    - name: hbase
 #      hosts: slave1:2181
 #      properties: {znodeParent: "/hbase-unsecure"}
-kafkaTopics:
-- topic: example
+
+mqTopics:
+- mqMode: rocketmq
+  topic: example
   groups:
-  - groupId: egroup
+  - groupId: example
     outAdapters:
     - name: logger
 #    - name: hbase

+ 5 - 1
client/pom.xml

@@ -101,7 +101,11 @@
 			<version>${spring_version}</version>
 			<scope>test</scope>
 		</dependency>
-
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.3.0</version>
+		</dependency>
 		<!-- 客户端要使用请单独引入kafka依赖 -->
 		<dependency>
 			<groupId>org.apache.kafka</groupId>

+ 42 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalMessageDeserializer.java

@@ -0,0 +1,42 @@
+package com.alibaba.otter.canal.client;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CanalMessageDeserializer {
+    private static Logger logger = LoggerFactory.getLogger(CanalMessageDeserializer.class);
+
+    public static Message deserializer(byte[] data) {
+        try {
+            if (data == null) {
+                return null;
+            } else {
+                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
+                switch (p.getType()) {
+                    case MESSAGES: {
+//                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+//                            throw new CanalClientException("compression is not supported in this connector");
+//                        }
+
+                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
+                        Message result = new Message(messages.getBatchId());
+                        for (ByteString byteString : messages.getMessagesList()) {
+                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        }
+                        return result;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error when deserializing byte[] to message ", e);
+        }
+        return null;
+    }
+}

+ 4 - 36
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -1,17 +1,12 @@
 package com.alibaba.otter.canal.client.kafka;
 
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.protocol.Message;
 import java.util.Map;
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.google.protobuf.ByteString;
-
 /**
  * Kafka Message类的反序列化
  *
@@ -27,35 +22,8 @@ public class MessageDeserializer implements Deserializer<Message> {
     }
 
     @Override
-    public Message deserialize(String topic, byte[] data) {
-        try {
-            if (data == null) {
-                return null;
-            }
-            else {
-                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
-                switch (p.getType()) {
-                    case MESSAGES: {
-                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)
-                                && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
-                            throw new CanalClientException("compression is not supported in this connector");
-                        }
-
-                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
-                        Message result = new Message(messages.getBatchId());
-                        for (ByteString byteString : messages.getMessagesList()) {
-                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
-                        }
-                        return result;
-                    }
-                    default:
-                        break;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Error when deserializing byte[] to message ");
-        }
-        return null;
+    public Message deserialize(String topic1, byte[] data) {
+        return CanalMessageDeserializer.deserializer(data);
     }
 
     @Override

+ 47 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class ConsumerBatchMessage<T> {
+    private final BlockingQueue<T> data;
+    private CountDownLatch latch;
+    private boolean hasFailure = false;
+
+    public ConsumerBatchMessage(BlockingQueue<T> data) {
+        this.data = data;
+        latch = new CountDownLatch(data.size());
+    }
+
+    public boolean waitFinish(long timeout) throws InterruptedException {
+        return latch.await(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    public boolean isSuccess() {
+        return !hasFailure;
+    }
+
+    public BlockingQueue<T> getData() {
+        return data;
+    }
+
+    /**
+     * Countdown if the sub message is successful.
+     */
+    public void ack() {
+        latch.countDown();
+    }
+
+    /**
+     * Countdown and fail-fast if the sub message is failed.
+     */
+    public void fail() {
+        hasFailure = true;
+        // fail fast
+        long count = latch.getCount();
+        for (int i = 0; i < count; i++) {
+            latch.countDown();
+        }
+    }
+}

+ 199 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -0,0 +1,199 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQCanalConnector implements CanalConnector {
+    private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+
+    private String nameServer;
+    private String topic;
+    private String groupName;
+    private volatile boolean connected = false;
+    private DefaultMQPushConsumer rocketMQConsumer;
+    private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
+    Map<Long, ConsumerBatchMessage<Message>> messageCache;
+    private long batchProcessTimeout = 10000;
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName) {
+        this.nameServer = nameServer;
+        this.topic = topic;
+        this.groupName = groupName;
+        messageBlockingQueue = new LinkedBlockingQueue<>();
+        messageCache = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void connect() throws CanalClientException {
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+        if (!StringUtils.isBlank(nameServer)) {
+            rocketMQConsumer.setNamesrvAddr(nameServer);
+        }
+    }
+
+    @Override
+    public void disconnect() throws CanalClientException {
+        rocketMQConsumer.shutdown();
+    }
+
+    @Override
+    public boolean checkValid() throws CanalClientException {
+        return connected;
+    }
+
+    @Override
+    public synchronized void subscribe(String filter) throws CanalClientException {
+        if (connected) {
+            return;
+        }
+        try {
+            if (rocketMQConsumer == null) {
+                this.connect();
+            }
+            rocketMQConsumer.subscribe(topic, "*");
+            rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
+                    ConsumeOrderlyContext context) {
+                    context.setAutoCommit(true);
+                    boolean isSuccess = process(messageExts);
+                    if (isSuccess) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+            rocketMQConsumer.start();
+        } catch (MQClientException ex) {
+            connected = false;
+            logger.error("Start RocketMQ consumer error", ex);
+        }
+        connected = true;
+    }
+
+    private boolean process(List<MessageExt> messageExts) {
+        BlockingQueue<Message> messageList = new LinkedBlockingQueue<>();
+        for (MessageExt messageExt : messageExts) {
+            byte[] data = messageExt.getBody();
+            Message message = CanalMessageDeserializer.deserializer(data);
+            try {
+                messageList.put(message);
+            } catch (InterruptedException ex) {
+                logger.error("Add message error");
+            }
+        }
+        ConsumerBatchMessage<Message> batchMessage = new ConsumerBatchMessage<>(messageList);
+        try {
+            messageBlockingQueue.put(batchMessage);
+        } catch (InterruptedException e) {
+            logger.error("Put message to queue error", e);
+            throw new RuntimeException(e);
+        }
+        boolean isCompleted;
+        try {
+            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted when waiting messages to be finished.", e);
+            throw new RuntimeException(e);
+        }
+        boolean isSuccess = batchMessage.isSuccess();
+        return isCompleted && isSuccess;
+    }
+
+    @Override
+    public void subscribe() throws CanalClientException {
+        this.subscribe(null);
+    }
+
+    @Override
+    public void unsubscribe() throws CanalClientException {
+        this.rocketMQConsumer.unsubscribe(this.topic);
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        Message message = getWithoutAck(batchSize);
+        ack(message.getId());
+        return message;
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        Message message = getWithoutAck(batchSize, timeout, unit);
+        ack(message.getId());
+        return message;
+    }
+
+    private Message getMessage(ConsumerBatchMessage consumerBatchMessage) {
+        BlockingQueue<Message> messageList = consumerBatchMessage.getData();
+        if (messageList != null & messageList.size() > 0) {
+            Message message = messageList.poll();
+            messageCache.put(message.getId(), consumerBatchMessage);
+            return message;
+        }
+        return null;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
+        if (batchMessage != null) {
+            return getMessage(batchMessage);
+        }
+        return null;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            return getMessage(batchMessage);
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        ConsumerBatchMessage batchMessage = messageCache.get(batchId);
+        if (batchMessage != null) {
+            batchMessage.ack();
+        }
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
+
+    }
+
+    @Override
+    public void stopRunning() throws CanalClientException {
+        this.rocketMQConsumer.shutdown();
+        connected = false;
+    }
+
+}

+ 20 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+/**
+ * RocketMQ connector provider.
+ */
+public class RocketMQCanalConnectorProvider {
+    /**
+     * Create RocketMQ connector
+     *
+     * @param nameServers name servers for RocketMQ
+     * @param topic
+     * @param groupId
+     * @return {@link RocketMQCanalConnector}
+     */
+    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic,
+        String groupId) {
+        return new RocketMQCanalConnector(nameServers, topic, groupId);
+    }
+
+}

+ 11 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java

@@ -0,0 +1,11 @@
+package com.alibaba.otter.canal.client.rocketmq;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * RocketMQ message listener
+ */
+public interface RocketMQCanalListener {
+    boolean onReceive(List<MessageExt> messageExts);
+}

+ 8 - 0
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.running.rocketmq;
+
+public class AbstractRocektMQTest {
+    public static String topic = "example";
+    public static String groupId = "group";
+    public static String nameServers = "localhost:9876";
+
+}

+ 135 - 0
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -0,0 +1,135 @@
+package com.alibaba.otter.canal.client.running.rocketmq;
+
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+/**
+ * Kafka client example
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class CanalRocketMQClientExample extends AbstractRocektMQTest {
+
+    protected final static Logger logger = LoggerFactory.getLogger(CanalRocketMQClientExample.class);
+
+    private RocketMQCanalConnector connector;
+
+    private static volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    public CanalRocketMQClientExample(String nameServers, String topic,
+        String groupId) {
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers,
+            topic,
+            groupId);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalRocketMQClientExample rocketMQClientExample = new CanalRocketMQClientExample(nameServers,
+                topic, groupId);
+            logger.info("## Start the rocketmq consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            rocketMQClientExample.start();
+            logger.info("## The canal rocketmq consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## Stop the rocketmq consumer");
+                        rocketMQClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("## Something goes wrong when stopping rocketmq consumer:", e);
+                    } finally {
+                        logger.info("## Rocketmq consumer is down.");
+                    }
+                }
+
+            });
+            while (running)
+                ;
+        } catch (Throwable e) {
+            logger.error("## Something going wrong when starting up the rocketmq consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        connector.stopRunning();
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe();
+                while (running) {
+                    Message message = connector.getWithoutAck(1); // 获取message
+                    try {
+                        if (message == null) {
+                            continue;
+                        }
+                        long batchId = message.getId();
+                        int size = message.getEntries().size();
+                        if (batchId == -1 || size == 0) {
+                        } else {
+                            logger.info(message.toString());
+                        }
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                    connector.ack(message.getId()); // 提交确认
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+    }
+}

+ 1 - 1
deployer/pom.xml

@@ -40,7 +40,7 @@
 						<exclude>**/canal.properties</exclude>
 						<exclude>**/spring/**</exclude>
 						<exclude>**/example/**</exclude>
-						<exclude>**/kafka.yml</exclude>
+						<exclude>**/mq.yml</exclude>
 					</excludes>
 				</configuration>
 			</plugin>

+ 15 - 12
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -1,25 +1,25 @@
 package com.alibaba.otter.canal.deployer;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.server.CanalMQStarter;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.Properties;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.kafka.CanalKafkaStarter;
-import com.alibaba.otter.canal.server.CanalServerStarter;
-
 /**
  * canal独立版本启动的入口类
- * 
+ *
  * @author jianghang 2012-11-6 下午05:20:49
  * @version 1.0.0
  */
 public class CanalLauncher {
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
 
     public static void main(String[] args) throws Throwable {
         try {
@@ -55,16 +55,19 @@ public class CanalLauncher {
 
             });
 
-            CanalServerStarter canalServerStarter = null;
+            CanalMQProducer canalMQProducer = null;
             String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
             if (serverMode.equalsIgnoreCase("kafka")) {
-                canalServerStarter = new CanalKafkaStarter();
-            } else if (serverMode.equalsIgnoreCase("rocketMQ")) {
-                // 预留rocketMQ启动
+                canalMQProducer = new CanalKafkaProducer();
+            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
+                canalMQProducer = new CanalRocketMQProducer();
             }
+            if (canalMQProducer != null) {
+                CanalMQStarter canalServerStarter = new CanalMQStarter(canalMQProducer);
+                if (canalServerStarter != null) {
+                    canalServerStarter.init();
+                }
 
-            if (canalServerStarter != null) {
-                canalServerStarter.init();
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 2 - 2
deployer/src/main/resources/canal.properties

@@ -9,8 +9,8 @@ canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
-# tcp, kafka, rocketMQ
-canal.serverMode = tcp
+# tcp, kafka, RocketMQ
+canal.serverMode = rocketmq
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 4 - 3
deployer/src/main/resources/kafka.yml → deployer/src/main/resources/mq.yml

@@ -1,4 +1,4 @@
-servers: slave1:6667,slave2:6667,slave3:6667
+servers: localhost:9876 #for rocketmq: means the nameserver
 retries: 0
 batchSize: 16384
 lingerMs: 1
@@ -11,8 +11,9 @@ canalGetTimeout: 100
 flatMessage: true
 
 canalDestinations:
-- canalDestination: example
-  topic: exp3
+  - canalDestination: example
+    topic: example
+    partition: 1
 #  #对应topic分区数量
 #  partitionsNum: 3
 #  partitionHash:

File diff suppressed because it is too large
+ 264 - 405
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


+ 5 - 0
server/pom.xml

@@ -42,6 +42,11 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.3.0</version>
+		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
 			<groupId>org.jboss.netty</groupId>

+ 71 - 0
server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java

@@ -0,0 +1,71 @@
+package com.alibaba.otter.canal.common;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+import java.util.List;
+import org.apache.kafka.common.errors.SerializationException;
+import org.springframework.util.CollectionUtils;
+
+public class CanalMessageSerializer {
+    public static byte[] serializer(Message data){
+        try {
+            if (data != null) {
+                if (data.getId() != -1) {
+                    if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
+                        // for performance
+                        List<ByteString> rowEntries = data.getRawEntries();
+                        // message size
+                        int messageSize = 0;
+                        messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
+
+                        int dataSize = 0;
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                        }
+                        messageSize += dataSize;
+                        messageSize += 1 * rowEntries.size();
+                        // packet size
+                        int size = 0;
+                        size += CodedOutputStream.computeEnumSize(3,
+                            PacketType.MESSAGES.getNumber());
+                        size += CodedOutputStream.computeTagSize(5)
+                            + CodedOutputStream.computeRawVarint32Size(messageSize)
+                            + messageSize;
+                        // build data
+                        byte[] body = new byte[size];
+                        CodedOutputStream output = CodedOutputStream.newInstance(body);
+                        output.writeEnum(3, PacketType.MESSAGES.getNumber());
+
+                        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+                        output.writeRawVarint32(messageSize);
+                        // message
+                        output.writeInt64(1, data.getId());
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            output.writeBytes(2, rowEntries.get(i));
+                        }
+                        output.checkNoSpaceLeft();
+                        return body;
+                    } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                        for (CanalEntry.Entry entry : data.getEntries()) {
+                            messageBuilder.addMessages(entry.toByteString());
+                        }
+
+                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                        packetBuilder.setType(PacketType.MESSAGES);
+                        packetBuilder.setBody(messageBuilder.build().toByteString());
+                        return packetBuilder.build().toByteArray();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SerializationException("Error when serializing message to byte[] ");
+        }
+        return null;
+    }
+}

+ 25 - 16
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java → server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka;
+package com.alibaba.otter.canal.common;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -10,26 +10,27 @@ import java.util.Map;
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class KafkaProperties {
+public class MQProperties {
 
-    private String                 servers                = "localhost:6667";
-    private int                    retries                = 0;
-    private int                    batchSize              = 16384;
-    private int                    lingerMs               = 1;
-    private long                   bufferMemory           = 33554432L;
-    private boolean                filterTransactionEntry = true;
-    private int                    canalBatchSize         = 50;
-    private Long                   canalGetTimeout;
-    private boolean                flatMessage            = true;
+    private String servers = "localhost:6667";
+    private int retries = 0;
+    private int batchSize = 16384;
+    private int lingerMs = 1;
+    private long bufferMemory = 33554432L;
+    private boolean filterTransactionEntry = true;
+    private String producerGroup = "Canal-Producer";
+    private int canalBatchSize = 50;
+    private Long canalGetTimeout;
+    private boolean flatMessage = true;
 
-    private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
+    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
 
     public static class CanalDestination {
 
-        private String              canalDestination;
-        private String              topic;
-        private Integer             partition;
-        private Integer             partitionsNum;
+        private String canalDestination;
+        private String topic;
+        private Integer partition;
+        private Integer partitionsNum;
         private Map<String, String> partitionHash;
 
         public String getCanalDestination() {
@@ -73,6 +74,7 @@ public class KafkaProperties {
         }
     }
 
+
     public String getServers() {
         return servers;
     }
@@ -153,4 +155,11 @@ public class KafkaProperties {
         this.filterTransactionEntry = filterTransactionEntry;
     }
 
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
 }

+ 18 - 21
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,8 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,27 +14,24 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-
 /**
  * kafka producer 主操作类
  *
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class CanalKafkaProducer {
+public class CanalKafkaProducer implements CanalMQProducer {
 
-    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
 
-    private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
+    private Producer<String, String> producer2;                                                 // 用于扁平message的数据投递
 
-    private KafkaProperties           kafkaProperties;
+    private MQProperties kafkaProperties;
 
-    public void init(KafkaProperties kafkaProperties) {
+    @Override
+    public void init(MQProperties kafkaProperties) {
         this.kafkaProperties = kafkaProperties;
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
@@ -51,6 +52,7 @@ public class CanalKafkaProducer {
         // producer.initTransactions();
     }
 
+    @Override
     public void stop() {
         try {
             logger.info("## stop the kafka producer");
@@ -67,7 +69,8 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.CanalDestination canalDestination, Message message, Callback callback) {
+    @Override
+    public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
         try {
             // producer.beginTransaction();
             if (!kafkaProperties.getFlatMessage()) {
@@ -102,10 +105,10 @@ public class CanalKafkaProducer {
                                     FlatMessage flatMessagePart = partitionFlatMessage[i];
                                     if (flatMessagePart != null) {
                                         ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                                canalDestination.getTopic(),
-                                                i,
-                                                null,
-                                                JSON.toJSONString(flatMessagePart));
+                                            canalDestination.getTopic(),
+                                            i,
+                                            null,
+                                            JSON.toJSONString(flatMessagePart));
                                         producer2.send(record);
                                     }
                                 }
@@ -135,10 +138,4 @@ public class CanalKafkaProducer {
         }
     }
 
-    public interface Callback {
-
-        void commit();
-
-        void rollback();
-    }
 }

+ 3 - 65
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -1,19 +1,9 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.List;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.protocol.Message;
 import java.util.Map;
-
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
-import org.springframework.util.CollectionUtils;
-
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
-import com.alibaba.otter.canal.protocol.Message;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.WireFormat;
 
 /**
  * Kafka Message类的序列化
@@ -28,60 +18,8 @@ public class MessageSerializer implements Serializer<Message> {
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public byte[] serialize(String topic, Message data) {
-        try {
-            if (data != null) {
-                if (data.getId() != -1) {
-                    if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
-                        // for performance
-                        List<ByteString> rowEntries = data.getRawEntries();
-                        // message size
-                        int messageSize = 0;
-                        messageSize += CodedOutputStream.computeInt64Size(1, data.getId());
-
-                        int dataSize = 0;
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
-                        }
-                        messageSize += dataSize;
-                        messageSize += 1 * rowEntries.size();
-                        // packet size
-                        int size = 0;
-                        size += CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
-                        size += CodedOutputStream.computeTagSize(5)
-                                + CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
-                        // build data
-                        byte[] body = new byte[size];
-                        CodedOutputStream output = CodedOutputStream.newInstance(body);
-                        output.writeEnum(3, PacketType.MESSAGES.getNumber());
-
-                        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
-                        output.writeRawVarint32(messageSize);
-                        // message
-                        output.writeInt64(1, data.getId());
-                        for (int i = 0; i < rowEntries.size(); i++) {
-                            output.writeBytes(2, rowEntries.get(i));
-                        }
-                        output.checkNoSpaceLeft();
-                        return body;
-                    } else if (!CollectionUtils.isEmpty(data.getEntries())) {
-                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
-                        for (CanalEntry.Entry entry : data.getEntries()) {
-                            messageBuilder.addMessages(entry.toByteString());
-                        }
-
-                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
-                        packetBuilder.setType(PacketType.MESSAGES);
-                        packetBuilder.setBody(messageBuilder.build().toByteString());
-                        return packetBuilder.build().toByteArray();
-                    }
-                }
-            }
-        } catch (Exception e) {
-            throw new SerializationException("Error when serializing message to byte[] ");
-        }
-        return null;
+        return CanalMessageSerializer.serializer(data);
     }
 
     @Override

+ 64 - 0
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -0,0 +1,64 @@
+package com.alibaba.otter.canal.rocketmq;
+
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CanalRocketMQProducer implements CanalMQProducer {
+    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
+
+    private DefaultMQProducer defaultMQProducer;
+
+    @Override
+    public void init(MQProperties rocketMQProperties) {
+        defaultMQProducer = new DefaultMQProducer();
+        defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
+        defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
+        defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
+        logger.info("##Start RocketMQ producer##");
+        try {
+            defaultMQProducer.start();
+        } catch (MQClientException ex) {
+            throw new CanalServerException("Start RocketMQ producer error", ex);
+        }
+    }
+
+    @Override
+    public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
+        Callback callback) {
+        try {
+            Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
+            this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                @Override
+                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                    int partition = 0;
+                    if (destination.getPartition() != null) {
+                        partition = destination.getPartition();
+                    }
+                    return mqs.get(partition);
+                }
+            }, null);
+            callback.commit();
+        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+            logger.error("Send message error!", e);
+            callback.rollback();
+        }
+    }
+
+    @Override
+    public void stop() {
+        logger.info("## Stop RocketMQ producer##");
+        this.defaultMQProducer.shutdown();
+    }
+}

+ 43 - 46
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java → server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,69 +1,66 @@
-package com.alibaba.otter.canal.kafka;
+package com.alibaba.otter.canal.server;
 
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
-import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.server.CanalServerStarter;
-import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
-
-/**
- * kafka 启动类
- *
- * @author machengyuan 2018-6-11 下午05:30:49
- * @version 1.0.0
- */
-public class CanalKafkaStarter implements CanalServerStarter {
-
-    private static final Logger logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
+public class CanalMQStarter {
+    private static final Logger logger = LoggerFactory.getLogger(CanalMQStarter.class);
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
 
-    private volatile boolean    running              = false;
+    private volatile boolean running = false;
+
+    private ExecutorService executorService;
 
-    private ExecutorService     executorService;
+    private CanalMQProducer canalMQProducer;
 
-    private CanalKafkaProducer  canalKafkaProducer;
+    private MQProperties properties;
 
-    private KafkaProperties     kafkaProperties;
+    public CanalMQStarter(CanalMQProducer canalMQProducer) {
+        this.canalMQProducer = canalMQProducer;
+    }
 
     public void init() {
         try {
-            logger.info("## load kafka configurations");
-            String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
+            logger.info("## load MQ configurations");
+            String conf = System.getProperty("mq.conf", "classpath:mq.yml");
 
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf),
-                    KafkaProperties.class);
+                properties = new Yaml().loadAs(CanalMQStarter.class.getClassLoader().getResourceAsStream(conf),
+                    MQProperties.class);
             } else {
-                kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
+                properties = new Yaml().loadAs(new FileInputStream(conf), MQProperties.class);
             }
 
             // 初始化 kafka producer
-            canalKafkaProducer = new CanalKafkaProducer();
-            canalKafkaProducer.init(kafkaProperties);
+//            canalMQProducer = new CanalKafkaProducer();
+            canalMQProducer.init(properties);
             // set filterTransactionEntry
-            // if (kafkaProperties.isFilterTransactionEntry()) {
-            // System.setProperty("canal.instance.filter.transaction.entry", "true");
-            // }
+            if (properties.isFilterTransactionEntry()) {
+                System.setProperty("canal.instance.filter.transaction.entry", "true");
+            }
+
             // 对应每个instance启动一个worker线程
-            List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
+            List<MQProperties.CanalDestination> destinations = properties.getCanalDestinations();
 
             executorService = Executors.newFixedThreadPool(destinations.size());
 
-            logger.info("## start the kafka workers.");
-            for (final CanalDestination destination : destinations) {
+            logger.info("## start the MQ workers.");
+            for (final MQProperties.CanalDestination destination : destinations) {
                 executorService.execute(new Runnable() {
 
                     @Override
@@ -73,31 +70,31 @@ public class CanalKafkaStarter implements CanalServerStarter {
                 });
             }
             running = true;
-            logger.info("## the kafka workers is running now ......");
+            logger.info("## the MQ workers is running now ......");
             Runtime.getRuntime().addShutdownHook(new Thread() {
 
                 public void run() {
                     try {
-                        logger.info("## stop the kafka workers");
+                        logger.info("## stop the MQ workers");
                         running = false;
                         executorService.shutdown();
-                        canalKafkaProducer.stop();
+                        canalMQProducer.stop();
                     } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping kafka workers:", e);
+                        logger.warn("##something goes wrong when stopping MQ workers:", e);
                     } finally {
-                        logger.info("## canal kafka is down.");
+                        logger.info("## canal MQ is down.");
                     }
                 }
 
             });
 
         } catch (Throwable e) {
-            logger.error("## Something goes wrong when starting up the canal kafka workers:", e);
+            logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
             System.exit(0);
         }
     }
 
-    private void worker(CanalDestination destination) {
+    private void worker(MQProperties.CanalDestination destination) {
         while (!running)
             ;
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
@@ -118,20 +115,20 @@ public class CanalKafkaStarter implements CanalServerStarter {
 
                 while (running) {
                     Message message;
-                    if (kafkaProperties.getCanalGetTimeout() != null) {
+                    if (properties.getCanalGetTimeout() != null) {
                         message = server.getWithoutAck(clientIdentity,
-                            kafkaProperties.getCanalBatchSize(),
-                            kafkaProperties.getCanalGetTimeout(),
+                            properties.getCanalBatchSize(),
+                            properties.getCanalGetTimeout(),
                             TimeUnit.MILLISECONDS);
                     } else {
-                        message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize());
+                        message = server.getWithoutAck(clientIdentity, properties.getCanalBatchSize());
                     }
 
                     final long batchId = message.getId();
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
-                            canalKafkaProducer.send(destination, message, new CanalKafkaProducer.Callback() {
+                            canalMQProducer.send(destination, message, new CanalKafkaProducer.Callback() {
 
                                 @Override
                                 public void commit() {

+ 36 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.spi;
+
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.protocol.Message;
+import java.io.IOException;
+
+public interface CanalMQProducer {
+    /**
+     * Init producer.
+     *
+     * @param mqProperties MQ config
+     */
+    void init(MQProperties mqProperties);
+
+    /**
+     * Send canal message to related topic
+     *
+     * @param canalDestination canal mq destination
+     * @param message canal message
+     * @throws IOException
+     */
+    void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) throws IOException;
+
+    /**
+     * Stop MQ producer service
+     */
+    void stop();
+
+    interface Callback {
+
+        void commit();
+
+        void rollback();
+    }
+}

Some files were not shown because too many files changed in this diff