1
0
Эх сурвалжийг харах

add proxy for SPI (#4520)

* 测试类报错

* add proxy for SPI
rewerma 2 жил өмнө
parent
commit
cd798b71e9

+ 87 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/ProxyOuterAdapter.java

@@ -0,0 +1,87 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ProxyOuterAdapter implements OuterAdapter {
+
+    private OuterAdapter outerAdapter;
+
+    public ProxyOuterAdapter(OuterAdapter outerAdapter) {
+        this.outerAdapter = outerAdapter;
+    }
+
+    private ClassLoader changeCL() {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(outerAdapter.getClass().getClassLoader());
+        return cl;
+    }
+
+    private void revertCL(ClassLoader cl) {
+        Thread.currentThread().setContextClassLoader(cl);
+    }
+
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        ClassLoader cl = changeCL();
+        try {
+            outerAdapter.init(configuration, envProperties);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void sync(List<Dml> dmls) {
+        ClassLoader cl = changeCL();
+        try {
+            outerAdapter.sync(dmls);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        ClassLoader cl = changeCL();
+        try {
+            outerAdapter.destroy();
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        ClassLoader cl = changeCL();
+        try {
+            return OuterAdapter.super.etl(task, params);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        ClassLoader cl = changeCL();
+        try {
+            return OuterAdapter.super.count(task);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public String getDestination(String task) {
+        ClassLoader cl = changeCL();
+        try {
+            return OuterAdapter.super.getDestination(task);
+        } finally {
+            revertCL(cl);
+        }
+    }
+}

+ 20 - 21
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
+import com.alibaba.otter.canal.connector.core.spi.ProxyCanalMsgConsumer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -26,33 +27,33 @@ import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader;
 
 /**
  * 适配处理器
- * 
+ *
  * @author rewerma 2020-02-01
  * @version 1.0.0
  */
 public class AdapterProcessor {
 
-    private static final Logger             logger                    = LoggerFactory.getLogger(AdapterProcessor.class);
+    private static final Logger logger = LoggerFactory.getLogger(AdapterProcessor.class);
 
-    private static final String             CONNECTOR_SPI_DIR         = "/plugin";
-    private static final String             CONNECTOR_STANDBY_SPI_DIR = "/canal-adapter/plugin";
+    private static final String CONNECTOR_SPI_DIR = "/plugin";
+    private static final String CONNECTOR_STANDBY_SPI_DIR = "/canal-adapter/plugin";
 
-    private CanalMsgConsumer                canalMsgConsumer;
+    private CanalMsgConsumer canalMsgConsumer;
 
-    private String                          canalDestination;                                                           // canal实例
-    private String                          groupId                   = null;                                           // groupId
-    private List<List<OuterAdapter>>        canalOuterAdapters;                                                         // 外部适配器
-    private CanalClientConfig               canalClientConfig;                                                          // 配置
-    private ExecutorService                 groupInnerExecutorService;                                                  // 组内工作线程池
-    private volatile boolean                running                   = false;                                          // 是否运行中
-    private Thread                          thread                    = null;
-    private Thread.UncaughtExceptionHandler handler                   = (t, e) -> logger
+    private String canalDestination;                                                           // canal实例
+    private String groupId = null;                                           // groupId
+    private List<List<OuterAdapter>> canalOuterAdapters;                                                         // 外部适配器
+    private CanalClientConfig canalClientConfig;                                                          // 配置
+    private ExecutorService groupInnerExecutorService;                                                  // 组内工作线程池
+    private volatile boolean running = false;                                          // 是否运行中
+    private Thread thread = null;
+    private Thread.UncaughtExceptionHandler handler = (t, e) -> logger
         .error("parse events has an error", e);
 
-    private SyncSwitch                      syncSwitch;
+    private SyncSwitch syncSwitch;
 
     public AdapterProcessor(CanalClientConfig canalClientConfig, String destination, String groupId,
-                            List<List<OuterAdapter>> canalOuterAdapters){
+        List<List<OuterAdapter>> canalOuterAdapters) {
         this.canalClientConfig = canalClientConfig;
         this.canalDestination = destination;
         this.groupId = groupId;
@@ -63,17 +64,15 @@ public class AdapterProcessor {
 
         // load connector consumer
         ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class);
-        canalMsgConsumer = loader
-            .getExtension(canalClientConfig.getMode().toLowerCase(),destination ,CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
+        canalMsgConsumer = new ProxyCanalMsgConsumer(loader
+            .getExtension(canalClientConfig.getMode().toLowerCase(), destination, CONNECTOR_SPI_DIR,
+                CONNECTOR_STANDBY_SPI_DIR));
 
         Properties properties = canalClientConfig.getConsumerProperties();
         properties.put(CanalConstants.CANAL_MQ_FLAT_MESSAGE, canalClientConfig.getFlatMessage());
         properties.put(CanalConstants.CANAL_ALIYUN_ACCESS_KEY, canalClientConfig.getAccessKey());
         properties.put(CanalConstants.CANAL_ALIYUN_SECRET_KEY, canalClientConfig.getSecretKey());
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(canalMsgConsumer.getClass().getClassLoader());
         canalMsgConsumer.init(properties, canalDestination, groupId);
-        Thread.currentThread().setContextClassLoader(cl);
     }
 
     public void start() {
@@ -171,7 +170,7 @@ public class AdapterProcessor {
         }
 
         int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+            || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         if (retry == -1) {
             // 重试次数-1代表异常时一直阻塞重试
             retry = Integer.MAX_VALUE;

+ 2 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
+import com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,11 +88,8 @@ public class CanalAdapterLoader {
     private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter;
-            adapter = loader.getExtension(config.getName(), config.getKey());
+            adapter = new ProxyOuterAdapter(loader.getExtension(config.getName(), config.getKey()));
 
-            ClassLoader cl = Thread.currentThread().getContextClassLoader();
-            // 替换ClassLoader
-            Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
             Environment env = (Environment) SpringContext.getBean(Environment.class);
             Properties evnProperties = null;
             if (env instanceof StandardEnvironment) {
@@ -109,7 +107,6 @@ public class CanalAdapterLoader {
                 }
             }
             adapter.init(config, evnProperties);
-            Thread.currentThread().setContextClassLoader(cl);
             canalOutConnectors.add(adapter);
             logger.info("Load canal adapter: {} succeed", config.getName());
         } catch (Exception e) {

+ 66 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/ProxyCanalMQProducer.java

@@ -0,0 +1,66 @@
+package com.alibaba.otter.canal.connector.core.spi;
+
+import com.alibaba.otter.canal.connector.core.config.MQProperties;
+import com.alibaba.otter.canal.connector.core.producer.MQDestination;
+import com.alibaba.otter.canal.connector.core.util.Callback;
+import com.alibaba.otter.canal.protocol.Message;
+import java.util.Properties;
+
+public class ProxyCanalMQProducer implements CanalMQProducer {
+
+    private CanalMQProducer canalMQProducer;
+
+    public ProxyCanalMQProducer(CanalMQProducer canalMQProducer) {
+        this.canalMQProducer = canalMQProducer;
+    }
+
+    private ClassLoader changeCL() {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
+        return cl;
+    }
+
+    private void revertCL(ClassLoader cl) {
+        Thread.currentThread().setContextClassLoader(cl);
+    }
+
+    @Override
+    public void init(Properties properties) {
+        ClassLoader cl = changeCL();
+        try {
+            canalMQProducer.init(properties);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public MQProperties getMqProperties() {
+        ClassLoader cl = changeCL();
+        try {
+            return canalMQProducer.getMqProperties();
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void send(MQDestination canalDestination, Message message, Callback callback) {
+        ClassLoader cl = changeCL();
+        try {
+            canalMQProducer.send(canalDestination, message, callback);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void stop() {
+        ClassLoader cl = changeCL();
+        try {
+            canalMQProducer.stop();
+        } finally {
+            revertCL(cl);
+        }
+    }
+}

+ 86 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/ProxyCanalMsgConsumer.java

@@ -0,0 +1,86 @@
+package com.alibaba.otter.canal.connector.core.spi;
+
+import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class ProxyCanalMsgConsumer implements CanalMsgConsumer {
+
+    private CanalMsgConsumer canalMsgConsumer;
+
+    public ProxyCanalMsgConsumer(CanalMsgConsumer canalMsgConsumer) {
+        this.canalMsgConsumer = canalMsgConsumer;
+    }
+
+    private ClassLoader changeCL() {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(canalMsgConsumer.getClass().getClassLoader());
+        return cl;
+    }
+
+    private void revertCL(ClassLoader cl) {
+        Thread.currentThread().setContextClassLoader(cl);
+    }
+
+
+    @Override
+    public void init(Properties properties, String topic, String groupId) {
+        ClassLoader cl = changeCL();
+        try {
+            canalMsgConsumer.init(properties, topic, groupId);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void connect() {
+        ClassLoader cl = changeCL();
+        try {
+            canalMsgConsumer.connect();
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
+        ClassLoader cl = changeCL();
+        try {
+            return canalMsgConsumer.getMessage(timeout, unit);
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void ack() {
+        ClassLoader cl = changeCL();
+        try {
+            canalMsgConsumer.ack();
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void rollback() {
+        ClassLoader cl = changeCL();
+        try {
+            canalMsgConsumer.rollback();
+        } finally {
+            revertCL(cl);
+        }
+    }
+
+    @Override
+    public void disconnect() {
+        ClassLoader cl = changeCL();
+        try {
+            canalMsgConsumer.disconnect();
+        } finally {
+            revertCL(cl);
+        }
+    }
+}

+ 10 - 11
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.deployer;
 
+import com.alibaba.otter.canal.connector.core.spi.ProxyCanalMQProducer;
 import java.util.Properties;
 
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
@@ -21,21 +22,21 @@ import com.alibaba.otter.canal.server.CanalMQStarter;
  */
 public class CanalStarter {
 
-    private static final Logger logger                    = LoggerFactory.getLogger(CanalStarter.class);
+    private static final Logger logger = LoggerFactory.getLogger(CanalStarter.class);
 
-    private static final String CONNECTOR_SPI_DIR         = "/plugin";
+    private static final String CONNECTOR_SPI_DIR = "/plugin";
     private static final String CONNECTOR_STANDBY_SPI_DIR = "/canal/plugin";
 
-    private CanalController     controller                = null;
-    private CanalMQProducer     canalMQProducer           = null;
-    private Thread              shutdownThread            = null;
-    private CanalMQStarter      canalMQStarter            = null;
+    private CanalController controller = null;
+    private CanalMQProducer canalMQProducer = null;
+    private Thread shutdownThread = null;
+    private CanalMQStarter canalMQStarter = null;
     private volatile Properties properties;
-    private volatile boolean    running                   = false;
+    private volatile boolean running = false;
 
     private CanalAdminWithNetty canalAdmin;
 
-    public CanalStarter(Properties properties){
+    public CanalStarter(Properties properties) {
         this.properties = properties;
     }
 
@@ -67,10 +68,8 @@ public class CanalStarter {
             canalMQProducer = loader
                 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
             if (canalMQProducer != null) {
-                ClassLoader cl = Thread.currentThread().getContextClassLoader();
-                Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
+                canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);
                 canalMQProducer.init(properties);
-                Thread.currentThread().setContextClassLoader(cl);
             }
         }