|
@@ -36,3 +36,66 @@ if (bulkResponse.hasFailures()) {
|
|
|
// process failures by iterating through each bulk response item
|
|
|
}
|
|
|
--------------------------------------------------
|
|
|
+
|
|
|
+[float]
|
|
|
+== Using Bulk Processor
|
|
|
+
|
|
|
+The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
|
|
|
+of requests, or after a given period.
|
|
|
+
|
|
|
+To use it, first create a `BulkProcessor` instance:
|
|
|
+
|
|
|
+[source,java]
|
|
|
+--------------------------------------------------
|
|
|
+import org.elasticsearch.action.bulk.BulkProcessor;
|
|
|
+
|
|
|
+BulkProcessor bulkProcessor = BulkProcessor.builder(
|
|
|
+ client, <1>
|
|
|
+ new BulkProcessor.Listener() {
|
|
|
+ @Override
|
|
|
+ public void beforeBulk(long executionId,
|
|
|
+ BulkRequest request) { ... } <2>
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterBulk(long executionId,
|
|
|
+ BulkRequest request,
|
|
|
+ BulkResponse response) { ... } <3>
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterBulk(long executionId,
|
|
|
+ BulkRequest request,
|
|
|
+ Throwable failure) { ... } <4>
|
|
|
+ })
|
|
|
+ .setBulkActions(10000) <5>
|
|
|
+ .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) <6>
|
|
|
+ .setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
|
|
|
+ .setConcurrentRequests(1) <8>
|
|
|
+ .build();
|
|
|
+--------------------------------------------------
|
|
|
+<1> Add your elasticsearch client
|
|
|
+<2> This method is called just before bulk is executed. You can for example see the numberOfActions with
|
|
|
+ `request.numberOfActions()`
|
|
|
+<3> This method is called after bulk execution. You can for example check if there was some failing requests
|
|
|
+ with `response.hasFailures()`
|
|
|
+<4> This method is called when the bulk failed and raised a `Throwable`
|
|
|
+<5> We want to execute the bulk every 10 000 requests
|
|
|
+<6> We want to flush the bulk every 1gb
|
|
|
+<7> We want to flush the bulk every 5 seconds whatever the number of requests
|
|
|
+<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
|
|
|
+ executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
|
|
|
+
|
|
|
+Then you can simply add your requests to the `BulkProcessor`:
|
|
|
+
|
|
|
+[source,java]
|
|
|
+--------------------------------------------------
|
|
|
+bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
|
|
|
+bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
|
|
|
+--------------------------------------------------
|
|
|
+
|
|
|
+By default, `BulkProcessor`:
|
|
|
+
|
|
|
+* sets bulkActions to `1000`
|
|
|
+* sets bulkSize to `5mb`
|
|
|
+* does not set flushInterval
|
|
|
+* sets concurrentRequests to 1
|
|
|
+
|