Browse Source

Merge pull request #782 from lcybo/master

MysqlMultiStageCoprocessor健壮性enhance
agapple 6 years ago
parent
commit
d624358cb5

+ 29 - 20
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.LockSupport;
 
+import com.alibaba.otter.canal.common.CanalException;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
@@ -52,17 +53,18 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
-    private LogEventConvert          logEventConvert;
-    private EventTransactionBuffer   transactionBuffer;
-    private ErosaConnection          connection;
+    private LogEventConvert              logEventConvert;
+    private EventTransactionBuffer       transactionBuffer;
+    private ErosaConnection              connection;
 
-    private int                      parserThreadCount;
-    private int                      ringBufferSize;
-    private RingBuffer<MessageEvent> disruptorMsgBuffer;
-    private ExecutorService          parserExecutor;
-    private ExecutorService          stageExecutor;
-    private String                   destination;
-    private CanalParseException      exception;
+    private int                          parserThreadCount;
+    private int                          ringBufferSize;
+    private RingBuffer<MessageEvent>     disruptorMsgBuffer;
+    private ExecutorService              parserExecutor;
+    private ExecutorService              stageExecutor;
+    private String                       destination;
+    private volatile CanalParseException exception;
+    private volatile RuntimeException    incident;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -77,6 +79,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     public void start() {
         super.start();
         this.exception = null;
+        this.incident = null;
         this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
             ringBufferSize,
             new BlockingWaitStrategy());
@@ -153,6 +156,13 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             return false;
         }
 
+        /* 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
+         * 让dump线程能够及时感知
+         */
+        if (incident != null) {
+            throw incident;
+        }
+
         boolean interupted = false;
         do {
             try {
@@ -253,9 +263,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            if (isStart()) {
-                stop();
-            }
+
         }
     }
 
@@ -291,9 +299,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            if (isStart()) {
-                stop();
-            }
+
         }
     }
 
@@ -332,9 +338,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            if (isStart()) {
-                stop();
-            }
+
         }
     }
 
@@ -401,7 +405,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void handleEventException(final Throwable ex, final long sequence, final Object event) {
-            throw new RuntimeException(ex);
+            /*
+             * 若异常不是onEvent body里产生的。如果仅仅抛出去,onShutdown会调用stop(),与dump线程的reset()产生竞争,导致向terminated ExecutorService提交新task
+             * #771场景中的异常是InterruptedException,由shutdownNow产生。
+             */
+            incident = new RuntimeException(ex);
+            throw incident;
         }
 
         @Override