| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- [[java-docs-bulk]]
- === Bulk API
- The bulk API allows one to index and delete several documents in a
- single request. Here is a sample usage:
- [source,java]
- --------------------------------------------------
- import static org.elasticsearch.common.xcontent.XContentFactory.*;
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- // either use client#prepare, or use Requests# to directly build index/delete requests
- bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
- .setSource(jsonBuilder()
- .startObject()
- .field("user", "kimchy")
- .field("postDate", new Date())
- .field("message", "trying out Elasticsearch")
- .endObject()
- )
- );
- bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
- .setSource(jsonBuilder()
- .startObject()
- .field("user", "kimchy")
- .field("postDate", new Date())
- .field("message", "another post")
- .endObject()
- )
- );
-
- BulkResponse bulkResponse = bulkRequest.get();
- if (bulkResponse.hasFailures()) {
- // process failures by iterating through each bulk response item
- }
- --------------------------------------------------
- [[java-docs-bulk-processor]]
- === Using Bulk Processor
- The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
- of requests, or after a given period.
- To use it, first create a `BulkProcessor` instance:
- [source,java]
- --------------------------------------------------
- import org.elasticsearch.action.bulk.BackoffPolicy;
- import org.elasticsearch.action.bulk.BulkProcessor;
- import org.elasticsearch.common.unit.ByteSizeUnit;
- import org.elasticsearch.common.unit.ByteSizeValue;
- import org.elasticsearch.common.unit.TimeValue;
- BulkProcessor bulkProcessor = BulkProcessor.builder(
- client, <1>
- new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId,
- BulkRequest request) { ... } <2>
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- BulkResponse response) { ... } <3>
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- Throwable failure) { ... } <4>
- })
- .setBulkActions(10000) <5>
- .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) <6>
- .setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
- .setConcurrentRequests(1) <8>
- .setBackoffPolicy(
- BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
- .build();
- --------------------------------------------------
- <1> Add your Elasticsearch client
- <2> This method is called just before bulk is executed. You can for example see the numberOfActions with
- `request.numberOfActions()`
- <3> This method is called after bulk execution. You can for example check if there was some failing requests
- with `response.hasFailures()`
- <4> This method is called when the bulk failed and raised a `Throwable`
- <5> We want to execute the bulk every 10 000 requests
- <6> We want to flush the bulk every 5mb
- <7> We want to flush the bulk every 5 seconds whatever the number of requests
- <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
- executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
- <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
- times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
- which indicates that there were too little compute resources available for processing the request. To disable backoff,
- pass `BackoffPolicy.noBackoff()`.
- By default, `BulkProcessor`:
- * sets bulkActions to `1000`
- * sets bulkSize to `5mb`
- * does not set flushInterval
- * sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
- * sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
- [[java-docs-bulk-processor-requests]]
- ==== Add requests
- Then you can simply add your requests to the `BulkProcessor`:
- [source,java]
- --------------------------------------------------
- bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
- bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
- --------------------------------------------------
- [[java-docs-bulk-processor-close]]
- ==== Closing the Bulk Processor
- When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
- [source,java]
- --------------------------------------------------
- bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
- --------------------------------------------------
- or
- [source,java]
- --------------------------------------------------
- bulkProcessor.close();
- --------------------------------------------------
- Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
- `flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
- all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
- `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.
- [[java-docs-bulk-processor-tests]]
- ==== Using Bulk Processor in tests
- If you are running tests with Elasticsearch and are using the `BulkProcessor` to populate your dataset
- you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed
- in a synchronous manner:
- [source,java]
- --------------------------------------------------
- BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
- .setBulkActions(10000)
- .setConcurrentRequests(0)
- .build();
- // Add your requests
- bulkProcessor.add(/* Your requests */);
- // Flush any remaining requests
- bulkProcessor.flush();
- // Or close the bulkProcessor if you don't need it anymore
- bulkProcessor.close();
- // Refresh your indices
- client.admin().indices().prepareRefresh().get();
- // Now you can start searching!
- client.prepareSearch().get();
- --------------------------------------------------
- [[java-docs-bulk-global-parameters]]
- ==== Global Parameters
- Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global
- parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters
- have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or
- BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
- ["source","java",subs="attributes,callouts,macros"]
- --------------------------------------------------
- include-tagged::{hlrc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
- --------------------------------------------------
- <1> global parameters from the BulkRequest will be applied on a sub request
- <2> local pipeline parameter on a sub request will override global parameters from BulkRequest
- ["source","java",subs="attributes,callouts,macros"]
- --------------------------------------------------
- include-tagged::{hlrc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
- --------------------------------------------------
- <1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
- <2> global parameter from the BulkRequest will be applied on a sub request
|