Преглед на файлове

1. AbstractEventParser 中应该对multiStageCoprocess每次在while最后进行stop而不是reset, reset会在mysqlMultiStageCoprocessor中重新创建两个线程池, 如果在dump出错的情况下, 会无限多的创建multiStageCoprocess而且不释放

2. MySqlMultiStageComprocessor中需要判断是否已经shutdown, 实际情况中存在shutdown为true而terminal一直为false的情况
wallellen преди 6 години
родител
ревизия
7c44e5933d

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -317,10 +317,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                     eventSink.interrupt();
                     transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                     binlogParser.reset();// 重新置位
-                    if (multiStageCoprocessor != null) {
+                    if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
                         // 处理 RejectedExecutionException
                         try {
-                            multiStageCoprocessor.reset();
+                            multiStageCoprocessor.stop();
                         } catch (Throwable t) {
                             logger.debug("multi processor rejected:", t);
                         }

+ 8 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -138,6 +138,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         try {
             parserExecutor.shutdownNow();
             while (!parserExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                if (parserExecutor.isShutdown() || parserExecutor.isTerminated()) {
+                    break;
+                }
+
                 parserExecutor.shutdownNow();
             }
         } catch (Throwable e) {
@@ -147,6 +151,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         try {
             stageExecutor.shutdownNow();
             while (!stageExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                if (stageExecutor.isShutdown() || stageExecutor.isTerminated()) {
+                    break;
+                }
+
                 stageExecutor.shutdownNow();
             }
         } catch (Throwable e) {