Browse Source

fixed issue #756 , improve exception logger

七锋 6 years ago
parent
commit
df8fe4a9eb

+ 5 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/NamedThreadFactory.java

@@ -23,6 +23,11 @@ public class NamedThreadFactory implements ThreadFactory {
 
                                                                        public void uncaughtException(Thread t,
                                                                                                      Throwable e) {
+                                                                           if (e instanceof InterruptedException
+                                                                               || (e.getCause() != null && e.getCause() instanceof InterruptedException)) {
+                                                                               return;
+                                                                           }
+
                                                                            logger.error("from " + t.getName(), e);
                                                                        }
                                                                    };

+ 34 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -18,7 +18,7 @@ import com.lmax.disruptor.BatchEventProcessor;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.FatalExceptionHandler;
+import com.lmax.disruptor.ExceptionHandler;
 import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
@@ -85,11 +85,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-"
                                                                                     + destination));
         SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
-
+        ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
         // stage 2
         BatchEventProcessor<MessageEvent> simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sequenceBarrier,
             new SimpleParserStage());
+        simpleParserStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
 
         // stage 3
@@ -100,7 +101,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         }
         WorkerPool<MessageEvent> workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
             dmlParserSequenceBarrier,
-            new FatalExceptionHandler(),
+            exceptionHandler,
             workHandlers);
         Sequence[] sequence = workerPool.getWorkerSequences();
         disruptorMsgBuffer.addGatingSequences(sequence);
@@ -110,6 +111,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         BatchEventProcessor<MessageEvent> sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sinkSequenceBarrier,
             new SinkStoreStage());
+        sinkStoreStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());
 
         // start work
@@ -161,7 +163,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     @Override
     public void reset() {
-        stop();
+        if (isStart()) {
+            stop();
+        }
+
         start();
     }
 
@@ -231,7 +236,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            stop();
+            if (isStart()) {
+                stop();
+            }
         }
     }
 
@@ -267,7 +274,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            stop();
+            if (isStart()) {
+                stop();
+            }
         }
     }
 
@@ -306,7 +315,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void onShutdown() {
-            stop();
+            if (isStart()) {
+                stop();
+            }
         }
     }
 
@@ -369,6 +380,22 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     }
 
+    class SimpleFatalExceptionHandler implements ExceptionHandler {
+
+        @Override
+        public void handleEventException(final Throwable ex, final long sequence, final Object event) {
+            throw new RuntimeException(ex);
+        }
+
+        @Override
+        public void handleOnStartException(final Throwable ex) {
+        }
+
+        @Override
+        public void handleOnShutdownException(final Throwable ex) {
+        }
+    }
+
     class MessageEventFactory implements EventFactory<MessageEvent> {
 
         public MessageEvent newInstance() {