Browse Source

es配置加载

mcy 6 years ago
parent
commit
e1191b665e

+ 5 - 12
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -28,24 +28,17 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
 @SPI("es")
 public class ESAdapter implements OuterAdapter {
 
-    private static Logger                             logger       = LoggerFactory.getLogger(ESAdapter.class);
+    private static Logger   logger = LoggerFactory.getLogger(ESAdapter.class);
 
-    private static volatile Map<String, ESSyncConfig> esSyncConfig = null;                                    // 文件名对应配置
+    private TransportClient transportClient;
 
-    private TransportClient                           transportClient;
-
-    private ESSyncService                             esSyncService;
+    private ESSyncService   esSyncService;
 
     @Override
     public void init(OuterAdapterConfig configuration) {
         try {
-            if (esSyncConfig == null) {
-                synchronized (ESSyncConfig.class) {
-                    if (esSyncConfig == null) {
-                        esSyncConfig = ESSyncConfigLoader.load();
-                    }
-                }
-            }
+            ESSyncConfigLoader.load();
+
             Map<String, String> properties = configuration.getProperties();
             Settings.Builder settingBuilder = Settings.builder();
             properties.forEach(settingBuilder::put);

+ 9 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -53,6 +53,8 @@ public class ESSyncConfig {
         private int          commitBatch = 1000;
         private String       etlCondition;
 
+        private SchemaItem   schemaItem;                     // sql解析结果模型
+
         public String get_index() {
             return _index;
         }
@@ -133,5 +135,12 @@ public class ESSyncConfig {
             this.etlCondition = etlCondition;
         }
 
+        public SchemaItem getSchemaItem() {
+            return schemaItem;
+        }
+
+        public void setSchemaItem(SchemaItem schemaItem) {
+            this.schemaItem = schemaItem;
+        }
     }
 }

+ 42 - 13
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -5,27 +5,38 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 public class ESSyncConfigLoader {
 
-    private static Logger       logger    = LoggerFactory.getLogger(ESSyncConfigLoader.class);
+    private static Logger                                   logger              = LoggerFactory
+        .getLogger(ESSyncConfigLoader.class);
 
-    private static final String BASE_PATH = "es";
+    private static final String                             BASE_PATH           = "es";
 
-    public static Map<String, ESSyncConfig> load() {
-        logger.info("## Start loading mapping config ... ");
+    private static volatile Map<String, ESSyncConfig>       esSyncConfig        = new LinkedHashMap<>(); // 文件名对应配置
+    private static volatile Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = new LinkedHashMap<>(); // schema-table对应配置
 
-        Map<String, ESSyncConfig> result = new LinkedHashMap<>();
+    public static Map<String, ESSyncConfig> getEsSyncConfig() {
+        return esSyncConfig;
+    }
 
+    public static Map<String, List<ESSyncConfig>> getDbTableEsSyncConfig() {
+        return dbTableEsSyncConfig;
+    }
+
+    public static synchronized void load() {
+        logger.info("## Start loading mapping config ... ");
         Collection<String> configs = AdapterConfigs.get("es");
         for (String c : configs) {
             if (c == null) {
@@ -47,17 +58,35 @@ public class ESSyncConfigLoader {
 
             try {
                 config.validate();
+                SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
+                config.getEsMapping().setSchemaItem(schemaItem);
+
+                DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                if (dataSource == null || dataSource.getUrl() == null) {
+                    throw new RuntimeException("No data source found: " + config.getDataSourceKey());
+                }
+                Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
+                Matcher matcher = pattern.matcher(dataSource.getUrl());
+                if (!matcher.find()){
+                    throw new RuntimeException("No found the schema of jdbc-url: " + config.getDataSourceKey());
+                }
+                String schema = matcher.group(2);
+
+                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
+                    List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig
+                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new ArrayList<>());
+                    esSyncConfigs.add(config);
+                });
             } catch (Exception e) {
-                throw new RuntimeException("ERROR Config: " + c + " " + e.getMessage(), e);
+                throw new RuntimeException("ERROR Config: " + c, e);
             }
-            result.put(c, config);
+            esSyncConfig.put(c, config);
         }
 
         logger.info("## Mapping config loaded");
-        return result;
     }
 
-    public static String readConfigContent(String config) {
+    private static String readConfigContent(String config) {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
@@ -68,7 +97,7 @@ public class ESSyncConfigLoader {
                 in = ESSyncConfigLoader.class.getClassLoader().getResourceAsStream(config);
             }
             if (in == null) {
-                throw new RuntimeException("Config file not found.");
+                throw new RuntimeException("Config file: " + config + " not found.");
             }
 
             byte[] bytes = new byte[in.available()];

+ 1 - 1
client-adapter/elasticsearch/src/main/resources/es/myetst_user.yml

@@ -4,7 +4,7 @@ esMapping:
   _type: _doc
   _id: id
 #  pk: id
-  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user.a
+  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a
         left join role b on a.role_id=b.id
         left join (select user_id, group_concat(label,',') as labels from user_label
         group by user_id) c on c.user_id=a.id"

+ 16 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -1,24 +1,31 @@
 package com.alibaba.otter.canal.client.adapter.es.test;
 
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Map;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 public class ConfigLoadTest {
 
     @Before
-    public void before() {
+    public void before() throws SQLException {
         AdapterConfigs.put("es", "myetst_user.yml");
+        // 加载数据源连接池
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", DataSourceConstant.dataSource);
     }
 
     @Test
     public void testLoad() {
-        Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load();
+        ESSyncConfigLoader.load();
+        Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.getEsSyncConfig();
         ESSyncConfig config = configMap.get("myetst_user.yml");
         Assert.assertNotNull(config);
         Assert.assertEquals("defaultDS", config.getDataSourceKey());
@@ -27,5 +34,8 @@ public class ConfigLoadTest {
         Assert.assertEquals("_doc", esMapping.get_type());
         Assert.assertEquals("id", esMapping.get_id());
         Assert.assertNotNull(esMapping.getSql());
+
+        Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = ESSyncConfigLoader.getDbTableEsSyncConfig();
+        Assert.assertFalse(dbTableEsSyncConfig.isEmpty());
     }
 }

+ 31 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/DataSourceConstant.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import java.sql.SQLException;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+public class DataSourceConstant {
+
+    static DruidDataSource dataSource;
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true");
+        dataSource.setUsername("root");
+        dataSource.setPassword("121212");
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(3);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 0 - 1
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java

@@ -11,7 +11,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**