Browse Source

Fix es bulk (#1743)

thanks
Jet 6 năm trước cách đây
mục cha
commit
8f088cddc0

+ 6 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -459,7 +459,7 @@ public class ESSyncService {
         sql = ESSyncUtil.appendCondition(sql, condition);
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         if (logger.isTraceEnabled()) {
-            logger.trace("Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+            logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
@@ -473,7 +473,7 @@ public class ESSyncService {
 
                     if (logger.isTraceEnabled()) {
                         logger.trace(
-                            "Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            "Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
                             config.getDestination(),
                             dml.getTable(),
                             mapping.get_index(),
@@ -517,7 +517,7 @@ public class ESSyncService {
 
                     if (logger.isTraceEnabled()) {
                         logger.trace(
-                            "Main table delete ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            "Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
                             config.getDestination(),
                             dml.getTable(),
                             mapping.get_index(),
@@ -784,7 +784,7 @@ public class ESSyncService {
         Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
 
         if (logger.isTraceEnabled()) {
-            logger.trace("Main table update ot es index, destination:{}, table: {}, index: {}, id: {}",
+            logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
@@ -807,7 +807,7 @@ public class ESSyncService {
         sql = ESSyncUtil.appendCondition(sql, condition);
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         if (logger.isTraceEnabled()) {
-            logger.trace("Main table update ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+            logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
@@ -821,7 +821,7 @@ public class ESSyncService {
 
                     if (logger.isTraceEnabled()) {
                         logger.trace(
-                            "Main table update ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            "Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
                             config.getDestination(),
                             dml.getTable(),
                             mapping.get_index(),

+ 6 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -58,6 +58,10 @@ public class ESTemplate {
         return bulkRequestBuilder;
     }
 
+    public void resetBulkRequestBuilder() {
+        this.bulkRequestBuilder = this.transportClient.prepareBulk();
+    }
+
     /**
      * 插入数据
      *
@@ -210,11 +214,12 @@ public class ESTemplate {
     }
 
     /**
-     * 如果大于批量数则提交批次
+     * 如果大于批量数则提交批次, 调用后es bulk请求后,numberOfActions不会清理,需要主动调用函数清0,否则不能起到批量请求的效果
      */
     private void commitBulk() {
         if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
             commit();
+            resetBulkRequestBuilder();
         }
     }