|
@@ -2,20 +2,19 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
-import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
|
|
|
-import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.Dml;
|
|
|
-import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
|
|
|
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
|
|
|
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
|
|
|
+import com.alibaba.otter.canal.client.adapter.support.Dml;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
|
|
|
+import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
|
|
|
/**
|
|
@@ -44,21 +43,21 @@ public abstract class AbstractCanalAdapterWorker {
|
|
|
protected void writeOut(final Message message) {
|
|
|
List<Future<Boolean>> futures = new ArrayList<>();
|
|
|
// 组间适配器并行运行
|
|
|
- for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
|
|
|
+ canalOuterAdapters.forEach(outerAdapters -> {
|
|
|
final List<OuterAdapter> adapters = outerAdapters;
|
|
|
futures.add(groupInnerExecutorService.submit(() -> {
|
|
|
try {
|
|
|
// 组内适配器穿行运行,尽量不要配置组内适配器
|
|
|
- for (final OuterAdapter c : adapters) {
|
|
|
+ adapters.forEach(adapter -> {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
- MessageUtil.parse4Dml(canalDestination, message, c::sync);
|
|
|
+ MessageUtil.parse4Dml(canalDestination, message, adapter::sync);
|
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("{} elapsed time: {}",
|
|
|
- c.getClass().getName(),
|
|
|
+ adapter.getClass().getName(),
|
|
|
(System.currentTimeMillis() - begin));
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
|
return false;
|
|
@@ -67,79 +66,94 @@ public abstract class AbstractCanalAdapterWorker {
|
|
|
|
|
|
// 等待所有适配器写入完成
|
|
|
// 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
|
|
|
- for (Future<Boolean> f : futures) {
|
|
|
+ futures.forEach(future -> {
|
|
|
try {
|
|
|
- if (!f.get()) {
|
|
|
+ if (!future.get()) {
|
|
|
logger.error("Outer adapter write failed");
|
|
|
}
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
// ignore
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
+ });
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
protected void writeOut(final FlatMessage flatMessage) {
|
|
|
List<Future<Boolean>> futures = new ArrayList<>();
|
|
|
// 组间适配器并行运行
|
|
|
- for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
|
|
|
- final List<OuterAdapter> adapters = outerAdapters;
|
|
|
- futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Boolean call() {
|
|
|
- try {
|
|
|
- // 组内适配器穿行运行,尽量不要配置组内适配器
|
|
|
- for (OuterAdapter c : adapters) {
|
|
|
- long begin = System.currentTimeMillis();
|
|
|
- Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
|
|
|
- c.sync(dml);
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("{} elapsed time: {}",
|
|
|
- c.getClass().getName(),
|
|
|
- (System.currentTimeMillis() - begin));
|
|
|
- }
|
|
|
+ canalOuterAdapters.forEach(outerAdapters -> {
|
|
|
+ futures.add(groupInnerExecutorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ // 组内适配器穿行运行,尽量不要配置组内适配器
|
|
|
+ outerAdapters.forEach(adapter -> {
|
|
|
+ long begin = System.currentTimeMillis();
|
|
|
+ Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
|
|
|
+ adapter.sync(dml);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("{} elapsed time: {}",
|
|
|
+ adapter.getClass().getName(),
|
|
|
+ (System.currentTimeMillis() - begin));
|
|
|
}
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ });
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ return false;
|
|
|
}
|
|
|
}));
|
|
|
|
|
|
// 等待所有适配器写入完成
|
|
|
// 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
|
|
|
- for (Future<Boolean> f : futures) {
|
|
|
+ futures.forEach(future -> {
|
|
|
try {
|
|
|
- if (!f.get()) {
|
|
|
+ if (!future.get()) {
|
|
|
logger.error("Outer adapter write failed");
|
|
|
}
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
// ignore
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() {
|
|
|
+ if (!running) {
|
|
|
+ thread = new Thread(this::process);
|
|
|
+ thread.setUncaughtExceptionHandler(handler);
|
|
|
+ thread.start();
|
|
|
+ running = true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected void stopOutAdapters() {
|
|
|
- if (thread != null) {
|
|
|
- try {
|
|
|
- thread.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // ignore
|
|
|
+ protected abstract void process();
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ try {
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
- groupInnerExecutorService.shutdown();
|
|
|
- logger.info("topic connectors' worker thread dead!");
|
|
|
- for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
|
|
|
- for (OuterAdapter adapter : outerAdapters) {
|
|
|
- adapter.destroy();
|
|
|
+
|
|
|
+ closeConnection();
|
|
|
+ running = false;
|
|
|
+
|
|
|
+ syncSwitch.release(canalDestination);
|
|
|
+
|
|
|
+ logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
|
|
|
+ if (thread != null) {
|
|
|
+ try {
|
|
|
+ thread.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
}
|
|
|
+ groupInnerExecutorService.shutdown();
|
|
|
+ logger.info("destination {} adapters worker thread dead!", canalDestination);
|
|
|
+ canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy));
|
|
|
+ logger.info("destination {} all adapters destroyed!", canalDestination);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
- logger.info("topic all connectors destroyed!");
|
|
|
}
|
|
|
|
|
|
- public abstract void start();
|
|
|
+ protected abstract void closeConnection();
|
|
|
|
|
|
- public abstract void stop();
|
|
|
}
|