Browse Source

es增加父子文档索引同步适配

mcy 6 years ago
parent
commit
4df60a0fb5

+ 16 - 31
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -26,12 +26,6 @@ public class ESSyncConfig {
     private ESMapping esMapping;
 
     public void validate() {
-        if (!esMapping.relations.isEmpty()) {
-            Map<String, RelationMapping> relationMappings = esMapping.relations.get(0);
-            RelationMapping relationMapping = relationMappings.values().iterator().next();
-            esMapping.isChild = StringUtils.isNotEmpty(relationMapping.getParent());
-        }
-
         if (esMapping._index == null) {
             throw new NullPointerException("esMapping._index");
         }
@@ -88,24 +82,23 @@ public class ESSyncConfig {
 
     public static class ESMapping {
 
-        private String                             _index;
-        private String                             _type;
-        private String                             _id;
-        private boolean                            upsert          = false;
-        private String                             pk;
-        private List<Map<String, RelationMapping>> relations       = new ArrayList<>();
-        private boolean                            isChild         = false;
-        private String                             sql;
+        private String                       _index;
+        private String                       _type;
+        private String                       _id;
+        private boolean                      upsert          = false;
+        private String                       pk;
+        private Map<String, RelationMapping> relations       = new LinkedHashMap<>();
+        private String                       sql;
         // 对象字段, 例: objFields:
         // - _labels: array:;
-        private Map<String, String>                objFields       = new LinkedHashMap<>();
-        private List<String>                       skips           = new ArrayList<>();
-        private int                                commitBatch     = 1000;
-        private String                             etlCondition;
-        private boolean                            syncByTimestamp = false;                // 是否按时间戳定时同步
-        private Long                               syncInterval;                           // 同步时间间隔
+        private Map<String, String>          objFields       = new LinkedHashMap<>();
+        private List<String>                 skips           = new ArrayList<>();
+        private int                          commitBatch     = 1000;
+        private String                       etlCondition;
+        private boolean                      syncByTimestamp = false;                // 是否按时间戳定时同步
+        private Long                         syncInterval;                           // 同步时间间隔
 
-        private SchemaItem                         schemaItem;                             // sql解析结果模型
+        private SchemaItem                   schemaItem;                             // sql解析结果模型
 
         public String get_index() {
             return _index;
@@ -163,22 +156,14 @@ public class ESSyncConfig {
             this.skips = skips;
         }
 
-        public List<Map<String, RelationMapping>> getRelations() {
+        public Map<String, RelationMapping> getRelations() {
             return relations;
         }
 
-        public void setRelations(List<Map<String, RelationMapping>> relations) {
+        public void setRelations(Map<String, RelationMapping> relations) {
             this.relations = relations;
         }
 
-        public boolean isChild() {
-            return isChild;
-        }
-
-        public void setChild(boolean child) {
-            isChild = child;
-        }
-
         public String getSql() {
             return sql;
         }

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -43,7 +43,7 @@ public class ESSyncService {
         long begin = System.currentTimeMillis();
         if (esSyncConfigs != null) {
             if (logger.isTraceEnabled()) {
-                logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
+                logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",
                     dml.getDestination(),
                     dml.getDatabase(),
                     dml.getTable(),
@@ -65,7 +65,7 @@ public class ESSyncService {
                 }
             }
             if (logger.isTraceEnabled()) {
-                logger.trace("Sync elapsed time: {} ms, effect index count:{}, destination: {}",
+                logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",
                     (System.currentTimeMillis() - begin),
                     esSyncConfigs.size(),
                     dml.getDestination());
@@ -74,7 +74,7 @@ public class ESSyncService {
                 StringBuilder configIndexes = new StringBuilder();
                 esSyncConfigs
                     .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
-                logger.debug("DML: {} \nEffect indexes: {}",
+                logger.debug("DML: {} \nAffected indexes: {}",
                     JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
                     configIndexes.toString());
             }

+ 100 - 9
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -9,10 +10,13 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -64,13 +68,24 @@ public class ESTemplate {
      */
     public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setDoc(esFieldData)
-                    .setDocAsUpsert(true));
+                    .setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             } else {
-                getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setSource(esFieldData));
+                IndexRequestBuilder indexRequestBuilder = transportClient
+                    .prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setSource(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    indexRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(indexRequestBuilder);
             }
             commitBulk();
         } else {
@@ -137,7 +152,7 @@ public class ESTemplate {
             return count;
         });
         if (logger.isTraceEnabled()) {
-            logger.trace("Update ES by query effect {} records", syncCount);
+            logger.trace("Update ES by query affected {} records", syncCount);
         }
     }
 
@@ -200,13 +215,24 @@ public class ESTemplate {
 
     private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setDoc(esFieldData)
-                    .setDocAsUpsert(true));
+                    .setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             } else {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData));
+                UpdateRequestBuilder updateRequestBuilder = transportClient
+                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequestBuilder.setRouting(parentVal);
+                }
+                getBulk().add(updateRequestBuilder);
             }
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
@@ -257,6 +283,10 @@ public class ESTemplate {
                 esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
         return resultIdVal;
     }
 
@@ -294,6 +324,10 @@ public class ESTemplate {
                 }
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
         return resultIdVal;
     }
 
@@ -340,6 +374,9 @@ public class ESTemplate {
                 esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlData, esFieldData);
         return resultIdVal;
     }
 
@@ -368,9 +405,63 @@ public class ESTemplate {
                     getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlOld, esFieldData);
         return resultIdVal;
     }
 
+    private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
+                                       Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    Object parentVal;
+                    try {
+                        parentVal = getValFromRS(mapping,
+                            resultSet,
+                            parentFieldItem.getFieldName(),
+                            parentFieldItem.getFieldName());
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+
+    private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
+                                 Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
+                    Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+
     /**
      * es 字段类型本地缓存
      */

+ 21 - 0
client-adapter/elasticsearch/src/main/resources/es/biz_order.yml

@@ -0,0 +1,21 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _type: _doc
+  _id: _id
+  relations:
+    customer_order:
+      name: order
+      parent: customer_id
+  sql: "select concat('oid_', t.id) as _id,
+        t.customer_id,
+        t.id as order_id,
+        t.serial_code as order_serial,
+        t.c_time as order_time
+        from biz_order t"
+  skips:
+    - customer_id
+  etlCondition: "where t.c_time>='{0}'"
+  commitBatch: 3000

+ 47 - 0
client-adapter/elasticsearch/src/main/resources/es/customer.yml

@@ -0,0 +1,47 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _type: _doc
+  _id: id
+  relations:
+    customer_order:
+      name: customer
+  sql: "select t.id, t.name, t.email from customer t"
+  etlCondition: "where t.c_time>='{0}'"
+  commitBatch: 3000
+
+
+#{
+#  "mappings":{
+#    "_doc":{
+#      "properties":{
+#        "id": {
+#          "type": "long"
+#        },
+#        "name": {
+#          "type": "text"
+#        },
+#        "email": {
+#          "type": "text"
+#        },
+#        "order_id": {
+#          "type": "long"
+#        },
+#        "order_serial": {
+#          "type": "text"
+#        },
+#        "order_time": {
+#          "type": "date"
+#        },
+#        "customer_order":{
+#          "type":"join",
+#          "relations":{
+#            "customer":"order"
+#          }
+#        }
+#      }
+#    }
+#  }
+#}