浏览代码

修复子线程阻塞造成不能中止的问题 (#2508)

rewerma 5 年之前
父节点
当前提交
6d9ea57186

+ 15 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DaemonThreadFactory.java

@@ -0,0 +1,15 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class DaemonThreadFactory implements ThreadFactory {
+
+    public static final ThreadFactory daemonThreadFactory = new DaemonThreadFactory();
+
+    public Thread newThread(Runnable r) {
+        Thread t = Executors.defaultThreadFactory().newThread(r);
+        t.setDaemon(true);
+        return t;
+    }
+}

+ 37 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -161,6 +161,43 @@ public class Util {
             });
     }
 
+    public static ThreadPoolExecutor newFixedDaemonThreadPool(int nThreads, long keepAliveTime) {
+        return new ThreadPoolExecutor(nThreads,
+                nThreads,
+                keepAliveTime,
+                TimeUnit.MILLISECONDS,
+                new SynchronousQueue<>(),
+                DaemonThreadFactory.daemonThreadFactory,
+                (r, exe) -> {
+                    if (!exe.isShutdown()) {
+                        try {
+                            exe.getQueue().put(r);
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                    }
+                }
+        );
+    }
+
+    public static ThreadPoolExecutor newSingleDaemonThreadExecutor(long keepAliveTime) {
+        return new ThreadPoolExecutor(1,
+                1,
+                keepAliveTime,
+                TimeUnit.MILLISECONDS,
+                new SynchronousQueue<>(),
+                DaemonThreadFactory.daemonThreadFactory,
+                (r, exe) -> {
+                    if (!exe.isShutdown()) {
+                        try {
+                            exe.getQueue().put(r);
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                    }
+                });
+    }
+
     public final static String  timeZone;    // 当前时区
     private static DateTimeZone dateTimeZone;
 

+ 5 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,9 +2,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,7 +136,8 @@ public abstract class AbstractCanalAdapterWorker {
 
     @SuppressWarnings("unchecked")
     protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage,
-                                     CanalMQConnector connector, ExecutorService workerExecutor) {
+                                     CanalMQConnector connector) {
+        ExecutorService workerExecutor =  Util.newSingleDaemonThreadExecutor(5000);
         try {
             List<?> messages;
             if (!flatMessage) {
@@ -185,6 +184,8 @@ public abstract class AbstractCanalAdapterWorker {
             } catch (InterruptedException e1) {
                 // ignore
             }
+        } finally {
+            workerExecutor.shutdown();
         }
         return false;
     }

+ 1 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -49,7 +49,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                 // ignore
             }
         }
-        ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
         int retry = canalClientConfig.getRetries() == null
                     || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
@@ -75,7 +74,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                         if (!running) {
                             break;
                         }
-                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector)) {
                             break;
                         }
                     }
@@ -85,7 +84,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        workerExecutor.shutdown();
 
         try {
             connector.unsubscribe();

+ 1 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRabbitMQWorker.java

@@ -43,7 +43,6 @@ public class CanalAdapterRabbitMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
         int retry = canalClientConfig.getRetries() == null
                     || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
@@ -68,7 +67,7 @@ public class CanalAdapterRabbitMQWorker extends AbstractCanalAdapterWorker {
                         if (!running) {
                             break;
                         }
-                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector)) {
                             break;
                         }
                     }
@@ -78,7 +77,6 @@ public class CanalAdapterRabbitMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        workerExecutor.shutdown();
 
         try {
             connector.unsubscribe();

+ 1 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -73,7 +73,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
         int retry = canalClientConfig.getRetries() == null
                     || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
@@ -99,7 +98,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                         if (!running) {
                             break;
                         }
-                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector)) {
                             break;
                         }
                     }
@@ -109,8 +108,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        workerExecutor.shutdown();
-
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {