소스 검색

增加对sql字段 `` 的支持
fix #1559

mcy 6 년 전
부모
커밋
a2360efff9

+ 13 - 10
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -202,20 +202,23 @@ public class ESEtlService {
                     long batchBegin = System.currentTimeMillis();
                     while (rs.next()) {
                         Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        Map<String, Object> idEsFieldData = new HashMap<>();
                         for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {
 
-                            // 如果是主键字段则不插入
-                            if (fieldItem.getFieldName().equals(mapping.get_id())) {
-                                continue;
-                            }
-
                             String fieldName = fieldItem.getFieldName();
                             if (mapping.getSkips().contains(fieldName)) {
                                 continue;
                             }
 
-                            Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
-                            esFieldData.put(fieldName, val);
+                            // 如果是主键字段则不插入
+                            if (fieldItem.getFieldName().equals(mapping.get_id())) {
+                                Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
+                                idEsFieldData.put(Util.cleanColumn(fieldName), val);
+                            } else {
+                                Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
+                                esFieldData.put(Util.cleanColumn(fieldName), val);
+                            }
+
                         }
 
                         if (!mapping.getRelations().isEmpty()) {
@@ -241,13 +244,13 @@ public class ESEtlService {
 
                                     }
                                 }
-                                esFieldData.put(relationField, relations);
+                                esFieldData.put(Util.cleanColumn(relationField), relations);
                             });
                         }
 
                         Object idVal = null;
                         if (mapping.get_id() != null) {
-                            idVal = esFieldData.get(mapping.get_id());
+                            idVal = idEsFieldData.get(mapping.get_id());
                         }
 
                         if (idVal != null) {
@@ -271,7 +274,7 @@ public class ESEtlService {
                                 bulkRequestBuilder.add(indexRequestBuilder);
                             }
                         } else {
-                            idVal = esFieldData.get(mapping.getPk());
+                            idVal = idEsFieldData.get(mapping.getPk());
                             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
                                 .setTypes(mapping.get_type())
                                 .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))

+ 8 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -167,7 +167,7 @@ public class ESSyncService {
                                     data,
                                     fieldItem.getFieldName(),
                                     fieldItem.getColumn().getColumnName());
-                                esFieldData.put(fieldItem.getFieldName(), value);
+                                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
                             }
 
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
@@ -296,7 +296,7 @@ public class ESSyncService {
                                         data,
                                         fieldItem.getFieldName(),
                                         fieldItem.getColumn().getColumnName());
-                                    esFieldData.put(fieldItem.getFieldName(), value);
+                                    esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
                                 }
                             }
                             joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
@@ -408,7 +408,7 @@ public class ESSyncService {
                         // ------关联表简单字段更新为null------
                         Map<String, Object> esFieldData = new LinkedHashMap<>();
                         for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                            esFieldData.put(fieldItem.getFieldName(), null);
+                            esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), null);
                         }
                         joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                     } else {
@@ -509,7 +509,7 @@ public class ESSyncService {
                     esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
                     esFieldData.remove(mapping.getPk());
                     for (String key : esFieldData.keySet()) {
-                        esFieldData.put(key, null);
+                        esFieldData.put(Util.cleanColumn(key), null);
                     }
                 }
                 while (rs.next()) {
@@ -618,7 +618,7 @@ public class ESSyncService {
                                                     rs,
                                                     fieldItem.getFieldName(),
                                                     fieldItem.getColumn().getColumnName());
-                                                esFieldData.put(fieldItem.getFieldName(), val);
+                                                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                                                 break out;
                                             }
                                         }
@@ -629,7 +629,7 @@ public class ESSyncService {
                                 rs,
                                 fieldItem.getFieldName(),
                                 fieldItem.getColumn().getColumnName());
-                            esFieldData.put(fieldItem.getFieldName(), val);
+                            esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                         }
                     }
 
@@ -725,7 +725,7 @@ public class ESSyncService {
                                                 rs,
                                                 fieldItem.getFieldName(),
                                                 fieldItem.getFieldName());
-                                            esFieldData.put(fieldItem.getFieldName(), val);
+                                            esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                                             break;
                                         }
                                     }
@@ -734,7 +734,7 @@ public class ESSyncService {
                         } else {
                             Object val = esTemplate
                                 .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
-                            esFieldData.put(fieldItem.getFieldName(), val);
+                            esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
                         }
                     }
 

+ 9 - 5
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -112,7 +112,9 @@ public class ESTemplate {
      * @param esFieldData 数据Map
      */
     public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        append4Update(mapping, pkVal, esFieldData);
+        Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());
+        esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));
+        append4Update(mapping, pkVal, esFieldDataTmp);
         commitBulk();
     }
 
@@ -250,6 +252,8 @@ public class ESTemplate {
 
     public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
                                String columnName) throws SQLException {
+        fieldName = Util.cleanColumn(fieldName);
+        columnName = Util.cleanColumn(columnName);
         String esType = getEsType(mapping, fieldName);
 
         Object value = resultSet.getObject(columnName);
@@ -281,7 +285,7 @@ public class ESTemplate {
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
                 && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(), value);
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
 
@@ -319,7 +323,7 @@ public class ESTemplate {
             for (ColumnItem columnItem : fieldItem.getColumnItems()) {
                 if (dmlOld.containsKey(columnItem.getColumnName())
                     && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                    esFieldData.put(fieldItem.getFieldName(),
+                    esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
                         getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
                     break;
                 }
@@ -372,7 +376,7 @@ public class ESTemplate {
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
                 && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(), value);
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
 
@@ -402,7 +406,7 @@ public class ESTemplate {
             }
 
             if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(),
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
                     getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }