bulk.asciidoc 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. [[bulk]]
  2. == Bulk API
  3. The bulk API allows one to index and delete several documents in a
  4. single request. Here is a sample usage:
  5. [source,java]
  6. --------------------------------------------------
  7. import static org.elasticsearch.common.xcontent.XContentFactory.*;
  8. BulkRequestBuilder bulkRequest = client.prepareBulk();
  9. // either use client#prepare, or use Requests# to directly build index/delete requests
  10. bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
  11. .setSource(jsonBuilder()
  12. .startObject()
  13. .field("user", "kimchy")
  14. .field("postDate", new Date())
  15. .field("message", "trying out Elasticsearch")
  16. .endObject()
  17. )
  18. );
  19. bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
  20. .setSource(jsonBuilder()
  21. .startObject()
  22. .field("user", "kimchy")
  23. .field("postDate", new Date())
  24. .field("message", "another post")
  25. .endObject()
  26. )
  27. );
  28. BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  29. if (bulkResponse.hasFailures()) {
  30. // process failures by iterating through each bulk response item
  31. }
  32. --------------------------------------------------
  33. [float]
  34. == Using Bulk Processor
  35. The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
  36. of requests, or after a given period.
  37. To use it, first create a `BulkProcessor` instance:
  38. [source,java]
  39. --------------------------------------------------
  40. import org.elasticsearch.action.bulk.BulkProcessor;
  41. BulkProcessor bulkProcessor = BulkProcessor.builder(
  42. client, <1>
  43. new BulkProcessor.Listener() {
  44. @Override
  45. public void beforeBulk(long executionId,
  46. BulkRequest request) { ... } <2>
  47. @Override
  48. public void afterBulk(long executionId,
  49. BulkRequest request,
  50. BulkResponse response) { ... } <3>
  51. @Override
  52. public void afterBulk(long executionId,
  53. BulkRequest request,
  54. Throwable failure) { ... } <4>
  55. })
  56. .setBulkActions(10000) <5>
  57. .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) <6>
  58. .setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
  59. .setConcurrentRequests(1) <8>
  60. .build();
  61. --------------------------------------------------
  62. <1> Add your elasticsearch client
  63. <2> This method is called just before bulk is executed. You can for example see the numberOfActions with
  64. `request.numberOfActions()`
  65. <3> This method is called after bulk execution. You can for example check if there was some failing requests
  66. with `response.hasFailures()`
  67. <4> This method is called when the bulk failed and raised a `Throwable`
  68. <5> We want to execute the bulk every 10 000 requests
  69. <6> We want to flush the bulk every 1gb
  70. <7> We want to flush the bulk every 5 seconds whatever the number of requests
  71. <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
  72. executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
  73. Then you can simply add your requests to the `BulkProcessor`:
  74. [source,java]
  75. --------------------------------------------------
  76. bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
  77. bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
  78. --------------------------------------------------
  79. By default, `BulkProcessor`:
  80. * sets bulkActions to `1000`
  81. * sets bulkSize to `5mb`
  82. * does not set flushInterval
  83. * sets concurrentRequests to 1
  84. When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
  85. [source,java]
  86. --------------------------------------------------
  87. bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
  88. --------------------------------------------------
  89. or
  90. [source,java]
  91. --------------------------------------------------
  92. bulkProcessor.close();
  93. --------------------------------------------------
  94. Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
  95. `flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
  96. all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
  97. `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exists immediately.