Quellcode durchsuchen

fixed #5270 , add CanalMQStarter stop latch

jianghang.loujh vor 7 Monaten
Ursprung
Commit
80ca4366db

+ 51 - 27
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,5 +1,15 @@
 package com.alibaba.otter.canal.server;
 
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
@@ -9,18 +19,6 @@ import com.alibaba.otter.canal.instance.core.CanalMQConfig;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CanalMQStarter {
 
@@ -107,28 +105,33 @@ public class CanalMQStarter {
 
     public synchronized void startDestination(String destination) {
         CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
-        CanalMQRunnable canalMQRunable = canalMQWorks.get(destination);
-        //当zk出现session time out可能会导致instance出现多个消费者,从而出现乱序
-        //因此当抢占成功的实例已经启动CanalMQRunnable时就不需要stop然后start了
-        if (canalInstance != null && Objects.isNull(canalMQRunable)) {
-            stopDestination(destination);
-            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
-            canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
-            executorService.execute(canalMQRunnable);
-            logger.info("## Start the MQ work of destination:" + destination);
+        if (canalInstance != null) {
+            CanalMQRunnable canalMQRunnable = canalMQWorks.get(destination);
+            if (Objects.isNull(canalMQRunnable)) {
+                canalMQRunnable = new CanalMQRunnable(destination);
+                canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
+                // 触发一下任务启动
+                Future future = executorService.submit(canalMQRunnable);
+                canalMQRunnable.setFuture(future);
+                logger.info("## Start the MQ work of destination:" + destination);
+            } else {
+                // 主段时间内的zk出现session time out,会默认优先当前节点抢占成功
+                // 如果没有出发过stop动作,这里可以忽略start的启动
+                logger.info("## Start the MQ work of destination:" + destination + " , ignore stop");
+            }
         }
     }
 
     public synchronized void stopDestination(String destination) {
-        CanalMQRunnable canalMQRunable = canalMQWorks.get(destination);
-        if (canalMQRunable != null) {
-            canalMQRunable.stop();
+        CanalMQRunnable canalMQRunnable = canalMQWorks.get(destination);
+        if (canalMQRunnable != null) {
+            canalMQRunnable.stop(true);
             canalMQWorks.remove(destination);
             logger.info("## Stop the MQ work of destination:" + destination);
         }
     }
 
-    private void worker(String destination, AtomicBoolean destinationRunning) {
+    private void worker(String destination, AtomicBoolean destinationRunning, CountDownLatch latch) {
         while (!running || !destinationRunning.get()) {
             try {
                 Thread.sleep(100);
@@ -210,6 +213,9 @@ public class CanalMQStarter {
                 logger.error("process error!", e);
             }
         }
+
+        // 确保一下关闭
+        latch.countDown();
     }
 
     private class CanalMQRunnable implements Runnable {
@@ -222,13 +228,31 @@ public class CanalMQStarter {
 
         private AtomicBoolean running = new AtomicBoolean(true);
 
+        private CountDownLatch latch   = new CountDownLatch(1);
+
+        private Future         future;
+
         @Override
         public void run() {
-            worker(destination, running);
+            worker(destination, running, latch);
         }
 
-        public void stop() {
+        public void stop(boolean wait) {
             running.set(false);
+            if (wait) {
+                try {
+                    // 触发一下interrupt
+                    future.cancel(true);
+                    // 等待MQ发送线程的正常退出
+                    latch.await();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        public void setFuture(Future future) {
+            this.future = future;
         }
     }
 }