Browse Source

Merge branch 'feature/es2'

mcy 6 years ago
parent
commit
941ed7f60d
17 changed files with 444 additions and 57 deletions
  1. 6 0
      client-adapter/elasticsearch/pom.xml
  2. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 47 15
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java
  4. 18 7
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java
  5. 46 10
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java
  6. 3 3
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  7. 100 9
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  8. 21 0
      client-adapter/elasticsearch/src/main/resources/es/biz_order.yml
  9. 47 0
      client-adapter/elasticsearch/src/main/resources/es/customer.yml
  10. 1 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java
  11. 118 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ESTest.java
  12. 1 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java
  13. 1 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/Common.java
  14. 1 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  15. 16 4
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java
  16. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  17. 16 4
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

+ 6 - 0
client-adapter/elasticsearch/pom.xml

@@ -40,6 +40,12 @@
             <version>4.12</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -133,7 +133,7 @@ public class ESAdapter implements OuterAdapter {
             esSyncService = new ESSyncService(esTemplate);
 
             esConfigMonitor = new ESConfigMonitor();
-            esConfigMonitor.init(this);
+            esConfigMonitor.init(this, envProperties);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

+ 47 - 15
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -5,6 +5,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * ES 映射配置
  *
@@ -31,7 +33,7 @@ public class ESSyncConfig {
             throw new NullPointerException("esMapping._type");
         }
         if (esMapping._id == null && esMapping.getPk() == null) {
-            throw new NullPointerException("esMapping._id and esMapping.pk");
+            throw new NullPointerException("esMapping._id or esMapping.pk");
         }
         if (esMapping.sql == null) {
             throw new NullPointerException("esMapping.sql");
@@ -80,23 +82,23 @@ public class ESSyncConfig {
 
     public static class ESMapping {
 
-        private String              _index;
-        private String              _type;
-        private String              _id;
-        private boolean             upsert          = false;
-        private String              pk;
-        // private String parent;
-        private String              sql;
+        private String                       _index;
+        private String                       _type;
+        private String                       _id;
+        private boolean                      upsert          = false;
+        private String                       pk;
+        private Map<String, RelationMapping> relations       = new LinkedHashMap<>();
+        private String                       sql;
         // 对象字段, 例: objFields:
         // - _labels: array:;
-        private Map<String, String> objFields       = new LinkedHashMap<>();
-        private List<String>        skips           = new ArrayList<>();
-        private int                 commitBatch     = 1000;
-        private String              etlCondition;
-        private boolean             syncByTimestamp = false;                // 是否按时间戳定时同步
-        private Long                syncInterval;                           // 同步时间间隔
+        private Map<String, String>          objFields       = new LinkedHashMap<>();
+        private List<String>                 skips           = new ArrayList<>();
+        private int                          commitBatch     = 1000;
+        private String                       etlCondition;
+        private boolean                      syncByTimestamp = false;                // 是否按时间戳定时同步
+        private Long                         syncInterval;                           // 同步时间间隔
 
-        private SchemaItem          schemaItem;                             // sql解析结果模型
+        private SchemaItem                   schemaItem;                             // sql解析结果模型
 
         public String get_index() {
             return _index;
@@ -154,6 +156,14 @@ public class ESSyncConfig {
             this.skips = skips;
         }
 
+        public Map<String, RelationMapping> getRelations() {
+            return relations;
+        }
+
+        public void setRelations(Map<String, RelationMapping> relations) {
+            this.relations = relations;
+        }
+
         public String getSql() {
             return sql;
         }
@@ -202,4 +212,26 @@ public class ESSyncConfig {
             this.schemaItem = schemaItem;
         }
     }
+
+    public static class RelationMapping {
+
+        private String name;
+        private String parent;
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getParent() {
+            return parent;
+        }
+
+        public void setParent(String parent) {
+            this.parent = parent;
+        }
+    }
 }

+ 18 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -3,9 +3,11 @@ package com.alibaba.otter.canal.client.adapter.es.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -31,10 +33,13 @@ public class ESConfigMonitor {
 
     private ESAdapter             esAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(ESAdapter esAdapter) {
+    public void init(ESAdapter esAdapter, Properties envProperties) {
         this.esAdapter = esAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -65,11 +70,13 @@ public class ESConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
-                config.validate();
-                addConfigToCache(file, config);
-
-                logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+                ESSyncConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
+                if (config != null) {
+                    config.validate();
+                    addConfigToCache(file, config);
+                    logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
@@ -88,7 +95,11 @@ public class ESConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
+                    ESSyncConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
                         deleteConfigFromCache(file);

+ 46 - 10
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.SQLException;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -13,13 +11,14 @@ import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -199,21 +198,58 @@ public class ESEtlService {
                             Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
                             esFieldData.put(fieldName, val);
                         }
+
+                        if (!mapping.getRelations().isEmpty()) {
+                            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                                Map<String, Object> relations = new HashMap<>();
+                                relations.put("name", relationMapping.getName());
+                                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                                    FieldItem parentFieldItem = mapping.getSchemaItem()
+                                        .getSelectFields()
+                                        .get(relationMapping.getParent());
+                                    Object parentVal;
+                                    try {
+                                        parentVal = esTemplate.getValFromRS(mapping,
+                                            rs,
+                                            parentFieldItem.getFieldName(),
+                                            parentFieldItem.getFieldName());
+                                    } catch (SQLException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                    if (parentVal != null) {
+                                        relations.put("parent", parentVal.toString());
+                                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                                    }
+                                }
+                                esFieldData.put(relationField, relations);
+                            });
+                        }
+
                         Object idVal = null;
                         if (mapping.get_id() != null) {
                             idVal = rs.getObject(mapping.get_id());
                         }
 
                         if (idVal != null) {
+                            String parentVal = (String) esFieldData.remove("$parent_routing");
                             if (mapping.isUpsert()) {
-                                bulkRequestBuilder.add(transportClient
+                                UpdateRequestBuilder updateRequestBuilder = transportClient
                                     .prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
                                     .setDoc(esFieldData)
-                                    .setDocAsUpsert(true));
+                                    .setDocAsUpsert(true);
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    updateRequestBuilder.setRouting(parentVal);
+                                }
+                                bulkRequestBuilder.add(updateRequestBuilder);
                             } else {
-                                bulkRequestBuilder.add(transportClient
+                                IndexRequestBuilder indexRequestBuilder = transportClient
                                     .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
-                                    .setSource(esFieldData));
+                                    .setSource(esFieldData);
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    indexRequestBuilder.setRouting(parentVal);
+                                }
+                                bulkRequestBuilder.add(indexRequestBuilder);
                             }
                         } else {
                             idVal = rs.getObject(mapping.getPk());

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -43,7 +43,7 @@ public class ESSyncService {
         long begin = System.currentTimeMillis();
         if (esSyncConfigs != null) {
             if (logger.isTraceEnabled()) {
-                logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
+                logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",
                     dml.getDestination(),
                     dml.getDatabase(),
                     dml.getTable(),
@@ -65,7 +65,7 @@ public class ESSyncService {
                 }
             }
             if (logger.isTraceEnabled()) {
-                logger.trace("Sync elapsed time: {} ms, effect index count:{}, destination: {}",
+                logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",
                     (System.currentTimeMillis() - begin),
                     esSyncConfigs.size(),
                     dml.getDestination());
@@ -74,7 +74,7 @@ public class ESSyncService {
                 StringBuilder configIndexes = new StringBuilder();
                 esSyncConfigs
                     .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
-                logger.debug("DML: {} \nEffect indexes: {}",
+                logger.debug("DML: {} \nAffected indexes: {}",
                     JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
                     configIndexes.toString());
             }

+ 100 - 9
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -9,10 +10,13 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -64,13 +68,24 @@ public class ESTemplate {
      */
     public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setDoc(esFieldData)
-                    .setDocAsUpsert(true));
+                    .setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             } else {
-                getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setSource(esFieldData));
+                IndexRequestBuilder indexRequestBuilder = transportClient
+                    .prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setSource(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    indexRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(indexRequestBuilder);
             }
             commitBulk();
         } else {
@@ -137,7 +152,7 @@ public class ESTemplate {
             return count;
         });
         if (logger.isTraceEnabled()) {
-            logger.trace("Update ES by query effect {} records", syncCount);
+            logger.trace("Update ES by query affected {} records", syncCount);
         }
     }
 
@@ -200,13 +215,24 @@ public class ESTemplate {
 
     private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setDoc(esFieldData)
-                    .setDocAsUpsert(true));
+                    .setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             } else {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData));
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             }
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
@@ -257,6 +283,10 @@ public class ESTemplate {
                 esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
         return resultIdVal;
     }
 
@@ -294,6 +324,10 @@ public class ESTemplate {
                 }
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
         return resultIdVal;
     }
 
@@ -340,6 +374,9 @@ public class ESTemplate {
                 esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlData, esFieldData);
         return resultIdVal;
     }
 
@@ -368,9 +405,63 @@ public class ESTemplate {
                     getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlOld, esFieldData);
         return resultIdVal;
     }
 
+    private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
+                                       Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    Object parentVal;
+                    try {
+                        parentVal = getValFromRS(mapping,
+                            resultSet,
+                            parentFieldItem.getFieldName(),
+                            parentFieldItem.getFieldName());
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+
+    private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
+                                 Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
+                    Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+
     /**
      * es 字段类型本地缓存
      */

+ 21 - 0
client-adapter/elasticsearch/src/main/resources/es/biz_order.yml

@@ -0,0 +1,21 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _type: _doc
+  _id: _id
+  relations:
+    customer_order:
+      name: order
+      parent: customer_id
+  sql: "select concat('oid_', t.id) as _id,
+        t.customer_id,
+        t.id as order_id,
+        t.serial_code as order_serial,
+        t.c_time as order_time
+        from biz_order t"
+  skips:
+    - customer_id
+  etlCondition: "where t.c_time>='{0}'"
+  commitBatch: 3000

+ 47 - 0
client-adapter/elasticsearch/src/main/resources/es/customer.yml

@@ -0,0 +1,47 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _type: _doc
+  _id: id
+  relations:
+    customer_order:
+      name: customer
+  sql: "select t.id, t.name, t.email from customer t"
+  etlCondition: "where t.c_time>='{0}'"
+  commitBatch: 3000
+
+
+#{
+#  "mappings":{
+#    "_doc":{
+#      "properties":{
+#        "id": {
+#          "type": "long"
+#        },
+#        "name": {
+#          "type": "text"
+#        },
+#        "email": {
+#          "type": "text"
+#        },
+#        "order_id": {
+#          "type": "long"
+#        },
+#        "order_serial": {
+#          "type": "text"
+#        },
+#        "order_time": {
+#          "type": "date"
+#        },
+#        "customer_order":{
+#          "type":"join",
+#          "relations":{
+#            "customer":"order"
+#          }
+#        }
+#      }
+#    }
+#  }
+#}

+ 1 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -23,6 +23,7 @@ public class ConfigLoadTest {
     public void testLoad() {
         Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load(null);
         ESSyncConfig config = configMap.get("mytest_user.yml");
+        config.validate();
         Assert.assertNotNull(config);
         Assert.assertEquals("defaultDS", config.getDataSourceKey());
         ESSyncConfig.ESMapping esMapping = config.getEsMapping();

+ 118 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ESTest.java

@@ -0,0 +1,118 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ESTest {
+
+    private TransportClient transportClient;
+
+    @Before
+    public void init() throws UnknownHostException {
+        Settings.Builder settingBuilder = Settings.builder();
+        settingBuilder.put("cluster.name", TestConstant.clusterName);
+        Settings settings = settingBuilder.build();
+        transportClient = new PreBuiltTransportClient(settings);
+        String[] hostArray = TestConstant.esHosts.split(",");
+        for (String host : hostArray) {
+            int i = host.indexOf(":");
+            transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
+                Integer.parseInt(host.substring(i + 1))));
+        }
+    }
+
+    @Test
+    public void test01() {
+        SearchResponse response = transportClient.prepareSearch("test")
+            .setTypes("osm")
+            .setQuery(QueryBuilders.termQuery("_id", "1"))
+            .setSize(10000)
+            .get();
+        for (SearchHit hit : response.getHits()) {
+            System.out.println(hit.getSourceAsMap().get("data").getClass());
+        }
+    }
+
+    @Test
+    public void test02() {
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        esFieldData.put("userId", 2L);
+        esFieldData.put("eventId", 4L);
+        esFieldData.put("eventName", "网络异常");
+        esFieldData.put("description", "第四个事件信息");
+
+        Map<String, Object> relations = new LinkedHashMap<>();
+        esFieldData.put("user_event", relations);
+        relations.put("name", "event");
+        relations.put("parent", "2");
+
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder
+            .add(transportClient.prepareIndex("test", "osm", "2_4").setRouting("2").setSource(esFieldData));
+        commit(bulkRequestBuilder);
+    }
+
+    @Test
+    public void test03() {
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        esFieldData.put("userId", 2L);
+        esFieldData.put("eventName", "网络异常1");
+
+        Map<String, Object> relations = new LinkedHashMap<>();
+        esFieldData.put("user_event", relations);
+        relations.put("name", "event");
+        relations.put("parent", "2");
+
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder.add(transportClient.prepareUpdate("test", "osm", "2_4").setRouting("2").setDoc(esFieldData));
+        commit(bulkRequestBuilder);
+    }
+
+    @Test
+    public void test04() {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        bulkRequestBuilder.add(transportClient.prepareDelete("test", "osm", "2_4"));
+        commit(bulkRequestBuilder);
+    }
+
+    private void commit(BulkRequestBuilder bulkRequestBuilder) {
+        if (bulkRequestBuilder.numberOfActions() > 0) {
+            BulkResponse response = bulkRequestBuilder.execute().actionGet();
+            if (response.hasFailures()) {
+                for (BulkItemResponse itemResponse : response.getItems()) {
+                    if (!itemResponse.isFailed()) {
+                        continue;
+                    }
+
+                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                        System.out.println(itemResponse.getFailureMessage());
+                    } else {
+                        System.out.println("ES bulk commit error" + itemResponse.getFailureMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    @After
+    public void after() {
+        transportClient.close();
+    }
+}

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java

@@ -11,7 +11,7 @@ public class TestConstant {
     public final static String    jdbcPassword = "121212";
 
     public final static String    esHosts      = "127.0.0.1:9300";
-    public final static String    clusterNmae  = "elasticsearch";
+    public final static String    clusterName  = "elasticsearch";
 
     public static DruidDataSource dataSource;
 

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/Common.java

@@ -22,7 +22,7 @@ public class Common {
         outerAdapterConfig.setName("es");
         outerAdapterConfig.setHosts(TestConstant.esHosts);
         Map<String, String> properties = new HashMap<>();
-        properties.put("cluster.name", TestConstant.clusterNmae);
+        properties.put("cluster.name", TestConstant.clusterName);
         outerAdapterConfig.setProperties(properties);
 
         ESAdapter esAdapter = new ESAdapter();

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -96,7 +96,7 @@ public class HbaseAdapter implements OuterAdapter {
             hbaseSyncService = new HbaseSyncService(hbaseTemplate);
 
             configMonitor = new HbaseConfigMonitor();
-            configMonitor.init(this);
+            configMonitor.init(this, envProperties);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

+ 16 - 4
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.hbase.HbaseAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
@@ -26,10 +27,13 @@ public class HbaseConfigMonitor {
 
     private HbaseAdapter          hbaseAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(HbaseAdapter hbaseAdapter) {
+    public void init(HbaseAdapter hbaseAdapter, Properties envProperties) {
         this.hbaseAdapter = hbaseAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -60,7 +64,11 @@ public class HbaseConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                MappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
                 config.validate();
                 addConfigToCache(file, config);
 
@@ -83,7 +91,11 @@ public class HbaseConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    MappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
                         deleteConfigFromCache(file);

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -146,7 +146,7 @@ public class RdbAdapter implements OuterAdapter {
             skipDupException);
 
         rdbConfigMonitor = new RdbConfigMonitor();
-        rdbConfigMonitor.init(configuration.getKey(), this);
+        rdbConfigMonitor.init(configuration.getKey(), this, envProperties);
     }
 
     /**

+ 16 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.monitor;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
@@ -29,11 +30,14 @@ public class RdbConfigMonitor {
 
     private RdbAdapter            rdbAdapter;
 
+    private Properties            envProperties;
+
     private FileAlterationMonitor fileMonitor;
 
-    public void init(String key, RdbAdapter rdbAdapter) {
+    public void init(String key, RdbAdapter rdbAdapter, Properties envProperties) {
         this.key = key;
         this.rdbAdapter = rdbAdapter;
+        this.envProperties = envProperties;
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -64,7 +68,11 @@ public class RdbConfigMonitor {
             try {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                MappingConfig config = YmlConfigBinder
+                    .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                if (config == null) {
+                    return;
+                }
                 config.validate();
                 if ((key == null && config.getOuterAdapterKey() == null)
                     || (key != null && key.equals(config.getOuterAdapterKey()))) {
@@ -90,7 +98,11 @@ public class RdbConfigMonitor {
                         onFileDelete(file);
                         return;
                     }
-                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    MappingConfig config = YmlConfigBinder
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                    if (config == null) {
+                        return;
+                    }
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
                         || (key != null && key.equals(config.getOuterAdapterKey()))) {