Browse Source

es 批量提交修改

mcy 6 years ago
parent
commit
17efc2ada0

+ 7 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -72,6 +72,7 @@ public class ESTemplate {
                 getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setSource(esFieldData));
             }
+            commitBulk();
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
                 .setTypes(mapping.get_type())
@@ -81,9 +82,10 @@ public class ESTemplate {
             for (SearchHit hit : response.getHits()) {
                 getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
                     .setDoc(esFieldData));
+                commitBulk();
             }
         }
-        commitBulk();
+
     }
 
     /**
@@ -149,6 +151,7 @@ public class ESTemplate {
     public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
             getBulk().add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
+            commitBulk();
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
                 .setTypes(mapping.get_type())
@@ -158,9 +161,10 @@ public class ESTemplate {
             for (SearchHit hit : response.getHits()) {
                 getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
                     .setDoc(esFieldData));
+                commitBulk();
             }
         }
-        commitBulk();
+
     }
 
     /**
@@ -186,7 +190,7 @@ public class ESTemplate {
     }
 
     /**
-     * 如果大于批量则提交批次
+     * 如果大于批量则提交批次
      */
     private void commitBulk() {
         if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {