Pārlūkot izejas kodu

Further improvement for issue #771.

Chuanyi Li L 6 gadi atpakaļ
vecāks
revīzija
72b907a75f

+ 13 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -64,6 +64,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private ExecutorService              stageExecutor;
     private String                       destination;
     private volatile CanalParseException exception;
+    private volatile RuntimeException    incident;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -78,6 +79,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     public void start() {
         super.start();
         this.exception = null;
+        this.incident = null;
         this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
             ringBufferSize,
             new BlockingWaitStrategy());
@@ -148,13 +150,17 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     public boolean publish(LogBuffer buffer, String binlogFileName) {
         if (!isStart()) {
+            if (exception != null) {
+                throw exception;
+            }
             return false;
         }
-        /* 由于改为processor退出不stop,那么需要由exception标识coprocessor是否正常工作。
+
+        /* 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
          * 让dump线程能够及时感知
          */
-        if (exception != null) {
-            throw exception;
+        if (incident != null) {
+            throw incident;
         }
 
         boolean interupted = false;
@@ -399,11 +405,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void handleEventException(final Throwable ex, final long sequence, final Object event) {
-            /*如果仅仅抛出去,onShutdown会调用stop(),与instance主parse线程的reset()产生竞争,导致向terminated ExecutorService提交新task
+            /*
+             * 若异常不是onEvent body里产生的。如果仅仅抛出去,onShutdown会调用stop(),与dump线程的reset()产生竞争,导致向terminated ExecutorService提交新task
              * #771场景中的异常是InterruptedException,由shutdownNow产生。
              */
-            exception = new CanalParseException(ex);
-            throw exception;
+            incident = new RuntimeException(ex);
+            throw incident;
         }
 
         @Override