Răsfoiți Sursa

fix: 同个instance配置多个group避免共享CanalMsgConsumer (#5175)

lwd-coding 10 luni în urmă
părinte
comite
9493ff733d

+ 12 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -1,16 +1,5 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -23,6 +12,16 @@ import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
 import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader;
 import com.alibaba.otter.canal.connector.core.spi.ProxyCanalMsgConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * 适配处理器
@@ -63,8 +62,9 @@ public class AdapterProcessor {
 
         // load connector consumer
         ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class);
+        String key = destination + "_" + groupId;
         canalMsgConsumer = new ProxyCanalMsgConsumer(loader
-            .getExtension(canalClientConfig.getMode().toLowerCase(), destination, CONNECTOR_SPI_DIR,
+            .getExtension(canalClientConfig.getMode().toLowerCase(), key, CONNECTOR_SPI_DIR,
                 CONNECTOR_STANDBY_SPI_DIR));
 
         Properties properties = canalClientConfig.getConsumerProperties();