Quellcode durchsuchen

fixed rocketmq ak/sk

agapple vor 6 Jahren
Ursprung
Commit
ec235f3efd

+ 20 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -30,6 +30,10 @@ public class CanalClientConfig {
     private Long               timeout;
     // 模式 tcp kafka rocketMQ
     private String             mode          = "tcp";
+    // aliyun ak/sk
+    private String             accessKey;
+    private String             secretKey;
+
     // canal adapters 配置
     private List<CanalAdapter> canalAdapters;
 
@@ -105,6 +109,22 @@ public class CanalClientConfig {
         this.mode = mode;
     }
 
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
     public List<CanalAdapter> getCanalAdapters() {
         return canalAdapters;
     }

+ 23 - 23
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -27,8 +27,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ExtensionLoader<T> {
 
-    private static final Logger                                      logger                     = LoggerFactory
-        .getLogger(ExtensionLoader.class);
+    private static final Logger                                      logger                     = LoggerFactory.getLogger(ExtensionLoader.class);
 
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
 
@@ -36,8 +35,7 @@ public class ExtensionLoader<T> {
 
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
 
-    private static final Pattern                                     NAME_SEPARATOR             = Pattern
-        .compile("\\s*[,]+\\s*");
+    private static final Pattern                                     NAME_SEPARATOR             = Pattern.compile("\\s*[,]+\\s*");
 
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
 
@@ -173,8 +171,7 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(),
-                t);
+                                            + ")  could not be instantiated: " + t.getMessage(), t);
         }
     }
 
@@ -194,8 +191,7 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(),
-                t);
+                                            + ")  could not be instantiated: " + t.getMessage(), t);
         }
     }
 
@@ -334,10 +330,12 @@ public class ExtensionLoader<T> {
                                             // Class.forName(line, true,
                                             // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
-                                                throw new IllegalStateException(
-                                                    "Error when load extension class(interface: " + type
-                                                                                + ", class line: " + clazz.getName()
-                                                                                + "), class " + clazz.getName()
+                                                throw new IllegalStateException("Error when load extension class(interface: "
+                                                                                + type
+                                                                                + ", class line: "
+                                                                                + clazz.getName()
+                                                                                + "), class "
+                                                                                + clazz.getName()
                                                                                 + "is not subtype of interface.");
                                             } else {
                                                 try {
@@ -355,9 +353,9 @@ public class ExtensionLoader<T> {
                                                                 extensionClasses.put(n, clazz);
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
-                                                                throw new IllegalStateException(
-                                                                    "Duplicate extension " + type.getName() + " name "
-                                                                                                + n + " on "
+                                                                throw new IllegalStateException("Duplicate extension "
+                                                                                                + type.getName()
+                                                                                                + " name " + n + " on "
                                                                                                 + c.getName() + " and "
                                                                                                 + clazz.getName());
                                                             }
@@ -367,9 +365,12 @@ public class ExtensionLoader<T> {
                                             }
                                         }
                                     } catch (Throwable t) {
-                                        IllegalStateException e = new IllegalStateException(
-                                            "Failed to load extension class(interface: " + type + ", class line: "
-                                                                                            + line + ") in " + url
+                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
+                                                                                            + type
+                                                                                            + ", class line: "
+                                                                                            + line
+                                                                                            + ") in "
+                                                                                            + url
                                                                                             + ", cause: "
                                                                                             + t.getMessage(),
                                             t);
@@ -384,18 +385,17 @@ public class ExtensionLoader<T> {
                         }
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
-                                     + ") in " + url,
-                            t);
+                                     + ") in " + url, t);
                     }
                 } // end of while urls
             }
         } catch (Throwable t) {
-            logger.error(
-                "Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
-                t);
+            logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
+                         + ").", t);
         }
     }
 
+    @SuppressWarnings("unused")
     private static ClassLoader findClassLoader() {
         return ExtensionLoader.class.getClassLoader();
     }

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.adapter.launcher;
 import org.springframework.boot.Banner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.builder.SpringApplicationBuilder;
 
 /**
  * 启动入口

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -64,11 +64,12 @@ public class SyncSwitch {
         }
     }
 
+    @SuppressWarnings("resource")
     private synchronized void startListen(String destination, BooleanMutex mutex) {
         try {
             String path = SYN_SWITCH_ZK_NODE + destination;
             CuratorFramework curator = curatorClient.getCurator();
-            final NodeCache nodeCache = new NodeCache(curator, path);
+            NodeCache nodeCache = new NodeCache(curator, path);
             nodeCache.start();
             nodeCache.getListenable().addListener(() -> initMutex(curator, destination, mutex));
         } catch (Exception e) {

+ 1 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -47,6 +47,7 @@ public class AdapterCanalConfig extends CanalClientConfig {
         return srcDataSources;
     }
 
+    @SuppressWarnings("resource")
     public void setSrcDataSources(Map<String, DatasourceConfig> srcDataSources) {
         this.srcDataSources = srcDataSources;
 

+ 1 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -44,8 +44,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         while (!running)
             ;
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 2 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -122,6 +122,8 @@ public class CanalAdapterLoader {
                         canalAdapter.getInstance(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
+                        canalClientConfig.getAccessKey(),
+                        canalClientConfig.getSecretKey(),
                         canalClientConfig.getFlatMessage());
                     canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();

+ 5 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -3,16 +3,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * rocketmq对应的client适配器工作线程
@@ -26,13 +22,14 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     private boolean                flatMessage;
 
     public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
-                                      String groupId, List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+                                      String groupId, List<List<OuterAdapter>> canalOuterAdapters, String accessKey,
+                                      String secretKey, boolean flatMessage){
         super(canalOuterAdapters);
         this.canalClientConfig = canalClientConfig;
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
+        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey, secretKey, flatMessage);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
@@ -42,8 +39,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             ;
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 2 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -21,6 +21,8 @@ canal.conf:
   syncBatchSize: 1000
   retries: 0
   timeout:
+  accessKey:
+  secretKey:
   mode: tcp # kafka rocketMQ
 #  srcDataSources:
 #    defaultDS: