Browse Source

增加对array和object类型的支持

mcy 6 years ago
parent
commit
0d9695c3a5

+ 26 - 13
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.es.config;
 package com.alibaba.otter.canal.client.adapter.es.config;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 /**
 /**
  * ES 映射配置
  * ES 映射配置
@@ -58,19 +60,22 @@ public class ESSyncConfig {
 
 
     public static class ESMapping {
     public static class ESMapping {
 
 
-        private String       _index;
-        private String       _type;
-        private String       _id;
-        private String       pk;
-        private String       parent;
-        private String       sql;
-        private List<String> skips           = new ArrayList<>();
-        private int          commitBatch     = 1000;
-        private String       etlCondition;
-        private boolean      syncByTimestamp = false;            // 是否按时间戳定时同步
-        private Long         syncInterval;                       // 同步时间间隔
-
-        private SchemaItem   schemaItem;                         // sql解析结果模型
+        private String              _index;
+        private String              _type;
+        private String              _id;
+        private String              pk;
+        private String              parent;
+        private String              sql;
+        // 对象字段, 例: objFields:
+        //              - _labels: array:;
+        private Map<String, String> objFields     = new LinkedHashMap<>();
+        private List<String>        skips           = new ArrayList<>();
+        private int                 commitBatch     = 1000;
+        private String              etlCondition;
+        private boolean             syncByTimestamp = false;                 // 是否按时间戳定时同步
+        private Long                syncInterval;                            // 同步时间间隔
+
+        private SchemaItem          schemaItem;                              // sql解析结果模型
 
 
         public String get_index() {
         public String get_index() {
             return _index;
             return _index;
@@ -112,6 +117,14 @@ public class ESSyncConfig {
             this.parent = parent;
             this.parent = parent;
         }
         }
 
 
+        public Map<String, String> getObjFields() {
+            return objFields;
+        }
+
+        public void setObjFields(Map<String, String> objFields) {
+            this.objFields = objFields;
+        }
+
         public List<String> getSkips() {
         public List<String> getSkips() {
             return skips;
             return skips;
         }
         }

+ 14 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -31,6 +31,20 @@ public class ESSyncUtil {
 
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncUtil.class);
     private static Logger logger = LoggerFactory.getLogger(ESSyncUtil.class);
 
 
+    public static Object convertToEsObj(Object val, String fieldInfo) {
+        if (val == null) {
+            return null;
+        }
+        if (fieldInfo.startsWith("array:")) {
+            String separator = fieldInfo.substring("array:".length()).trim();
+            String[] values = val.toString().split(separator);
+            return Arrays.asList(values);
+        } else if (fieldInfo.startsWith("object")) {
+            return JSON.parse(val.toString());
+        }
+        return null;
+    }
+
     /**
     /**
      * 类型转换为Mapping中对应的类型
      * 类型转换为Mapping中对应的类型
      */
      */

+ 23 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,16 +2,14 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 
 import java.sql.ResultSet;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
+import com.alibaba.fastjson.JSON;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -205,8 +203,14 @@ public class ESTemplate {
                         .append(mapValue.get("lat"))
                         .append(mapValue.get("lat"))
                         .append("];");
                         .append("];");
                 } else {
                 } else {
-                    logger.warn("Unsupported object type for script_update");
+                    sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
+                    sb.append(JSON.toJSONString(value));
+                    sb.append(";");
                 }
                 }
+            } else if (value instanceof List) {
+                sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
+                sb.append(JSON.toJSONString(value));
+                sb.append(";");
             } else if (value instanceof String) {
             } else if (value instanceof String) {
                 sb.append("ctx._source")
                 sb.append("ctx._source")
                     .append("['")
                     .append("['")
@@ -325,7 +329,13 @@ public class ESTemplate {
                 value = resultSet.getByte(columnName);
                 value = resultSet.getByte(columnName);
             }
             }
         }
         }
-        return ESSyncUtil.typeConvert(value, esType);
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
     }
     }
 
 
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
@@ -393,7 +403,13 @@ public class ESTemplate {
                 value = ((Byte) value).intValue() != 0;
                 value = ((Byte) value).intValue() != 0;
             }
             }
         }
         }
-        return ESSyncUtil.typeConvert(value, esType);
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
     }
     }
 
 
     /**
     /**

+ 2 - 0
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -10,5 +10,7 @@ esMapping:
         left join role b on b.id=a.role_id
         left join role b on b.id=a.role_id
         left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         group by user_id) c on c.user_id=a.id"
         group by user_id) c on c.user_id=a.id"
+#  objFields:
+#    _labels: array:;
   etlCondition: "where a.c_time>='{0}'"
   etlCondition: "where a.c_time>='{0}'"
   commitBatch: 3000
   commitBatch: 3000