mcy 6 years ago
parent
commit
a4856b9a73

+ 79 - 23
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -4,6 +4,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 
 public class SchemaItem {
@@ -22,7 +23,8 @@ public class SchemaItem {
         this.isAllFieldsSimple();
         aliasTableItems.values().forEach(tableItem -> {
             tableItem.getRelationTableFields();
-            tableItem.getRelationSelectFields();
+            tableItem.getRelationKeyFieldItems();
+            tableItem.getRelationSelectFieldItems();
         });
     }
 
@@ -131,20 +133,21 @@ public class SchemaItem {
 
     public static class TableItem {
 
-        private SchemaItem               schemaItem;
+        private SchemaItem                               schemaItem;
 
-        private String                   schema;
-        private String                   tableName;
-        private String                   alias;
-        private String                   subQuerySql;
-        private List<FieldItem>          subQueryFields = new ArrayList<>();
-        private List<RelationFieldsPair> relationFields = new ArrayList<>();
+        private String                                   schema;
+        private String                                   tableName;
+        private String                                   alias;
+        private String                                   subQuerySql;
+        private List<FieldItem>                          subQueryFields = new ArrayList<>();
+        private List<RelationFieldsPair>                 relationFields = new ArrayList<>();
 
-        private boolean                  main;
-        private boolean                  subQuery;
+        private boolean                                  main;
+        private boolean                                  subQuery;
 
-        private volatile List<FieldItem> relationTableFields;               // 当前表关联条件字段
-        private volatile List<FieldItem> relationSelectFieldItem;           // 关联条件字段在select中的对应字段
+        private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段
+        private volatile List<FieldItem>                 relationKeyFieldItems;             // 关联条件字段在select中的对应字段
+        private volatile List<FieldItem>                 relationSelectFieldItems;          // 子表所在主表的查询字段
 
         public TableItem(SchemaItem schemaItem){
             this.schemaItem = schemaItem;
@@ -222,18 +225,40 @@ public class SchemaItem {
             this.relationFields = relationFields;
         }
 
-        public List<FieldItem> getRelationTableFields() {
+        public Map<FieldItem, List<FieldItem>> getRelationTableFields() {
             if (relationTableFields == null) {
                 synchronized (SchemaItem.class) {
                     if (relationTableFields == null) {
-                        relationTableFields = new ArrayList<>();
+                        relationTableFields = new LinkedHashMap<>();
+
                         getRelationFields().forEach(relationFieldsPair -> {
                             FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
                             FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
+                            FieldItem currentTableRelField = null;
                             if (getAlias().equals(leftFieldItem.getOwner())) {
-                                relationTableFields.add(leftFieldItem);
+                                // relationTableFields.add(leftFieldItem);
+                                currentTableRelField = leftFieldItem;
                             } else if (getAlias().equals(rightFieldItem.getOwner())) {
-                                relationTableFields.add(rightFieldItem);
+                                // relationTableFields.add(rightFieldItem);
+                                currentTableRelField = rightFieldItem;
+                            }
+
+                            if (currentTableRelField != null) {
+                                List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
+                                    .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
+                                if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                    relationTableFields.put(currentTableRelField, selectFieldItem);
+                                } else {
+                                    selectFieldItem = getSchemaItem().getColumnFields()
+                                        .get(rightFieldItem.getOwner() + "."
+                                             + rightFieldItem.getColumn().getColumnName());
+                                    if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                        relationTableFields.put(currentTableRelField, selectFieldItem);
+                                    } else {
+                                        throw new UnsupportedOperationException(
+                                            "Relation condition column must in select columns.");
+                                    }
+                                }
                             }
                         });
                     }
@@ -242,23 +267,23 @@ public class SchemaItem {
             return relationTableFields;
         }
 
-        public List<FieldItem> getRelationSelectFields() {
-            if (relationSelectFieldItem == null) {
+        public List<FieldItem> getRelationKeyFieldItems() {
+            if (relationKeyFieldItems == null) {
                 synchronized (SchemaItem.class) {
-                    if (relationSelectFieldItem == null) {
-                        relationSelectFieldItem = new ArrayList<>();
+                    if (relationKeyFieldItems == null) {
+                        relationKeyFieldItems = new ArrayList<>();
                         getRelationFields().forEach(relationFieldsPair -> {
                             FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
                             List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
                                 .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
                             if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                relationSelectFieldItem.addAll(selectFieldItem);
+                                relationKeyFieldItems.addAll(selectFieldItem);
                             } else {
                                 FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
                                 selectFieldItem = getSchemaItem().getColumnFields()
                                     .get(rightFieldItem.getOwner() + "." + rightFieldItem.getColumn().getColumnName());
                                 if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                    relationSelectFieldItem.addAll(selectFieldItem);
+                                    relationKeyFieldItems.addAll(selectFieldItem);
                                 } else {
                                     throw new UnsupportedOperationException(
                                         "Relation condition column must in select columns.");
@@ -268,7 +293,23 @@ public class SchemaItem {
                     }
                 }
             }
-            return relationSelectFieldItem;
+            return relationKeyFieldItems;
+        }
+
+        public List<FieldItem> getRelationSelectFieldItems() {
+            if (relationSelectFieldItems == null) {
+                synchronized (SchemaItem.class) {
+                    if (relationSelectFieldItems == null) {
+                        relationSelectFieldItems = new ArrayList<>();
+                        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+                            if (fieldItem.getOwners().contains(getAlias())) {
+                                relationSelectFieldItems.add(fieldItem);
+                            }
+                        }
+                    }
+                }
+            }
+            return relationSelectFieldItems;
         }
     }
 
@@ -367,6 +408,21 @@ public class SchemaItem {
                 return null;
             }
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            FieldItem fieldItem = (FieldItem) o;
+
+            return fieldName != null ? fieldName.equals(fieldItem.fieldName) : fieldItem.fieldName == null;
+        }
+
+        @Override
+        public int hashCode() {
+            return fieldName != null ? fieldName.hashCode() : 0;
+        }
     }
 
     public static class ColumnItem {

+ 64 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -6,6 +6,8 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -14,6 +16,8 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
@@ -165,6 +169,66 @@ public class ESSyncService {
                     return 0;
                 });
             }
+
+            // 从表的操作
+            for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                if (tableItem.isMain()) {
+                    continue;
+                }
+                if (!tableItem.getTableName().equals(dml.getTable())) {
+                    continue;
+                }
+                // ------关联条件出现在主表查询条件------
+                boolean allFieldsSimple = true;
+                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                        allFieldsSimple = false;
+                        break;
+                    }
+                }
+                // 所有查询字段均为简单字段
+                if (allFieldsSimple) {
+                    // 不是子查询
+                    if (!tableItem.isSubQuery()) {
+
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                            Object value = esTemplate
+                                .getValFromData(mapping, data, fieldItem.getColumn().getColumnName());
+                            esFieldData.put(fieldItem.getFieldName(), value);
+                        }
+                        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                        for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields()
+                            .entrySet()) {
+                            Object value = esTemplate
+                                .getValFromData(mapping, data, entry.getKey().getColumn().getColumnName());
+                            for (FieldItem fieldItem : entry.getValue()) {
+                                queryBuilder.must(QueryBuilders.termsQuery(fieldItem.getFieldName(), value));
+                            }
+                        }
+
+                        //
+                        // for (FieldItem fieldItem : tableItem.getRelationTableFields()) {
+                        // Object value = esTemplate
+                        // .getValFromData(mapping, data, fieldItem.getColumn().getColumnName());
+                        //
+                        // }
+
+                        // if (logger.isDebugEnabled()) {
+                        // logger.debug("从表insert, 且均为简单字段,对es进行update_by_query, queryBuilder: {},
+                        // esFieldData:{}", queryBuilder.toString(), esFieldData);
+                        // }
+                        boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                        // if (!result) {
+                        // logger.error("从表insert,均为简单字段,直接对es进行update_by_query, es更新存在错误, table: {},
+                        // index: {}, dml: {}", dml.getTable(), config.getEsSyn().getIndex(), dml);
+                        // }
+                    }
+                    // TODO
+                } else {
+                    // TODO 查询总sql
+                }
+            }
         }
     }
 }

+ 5 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.sql.*;
 import java.util.*;
 import java.util.Date;
@@ -148,14 +149,10 @@ public class ESSyncUtil {
                 Base64 base64 = new Base64();
                 res = base64.encodeAsString(b);
             } else if (val instanceof String) {
-                try {
-                    // 对应canal中的单字节编码
-                    byte[] b = ((String) val).getBytes("ISO-8859-1");
-                    Base64 base64 = new Base64();
-                    res = base64.encodeAsString(b);
-                } catch (UnsupportedEncodingException e) {
-                    logger.error(e.getMessage());
-                }
+                // 对应canal中的单字节编码
+                byte[] b = ((String) val).getBytes(StandardCharsets.ISO_8859_1);
+                Base64 base64 = new Base64();
+                res = base64.encodeAsString(b);
             }
         } else if ("geo_point".equals(esType)) {
             if (!(val instanceof String)) {

+ 35 - 29
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -133,24 +133,21 @@ public class ESTemplate {
     /**
      * update by query
      *
-     * @param esSyncConfig
+     * @param mapping
      * @param queryBuilder
      * @param esFieldData
      * @return
      */
-    private boolean updateByQuery(ESSyncConfig esSyncConfig, QueryBuilder queryBuilder,
-                                  Map<String, Object> esFieldData) {
-        return updateByQuery(esSyncConfig, queryBuilder, esFieldData, 0);
+    public boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData) {
+        return updateByQuery(mapping, queryBuilder, esFieldData, 1);
     }
 
-    private boolean updateByQuery(ESSyncConfig esSyncConfig, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
-                                  int counter) {
+    public boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
+                                 int counter) {
         if (CollectionUtils.isEmpty(esFieldData)) {
             return true;
         }
 
-        ESMapping mapping = esSyncConfig.getEsMapping();
-
         StringBuilder sb = new StringBuilder();
         esFieldData.forEach((key, value) -> {
             if (value instanceof Map) {
@@ -216,7 +213,7 @@ public class ESTemplate {
             } catch (InterruptedException e) {
                 // ignore
             }
-            return updateByQuery(esSyncConfig, queryBuilder, esFieldData, ++counter);
+            return updateByQuery(mapping, queryBuilder, esFieldData, ++counter);
         }
 
         return true;
@@ -308,6 +305,18 @@ public class ESTemplate {
     // return ESSyncUtil.typeConvert(val, esType);
     // }
 
+    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName) throws SQLException {
+        String esType = getEsType(mapping, fieldName);
+
+        Object value = resultSet.getObject(fieldName);
+        if (value instanceof Boolean) {
+            if (!"boolean".equals(esType)) {
+                value = resultSet.getByte(fieldName);
+            }
+        }
+        return value;
+    }
+
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
                                   Map<String, Object> esFieldData) throws SQLException {
         SchemaItem schemaItem = mapping.getSchemaItem();
@@ -315,22 +324,14 @@ public class ESTemplate {
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
-            String esType = getEsType(mapping, fieldName);
-
-            Object value = resultSet.getObject(fieldName);
-            if (value instanceof Boolean) {
-                // 判断es类型
-                if (!"boolean".equals(esType)) {
-                    value = resultSet.getByte(fieldName);
-                }
-            }
+            Object value = getValFromRS(mapping, resultSet, fieldName);
 
             if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = ESSyncUtil.typeConvert(value, esType);
+                resultIdVal = value;
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
-                esFieldData.put(fieldName, ESSyncUtil.typeConvert(value, esType));
+                esFieldData.put(fieldName, value);
             }
         }
         return resultIdVal;
@@ -361,6 +362,17 @@ public class ESTemplate {
     // return null;
     // }
 
+    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName) {
+        String esType = getEsType(mapping, fieldName);
+        Object value = dmlData.get(fieldName);
+        if (value instanceof Byte) {
+            if ("boolean".equals(esType)) {
+                value = ((Byte) value).intValue() != 0;
+            }
+        }
+        return ESSyncUtil.typeConvert(value, esType);
+    }
+
     /**
      * 将dml的data转换为es的data
      *
@@ -376,20 +388,14 @@ public class ESTemplate {
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
-            String esType = getEsType(mapping, fieldName);
-            Object value = dmlData.get(fieldName);
-            if (value instanceof Byte) {
-                if ("boolean".equals(esType)) {
-                    value = ((Byte) value).intValue() != 0;
-                }
-            }
+            Object value = getValFromData(mapping, dmlData, fieldName);
 
             if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = ESSyncUtil.typeConvert(value, esType);
+                resultIdVal = value;
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
-                esFieldData.put(fieldName, ESSyncUtil.typeConvert(value, esType));
+                esFieldData.put(fieldName, value);
             }
         }
         return resultIdVal;

+ 3 - 1
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -10,6 +10,8 @@ esMapping:
 #        left join (select user_id, group_concat(label,',') as labels from user_label
 #        group by user_id) c on c.user_id=a.id"
 #  sql: "select id, name, c_time from user "
-  sql: "select id, concat(name, '_') as name, c_time from user "
+#  sql: "select id, concat(name, '_') as name, c_time from user "
+  sql: "select a.id, a.name, a.role_id, b.role_name, a.c_time from user a
+        left join role b on b.id=a.role_id"
   commitBatch: 3000
   etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"

+ 5 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.es.test;
 
 import java.util.List;
+import java.util.Map;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,11 +34,12 @@ public class SqlParseTest {
             fieldItem -> Assert.assertEquals("c.labels", fieldItem.getOwner() + "." + fieldItem.getFieldName()));
 
         // 获取当前表关联条件字段
-        List<FieldItem> relationTableFields = tableItem.getRelationTableFields();
-        relationTableFields.forEach(fieldItem -> Assert.assertEquals("user_id", fieldItem.getColumn().getColumnName()));
+        Map<FieldItem, List<FieldItem>> relationTableFields = tableItem.getRelationTableFields();
+        relationTableFields.keySet()
+            .forEach(fieldItem -> Assert.assertEquals("user_id", fieldItem.getColumn().getColumnName()));
 
         // 获取关联字段在select中的对应字段
-        List<FieldItem> relationSelectFieldItem = tableItem.getRelationSelectFields();
+        List<FieldItem> relationSelectFieldItem = tableItem.getRelationKeyFieldItems();
         relationSelectFieldItem.forEach(fieldItem -> Assert.assertEquals("c.labels",
             fieldItem.getOwner() + "." + fieldItem.getColumn().getColumnName()));
     }

+ 19 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncTest.java

@@ -89,6 +89,25 @@ public class UserSyncTest {
         Assert.assertEquals("Eric_", response.getSource().get("name"));
     }
 
+    @Test
+    public void insertTest03() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 17 - 17
client-adapter/launcher/src/main/resources/application.yml

@@ -16,34 +16,34 @@ hbase.zookeeper.property.clientPort: 2181
 hbase.zookeeper.znode.parent: /hbase
 
 canal.conf:
-  canalServerHost: 127.0.0.1:11111
+#  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
   retry: 3
   timeout: 20000
   batchSize: 50
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
-      - name: es
-        hosts: 127.0.0.1:9300
-        properties:
-          cluster.name: elasticsearch
+#  canalInstances:
+#  - instance: example
+#    groups:
+#    - outAdapters:
+#      - name: es
+#        hosts: 127.0.0.1:9300
+#        properties:
+#          cluster.name: elasticsearch
 #      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
 #          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
 #          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
-#  mqTopics:
-#  - mqMode: kafka
-#    topic: example
-#    groups:
-#    - groupId: g2
-#      outAdapters:
-#      - name: logger
+  mqTopics:
+  - mqMode: kafka
+    topic: example
+    groups:
+    - groupId: g2
+      outAdapters:
+      - name: logger
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example