Browse Source

代码整理

mcy 6 years ago
parent
commit
0ef21e4155

+ 3 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -144,7 +144,9 @@ public class ESAdapter implements OuterAdapter {
             return;
         }
         for (Dml dml : dmls) {
-            sync(dml);
+            if (!dml.getIsDdl()) {
+                sync(dml);
+            }
         }
         esSyncService.commit(); // 批次统一提交
     }

+ 8 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -71,7 +71,12 @@ public class ESSyncService {
                     dml.getDestination());
             }
             if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                StringBuilder configIndexes = new StringBuilder();
+                esSyncConfigs
+                    .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
+                logger.debug("DML: {} \nEffect indexes: {}",
+                    JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
+                    configIndexes.toString());
             }
         }
     }
@@ -92,6 +97,8 @@ public class ESSyncService {
                 update(config, dml);
             } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                 delete(config, dml);
+            } else {
+                return;
             }
 
             if (logger.isTraceEnabled()) {

+ 4 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -65,8 +65,8 @@ public class ESTemplate {
         if (mapping.get_id() != null) {
             if (mapping.isUpsert()) {
                 getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                        .setDoc(esFieldData)
-                        .setDocAsUpsert(true));
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true));
             } else {
                 getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setSource(esFieldData));
@@ -200,7 +200,7 @@ public class ESTemplate {
      * 提交批次
      */
     public void commit() {
-        if (getBulk().numberOfActions() >= 0) {
+        if (getBulk().numberOfActions() > 0) {
             BulkResponse response = getBulk().execute().actionGet();
             if (response.hasFailures()) {
                 for (BulkItemResponse itemResponse : response.getItems()) {
@@ -369,7 +369,7 @@ public class ESTemplate {
                 resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
             }
 
-            if (dmlOld.get(columnName) != null && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+            if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(fieldItem.getFieldName(),
                     getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }