浏览代码

[Docs] Document Bulk Processor for Java High Level REST Client (#25572)

Tanguy Leroux 8 年之前
父节点
当前提交
d9bc0f48b4

+ 100 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java

@@ -27,7 +27,9 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -45,6 +47,9 @@ import org.elasticsearch.client.ESRestHighLevelClientTestCase;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
@@ -55,12 +60,14 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -855,4 +862,97 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
             // end::get-conflict
         }
     }
+
+    public void testBulkProcessor() throws InterruptedException, IOException {
+        Settings settings = Settings.builder().put("node.name", "my-application").build();
+        RestHighLevelClient client = highLevelClient();
+        {
+            // tag::bulk-processor-init
+            ThreadPool threadPool = new ThreadPool(settings); // <1>
+
+            BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <2>
+                @Override
+                public void beforeBulk(long executionId, BulkRequest request) {
+                    // <3>
+                }
+
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+                    // <4>
+                }
+
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+                    // <5>
+                }
+            };
+
+            BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
+                    .build(); // <6>
+            // end::bulk-processor-init
+            assertNotNull(bulkProcessor);
+
+            // tag::bulk-processor-add
+            IndexRequest one = new IndexRequest("posts", "doc", "1").
+                    source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
+            IndexRequest two = new IndexRequest("posts", "doc", "2")
+                    .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
+            IndexRequest three = new IndexRequest("posts", "doc", "3")
+                    .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
+
+            bulkProcessor.add(one);
+            bulkProcessor.add(two);
+            bulkProcessor.add(three);
+            // end::bulk-processor-add
+
+            // tag::bulk-processor-await
+            boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); // <1>
+            // end::bulk-processor-await
+            assertTrue(terminated);
+
+            // tag::bulk-processor-close
+            bulkProcessor.close();
+            // end::bulk-processor-close
+            terminate(threadPool);
+        }
+        {
+            // tag::bulk-processor-listener
+            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+                @Override
+                public void beforeBulk(long executionId, BulkRequest request) {
+                    int numberOfActions = request.numberOfActions(); // <1>
+                    logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
+                }
+
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+                    if (response.hasFailures()) { // <2>
+                        logger.warn("Bulk [{}] executed with failures", executionId);
+                    } else {
+                        logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
+                    }
+                }
+
+                @Override
+                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+                    logger.error("Failed to execute bulk", failure); // <3>
+                }
+            };
+            // end::bulk-processor-listener
+
+            ThreadPool threadPool = new ThreadPool(settings);
+            try {
+                // tag::bulk-processor-options
+                BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
+                builder.setBulkActions(500); // <1>
+                builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
+                builder.setConcurrentRequests(0); // <3>
+                builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4>
+                builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5>
+                // end::bulk-processor-options
+            } finally {
+                terminate(threadPool);
+            }
+        }
+    }
 }

+ 94 - 0
docs/java-rest/high-level/apis/bulk.asciidoc

@@ -1,6 +1,8 @@
 [[java-rest-high-document-bulk]]
 === Bulk API
 
+NOTE: The Java High Level REST Client provides the <<java-rest-high-document-bulk-processor>> to assist with bulk requests
+
 [[java-rest-high-document-bulk-request]]
 ==== Bulk Request
 
@@ -115,3 +117,95 @@ include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-errors]
 --------------------------------------------------
 <1> Indicate if a given operation failed
 <2> Retrieve the failure of the failed operation
+
+[[java-rest-high-document-bulk-processor]]
+==== Bulk Processor
+
+The `BulkProcessor` simplifies the usage of the Bulk API by providing
+a utility class that allows index/update/delete operations to be
+transparently executed as they are added to the processor.
+
+In order to execute the requests, the `BulkProcessor` requires 3 components:
+
+`RestHighLevelClient`:: This client is used to execute the `BulkRequest`
+and to retrieve the `BulkResponse`
+`BulkProcessor.Listener`:: This listener is called before and after
+every `BulkRequest` execution or when a `BulkRequest` failed
+`ThreadPool`:: The `BulkRequest` executions are done using threads from this
+pool, allowing the `BulkProcessor` to work in a non-blocking manner and to
+accept new index/update/delete requests while bulk requests are executing.
+
+Then the `BulkProcessor.Builder` class can be used to build a new `BulkProcessor`:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-init]
+--------------------------------------------------
+<1> Create the `ThreadPool` using the given `Settings`
+<2> Create the `BulkProcessor.Listener`
+<3> This method is called before each execution of a `BulkRequest`
+<4> This method is called after each execution of a `BulkRequest`
+<5> This method is called when a `BulkRequest` failed
+<6> Create the `BulkProcessor` by calling the `build()` method from
+the `BulkProcessor.Builder`. The `RestHighLevelClient.bulkAsync()`
+method will be used to execute the `BulkRequest` under the hood.
+
+The `BulkProcessor.Builder` provides methods to configure how the `BulkProcessor`
+should handle requests execution:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-options]
+--------------------------------------------------
+<1> Set when to flush a new bulk request based on the number of
+actions currently added (defaults to 1000, use -1 to disable it)
+<2> Set when to flush a new bulk request based on the size of
+actions currently added (defaults to 5Mb, use -1 to disable it)
+<3> Set the number of concurrent requests allowed to be executed
+(default to 1, use 0 to only allow the execution of a single request)
+<4> Set a flush interval flushing any `BulkRequest` pending if the
+interval passes (defaults to not set)
+<5> Set a constant back off policy that initially waits for 1 second
+and retries up to 3 times. See `BackoffPolicy.noBackoff()`,
+`BackoffPolicy.constantBackoff()` and `BackoffPolicy.exponentialBackoff()`
+for more options.
+
+Once the `BulkProcessor` is created requests can be added to it:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-add]
+--------------------------------------------------
+
+The requests will be executed by the `BulkProcessor`, which takes care of
+calling the `BulkProcessor.Listener` for every bulk request.
+
+The listener provides methods to access to the `BulkRequest` and the `BulkResponse`:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-listener]
+--------------------------------------------------
+<1> Called before each execution of a `BulkRequest`, this method allows
+to know the number of operations that are going to be executed within the `BulkRequest`
+<2> Called after each execution of a `BulkRequest`, this method allows
+to know if the `BulkResponse` contains errors
+<3> Called if the `BulkRequest` failed, this method allows to know
+the failure
+
+Once all requests have been added to the `BulkProcessor`, its instance needs to
+be closed closed using one of the two available closing methods.
+
+The `awaitClose()` method can be used to wait until all requests have been processed
+ or the specified waiting time elapses:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-await]
+--------------------------------------------------
+<1> The method returns `true` if all bulk requests completed and `false` if the
+waiting time elapsed before all the bulk requests completed
+
+The `close()` method can be used to immediately close the `BulkProcessor`:
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-close]
+--------------------------------------------------
+
+Both methods flush the requests added to the processor before closing the processor
+and also forbid any new request to be added to it.