bulk.asciidoc 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. [[java-docs-bulk]]
  2. === Bulk API
  3. The bulk API allows one to index and delete several documents in a
  4. single request. Here is a sample usage:
  5. [source,java]
  6. --------------------------------------------------
  7. import static org.elasticsearch.common.xcontent.XContentFactory.*;
  8. BulkRequestBuilder bulkRequest = client.prepareBulk();
  9. // either use client#prepare, or use Requests# to directly build index/delete requests
  10. bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
  11. .setSource(jsonBuilder()
  12. .startObject()
  13. .field("user", "kimchy")
  14. .field("postDate", new Date())
  15. .field("message", "trying out Elasticsearch")
  16. .endObject()
  17. )
  18. );
  19. bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
  20. .setSource(jsonBuilder()
  21. .startObject()
  22. .field("user", "kimchy")
  23. .field("postDate", new Date())
  24. .field("message", "another post")
  25. .endObject()
  26. )
  27. );
  28. BulkResponse bulkResponse = bulkRequest.get();
  29. if (bulkResponse.hasFailures()) {
  30. // process failures by iterating through each bulk response item
  31. }
  32. --------------------------------------------------
  33. [[java-docs-bulk-processor]]
  34. === Using Bulk Processor
  35. The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
  36. of requests, or after a given period.
  37. To use it, first create a `BulkProcessor` instance:
  38. [source,java]
  39. --------------------------------------------------
  40. import org.elasticsearch.action.bulk.BackoffPolicy;
  41. import org.elasticsearch.action.bulk.BulkProcessor;
  42. import org.elasticsearch.common.unit.ByteSizeUnit;
  43. import org.elasticsearch.common.unit.ByteSizeValue;
  44. import org.elasticsearch.common.unit.TimeValue;
  45. BulkProcessor bulkProcessor = BulkProcessor.builder(
  46. client, <1>
  47. new BulkProcessor.Listener() {
  48. @Override
  49. public void beforeBulk(long executionId,
  50. BulkRequest request) { ... } <2>
  51. @Override
  52. public void afterBulk(long executionId,
  53. BulkRequest request,
  54. BulkResponse response) { ... } <3>
  55. @Override
  56. public void afterBulk(long executionId,
  57. BulkRequest request,
  58. Throwable failure) { ... } <4>
  59. })
  60. .setBulkActions(10000) <5>
  61. .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) <6>
  62. .setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
  63. .setConcurrentRequests(1) <8>
  64. .setBackoffPolicy(
  65. BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
  66. .build();
  67. --------------------------------------------------
  68. <1> Add your elasticsearch client
  69. <2> This method is called just before bulk is executed. You can for example see the numberOfActions with
  70. `request.numberOfActions()`
  71. <3> This method is called after bulk execution. You can for example check if there was some failing requests
  72. with `response.hasFailures()`
  73. <4> This method is called when the bulk failed and raised a `Throwable`
  74. <5> We want to execute the bulk every 10 000 requests
  75. <6> We want to flush the bulk every 1gb
  76. <7> We want to flush the bulk every 5 seconds whatever the number of requests
  77. <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
  78. executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
  79. <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
  80. times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
  81. which indicates that there were too little compute resources available for processing the request. To disable backoff,
  82. pass `BackoffPolicy.noBackoff()`.
  83. Then you can simply add your requests to the `BulkProcessor`:
  84. [source,java]
  85. --------------------------------------------------
  86. bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
  87. bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
  88. --------------------------------------------------
  89. By default, `BulkProcessor`:
  90. * sets bulkActions to `1000`
  91. * sets bulkSize to `5mb`
  92. * does not set flushInterval
  93. * sets concurrentRequests to 1
  94. * sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
  95. When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
  96. [source,java]
  97. --------------------------------------------------
  98. bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
  99. --------------------------------------------------
  100. or
  101. [source,java]
  102. --------------------------------------------------
  103. bulkProcessor.close();
  104. --------------------------------------------------
  105. Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
  106. `flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
  107. all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
  108. `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.