|
@@ -4,7 +4,6 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.locks.LockSupport;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.CanalException;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
|
|
@@ -64,7 +63,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
private ExecutorService stageExecutor;
|
|
|
private String destination;
|
|
|
private volatile CanalParseException exception;
|
|
|
- private volatile RuntimeException incident;
|
|
|
|
|
|
public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
|
|
|
EventTransactionBuffer transactionBuffer, String destination){
|
|
@@ -79,7 +77,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
public void start() {
|
|
|
super.start();
|
|
|
this.exception = null;
|
|
|
- this.incident = null;
|
|
|
this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
|
|
|
ringBufferSize,
|
|
|
new BlockingWaitStrategy());
|
|
@@ -156,13 +153,13 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- /* 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
|
|
|
+ /**
|
|
|
+ * 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
|
|
|
* 让dump线程能够及时感知
|
|
|
*/
|
|
|
- if (incident != null) {
|
|
|
- throw incident;
|
|
|
+ if (exception != null) {
|
|
|
+ throw exception;
|
|
|
}
|
|
|
-
|
|
|
boolean interupted = false;
|
|
|
do {
|
|
|
try {
|
|
@@ -180,11 +177,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
interupted = Thread.interrupted();
|
|
|
}
|
|
|
} while (!interupted && isStart());
|
|
|
-
|
|
|
- if (exception != null) {
|
|
|
- throw exception;
|
|
|
- }
|
|
|
-
|
|
|
return isStart();
|
|
|
}
|
|
|
|
|
@@ -405,12 +397,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
@Override
|
|
|
public void handleEventException(final Throwable ex, final long sequence, final Object event) {
|
|
|
- /*
|
|
|
- * 若异常不是onEvent body里产生的。如果仅仅抛出去,onShutdown会调用stop(),与dump线程的reset()产生竞争,导致向terminated ExecutorService提交新task
|
|
|
- * #771场景中的异常是InterruptedException,由shutdownNow产生。
|
|
|
- */
|
|
|
- incident = new RuntimeException(ex);
|
|
|
- throw incident;
|
|
|
}
|
|
|
|
|
|
@Override
|