瀏覽代碼

CanalConnector接口去掉stopRunning接口

mcy 6 年之前
父節點
當前提交
b131e8cfa4

+ 0 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -134,7 +134,6 @@ public abstract class AbstractCanalAdapterWorker {
                 return;
             }
 
-            closeConnection();
             running = false;
 
             syncSwitch.release(canalDestination);
@@ -156,6 +155,4 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
-    protected abstract void closeConnection();
-
 }

+ 0 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -34,11 +34,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
     }
 
-    @Override
-    protected void closeConnection() {
-        connector.stopRunning();
-    }
-
     @Override
     protected void process() {
         while (!running)

+ 1 - 6
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -32,11 +32,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
-    @Override
-    protected void closeConnection() {
-        connector.stopRunning();
-    }
-
     @Override
     protected void process() {
         while (!running)
@@ -88,7 +83,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         } catch (WakeupException e) {
             // No-op. Continue process
         }
-        connector.stopRunning();
+        connector.disconnect();
         logger.info("=============> Disconnect topic: {} <=============", this.topic);
     }
 }

+ 35 - 6
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.SocketAddress;
 import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -10,6 +9,7 @@ import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -54,11 +54,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
         ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
     }
 
-    @Override
-    protected void closeConnection() {
-        connector.stopRunning();
-    }
-
     @Override
     protected void process() {
         while (!running)
@@ -130,4 +125,38 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             }
         }
     }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+            if (connector instanceof ClusterCanalConnector) {
+                ((ClusterCanalConnector) connector).stopRunning();
+            } else if (connector instanceof SimpleCanalConnector) {
+                ((SimpleCanalConnector) connector).stopRunning();
+            }
+
+            running = false;
+
+            syncSwitch.release(canalDestination);
+
+            logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("destination {} adapters worker thread dead!", canalDestination);
+            canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy));
+            logger.info("destination {} all adapters destroyed!", canalDestination);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
 }

+ 0 - 7
client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java

@@ -149,11 +149,4 @@ public interface CanalConnector {
      * @throws CanalClientException
      */
     void rollback() throws CanalClientException;
-
-    /**
-     * 中断的阻塞,用于优雅停止client
-     * 
-     * @throws CanalClientException
-     */
-    void stopRunning() throws CanalClientException;
 }

+ 0 - 9
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -238,15 +238,6 @@ public class KafkaCanalConnector implements CanalMQConnector {
         }
     }
 
-    public void stopRunning() {
-        if (running) {
-            running = false; // 设置为非running状态
-            // if (!mutex.state()) {
-            // mutex.set(true); // 中断阻塞
-            // }
-        }
-    }
-
     @Override
     public void subscribe(String filter) throws CanalClientException {
         throw new CanalClientException("mq not support this method");

+ 1 - 6
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -65,6 +65,7 @@ public class RocketMQCanalConnector implements CanalMQConnector {
 
     public void disconnect() throws CanalClientException {
         rocketMQConsumer.shutdown();
+        connected = false;
     }
 
     public boolean checkValid() throws CanalClientException {
@@ -259,10 +260,4 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         throw new CanalClientException("mq not support this method");
     }
 
-    @Override
-    public void stopRunning() throws CanalClientException {
-        this.rocketMQConsumer.shutdown();
-        connected = false;
-    }
-
 }

+ 0 - 1
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -88,7 +88,6 @@ public class CanalKafkaClientExample {
         if (!running) {
             return;
         }
-        connector.stopRunning();
         running = false;
         if (thread != null) {
             try {

+ 1 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -87,7 +87,6 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
         if (!running) {
             return;
         }
-        connector.stopRunning();
         running = false;
         if (thread != null) {
             try {
@@ -134,6 +133,6 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
         } catch (WakeupException e) {
             // No-op. Continue process
         }
-        connector.stopRunning();
+//        connector.stopRunning();
     }
 }

+ 0 - 1
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -96,7 +96,6 @@ public class AbstractCanalClientTest {
         if (!running) {
             return;
         }
-        connector.stopRunning();
         running = false;
         if (thread != null) {
             try {