浏览代码

group 线程池没有初始化的bug

mcy 6 年之前
父节点
当前提交
00fce30072

+ 4 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -4,6 +4,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -36,7 +37,9 @@ public abstract class AbstractCanalAdapterWorker {
 
 
     protected SyncSwitch                      syncSwitch;
     protected SyncSwitch                      syncSwitch;
 
 
-    public AbstractCanalAdapterWorker(){
+    public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
         syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
     }
     }
 
 

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -26,7 +26,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
                                    List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
                                    List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
-        this.canalOuterAdapters = canalOuterAdapters;
+        super(canalOuterAdapters);
         this.topic = topic;
         this.topic = topic;
         this.canalDestination = topic;
         this.canalDestination = topic;
         this.flatMessage = flatMessage;
         this.flatMessage = flatMessage;

+ 2 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -24,12 +24,12 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
                                       List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
                                       List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
-        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
-        this.canalOuterAdapters = canalOuterAdapters;
+        super(canalOuterAdapters);
         this.topic = topic;
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
         this.canalDestination = topic;
         this.connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
         this.connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
     }
 
 
     @Override
     @Override

+ 2 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -34,7 +34,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      */
      */
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
                               List<List<OuterAdapter>> canalOuterAdapters){
                               List<List<OuterAdapter>> canalOuterAdapters){
-        this.canalOuterAdapters = canalOuterAdapters;
+        super(canalOuterAdapters);
         this.canalDestination = canalDestination;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
         connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
@@ -49,7 +49,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      */
      */
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
                               List<List<OuterAdapter>> canalOuterAdapters){
                               List<List<OuterAdapter>> canalOuterAdapters){
-        this.canalOuterAdapters = canalOuterAdapters;
+        super(canalOuterAdapters);
         this.canalDestination = canalDestination;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
         connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");