Ver código fonte

Add RocketMQ example and polish mq abstract class

duhengforever 6 anos atrás
pai
commit
a37a7ff142

+ 41 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -77,6 +77,47 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
+    protected void writeOut(Message message,String topic){
+        if (logger.isDebugEnabled()) {
+            logger.debug("topic: {} batchId: {} batchSize: {} ",
+                topic,
+                message.getId(),
+                message.getEntries().size());
+        }
+        long begin = System.currentTimeMillis();
+        writeOut(message);
+        long now = System.currentTimeMillis();
+        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+            logger.error("topic: {} batchId {} elapsed time: {} ms",
+                topic,
+                message.getId(),
+                now - begin);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("topic: {} batchId {} elapsed time: {} ms",
+                topic,
+                message.getId(),
+                now - begin);
+        }
+    }
+
+    protected void stopOutAdapters(){
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        groupInnerExecutorService.shutdown();
+        logger.info("topic connectors' worker thread dead!");
+        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (CanalOuterAdapter adapter : outerAdapters) {
+                adapter.destroy();
+            }
+        }
+        logger.info("topic all connectors destroyed!");
+    }
     public abstract void start();
 
     public abstract void stop();

+ 6 - 44
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -24,10 +24,10 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     private KafkaCanalConnector connector;
 
-    private String              topic;
+    private String topic;
 
     public CanalAdapterKafkaWorker(String zkServers, String bootstrapServers, String topic, String groupId,
-                                   List<List<CanalOuterAdapter>> canalOuterAdapters){
+        List<List<CanalOuterAdapter>> canalOuterAdapters) {
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
@@ -60,29 +60,11 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             if (!running) {
                 return;
             }
-
             connector.stopRunning();
             running = false;
-
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
-            if (thread != null) {
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            groupInnerExecutorService.shutdown();
-            logger.info("topic {} connectors' worker thread dead!", this.topic);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
-                    adapter.destroy();
-                }
-            }
-            logger.info("topic {} all connectors destroyed!", this.topic);
+            logger.info("Stop topic {} out adapters begin", this.topic);
+            stopOutAdapters();
+            logger.info("Stop topic {} out adapters end", this.topic);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
@@ -113,27 +95,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                                 @Override
                                 public void run() {
                                     try {
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                                topic,
-                                                message.getId(),
-                                                message.getEntries().size());
-                                        }
-                                        long begin = System.currentTimeMillis();
-                                        writeOut(message);
-                                        long now = System.currentTimeMillis();
-                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-                                            logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
-                                        }
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
-                                        }
+                                        writeOut(message, topic);
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);
                                     } finally {

+ 4 - 4
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -82,7 +82,7 @@ public class CanalAdapterLoader {
             }
         }
 
-        // 初始化canal-client-rocketmq的适配器
+        // 初始化canal-client-mq的适配器
         if (canalClientConfig.getMqTopics() != null) {
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
                 for (CanalClientConfig.Group group : topic.getGroups()) {
@@ -153,17 +153,17 @@ public class CanalAdapterLoader {
             stopExecutorService.shutdown();
         }
         if (canalMQWorker.size() > 0) {
-            ExecutorService stopKafkaExecutorService = Executors.newFixedThreadPool(canalMQWorker.size());
+            ExecutorService stopMQWokerService = Executors.newFixedThreadPool(canalMQWorker.size());
             for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
                 final AbstractCanalAdapterWorker worker = tmp;
-                stopKafkaExecutorService.submit(new Runnable() {
+                stopMQWokerService.submit(new Runnable() {
                     @Override
                     public void run() {
                         worker.stop();
                     }
                 });
             }
-            stopKafkaExecutorService.shutdown();
+            stopMQWokerService.shutdown();
         }
         logger.info("All canal adapters destroyed");
     }

+ 5 - 48
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -55,29 +55,11 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             if (!running) {
                 return;
             }
-
             connector.stopRunning();
             running = false;
-
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
-            if (thread != null) {
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-            groupInnerExecutorService.shutdown();
-            logger.info("topic {} connectors' worker thread dead!", this.topic);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
-                    adapter.destroy();
-                }
-            }
-            logger.info("topic {} all connectors destroyed!", this.topic);
+            logger.info("Stop topic {} out adapters begin", this.topic);
+            stopOutAdapters();
+            logger.info("Stop topic {} out adapters end", this.topic);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
@@ -87,7 +69,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         while (!running)
             ;
         ExecutorService executor = Executors.newSingleThreadExecutor();
-        final AtomicBoolean executing = new AtomicBoolean(true);
         while (running) {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -100,41 +81,17 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                         // switcher.get(); //等待开关开启
 
                         final Message message = connector.getWithoutAck(1);
-
-                        executing.set(true);
                         if (message != null) {
                             executor.submit(new Runnable() {
 
                                 @Override
                                 public void run() {
                                     try {
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                                topic,
-                                                message.getId(),
-                                                message.getEntries().size());
-                                        }
-                                        long begin = System.currentTimeMillis();
-                                        writeOut(message);
-                                        long now = System.currentTimeMillis();
-                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-                                            logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
-                                        }
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
-                                        }
-                                        connector.ack(message.getId());
+                                        writeOut(message, topic);
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);
-                                    } finally {
-                                        executing.compareAndSet(true, false);
                                     }
+                                    connector.ack(message.getId());
                                 }
                             });
                         } else {

+ 1 - 1
client-launcher/src/main/resources/canal-client.yml

@@ -1,6 +1,6 @@
 canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
-#bootstrapServers: slave1:6667,slave2:6667
+#bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers
 
 #canalInstances:
 #- instance: example

+ 6 - 3
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -75,6 +75,7 @@ public class RocketMQCanalConnector implements CanalConnector {
                     context.setAutoCommit(true);
                     boolean isSuccess = process(messageExts);
                     if (isSuccess) {
+                        logger.info("Dispatch success!");
                         return ConsumeOrderlyStatus.SUCCESS;
                     } else {
                         return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
@@ -155,7 +156,7 @@ public class RocketMQCanalConnector implements CanalConnector {
     @Override
     public Message getWithoutAck(int batchSize) throws CanalClientException {
         ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
-        if (batchMessage != null){
+        if (batchMessage != null) {
             return getMessage(batchMessage);
         }
         return null;
@@ -168,14 +169,16 @@ public class RocketMQCanalConnector implements CanalConnector {
             return getMessage(batchMessage);
         } catch (InterruptedException ex) {
             logger.warn("Get message timeout", ex);
-            throw new CanalClientException("failed to fetch the data after " + timeout);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
     }
 
     @Override
     public void ack(long batchId) throws CanalClientException {
         ConsumerBatchMessage batchMessage = messageCache.get(batchId);
-        batchMessage.ack();
+        if (batchMessage != null) {
+            batchMessage.ack();
+        }
     }
 
     @Override

+ 8 - 0
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.running.rocketmq;
+
+public class AbstractRocektMQTest {
+    public static String topic = "example";
+    public static String groupId = "group";
+    public static String nameServers = "localhost:9876";
+
+}

+ 135 - 0
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -0,0 +1,135 @@
+package com.alibaba.otter.canal.client.running.rocketmq;
+
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+/**
+ * Kafka client example
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class CanalRocketMQClientExample extends AbstractRocektMQTest {
+
+    protected final static Logger logger = LoggerFactory.getLogger(CanalRocketMQClientExample.class);
+
+    private RocketMQCanalConnector connector;
+
+    private static volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    public CanalRocketMQClientExample(String nameServers, String topic,
+        String groupId) {
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers,
+            topic,
+            groupId);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalRocketMQClientExample rocketMQClientExample = new CanalRocketMQClientExample(nameServers,
+                topic, groupId);
+            logger.info("## Start the rocketmq consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            rocketMQClientExample.start();
+            logger.info("## The canal rocketmq consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## Stop the rocketmq consumer");
+                        rocketMQClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("## Something goes wrong when stopping rocketmq consumer:", e);
+                    } finally {
+                        logger.info("## Rocketmq consumer is down.");
+                    }
+                }
+
+            });
+            while (running)
+                ;
+        } catch (Throwable e) {
+            logger.error("## Something going wrong when starting up the rocketmq consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        connector.stopRunning();
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe();
+                while (running) {
+                    Message message = connector.getWithoutAck(1); // 获取message
+                    try {
+                        if (message == null) {
+                            continue;
+                        }
+                        long batchId = message.getId();
+                        int size = message.getEntries().size();
+                        if (batchId == -1 || size == 0) {
+                        } else {
+                            logger.info(message.toString());
+                        }
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                    connector.ack(message.getId()); // 提交确认
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+    }
+}

+ 1 - 1
deployer/src/main/resources/mq.yml

@@ -10,7 +10,7 @@ filterTransactionEntry: true
 canalDestinations:
   - canalDestination: example
     topic: example
-    partition: 0
+    partition: 1
     # 一个destination可以对应多个topic
     #topics:
     #  - topic: example