1
0

bulk.asciidoc 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. [[java-docs-bulk]]
  2. === Bulk API
  3. The bulk API allows one to index and delete several documents in a
  4. single request. Here is a sample usage:
  5. [source,java]
  6. --------------------------------------------------
  7. import static org.elasticsearch.common.xcontent.XContentFactory.*;
  8. BulkRequestBuilder bulkRequest = client.prepareBulk();
  9. // either use client#prepare, or use Requests# to directly build index/delete requests
  10. bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
  11. .setSource(jsonBuilder()
  12. .startObject()
  13. .field("user", "kimchy")
  14. .field("postDate", new Date())
  15. .field("message", "trying out Elasticsearch")
  16. .endObject()
  17. )
  18. );
  19. bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
  20. .setSource(jsonBuilder()
  21. .startObject()
  22. .field("user", "kimchy")
  23. .field("postDate", new Date())
  24. .field("message", "another post")
  25. .endObject()
  26. )
  27. );
  28. BulkResponse bulkResponse = bulkRequest.get();
  29. if (bulkResponse.hasFailures()) {
  30. // process failures by iterating through each bulk response item
  31. }
  32. --------------------------------------------------
  33. [[java-docs-bulk-processor]]
  34. === Using Bulk Processor
  35. The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
  36. of requests, or after a given period.
  37. To use it, first create a `BulkProcessor` instance:
  38. [source,java]
  39. --------------------------------------------------
  40. import org.elasticsearch.action.bulk.BackoffPolicy;
  41. import org.elasticsearch.action.bulk.BulkProcessor;
  42. import org.elasticsearch.common.unit.ByteSizeUnit;
  43. import org.elasticsearch.common.unit.ByteSizeValue;
  44. import org.elasticsearch.common.unit.TimeValue;
  45. BulkProcessor bulkProcessor = BulkProcessor.builder(
  46. client, <1>
  47. new BulkProcessor.Listener() {
  48. @Override
  49. public void beforeBulk(long executionId,
  50. BulkRequest request) { ... } <2>
  51. @Override
  52. public void afterBulk(long executionId,
  53. BulkRequest request,
  54. BulkResponse response) { ... } <3>
  55. @Override
  56. public void afterBulk(long executionId,
  57. BulkRequest request,
  58. Throwable failure) { ... } <4>
  59. })
  60. .setBulkActions(10000) <5>
  61. .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) <6>
  62. .setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
  63. .setConcurrentRequests(1) <8>
  64. .setBackoffPolicy(
  65. BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
  66. .build();
  67. --------------------------------------------------
  68. <1> Add your Elasticsearch client
  69. <2> This method is called just before bulk is executed. You can for example see the numberOfActions with
  70. `request.numberOfActions()`
  71. <3> This method is called after bulk execution. You can for example check if there was some failing requests
  72. with `response.hasFailures()`
  73. <4> This method is called when the bulk failed and raised a `Throwable`
  74. <5> We want to execute the bulk every 10 000 requests
  75. <6> We want to flush the bulk every 5mb
  76. <7> We want to flush the bulk every 5 seconds whatever the number of requests
  77. <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
  78. executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
  79. <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
  80. times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
  81. which indicates that there were too little compute resources available for processing the request. To disable backoff,
  82. pass `BackoffPolicy.noBackoff()`.
  83. By default, `BulkProcessor`:
  84. * sets bulkActions to `1000`
  85. * sets bulkSize to `5mb`
  86. * does not set flushInterval
  87. * sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
  88. * sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
  89. [[java-docs-bulk-processor-requests]]
  90. ==== Add requests
  91. Then you can simply add your requests to the `BulkProcessor`:
  92. [source,java]
  93. --------------------------------------------------
  94. bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
  95. bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
  96. --------------------------------------------------
  97. [[java-docs-bulk-processor-close]]
  98. ==== Closing the Bulk Processor
  99. When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
  100. [source,java]
  101. --------------------------------------------------
  102. bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
  103. --------------------------------------------------
  104. or
  105. [source,java]
  106. --------------------------------------------------
  107. bulkProcessor.close();
  108. --------------------------------------------------
  109. Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
  110. `flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
  111. all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
  112. `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.
  113. [[java-docs-bulk-processor-tests]]
  114. ==== Using Bulk Processor in tests
  115. If you are running tests with Elasticsearch and are using the `BulkProcessor` to populate your dataset
  116. you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed
  117. in a synchronous manner:
  118. [source,java]
  119. --------------------------------------------------
  120. BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
  121. .setBulkActions(10000)
  122. .setConcurrentRequests(0)
  123. .build();
  124. // Add your requests
  125. bulkProcessor.add(/* Your requests */);
  126. // Flush any remaining requests
  127. bulkProcessor.flush();
  128. // Or close the bulkProcessor if you don't need it anymore
  129. bulkProcessor.close();
  130. // Refresh your indices
  131. client.admin().indices().prepareRefresh().get();
  132. // Now you can start searching!
  133. client.prepareSearch().get();
  134. --------------------------------------------------
  135. [[java-docs-bulk-global-parameters]]
  136. ==== Global Parameters
  137. Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global
  138. parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters
  139. have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or
  140. BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
  141. ["source","java",subs="attributes,callouts,macros"]
  142. --------------------------------------------------
  143. include-tagged::{hlrc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
  144. --------------------------------------------------
  145. <1> global parameters from the BulkRequest will be applied on a sub request
  146. <2> local pipeline parameter on a sub request will override global parameters from BulkRequest
  147. ["source","java",subs="attributes,callouts,macros"]
  148. --------------------------------------------------
  149. include-tagged::{hlrc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
  150. --------------------------------------------------
  151. <1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
  152. <2> global parameter from the BulkRequest will be applied on a sub request