Browse Source

修改测试完成

mcy 6 years ago
parent
commit
5b9ada1f8c

+ 83 - 111
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -102,51 +102,30 @@ public class ESSyncService {
 
 
     }
     }
 
 
-    private void update(ESSyncConfig config, Dml dml) {
+    /**
+     * 插入操作dml
+     * 
+     * @param config es配置
+     * @param dml dml数据
+     */
+    private void insert(ESSyncConfig config, Dml dml) {
         List<Map<String, Object>> dataList = dml.getData();
         List<Map<String, Object>> dataList = dml.getData();
-        List<Map<String, Object>> oldList = dml.getOld();
-        if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
+        if (dataList == null || dataList.isEmpty()) {
             return;
             return;
         }
         }
         SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
         SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
-        int i = 0;
         for (Map<String, Object> data : dataList) {
         for (Map<String, Object> data : dataList) {
-            Map<String, Object> old = oldList.get(i);
-            if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
+            if (data == null || data.isEmpty()) {
                 continue;
                 continue;
             }
             }
 
 
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
                 // ------单表 & 所有字段都为简单字段------
                 // ------单表 & 所有字段都为简单字段------
-                singleTableSimpleFiledUpdate(config, dml, data, old);
+                singleTableSimpleFiledInsert(config, dml, data);
             } else {
             } else {
-                // ------主表 查询sql来更新------
+                // ------是主表 查询sql来插入------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                    ESMapping mapping = config.getEsMapping();
-                    String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-                    FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
-                    boolean idFieldSimple = true;
-                    if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
-                        idFieldSimple = false;
-                    }
-
-                    boolean allUpdateFieldSimple = true;
-                    out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-                        for (ColumnItem columnItem : fieldItem.getColumnItems()) {
-                            if (old.containsKey(columnItem.getColumnName())) {
-                                if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
-                                    allUpdateFieldSimple = false;
-                                    break out;
-                                }
-                            }
-                        }
-                    }
-                    // 判断主键和所更新的字段是否全为简单字段
-                    if (idFieldSimple && allUpdateFieldSimple) {
-                        singleTableSimpleFiledUpdate(config, dml, data, old);
-                    } else {
-                        mainTableUpdate(config, dml, data, old);
-                    }
+                    mainTableInsert(config, dml, data);
                 }
                 }
 
 
                 // 从表的操作
                 // 从表的操作
@@ -157,7 +136,6 @@ public class ESSyncService {
                     if (!tableItem.getTableName().equals(dml.getTable())) {
                     if (!tableItem.getTableName().equals(dml.getTable())) {
                         continue;
                         continue;
                     }
                     }
-
                     // 关联条件出现在主表查询条件是否为简单字段
                     // 关联条件出现在主表查询条件是否为简单字段
                     boolean allFieldsSimple = true;
                     boolean allFieldsSimple = true;
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
@@ -166,62 +144,85 @@ public class ESSyncService {
                             break;
                             break;
                         }
                         }
                     }
                     }
-
                     // 所有查询字段均为简单字段
                     // 所有查询字段均为简单字段
                     if (allFieldsSimple) {
                     if (allFieldsSimple) {
                         // 不是子查询
                         // 不是子查询
                         if (!tableItem.isSubQuery()) {
                         if (!tableItem.isSubQuery()) {
-                            // ------关联表简单字段更新------
+                            // ------关联表简单字段插入------
                             Map<String, Object> esFieldData = new LinkedHashMap<>();
                             Map<String, Object> esFieldData = new LinkedHashMap<>();
                             for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                             for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                                if (old.containsKey(fieldItem.getColumn().getColumnName())) {
-                                    Object value = esTemplate.getValFromData(config.getEsMapping(),
-                                        data,
-                                        fieldItem.getFieldName(),
-                                        fieldItem.getColumn().getColumnName());
-                                    esFieldData.put(fieldItem.getFieldName(), value);
-                                }
+                                Object value = esTemplate.getValFromData(config.getEsMapping(),
+                                    data,
+                                    fieldItem.getFieldName(),
+                                    fieldItem.getColumn().getColumnName());
+                                esFieldData.put(fieldItem.getFieldName(), value);
                             }
                             }
+
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                         } else {
                         } else {
-                            // ------关联子表简单字段更新------
-                            subTableSimpleFieldOperation(config, dml, data, old, tableItem);
+                            // ------关联子表简单字段插入------
+                            subTableSimpleFieldOperation(config, dml, data, null, tableItem);
                         }
                         }
                     } else {
                     } else {
-                        // ------关联子表复杂字段更新 执行全sql更新es------
-                        jonTableWholeSqlOperation(config, dml, data, old, tableItem);
+                        // ------关联子表复杂字段插入 执行全sql更新es------
+                        jonTableWholeSqlOperation(config, dml, data, null, tableItem);
                     }
                     }
                 }
                 }
             }
             }
-
-            i++;
         }
         }
     }
     }
 
 
     /**
     /**
-     * 插入dml操作
-     * 
+     * 更新操作dml
+     *
      * @param config es配置
      * @param config es配置
      * @param dml dml数据
      * @param dml dml数据
      */
      */
-    private void insert(ESSyncConfig config, Dml dml) {
+    private void update(ESSyncConfig config, Dml dml) {
         List<Map<String, Object>> dataList = dml.getData();
         List<Map<String, Object>> dataList = dml.getData();
-        if (dataList == null || dataList.isEmpty()) {
+        List<Map<String, Object>> oldList = dml.getOld();
+        if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
             return;
             return;
         }
         }
         SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
         SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
+        int i = 0;
         for (Map<String, Object> data : dataList) {
         for (Map<String, Object> data : dataList) {
-            if (data == null || data.isEmpty()) {
+            Map<String, Object> old = oldList.get(i);
+            if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
                 continue;
                 continue;
             }
             }
 
 
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
                 // ------单表 & 所有字段都为简单字段------
                 // ------单表 & 所有字段都为简单字段------
-                singleTableSimpleFiledInsert(config, dml, data);
+                singleTableSimpleFiledUpdate(config, dml, data, old);
             } else {
             } else {
-                // ------是主表 查询sql来插入------
+                // ------主表 查询sql来更新------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                    mainTableInsert(config, dml, data);
+                    ESMapping mapping = config.getEsMapping();
+                    String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+                    FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
+                    boolean idFieldSimple = true;
+                    if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
+                        idFieldSimple = false;
+                    }
+
+                    boolean allUpdateFieldSimple = true;
+                    out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+                        for (ColumnItem columnItem : fieldItem.getColumnItems()) {
+                            if (old.containsKey(columnItem.getColumnName())) {
+                                if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                                    allUpdateFieldSimple = false;
+                                    break out;
+                                }
+                            }
+                        }
+                    }
+                    // 判断主键和所更新的字段是否全为简单字段
+                    if (idFieldSimple && allUpdateFieldSimple) {
+                        singleTableSimpleFiledUpdate(config, dml, data, old);
+                    } else {
+                        mainTableUpdate(config, dml, data, old);
+                    }
                 }
                 }
 
 
                 // 从表的操作
                 // 从表的操作
@@ -232,6 +233,7 @@ public class ESSyncService {
                     if (!tableItem.getTableName().equals(dml.getTable())) {
                     if (!tableItem.getTableName().equals(dml.getTable())) {
                         continue;
                         continue;
                     }
                     }
+
                     // 关联条件出现在主表查询条件是否为简单字段
                     // 关联条件出现在主表查询条件是否为简单字段
                     boolean allFieldsSimple = true;
                     boolean allFieldsSimple = true;
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
@@ -240,31 +242,35 @@ public class ESSyncService {
                             break;
                             break;
                         }
                         }
                     }
                     }
+
                     // 所有查询字段均为简单字段
                     // 所有查询字段均为简单字段
                     if (allFieldsSimple) {
                     if (allFieldsSimple) {
                         // 不是子查询
                         // 不是子查询
                         if (!tableItem.isSubQuery()) {
                         if (!tableItem.isSubQuery()) {
-                            // ------关联表简单字段插入------
+                            // ------关联表简单字段更新------
                             Map<String, Object> esFieldData = new LinkedHashMap<>();
                             Map<String, Object> esFieldData = new LinkedHashMap<>();
                             for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                             for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                                Object value = esTemplate.getValFromData(config.getEsMapping(),
-                                    data,
-                                    fieldItem.getFieldName(),
-                                    fieldItem.getColumn().getColumnName());
-                                esFieldData.put(fieldItem.getFieldName(), value);
+                                if (old.containsKey(fieldItem.getColumn().getColumnName())) {
+                                    Object value = esTemplate.getValFromData(config.getEsMapping(),
+                                        data,
+                                        fieldItem.getFieldName(),
+                                        fieldItem.getColumn().getColumnName());
+                                    esFieldData.put(fieldItem.getFieldName(), value);
+                                }
                             }
                             }
-
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                         } else {
                         } else {
-                            // ------关联子表简单字段插入------
-                            subTableSimpleFieldOperation(config, dml, data, null, tableItem);
+                            // ------关联子表简单字段更新------
+                            subTableSimpleFieldOperation(config, dml, data, old, tableItem);
                         }
                         }
                     } else {
                     } else {
-                        // ------关联子表复杂字段插入 执行全sql更新es------
-                        jonTableWholeSqlOperation(config, dml, data, null, tableItem);
+                        // ------关联子表复杂字段更新 执行全sql更新es------
+                        jonTableWholeSqlOperation(config, dml, data, old, tableItem);
                     }
                     }
                 }
                 }
             }
             }
+
+            i++;
         }
         }
     }
     }
 
 
@@ -412,23 +418,7 @@ public class ESSyncService {
         for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
         for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
             String columnName = fkFieldItem.getColumn().getColumnName();
             String columnName = fkFieldItem.getColumn().getColumnName();
             Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
             Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
-            if (value instanceof String) {
-                sql.append(tableItem.getAlias())
-                    .append(".")
-                    .append(columnName)
-                    .append("='")
-                    .append(value)
-                    .append("' ")
-                    .append(" AND ");
-            } else {
-                sql.append(tableItem.getAlias())
-                    .append(".")
-                    .append(columnName)
-                    .append("=")
-                    .append(value)
-                    .append(" ")
-                    .append(" AND ");
-            }
+            ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
         }
         }
         int len = sql.length();
         int len = sql.length();
         sql.delete(len - 5, len);
         sql.delete(len - 5, len);
@@ -527,23 +517,7 @@ public class ESSyncService {
         for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
         for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
             String columnName = fkFieldItem.getColumn().getColumnName();
             String columnName = fkFieldItem.getColumn().getColumnName();
             Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
             Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
-            if (value instanceof String) {
-                sql.append(tableItem.getAlias())
-                    .append(".")
-                    .append(columnName)
-                    .append("='")
-                    .append(value)
-                    .append("' ")
-                    .append(" AND ");
-            } else {
-                sql.append(tableItem.getAlias())
-                    .append(".")
-                    .append(columnName)
-                    .append("=")
-                    .append(value)
-                    .append(" ")
-                    .append(" AND ");
-            }
+            ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
         }
         }
         int len = sql.length();
         int len = sql.length();
         sql.delete(len - 5, len);
         sql.delete(len - 5, len);
@@ -602,16 +576,14 @@ public class ESSyncService {
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                     for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                     for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                         for (FieldItem fieldItem : entry.getValue()) {
                         for (FieldItem fieldItem : entry.getValue()) {
-                            if (fieldItem.getColumnItems().size() == 1) {
-                                Object value = esTemplate
-                                    .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
-                                String fieldName = fieldItem.getFieldName();
-                                // 判断是否是主键
-                                if (fieldName.equals(mapping.get_id())) {
-                                    fieldName = "_id";
-                                }
-                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+                            Object value = esTemplate
+                                .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
+                            String fieldName = fieldItem.getFieldName();
+                            // 判断是否是主键
+                            if (fieldName.equals(mapping.get_id())) {
+                                fieldName = "_id";
                             }
                             }
+                            queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                         }
                         }
                     }
                     }
 
 

+ 10 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -19,7 +19,6 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 
 
 public class ESSyncUtil {
 public class ESSyncUtil {
@@ -161,7 +160,7 @@ public class ESSyncUtil {
             }
             }
 
 
             if (!((String) val).contains(",")) {
             if (!((String) val).contains(",")) {
-                logger.error("es type is geo_point, source value not contains ',' spit");
+                logger.error("es type is geo_point, source value not contains ',' separator");
                 return val;
                 return val;
             }
             }
 
 
@@ -225,7 +224,7 @@ public class ESSyncUtil {
         // 拼接condition
         // 拼接condition
         StringBuilder condition = new StringBuilder(" ");
         StringBuilder condition = new StringBuilder(" ");
         for (ColumnItem idColumn : idColumns) {
         for (ColumnItem idColumn : idColumns) {
-            Object idVal = data.get(Util.cleanColumn(idColumn.getColumnName()));
+            Object idVal = data.get(idColumn.getColumnName());
             if (mainTable.getAlias() != null) condition.append(mainTable.getAlias()).append(".");
             if (mainTable.getAlias() != null) condition.append(mainTable.getAlias()).append(".");
             condition.append(idColumn.getColumnName()).append("=");
             condition.append(idColumn.getColumnName()).append("=");
             if (idVal instanceof String) {
             if (idVal instanceof String) {
@@ -246,6 +245,14 @@ public class ESSyncUtil {
         return sql + " WHERE " + condition + " ";
         return sql + " WHERE " + condition + " ";
     }
     }
 
 
+    public static void appendCondition(StringBuilder sql, Object value, String owner, String columnName) {
+        if (value instanceof String) {
+            sql.append(owner).append(".").append(columnName).append("='").append(value).append("'  AND ");
+        } else {
+            sql.append(owner).append(".").append(columnName).append("=").append(value).append("  AND ");
+        }
+    }
+
     /**
     /**
      * 执行查询sql
      * 执行查询sql
      */
      */

+ 0 - 18
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/Util.java

@@ -1,18 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.es.support;
-
-public class Util {
-    public static String cleanColumn(String column) {
-        if (column == null) {
-            return null;
-        }
-        if (column.contains("`")) {
-            column = column.replaceAll("`", "");
-        }
-
-        if (column.contains("'")) {
-            column = column.replaceAll("'", "");
-        }
-
-        return column;
-    }
-}

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSub2Test.java

@@ -69,7 +69,7 @@ public class RoleSyncJoinSub2Test {
         List<Map<String, Object>> oldList = new ArrayList<>();
         List<Map<String, Object>> oldList = new ArrayList<>();
         Map<String, Object> old = new LinkedHashMap<>();
         Map<String, Object> old = new LinkedHashMap<>();
         oldList.add(old);
         oldList.add(old);
-        old.put("label", "a");
+        old.put("label", "v");
         dml.setOld(oldList);
         dml.setOld(oldList);
 
 
         esAdapter.getEsSyncService().sync(dml);
         esAdapter.getEsSyncService().sync(dml);