Forráskód Böngészése

es增加relations配置
配置monitor yml加载使用通用加载类

mcy 6 éve
szülő
commit
05ad671cc1

+ 6 - 0
client-adapter/elasticsearch/pom.xml

@@ -40,6 +40,12 @@
             <version>4.12</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -133,7 +133,7 @@ public class ESAdapter implements OuterAdapter {
             esSyncService = new ESSyncService(esTemplate);
 
             esConfigMonitor = new ESConfigMonitor();
-            esConfigMonitor.init(this);
+            esConfigMonitor.init(this, envProperties);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

+ 62 - 15
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -5,6 +5,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * ES 映射配置
  *
@@ -24,6 +26,12 @@ public class ESSyncConfig {
     private ESMapping esMapping;
 
     public void validate() {
+        if (!esMapping.relations.isEmpty()) {
+            Map<String, RelationMapping> relationMappings = esMapping.relations.get(0);
+            RelationMapping relationMapping = relationMappings.values().iterator().next();
+            esMapping.isChild = StringUtils.isNotEmpty(relationMapping.getParent());
+        }
+
         if (esMapping._index == null) {
             throw new NullPointerException("esMapping._index");
         }
@@ -31,7 +39,7 @@ public class ESSyncConfig {
             throw new NullPointerException("esMapping._type");
         }
         if (esMapping._id == null && esMapping.getPk() == null) {
-            throw new NullPointerException("esMapping._id and esMapping.pk");
+            throw new NullPointerException("esMapping._id or esMapping.pk");
         }
         if (esMapping.sql == null) {
             throw new NullPointerException("esMapping.sql");
@@ -80,23 +88,24 @@ public class ESSyncConfig {
 
     public static class ESMapping {
 
-        private String              _index;
-        private String              _type;
-        private String              _id;
-        private boolean             upsert          = false;
-        private String              pk;
-        // private String parent;
-        private String              sql;
+        private String                             _index;
+        private String                             _type;
+        private String                             _id;
+        private boolean                            upsert          = false;
+        private String                             pk;
+        private List<Map<String, RelationMapping>> relations       = new ArrayList<>();
+        private boolean                            isChild         = false;
+        private String                             sql;
         // 对象字段, 例: objFields:
         // - _labels: array:;
-        private Map<String, String> objFields       = new LinkedHashMap<>();
-        private List<String>        skips           = new ArrayList<>();
-        private int                 commitBatch     = 1000;
-        private String              etlCondition;
-        private boolean             syncByTimestamp = false;                // 是否按时间戳定时同步
-        private Long                syncInterval;                           // 同步时间间隔
+        private Map<String, String>                objFields       = new LinkedHashMap<>();
+        private List<String>                       skips           = new ArrayList<>();
+        private int                                commitBatch     = 1000;
+        private String                             etlCondition;
+        private boolean                            syncByTimestamp = false;                // 是否按时间戳定时同步
+        private Long                               syncInterval;                           // 同步时间间隔
 
-        private SchemaItem          schemaItem;                             // sql解析结果模型
+        private SchemaItem                         schemaItem;                             // sql解析结果模型
 
         public String get_index() {
             return _index;
@@ -154,6 +163,22 @@ public class ESSyncConfig {
             this.skips = skips;
         }
 
+        public List<Map<String, RelationMapping>> getRelations() {
+            return relations;
+        }
+
+        public void setRelations(List<Map<String, RelationMapping>> relations) {
+            this.relations = relations;
+        }
+
+        public boolean isChild() {
+            return isChild;
+        }
+
+        public void setChild(boolean child) {
+            isChild = child;
+        }
+
         public String getSql() {
             return sql;
         }
@@ -202,4 +227,26 @@ public class ESSyncConfig {
             this.schemaItem = schemaItem;
         }
     }
+
+    public static class RelationMapping {
+
+        private String name;
+        private String parent;
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getParent() {
+            return parent;
+        }
+
+        public void setParent(String parent) {
+            this.parent = parent;
+        }
+    }
 }

+ 18 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -3,9 +3,11 @@ package com.alibaba.otter.canal.client.adapter.es.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -31,10 +33,13 @@ public class ESConfigMonitor {
 
     private ESAdapter             esAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(ESAdapter esAdapter) {
+    public void init(ESAdapter esAdapter, Properties envProperties) {
         this.esAdapter = esAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -65,11 +70,13 @@ public class ESConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
-                config.validate();
-                addConfigToCache(file, config);
-
-                logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+                ESSyncConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
+                if (config != null) {
+                    config.validate();
+                    addConfigToCache(file, config);
+                    logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
@@ -88,7 +95,11 @@ public class ESConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
+                    ESSyncConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
                         deleteConfigFromCache(file);

+ 1 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -23,6 +23,7 @@ public class ConfigLoadTest {
     public void testLoad() {
         Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load(null);
         ESSyncConfig config = configMap.get("mytest_user.yml");
+        config.validate();
         Assert.assertNotNull(config);
         Assert.assertEquals("defaultDS", config.getDataSourceKey());
         ESSyncConfig.ESMapping esMapping = config.getEsMapping();

+ 118 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ESTest.java

@@ -0,0 +1,118 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ESTest {
+
+    private TransportClient transportClient;
+
+    @Before
+    public void init() throws UnknownHostException {
+        Settings.Builder settingBuilder = Settings.builder();
+        settingBuilder.put("cluster.name", TestConstant.clusterName);
+        Settings settings = settingBuilder.build();
+        transportClient = new PreBuiltTransportClient(settings);
+        String[] hostArray = TestConstant.esHosts.split(",");
+        for (String host : hostArray) {
+            int i = host.indexOf(":");
+            transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
+                Integer.parseInt(host.substring(i + 1))));
+        }
+    }
+
+    @Test
+    public void test01() {
+        SearchResponse response = transportClient.prepareSearch("test")
+            .setTypes("osm")
+            .setQuery(QueryBuilders.termQuery("_id", "1"))
+            .setSize(10000)
+            .get();
+        for (SearchHit hit : response.getHits()) {
+            System.out.println(hit.getSourceAsMap().get("data").getClass());
+        }
+    }
+
+    @Test
+    public void test02() {
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        esFieldData.put("userId", 2L);
+        esFieldData.put("eventId", 4L);
+        esFieldData.put("eventName", "网络异常");
+        esFieldData.put("description", "第四个事件信息");
+
+        Map<String, Object> relations = new LinkedHashMap<>();
+        esFieldData.put("user_event", relations);
+        relations.put("name", "event");
+        relations.put("parent", "2");
+
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder
+            .add(transportClient.prepareIndex("test", "osm", "2_4").setRouting("2").setSource(esFieldData));
+        commit(bulkRequestBuilder);
+    }
+
+    @Test
+    public void test03() {
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        esFieldData.put("userId", 2L);
+        esFieldData.put("eventName", "网络异常1");
+
+        Map<String, Object> relations = new LinkedHashMap<>();
+        esFieldData.put("user_event", relations);
+        relations.put("name", "event");
+        relations.put("parent", "2");
+
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder.add(transportClient.prepareUpdate("test", "osm", "2_4").setRouting("2").setDoc(esFieldData));
+        commit(bulkRequestBuilder);
+    }
+
+    @Test
+    public void test04() {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder.add(transportClient.prepareDelete("test", "osm", "2_4"));
+        commit(bulkRequestBuilder);
+    }
+
+    private void commit(BulkRequestBuilder bulkRequestBuilder) {
+        if (bulkRequestBuilder.numberOfActions() > 0) {
+            BulkResponse response = bulkRequestBuilder.execute().actionGet();
+            if (response.hasFailures()) {
+                for (BulkItemResponse itemResponse : response.getItems()) {
+                    if (!itemResponse.isFailed()) {
+                        continue;
+                    }
+
+                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                        System.out.println(itemResponse.getFailureMessage());
+                    } else {
+                        System.out.println("ES bulk commit error" + itemResponse.getFailureMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    @After
+    public void after() {
+        transportClient.close();
+    }
+}

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java

@@ -11,7 +11,7 @@ public class TestConstant {
     public final static String    jdbcPassword = "121212";
 
     public final static String    esHosts      = "127.0.0.1:9300";
-    public final static String    clusterNmae  = "elasticsearch";
+    public final static String    clusterName  = "elasticsearch";
 
     public static DruidDataSource dataSource;
 

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/Common.java

@@ -22,7 +22,7 @@ public class Common {
         outerAdapterConfig.setName("es");
         outerAdapterConfig.setHosts(TestConstant.esHosts);
         Map<String, String> properties = new HashMap<>();
-        properties.put("cluster.name", TestConstant.clusterNmae);
+        properties.put("cluster.name", TestConstant.clusterName);
         outerAdapterConfig.setProperties(properties);
 
         ESAdapter esAdapter = new ESAdapter();

+ 16 - 4
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.hbase.HbaseAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
@@ -26,10 +27,13 @@ public class HbaseConfigMonitor {
 
     private HbaseAdapter          hbaseAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(HbaseAdapter hbaseAdapter) {
+    public void init(HbaseAdapter hbaseAdapter, Properties envProperties) {
         this.hbaseAdapter = hbaseAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -60,7 +64,11 @@ public class HbaseConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                MappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
                 config.validate();
                 addConfigToCache(file, config);
 
@@ -83,7 +91,11 @@ public class HbaseConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    MappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
                         deleteConfigFromCache(file);

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -146,7 +146,7 @@ public class RdbAdapter implements OuterAdapter {
             skipDupException);
 
         rdbConfigMonitor = new RdbConfigMonitor();
-        rdbConfigMonitor.init(configuration.getKey(), this);
+        rdbConfigMonitor.init(configuration.getKey(), this, envProperties);
     }
 
     /**

+ 16 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
@@ -29,11 +30,14 @@ public class RdbConfigMonitor {
 
     private RdbAdapter            rdbAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(String key, RdbAdapter rdbAdapter) {
+    public void init(String key, RdbAdapter rdbAdapter, Properties envProperties) {
         this.key = key;
         this.rdbAdapter = rdbAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -64,7 +68,11 @@ public class RdbConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                MappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
                 config.validate();
                 if ((key == null && config.getOuterAdapterKey() == null)
                     || (key != null && key.equals(config.getOuterAdapterKey()))) {
@@ -90,7 +98,11 @@ public class RdbConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    MappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
                         || (key != null && key.equals(config.getOuterAdapterKey()))) {