Browse Source

es父子文档索引ETL支持

mcy 6 years ago
parent
commit
e03705e211

+ 46 - 10
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.SQLException;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -13,13 +11,14 @@ import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -199,21 +198,58 @@ public class ESEtlService {
                             Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
                             esFieldData.put(fieldName, val);
                         }
+
+                        if (!mapping.getRelations().isEmpty()) {
+                            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                                Map<String, Object> relations = new HashMap<>();
+                                relations.put("name", relationMapping.getName());
+                                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                                    FieldItem parentFieldItem = mapping.getSchemaItem()
+                                        .getSelectFields()
+                                        .get(relationMapping.getParent());
+                                    Object parentVal;
+                                    try {
+                                        parentVal = esTemplate.getValFromRS(mapping,
+                                            rs,
+                                            parentFieldItem.getFieldName(),
+                                            parentFieldItem.getFieldName());
+                                    } catch (SQLException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                    if (parentVal != null) {
+                                        relations.put("parent", parentVal.toString());
+                                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                                    }
+                                }
+                                esFieldData.put(relationField, relations);
+                            });
+                        }
+
                         Object idVal = null;
                         if (mapping.get_id() != null) {
                             idVal = rs.getObject(mapping.get_id());
                         }
 
                         if (idVal != null) {
+                            String parentVal = (String) esFieldData.remove("$parent_routing");
                             if (mapping.isUpsert()) {
-                                bulkRequestBuilder.add(transportClient
+                                UpdateRequestBuilder updateRequestBuilder = transportClient
                                     .prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
                                     .setDoc(esFieldData)
-                                    .setDocAsUpsert(true));
+                                    .setDocAsUpsert(true);
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    updateRequestBuilder.setRouting(parentVal);
+                                }
+                                bulkRequestBuilder.add(updateRequestBuilder);
                             } else {
-                                bulkRequestBuilder.add(transportClient
+                                IndexRequestBuilder indexRequestBuilder = transportClient
                                     .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
-                                    .setSource(esFieldData));
+                                    .setSource(esFieldData);
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    indexRequestBuilder.setRouting(parentVal);
+                                }
+                                bulkRequestBuilder.add(indexRequestBuilder);
                             }
                         } else {
                             idVal = rs.getObject(mapping.getPk());