|
@@ -27,7 +27,10 @@ import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayDeque;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Deque;
|
|
|
import java.util.List;
|
|
|
|
|
|
/**
|
|
@@ -44,13 +47,14 @@ public class BulkRequestBuilder extends ActionRequestLazyBuilder<BulkRequest, Bu
|
|
|
*/
|
|
|
private final List<DocWriteRequest<?>> requests = new ArrayList<>();
|
|
|
private final List<FramedData> framedData = new ArrayList<>();
|
|
|
- private final List<ActionRequestLazyBuilder<? extends DocWriteRequest<?>, ? extends DocWriteResponse>> requestBuilders =
|
|
|
- new ArrayList<>();
|
|
|
+ private final Deque<ActionRequestLazyBuilder<? extends DocWriteRequest<?>, ? extends DocWriteResponse>> requestBuilders =
|
|
|
+ new ArrayDeque<>();
|
|
|
private ActiveShardCount waitForActiveShards;
|
|
|
private TimeValue timeout;
|
|
|
private String globalPipeline;
|
|
|
private String globalRouting;
|
|
|
private WriteRequest.RefreshPolicy refreshPolicy;
|
|
|
+ private boolean requestPreviouslyCalled = false;
|
|
|
|
|
|
public BulkRequestBuilder(ElasticsearchClient client, @Nullable String globalIndex) {
|
|
|
super(client, BulkAction.INSTANCE);
|
|
@@ -199,11 +203,19 @@ public class BulkRequestBuilder extends ActionRequestLazyBuilder<BulkRequest, Bu
|
|
|
|
|
|
@Override
|
|
|
public BulkRequest request() {
|
|
|
+ assert requestPreviouslyCalled == false : "Cannot call request() multiple times on the same BulkRequestBuilder object";
|
|
|
+ if (requestPreviouslyCalled) {
|
|
|
+ throw new IllegalStateException("Cannot call request() multiple times on the same BulkRequestBuilder object");
|
|
|
+ }
|
|
|
+ requestPreviouslyCalled = true;
|
|
|
validate();
|
|
|
BulkRequest request = new BulkRequest(globalIndex);
|
|
|
- for (ActionRequestLazyBuilder<? extends DocWriteRequest<?>, ? extends DocWriteResponse> requestBuilder : requestBuilders) {
|
|
|
- DocWriteRequest<?> childRequest = requestBuilder.request();
|
|
|
- request.add(childRequest);
|
|
|
+ /*
|
|
|
+ * In the following loop we intentionally remove the builders from requestBuilders so that they can be garbage collected. This is
|
|
|
+ * so that we don't require double the memory of all of the inner requests, which could be really bad for a lage bulk request.
|
|
|
+ */
|
|
|
+ for (var builder = requestBuilders.pollFirst(); builder != null; builder = requestBuilders.pollFirst()) {
|
|
|
+ request.add(builder.request());
|
|
|
}
|
|
|
for (DocWriteRequest<?> childRequest : requests) {
|
|
|
request.add(childRequest);
|
|
@@ -234,17 +246,17 @@ public class BulkRequestBuilder extends ActionRequestLazyBuilder<BulkRequest, Bu
|
|
|
}
|
|
|
|
|
|
private void validate() {
|
|
|
- if (countNonEmptyLists(requestBuilders, requests, framedData) > 1) {
|
|
|
+ if (countNonEmptyCollections(requestBuilders, requests, framedData) > 1) {
|
|
|
throw new IllegalStateException(
|
|
|
"Must use only request builders, requests, or byte arrays within a single bulk request. Cannot mix and match"
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private int countNonEmptyLists(List<?>... lists) {
|
|
|
+ private int countNonEmptyCollections(Collection<?>... collections) {
|
|
|
int sum = 0;
|
|
|
- for (List<?> list : lists) {
|
|
|
- if (list.isEmpty() == false) {
|
|
|
+ for (Collection<?> collection : collections) {
|
|
|
+ if (collection.isEmpty() == false) {
|
|
|
sum++;
|
|
|
}
|
|
|
}
|