Преглед изворни кода

Merge pull request #970 from wingerx/master

 fix bug #968
agapple пре 6 година
родитељ
комит
ff592758cf

+ 18 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
@@ -36,6 +37,7 @@ import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 
+
 /**
  * 针对解析器提供一个多阶段协同的处理
  * 
@@ -65,6 +67,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private volatile CanalParseException exception;
     private AtomicLong                   eventsPublishBlockingTime;
     private GTIDSet                      gtidSet;
+    private WorkerPool<MessageEvent>     workerPool;
+    private BatchEventProcessor<MessageEvent> simpleParserStage;
+    private BatchEventProcessor<MessageEvent> sinkStoreStage;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -91,7 +96,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
         ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
         // stage 2
-        BatchEventProcessor<MessageEvent> simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+        simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sequenceBarrier,
             new SimpleParserStage());
         simpleParserStage.setExceptionHandler(exceptionHandler);
@@ -103,7 +108,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         for (int i = 0; i < parserThreadCount; i++) {
             workHandlers[i] = new DmlParserStage();
         }
-        WorkerPool<MessageEvent> workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
+        workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
             dmlParserSequenceBarrier,
             exceptionHandler,
             workHandlers);
@@ -112,7 +117,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         // stage 4
         SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
-        BatchEventProcessor<MessageEvent> sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+        sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sinkSequenceBarrier,
             new SinkStoreStage());
         sinkStoreStage.setExceptionHandler(exceptionHandler);
@@ -126,14 +131,24 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     @Override
     public void stop() {
+        // fix bug #968,对于pool与
+        workerPool.halt();
+        simpleParserStage.halt();
+        sinkStoreStage.halt();
         try {
             parserExecutor.shutdownNow();
+            while (!parserExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                parserExecutor.shutdownNow();
+            }
         } catch (Throwable e) {
             // ignore
         }
 
         try {
             stageExecutor.shutdownNow();
+            while (!stageExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                stageExecutor.shutdownNow();
+            }
         } catch (Throwable e) {
             // ignore
         }