Browse Source

fixed outAdapterkey autoGen

jianghang.loujh 3 years ago
parent
commit
ee51f338e0

+ 37 - 52
client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/ESAdapter.java

@@ -1,5 +1,14 @@
 package com.alibaba.otter.canal.client.adapter.es.core;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
@@ -9,20 +18,7 @@ import com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser;
 import com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.commons.lang.StringUtils;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * ES外部适配器
@@ -96,12 +92,12 @@ public abstract class ESAdapter implements OuterAdapter {
         String table = dml.getTable();
         Map<String, ESSyncConfig> configMap;
         if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-            configMap = dbTableEsSyncConfig.get(StringUtils.trimToEmpty(dml.getDestination()) + "-"
-                                                + StringUtils.trimToEmpty(dml.getGroupId()) + "_" + database + "-"
-                                                + table);
+            configMap = dbTableEsSyncConfig
+                .get(StringUtils.trimToEmpty(dml.getDestination()) + "-" + StringUtils.trimToEmpty(dml.getGroupId())
+                     + "_" + database + "-" + table);
         } else {
-            configMap = dbTableEsSyncConfig.get(StringUtils.trimToEmpty(dml.getDestination()) + "_" + database + "-"
-                                                + table);
+            configMap = dbTableEsSyncConfig
+                .get(StringUtils.trimToEmpty(dml.getDestination()) + "_" + database + "-" + table);
         }
 
         if (configMap != null && !configMap.values().isEmpty()) {
@@ -146,47 +142,37 @@ public abstract class ESAdapter implements OuterAdapter {
             throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
         }
         String schema = matcher.group(2);
+        schemaItem.getAliasTableItems().values().forEach(tableItem -> {
+            Map<String, ESSyncConfig> esSyncConfigMap;
+            String schemaKey = tableItem.getSchema() == null ? schema : tableItem.getSchema();
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                esSyncConfigMap = dbTableEsSyncConfig
+                    .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "-"
+                                     + StringUtils.trimToEmpty(config.getGroupId()) + "_" + schemaKey + "-"
+                                     + tableItem.getTableName(),
+                        k -> new ConcurrentHashMap<>());
+            } else {
+                esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(
+                    StringUtils.trimToEmpty(config.getDestination()) + "_" + schemaKey + "-" + tableItem.getTableName(),
+                    k -> new ConcurrentHashMap<>());
+            }
 
-        schemaItem.getAliasTableItems()
-                .values()
-                .forEach(tableItem -> {
-                    Map<String, ESSyncConfig> esSyncConfigMap;
-                    if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                        esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
-                                        + "-"
-                                        + StringUtils.trimToEmpty(config.getGroupId())
-                                        + "_"
-                                        + tableItem.getSchema() == null ? schema : tableItem.getSchema()
-                                        + "-"
-                                        + tableItem.getTableName(),
-                                k -> new ConcurrentHashMap<>());
-                    } else {
-                        esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
-                                        + "_"
-                                        + tableItem.getSchema() == null ? schema : tableItem.getSchema()
-                                        + "-"
-                                        + tableItem.getTableName(),
-                                k -> new ConcurrentHashMap<>());
-                    }
-
-                    esSyncConfigMap.put(configName, config);
-                });
+            esSyncConfigMap.put(configName, config);
+        });
     }
 
     public boolean addConfig(String fileName, ESSyncConfig config) {
         if (match(config)) {
             esSyncConfig.put(fileName, config);
             addSyncConfigToCache(fileName, config);
-            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
-                    configuration.getKey());
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName, configuration.getKey());
             return true;
         }
         return false;
     }
 
     public void updateConfig(String fileName, ESSyncConfig config) {
-        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
-                .equals(configuration.getKey())) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey().equals(configuration.getKey())) {
             // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
             throw new RuntimeException("not allow to change outAdapterKey");
         }
@@ -205,12 +191,11 @@ public abstract class ESAdapter implements OuterAdapter {
     }
 
     private boolean match(ESSyncConfig config) {
-        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
-                .equalsIgnoreCase(configuration.getKey());
+        boolean sameMatch = config.getOuterAdapterKey() != null
+                            && config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey());
         boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
-                .startsWith(StringUtils
-                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
-                                config.getGroupId()}, '-'));
+            .startsWith(StringUtils
+                .join(new String[] { Util.AUTO_GENERATED_PREFIX, config.getDestination(), config.getGroupId() }, '-'));
         return sameMatch || prefixMatch;
     }
 }

+ 24 - 147
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -1,11 +1,5 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
-import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -14,6 +8,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,6 +17,13 @@ import org.springframework.core.env.Environment;
 import org.springframework.core.env.PropertySource;
 import org.springframework.core.env.StandardEnvironment;
 
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
 /**
  * 外部适配器的加载器
  *
@@ -49,162 +51,37 @@ public class CanalAdapterLoader {
 
         for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
             for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-                int i = 0;
+                int autoGenId = 0;
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
                 List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
 
                 for (OuterAdapterConfig config : group.getOuterAdapters()) {
                     // 保证一定有key
-                    if (config.getKey() == null) {
-                        String key = StringUtils.join(new String[]{Util.AUTO_GENERATED_PREFIX,
-                                        canalAdapter.getInstance(), group.getGroupId(), String.valueOf(i)},
-                                '-');
+                    if (StringUtils.isEmpty(config.getKey())) {
+                        String key = StringUtils.join(
+                            new String[] { Util.AUTO_GENERATED_PREFIX, canalAdapter.getInstance(), group.getGroupId(),
+                                           String.valueOf(autoGenId) },
+                            '-');
+                        //gen keyId
                         config.setKey(key);
                     }
-                    i++;
+                    autoGenId++;
                     loadAdapter(config, canalOuterAdapters);
                 }
                 canalOuterAdapterGroups.add(canalOuterAdapters);
 
-                AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(canalAdapter.getInstance()
-                                + "|"
-                                + StringUtils.trimToEmpty(group.getGroupId()),
-                        f -> new AdapterProcessor(canalClientConfig,
-                                canalAdapter.getInstance(),
-                                group.getGroupId(),
-                                canalOuterAdapterGroups));
+                AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(
+                    canalAdapter.getInstance() + "|" + StringUtils.trimToEmpty(group.getGroupId()),
+                    f -> new AdapterProcessor(canalClientConfig,
+                        canalAdapter.getInstance(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups));
                 adapterProcessor.start();
 
-                logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
-                        + group.getGroupId());
+                logger.info("Start adapter for canal-client mq topic: {} succeed",
+                    canalAdapter.getInstance() + "-" + group.getGroupId());
             }
         }
-
-        // if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
-        // // 初始化canal-client的适配器
-        // for (CanalClientConfig.CanalAdapter canalAdapter :
-        // canalClientConfig.getCanalAdapters()) {
-        // List<List<OuterAdapter>> canalOuterAdapterGroups = new
-        // CopyOnWriteArrayList<>();
-        //
-        // for (CanalClientConfig.Group connectorGroup :
-        // canalAdapter.getGroups()) {
-        // List<OuterAdapter> canalOutConnectors = new CopyOnWriteArrayList<>();
-        // for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
-        // loadAdapter(c, canalOutConnectors);
-        // }
-        // canalOuterAdapterGroups.add(canalOutConnectors);
-        // }
-        // CanalAdapterWorker worker;
-        // if (StringUtils.isNotEmpty(canalServerHost)) {
-        // worker = new CanalAdapterWorker(canalClientConfig,
-        // canalAdapter.getInstance(),
-        // canalServerHost,
-        // zkHosts,
-        // canalOuterAdapterGroups);
-        // } else if (zkHosts != null) {
-        // worker = new CanalAdapterWorker(canalClientConfig,
-        // canalAdapter.getInstance(),
-        // zkHosts,
-        // canalOuterAdapterGroups);
-        // } else {
-        // throw new RuntimeException("No canal server connector found");
-        // }
-        // canalWorkers.put(canalAdapter.getInstance(), worker);
-        // worker.start();
-        // logger.info("Start adapter for canal instance: {} succeed",
-        // canalAdapter.getInstance());
-        // }
-        // } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {
-        // // 初始化canal-client-kafka的适配器
-        // for (CanalClientConfig.CanalAdapter canalAdapter :
-        // canalClientConfig.getCanalAdapters()) {
-        // for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-        // List<List<OuterAdapter>> canalOuterAdapterGroups = new
-        // CopyOnWriteArrayList<>();
-        // List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
-        // for (OuterAdapterConfig config : group.getOuterAdapters()) {
-        // loadAdapter(config, canalOuterAdapters);
-        // }
-        // canalOuterAdapterGroups.add(canalOuterAdapters);
-        //
-        // CanalAdapterKafkaWorker canalKafkaWorker = new
-        // CanalAdapterKafkaWorker(canalClientConfig,
-        // canalClientConfig.getMqServers(),
-        // canalAdapter.getInstance(),
-        // group.getGroupId(),
-        // canalOuterAdapterGroups,
-        // canalClientConfig.getFlatMessage());
-        // canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" +
-        // group.getGroupId(), canalKafkaWorker);
-        // canalKafkaWorker.start();
-        // logger.info("Start adapter for canal-client mq topic: {} succeed",
-        // canalAdapter.getInstance() + "-" + group.getGroupId());
-        // }
-        // }
-        // } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode()))
-        // {
-        // // 初始化canal-client-rocketMQ的适配器
-        // for (CanalClientConfig.CanalAdapter canalAdapter :
-        // canalClientConfig.getCanalAdapters()) {
-        // for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-        // List<List<OuterAdapter>> canalOuterAdapterGroups = new
-        // CopyOnWriteArrayList<>();
-        // List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
-        // for (OuterAdapterConfig config : group.getOuterAdapters()) {
-        // loadAdapter(config, canalOuterAdapters);
-        // }
-        // canalOuterAdapterGroups.add(canalOuterAdapters);
-        // CanalAdapterRocketMQWorker rocketMQWorker = new
-        // CanalAdapterRocketMQWorker(canalClientConfig,
-        // canalClientConfig.getMqServers(),
-        // canalAdapter.getInstance(),
-        // group.getGroupId(),
-        // canalOuterAdapterGroups,
-        // canalClientConfig.getAccessKey(),
-        // canalClientConfig.getSecretKey(),
-        // canalClientConfig.getFlatMessage(),
-        // canalClientConfig.isEnableMessageTrace(),
-        // canalClientConfig.getCustomizedTraceTopic(),
-        // canalClientConfig.getAccessChannel(),
-        // canalClientConfig.getNamespace());
-        // canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" +
-        // group.getGroupId(), rocketMQWorker);
-        // rocketMQWorker.start();
-        //
-        // logger.info("Start adapter for canal-client mq topic: {} succeed",
-        // canalAdapter.getInstance() + "-" + group.getGroupId());
-        // }
-        // }
-        // } else if ("rabbitMQ".equalsIgnoreCase(canalClientConfig.getMode()))
-        // {
-        // // 初始化canal-client-rabbitMQ的适配器
-        // for (CanalClientConfig.CanalAdapter canalAdapter :
-        // canalClientConfig.getCanalAdapters()) {
-        // for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-        // List<List<OuterAdapter>> canalOuterAdapterGroups = new
-        // CopyOnWriteArrayList<>();
-        // List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
-        // for (OuterAdapterConfig config : group.getOuterAdapters()) {
-        // loadAdapter(config, canalOuterAdapters);
-        // }
-        // canalOuterAdapterGroups.add(canalOuterAdapters);
-        // CanalAdapterRabbitMQWorker rabbitMQWork = new
-        // CanalAdapterRabbitMQWorker(canalClientConfig,
-        // canalOuterAdapterGroups,
-        // canalAdapter.getInstance(),
-        // group.getGroupId(),
-        // canalClientConfig.getFlatMessage());
-        // canalMQWorker.put(canalAdapter.getInstance() + "-rabbitmq-" +
-        // group.getGroupId(), rabbitMQWork);
-        // rabbitMQWork.start();
-        //
-        // logger.info("Start adapter for canal-client mq topic: {} succeed",
-        // canalAdapter.getInstance() + "-" + group.getGroupId());
-        // }
-        // }
-        // // CanalAdapterRabbitMQWork
-        // }
     }
 
     private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {

+ 3 - 3
client-adapter/launcher/src/main/resources/bootstrap.yml

@@ -1,6 +1,6 @@
 canal:
   manager:
     jdbc:
-      url: jdbc:mysql://rm-bp1yjb2rh8249f1ft90130.mysql.rds.aliyuncs.com:3306/canal_manager?useUnicode=true
-      username: dev
-      password: dev123456
+      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
+      username: root
+      password: 121212