|
@@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.locks.LockSupport;
|
|
import java.util.concurrent.locks.LockSupport;
|
|
|
|
|
|
|
|
+import com.alibaba.otter.canal.common.CanalException;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
|
|
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
|
|
@@ -52,17 +53,17 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
|
|
*/
|
|
*/
|
|
public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
|
|
public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
|
|
|
|
|
|
- private LogEventConvert logEventConvert;
|
|
|
|
- private EventTransactionBuffer transactionBuffer;
|
|
|
|
- private ErosaConnection connection;
|
|
|
|
|
|
+ private LogEventConvert logEventConvert;
|
|
|
|
+ private EventTransactionBuffer transactionBuffer;
|
|
|
|
+ private ErosaConnection connection;
|
|
|
|
|
|
- private int parserThreadCount;
|
|
|
|
- private int ringBufferSize;
|
|
|
|
- private RingBuffer<MessageEvent> disruptorMsgBuffer;
|
|
|
|
- private ExecutorService parserExecutor;
|
|
|
|
- private ExecutorService stageExecutor;
|
|
|
|
- private String destination;
|
|
|
|
- private CanalParseException exception;
|
|
|
|
|
|
+ private int parserThreadCount;
|
|
|
|
+ private int ringBufferSize;
|
|
|
|
+ private RingBuffer<MessageEvent> disruptorMsgBuffer;
|
|
|
|
+ private ExecutorService parserExecutor;
|
|
|
|
+ private ExecutorService stageExecutor;
|
|
|
|
+ private String destination;
|
|
|
|
+ private volatile CanalParseException exception;
|
|
|
|
|
|
public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
|
|
public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
|
|
EventTransactionBuffer transactionBuffer, String destination){
|
|
EventTransactionBuffer transactionBuffer, String destination){
|
|
@@ -147,11 +148,14 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
public boolean publish(LogBuffer buffer, String binlogFileName) {
|
|
public boolean publish(LogBuffer buffer, String binlogFileName) {
|
|
if (!isStart()) {
|
|
if (!isStart()) {
|
|
- if (exception != null) {
|
|
|
|
- throw exception;
|
|
|
|
- }
|
|
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
+ /* 由于改为processor退出不stop,那么需要由exception标识coprocessor是否正常工作。
|
|
|
|
+ * 让dump线程能够及时感知
|
|
|
|
+ */
|
|
|
|
+ if (exception != null) {
|
|
|
|
+ throw exception;
|
|
|
|
+ }
|
|
|
|
|
|
boolean interupted = false;
|
|
boolean interupted = false;
|
|
do {
|
|
do {
|
|
@@ -253,9 +257,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onShutdown() {
|
|
public void onShutdown() {
|
|
- if (isStart()) {
|
|
|
|
- stop();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -291,9 +293,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onShutdown() {
|
|
public void onShutdown() {
|
|
- if (isStart()) {
|
|
|
|
- stop();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -332,9 +332,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onShutdown() {
|
|
public void onShutdown() {
|
|
- if (isStart()) {
|
|
|
|
- stop();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -401,7 +399,11 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void handleEventException(final Throwable ex, final long sequence, final Object event) {
|
|
public void handleEventException(final Throwable ex, final long sequence, final Object event) {
|
|
- throw new RuntimeException(ex);
|
|
|
|
|
|
+ /*如果仅仅抛出去,onShutdown会调用stop(),与instance主parse线程的reset()产生竞争,导致向terminated ExecutorService提交新task
|
|
|
|
+ * #771场景中的异常是InterruptedException,由shutdownNow产生。
|
|
|
|
+ */
|
|
|
|
+ exception = new CanalParseException(ex);
|
|
|
|
+ throw exception;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|