|
@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
|
|
|
|
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketAddress;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
@@ -10,6 +9,7 @@ import com.alibaba.otter.canal.client.CanalConnector;
|
|
import com.alibaba.otter.canal.client.CanalConnectors;
|
|
import com.alibaba.otter.canal.client.CanalConnectors;
|
|
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
|
|
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
|
|
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
|
|
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
|
|
|
|
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -36,7 +36,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
|
|
List<List<OuterAdapter>> canalOuterAdapters){
|
|
List<List<OuterAdapter>> canalOuterAdapters){
|
|
super(canalOuterAdapters);
|
|
super(canalOuterAdapters);
|
|
this.canalDestination = canalDestination;
|
|
this.canalDestination = canalDestination;
|
|
- groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
|
|
|
|
connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
|
|
connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -51,16 +50,10 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
|
|
List<List<OuterAdapter>> canalOuterAdapters){
|
|
List<List<OuterAdapter>> canalOuterAdapters){
|
|
super(canalOuterAdapters);
|
|
super(canalOuterAdapters);
|
|
this.canalDestination = canalDestination;
|
|
this.canalDestination = canalDestination;
|
|
- groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
|
|
|
|
connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
|
|
connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
|
|
((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
|
|
((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- protected void closeConnection() {
|
|
|
|
- connector.stopRunning();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
protected void process() {
|
|
protected void process() {
|
|
while (!running)
|
|
while (!running)
|
|
@@ -132,4 +125,38 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void stop() {
|
|
|
|
+ try {
|
|
|
|
+ if (!running) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (connector instanceof ClusterCanalConnector) {
|
|
|
|
+ ((ClusterCanalConnector) connector).stopRunning();
|
|
|
|
+ } else if (connector instanceof SimpleCanalConnector) {
|
|
|
|
+ ((SimpleCanalConnector) connector).stopRunning();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ running = false;
|
|
|
|
+
|
|
|
|
+ syncSwitch.release(canalDestination);
|
|
|
|
+
|
|
|
|
+ logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
|
|
|
|
+ if (thread != null) {
|
|
|
|
+ try {
|
|
|
|
+ thread.join();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // ignore
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ groupInnerExecutorService.shutdown();
|
|
|
|
+ logger.info("destination {} adapters worker thread dead!", canalDestination);
|
|
|
|
+ canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy));
|
|
|
|
+ logger.info("destination {} all adapters destroyed!", canalDestination);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|