Browse Source

block waiting interrupted by stop request

Neal Hu 7 years ago
parent
commit
eda7e4ca2b

+ 14 - 8
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -23,7 +23,7 @@ public class ClusterCanalConnector implements CanalConnector {
     private String                  username;
     private String                  password;
     private int                     soTimeout     = 10000;
-    private int                     retryTimes    = 3;
+    private int                     retryTimes    = 3;                                       // 设置-1时可以subscribe阻塞等待时优雅停机
     private int                     retryInterval = 5000;                                    // 重试的时间间隔,默认5秒
     private CanalNodeAccessStrategy accessStrategy;
     private SimpleCanalConnector    currentConnector;
@@ -106,13 +106,19 @@ public class ClusterCanalConnector implements CanalConnector {
                 this.filter = filter;
                 return;
             } catch (Throwable t) {
-                logger.warn(String.format(
-                        "something goes wrong when subscribing from server: %s",
-                        currentConnector != null ? currentConnector.getAddress() : "null"),
-                        t);
-                times++;
-                restart();
-                logger.info("restart the connector for next round retry.");
+                if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
+                    logger.info("block waiting interrupted by other thread.");
+                    return;
+                } else {
+                    logger.warn(String.format(
+                            "something goes wrong when subscribing from server: %s",
+                            currentConnector != null ? currentConnector.getAddress() : "null"),
+                            t);
+                    times++;
+                    restart();
+                    logger.info("restart the connector for next round retry.");
+                }
+
             }
         }
 

+ 9 - 0
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -4,6 +4,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 
+import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import org.apache.commons.lang.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ public class AbstractCanalClientTest {
     protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
     protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
     protected volatile boolean                running            = false;
+    private volatile boolean                  waiting            = true;
     protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
 
                                                                      public void uncaughtException(Thread t, Throwable e) {
@@ -91,6 +93,12 @@ public class AbstractCanalClientTest {
             return;
         }
         running = false;
+        if (waiting) {
+            if (connector instanceof ClusterCanalConnector) {
+                ((ClusterCanalConnector) connector).setRetryTimes(-1);
+            }
+            thread.interrupt();
+        }
         if (thread != null) {
             try {
                 thread.join();
@@ -109,6 +117,7 @@ public class AbstractCanalClientTest {
                 MDC.put("destination", destination);
                 connector.connect();
                 connector.subscribe();
+                waiting = false;
                 while (running) {
                     Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                     long batchId = message.getId();