浏览代码

Remove unused `BulkProcessor` (#129875)

The `BulkProcessor` and `BulkRequestHandler` classes were unused and
could thus be removed along with their test classes.
Niels Bauman 3 月之前
父节点
当前提交
5ccb772468

+ 0 - 373
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java

@@ -1,373 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.action.bulk;
-
-import com.carrotsearch.randomizedtesting.generators.RandomPicks;
-
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.internal.Client;
-import org.elasticsearch.client.internal.Requests;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.test.ESIntegTestCase;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.either;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-public class BulkProcessorIT extends ESIntegTestCase {
-
-    public void testThatBulkProcessorCountIsCorrect() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
-
-        int numDocs = randomIntBetween(10, 100);
-        try (
-            BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-                // let's make sure that the bulk action limit trips, one single execution will index all the documents
-                .setConcurrentRequests(randomIntBetween(0, 1))
-                .setBulkActions(numDocs)
-                .setFlushInterval(TimeValue.timeValueHours(24))
-                .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
-                .build()
-        ) {
-
-            MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
-
-            latch.await();
-
-            assertThat(listener.beforeCounts.get(), equalTo(1));
-            assertThat(listener.afterCounts.get(), equalTo(1));
-            assertThat(listener.bulkFailures.size(), equalTo(0));
-            assertResponseItems(listener.bulkItems, numDocs);
-            assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
-        }
-    }
-
-    public void testBulkProcessorFlush() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
-
-        int numDocs = randomIntBetween(10, 100);
-
-        try (
-            BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-                // let's make sure that this bulk won't be automatically flushed
-                .setConcurrentRequests(randomIntBetween(0, 10))
-                .setBulkActions(numDocs + randomIntBetween(1, 100))
-                .setFlushInterval(TimeValue.timeValueHours(24))
-                .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
-                .build()
-        ) {
-
-            MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
-
-            assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false));
-            // we really need an explicit flush as none of the bulk thresholds was reached
-            processor.flush();
-            latch.await();
-
-            assertThat(listener.beforeCounts.get(), equalTo(1));
-            assertThat(listener.afterCounts.get(), equalTo(1));
-            assertThat(listener.bulkFailures.size(), equalTo(0));
-            assertResponseItems(listener.bulkItems, numDocs);
-            assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
-        }
-    }
-
-    public void testBulkProcessorFlushDisabled() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
-
-        int numDocs = randomIntBetween(10, 100);
-
-        AtomicBoolean flushEnabled = new AtomicBoolean(false);
-        try (
-            BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-                // let's make sure that this bulk won't be automatically flushed
-                .setConcurrentRequests(randomIntBetween(0, 10))
-                .setBulkActions(numDocs + randomIntBetween(1, 100))
-                .setFlushInterval(TimeValue.timeValueHours(24))
-                .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
-                .setFlushCondition(flushEnabled::get)
-                .build()
-        ) {
-
-            MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
-            assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false));
-            // no documents will be indexed here
-            processor.flush();
-
-            flushEnabled.set(true);
-            processor.flush();
-            latch.await();
-
-            // disabled flush resulted in listener being triggered only once
-            assertThat(listener.beforeCounts.get(), equalTo(1));
-            assertThat(listener.afterCounts.get(), equalTo(1));
-            assertThat(listener.bulkFailures.size(), equalTo(0));
-            assertResponseItems(listener.bulkItems, numDocs);
-            assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
-        }
-    }
-
-    public void testBulkProcessorConcurrentRequests() throws Exception {
-        int bulkActions = randomIntBetween(10, 100);
-        int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
-        int concurrentRequests = randomIntBetween(0, 7);
-
-        int expectedBulkActions = numDocs / bulkActions;
-
-        final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
-        int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
-        final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
-
-        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
-
-        MultiGetRequestBuilder multiGetRequestBuilder;
-
-        try (
-            BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-                .setConcurrentRequests(concurrentRequests)
-                .setBulkActions(bulkActions)
-                // set interval and size to high values
-                .setFlushInterval(TimeValue.timeValueHours(24))
-                .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
-                .build()
-        ) {
-
-            multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
-
-            latch.await();
-
-            assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
-            assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
-            assertThat(listener.bulkFailures.size(), equalTo(0));
-            assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions));
-        }
-
-        closeLatch.await();
-
-        assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
-        assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
-        assertThat(listener.bulkFailures.size(), equalTo(0));
-        assertThat(listener.bulkItems.size(), equalTo(numDocs));
-
-        Set<String> ids = new HashSet<>();
-        for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
-            assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
-            assertThat(bulkItemResponse.getIndex(), equalTo("test"));
-            // with concurrent requests > 1 we can't rely on the order of the bulk requests
-            assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
-            // we do want to check that we don't get duplicate ids back
-            assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
-        }
-
-        assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
-    }
-
-    public void testBulkProcessorWaitOnClose() throws Exception {
-        BulkProcessorTestListener listener = new BulkProcessorTestListener();
-
-        int numDocs = randomIntBetween(10, 100);
-        BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-            // let's make sure that the bulk action limit trips, one single execution will index all the documents
-            .setConcurrentRequests(randomIntBetween(0, 1))
-            .setBulkActions(numDocs)
-            .setFlushInterval(TimeValue.timeValueHours(24))
-            .setBulkSize(ByteSizeValue.of(randomIntBetween(1, 10), RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
-            .build();
-
-        MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
-        assertThat(processor.isOpen(), is(true));
-        assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
-        if (randomBoolean()) { // check if we can call it multiple times
-            if (randomBoolean()) {
-                assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
-            } else {
-                processor.close();
-            }
-        }
-        assertThat(processor.isOpen(), is(false));
-
-        assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
-        assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
-        assertThat(listener.bulkFailures.size(), equalTo(0));
-        assertResponseItems(listener.bulkItems, numDocs);
-        assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
-    }
-
-    public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
-        createIndex("test-ro");
-        updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true), "test-ro");
-        ensureGreen();
-
-        int bulkActions = randomIntBetween(10, 100);
-        int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
-        int concurrentRequests = randomIntBetween(0, 10);
-
-        int expectedBulkActions = numDocs / bulkActions;
-
-        final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
-        int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
-        final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
-
-        int testDocs = 0;
-        int testReadOnlyDocs = 0;
-        MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
-        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
-
-        try (
-            BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
-                .setConcurrentRequests(concurrentRequests)
-                .setBulkActions(bulkActions)
-                // set interval and size to high values
-                .setFlushInterval(TimeValue.timeValueHours(24))
-                .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
-                .build()
-        ) {
-
-            for (int i = 1; i <= numDocs; i++) {
-                if (randomBoolean()) {
-                    testDocs++;
-                    processor.add(
-                        new IndexRequest("test").id(Integer.toString(testDocs)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")
-                    );
-                    multiGetRequestBuilder.add("test", Integer.toString(testDocs));
-                } else {
-                    testReadOnlyDocs++;
-                    processor.add(
-                        new IndexRequest("test-ro").id(Integer.toString(testReadOnlyDocs))
-                            .source(Requests.INDEX_CONTENT_TYPE, "field", "value")
-                    );
-                }
-            }
-        }
-
-        closeLatch.await();
-
-        assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
-        assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
-        assertThat(listener.bulkFailures.size(), equalTo(0));
-        assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
-
-        Set<String> ids = new HashSet<>();
-        Set<String> readOnlyIds = new HashSet<>();
-        for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
-            assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
-            if (bulkItemResponse.getIndex().equals("test")) {
-                assertThat(bulkItemResponse.isFailed(), equalTo(false));
-                // with concurrent requests > 1 we can't rely on the order of the bulk requests
-                assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
-                // we do want to check that we don't get duplicate ids back
-                assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
-            } else {
-                assertThat(bulkItemResponse.isFailed(), equalTo(true));
-                // with concurrent requests > 1 we can't rely on the order of the bulk requests
-                assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
-                // we do want to check that we don't get duplicate ids back
-                assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
-            }
-        }
-
-        assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
-    }
-
-    private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception {
-        MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
-        for (int i = 1; i <= numDocs; i++) {
-            processor.add(
-                new IndexRequest("test").id(Integer.toString(i))
-                    .source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))
-            );
-            multiGetRequestBuilder.add("test", Integer.toString(i));
-        }
-        return multiGetRequestBuilder;
-    }
-
-    private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
-        assertThat(bulkItemResponses.size(), is(numDocs));
-        int i = 1;
-        for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
-            assertThat(bulkItemResponse.getIndex(), equalTo("test"));
-            assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
-            assertThat(
-                "item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
-                bulkItemResponse.isFailed(),
-                equalTo(false)
-            );
-        }
-    }
-
-    private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) {
-        assertThat(multiGetResponse.getResponses().length, equalTo(numDocs));
-        int i = 1;
-        for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
-            assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
-            assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
-        }
-    }
-
-    private static class BulkProcessorTestListener implements BulkProcessor.Listener {
-
-        private final CountDownLatch[] latches;
-        private final AtomicInteger beforeCounts = new AtomicInteger();
-        private final AtomicInteger afterCounts = new AtomicInteger();
-        private final List<BulkItemResponse> bulkItems = new CopyOnWriteArrayList<>();
-        private final List<Throwable> bulkFailures = new CopyOnWriteArrayList<>();
-
-        private BulkProcessorTestListener(CountDownLatch... latches) {
-            this.latches = latches;
-        }
-
-        @Override
-        public void beforeBulk(long executionId, BulkRequest request) {
-            beforeCounts.incrementAndGet();
-        }
-
-        @Override
-        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-            bulkItems.addAll(Arrays.asList(response.getItems()));
-            afterCounts.incrementAndGet();
-            for (CountDownLatch latch : latches) {
-                latch.countDown();
-            }
-        }
-
-        @Override
-        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-            bulkFailures.add(failure);
-            afterCounts.incrementAndGet();
-            for (CountDownLatch latch : latches) {
-                latch.countDown();
-            }
-        }
-    }
-}

+ 0 - 235
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

@@ -1,235 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-package org.elasticsearch.action.bulk;
-
-import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.common.BackoffPolicy;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.test.ESIntegTestCase;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
-public class BulkProcessorRetryIT extends ESIntegTestCase {
-    private static final String INDEX_NAME = "test";
-
-    @Override
-    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
-        // Have very low pool and queue sizes to overwhelm internal pools easily
-        return Settings.builder()
-            .put(super.nodeSettings(nodeOrdinal, otherSettings))
-            // don't mess with this one! It's quite sensitive to a low queue size
-            // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
-            // .put("thread_pool.listener.queue_size", 1)
-            .put("thread_pool.get.queue_size", 1)
-            // default is 200
-            .put("thread_pool.write.queue_size", 30)
-            .build();
-    }
-
-    public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
-        boolean rejectedExecutionExpected = true;
-        executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
-    }
-
-    public void testBulkRejectionLoadWithBackoff() throws Throwable {
-        boolean rejectedExecutionExpected = false;
-        executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
-    }
-
-    private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable {
-        final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy);
-        int numberOfAsyncOps = randomIntBetween(600, 700);
-        final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
-        final Set<Object> responses = ConcurrentCollections.newConcurrentSet();
-
-        assertAcked(prepareCreate(INDEX_NAME));
-        ensureGreen();
-
-        BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() {
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {
-                // no op
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                internalPolicy.logResponse(response);
-                responses.add(response);
-                latch.countDown();
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                internalPolicy.logResponse(failure);
-                responses.add(failure);
-                latch.countDown();
-            }
-        }, "BulkProcssorRetryIT")
-            .setBulkActions(1)
-            // zero means that we're in the sync case, more means that we're in the async case
-            .setConcurrentRequests(randomIntBetween(0, 100))
-            .setBackoffPolicy(internalPolicy)
-            .build();
-        indexDocs(bulkProcessor, numberOfAsyncOps);
-        latch.await(10, TimeUnit.SECONDS);
-        bulkProcessor.close();
-
-        assertThat(responses.size(), equalTo(numberOfAsyncOps));
-
-        // validate all responses
-        boolean rejectedAfterAllRetries = false;
-        for (Object response : responses) {
-            if (response instanceof BulkResponse bulkResponse) {
-                for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
-                    if (bulkItemResponse.isFailed()) {
-                        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
-                        if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
-                            if (rejectedExecutionExpected == false) {
-                                assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
-                                rejectedAfterAllRetries = true;
-                            }
-                        } else {
-                            throw new AssertionError("Unexpected failure status: " + failure.getStatus());
-                        }
-                    }
-                }
-            } else {
-                if (ExceptionsHelper.status((Throwable) response) == RestStatus.TOO_MANY_REQUESTS) {
-                    if (rejectedExecutionExpected == false) {
-                        assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
-                        rejectedAfterAllRetries = true;
-                    }
-                    // ignored, we exceeded the write queue size when dispatching the initial bulk request
-                } else {
-                    Throwable t = (Throwable) response;
-                    // we're not expecting any other errors
-                    throw new AssertionError("Unexpected failure", t);
-                }
-            }
-        }
-
-        indicesAdmin().refresh(new RefreshRequest()).get();
-
-        final boolean finalRejectedAfterAllRetries = rejectedAfterAllRetries;
-        assertResponse(prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).setSize(0), results -> {
-            if (rejectedExecutionExpected) {
-                assertThat((int) results.getHits().getTotalHits().value(), lessThanOrEqualTo(numberOfAsyncOps));
-            } else if (finalRejectedAfterAllRetries) {
-                assertThat((int) results.getHits().getTotalHits().value(), lessThan(numberOfAsyncOps));
-            } else {
-                assertThat((int) results.getHits().getTotalHits().value(), equalTo(numberOfAsyncOps));
-            }
-        });
-    }
-
-    private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
-        Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
-        assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
-        if (backoffState.hasNext()) {
-            // we're not expecting that we overwhelmed it even once when we maxed out the number of retries
-            throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
-        } else {
-            logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
-        }
-    }
-
-    private static void indexDocs(BulkProcessor processor, int numDocs) {
-        for (int i = 1; i <= numDocs; i++) {
-            processor.add(
-                prepareIndex(INDEX_NAME).setId(Integer.toString(i))
-                    .setSource("field", randomRealisticUnicodeOfLengthBetween(1, 30))
-                    .request()
-            );
-        }
-    }
-
-    /**
-     * Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number
-     * of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load).
-     *
-     * This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread
-     * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
-     */
-    private static class CorrelatingBackoffPolicy extends BackoffPolicy {
-        private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
-        // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
-        // thread local to be eligible for garbage collection right after the test to avoid leaks.
-        private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
-
-        private final BackoffPolicy delegate;
-
-        private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
-            this.delegate = delegate;
-        }
-
-        public Iterator<TimeValue> backoffStateFor(Object response) {
-            return correlations.get(response);
-        }
-
-        // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
-        // see also Retry.AbstractRetryHandler#onResponse().
-        public void logResponse(Object response) {
-            Iterator<TimeValue> iterator = iterators.get();
-            // did we ever retry?
-            if (iterator != null) {
-                // we should correlate any iterator only once
-                iterators.remove();
-                correlations.put(response, iterator);
-            }
-        }
-
-        @Override
-        public Iterator<TimeValue> iterator() {
-            return new CorrelatingIterator(iterators, delegate.iterator());
-        }
-
-        private static class CorrelatingIterator implements Iterator<TimeValue> {
-            private final Iterator<TimeValue> delegate;
-            private final ThreadLocal<Iterator<TimeValue>> iterators;
-
-            private CorrelatingIterator(ThreadLocal<Iterator<TimeValue>> iterators, Iterator<TimeValue> delegate) {
-                this.iterators = iterators;
-                this.delegate = delegate;
-            }
-
-            @Override
-            public boolean hasNext() {
-                // update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that
-                // we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the
-                // enclosing class CorrelatingBackoffPolicy this should not pose a major problem though.
-                iterators.set(this);
-                return delegate.hasNext();
-            }
-
-            @Override
-            public TimeValue next() {
-                // update on every invocation
-                iterators.set(this);
-                return delegate.next();
-            }
-        }
-    }
-}

+ 0 - 554
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

@@ -1,554 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.action.bulk;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.common.BackoffPolicy;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.core.Nullable;
-import org.elasticsearch.core.RestApiVersion;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.core.Tuple;
-import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler;
-import org.elasticsearch.threadpool.Scheduler;
-import org.elasticsearch.xcontent.XContentType;
-
-import java.io.Closeable;
-import java.util.Objects;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-
-/**
- * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
- * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
- * requests allowed to be executed in parallel.
- * <p>
- * In order to create a new bulk processor, use the {@link Builder}.
- */
-public class BulkProcessor implements Closeable {
-
-    static final String FLUSH_SCHEDULER_NAME_SUFFIX = "-flush-scheduler";
-    static final String RETRY_SCHEDULER_NAME_SUFFIX = "-retry-scheduler";
-
-    /**
-     * A listener for the execution.
-     */
-    public interface Listener {
-
-        /**
-         * Callback before the bulk is executed.
-         */
-        void beforeBulk(long executionId, BulkRequest request);
-
-        /**
-         * Callback after a successful execution of bulk request.
-         */
-        void afterBulk(long executionId, BulkRequest request, BulkResponse response);
-
-        /**
-         * Callback after a failed execution of bulk request.
-         * <p>
-         * Note that in case an instance of <code>InterruptedException</code> is passed, which means that request processing has been
-         * cancelled externally, the thread's interruption status has been restored prior to calling this method.
-         */
-        void afterBulk(long executionId, BulkRequest request, Throwable failure);
-    }
-
-    /**
-     * A builder used to create a build an instance of a bulk processor.
-     */
-    public static class Builder {
-
-        private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
-        private final Listener listener;
-        private final Scheduler flushScheduler;
-        private final Scheduler retryScheduler;
-        private final Runnable onClose;
-        private int concurrentRequests = 1;
-        private int bulkActions = 1000;
-        private ByteSizeValue bulkSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
-        private TimeValue flushInterval = null;
-        private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();
-        private String globalIndex;
-        private String globalRouting;
-        private String globalPipeline;
-        private Supplier<Boolean> flushCondition = () -> true;
-
-        private Builder(
-            BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
-            Listener listener,
-            Scheduler flushScheduler,
-            Scheduler retryScheduler,
-            Runnable onClose
-        ) {
-            this.consumer = consumer;
-            this.listener = listener;
-            this.flushScheduler = flushScheduler;
-            this.retryScheduler = retryScheduler;
-            this.onClose = onClose;
-        }
-
-        /**
-         * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
-         * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
-         * while accumulating new bulk requests. Defaults to {@code 1}.
-         */
-        public Builder setConcurrentRequests(int concurrentRequests) {
-            this.concurrentRequests = concurrentRequests;
-            return this;
-        }
-
-        /**
-         * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
-         * {@code 1000}. Can be set to {@code -1} to disable it.
-         */
-        public Builder setBulkActions(int bulkActions) {
-            this.bulkActions = bulkActions;
-            return this;
-        }
-
-        /**
-         * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
-         * {@code 5mb}. Can be set to {@code -1} to disable it.
-         */
-        public Builder setBulkSize(ByteSizeValue bulkSize) {
-            this.bulkSize = bulkSize;
-            return this;
-        }
-
-        /**
-         * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
-         * <p>
-         * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
-         * can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
-         */
-        public Builder setFlushInterval(TimeValue flushInterval) {
-            this.flushInterval = flushInterval;
-            return this;
-        }
-
-        public Builder setGlobalIndex(String globalIndex) {
-            this.globalIndex = globalIndex;
-            return this;
-        }
-
-        public Builder setGlobalRouting(String globalRouting) {
-            this.globalRouting = globalRouting;
-            return this;
-        }
-
-        public Builder setGlobalPipeline(String globalPipeline) {
-            this.globalPipeline = globalPipeline;
-            return this;
-        }
-
-        /**
-         * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
-         * in case they have failed due to resource constraints (i.e. a thread pool was full).
-         *
-         * The default is to back off exponentially.
-         *
-         * @see BackoffPolicy#exponentialBackoff()
-         */
-        public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
-            if (backoffPolicy == null) {
-                throw new NullPointerException("'backoffPolicy' must not be null. To disable backoff, pass BackoffPolicy.noBackoff()");
-            }
-            this.backoffPolicy = backoffPolicy;
-            return this;
-        }
-
-        /**
-         * Builds a new bulk processor.
-         */
-        public BulkProcessor build() {
-            return new BulkProcessor(
-                consumer,
-                backoffPolicy,
-                listener,
-                concurrentRequests,
-                bulkActions,
-                bulkSize,
-                flushInterval,
-                flushScheduler,
-                retryScheduler,
-                onClose,
-                createBulkRequestWithGlobalDefaults(),
-                flushCondition
-            );
-        }
-
-        private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
-            return () -> new BulkRequest(globalIndex).pipeline(globalPipeline).routing(globalRouting);
-        }
-
-        public Builder setFlushCondition(Supplier<Boolean> flushCondition) {
-            this.flushCondition = flushCondition;
-            return this;
-        }
-    }
-
-    /**
-     * @param consumer The consumer that is called to fulfil bulk operations
-     * @param listener The BulkProcessor listener that gets called on bulk events
-     * @param name     The name of this processor, e.g. to identify the scheduler threads
-     * @return the builder for BulkProcessor
-     */
-    public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, String name) {
-        Objects.requireNonNull(consumer, "consumer");
-        Objects.requireNonNull(listener, "listener");
-        final ScheduledThreadPoolExecutor flushScheduler = Scheduler.initScheduler(Settings.EMPTY, name + FLUSH_SCHEDULER_NAME_SUFFIX);
-        final ScheduledThreadPoolExecutor retryScheduler = Scheduler.initScheduler(Settings.EMPTY, name + RETRY_SCHEDULER_NAME_SUFFIX);
-        return new Builder(consumer, listener, buildScheduler(flushScheduler), buildScheduler(retryScheduler), () -> {
-            Scheduler.terminate(flushScheduler, 10, TimeUnit.SECONDS);
-            Scheduler.terminate(retryScheduler, 10, TimeUnit.SECONDS);
-        });
-    }
-
-    private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
-        return new ScheduledExecutorServiceScheduler(scheduledThreadPoolExecutor);
-    }
-
-    private final int bulkActions;
-    private final long bulkSize;
-
-    private final Scheduler.Cancellable cancellableFlushTask;
-
-    private final AtomicLong executionIdGen = new AtomicLong();
-
-    private BulkRequest bulkRequest;
-    private final Supplier<BulkRequest> bulkRequestSupplier;
-    private final Supplier<Boolean> flushSupplier;
-    private final BulkRequestHandler bulkRequestHandler;
-    private final Runnable onClose;
-
-    private volatile boolean closed = false;
-    private final ReentrantLock lock = new ReentrantLock();
-
-    BulkProcessor(
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
-        BackoffPolicy backoffPolicy,
-        Listener listener,
-        int concurrentRequests,
-        int bulkActions,
-        ByteSizeValue bulkSize,
-        @Nullable TimeValue flushInterval,
-        Scheduler flushScheduler,
-        Scheduler retryScheduler,
-        Runnable onClose,
-        Supplier<BulkRequest> bulkRequestSupplier,
-        Supplier<Boolean> flushSupplier
-    ) {
-        this.bulkActions = bulkActions;
-        this.bulkSize = bulkSize.getBytes();
-        this.bulkRequest = bulkRequestSupplier.get();
-        this.bulkRequestSupplier = bulkRequestSupplier;
-        this.flushSupplier = flushSupplier;
-        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests);
-        // Start period flushing task after everything is setup
-        this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler);
-        this.onClose = onClose;
-    }
-
-    BulkProcessor(
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
-        BackoffPolicy backoffPolicy,
-        Listener listener,
-        int concurrentRequests,
-        int bulkActions,
-        ByteSizeValue bulkSize,
-        @Nullable TimeValue flushInterval,
-        Scheduler flushScheduler,
-        Scheduler retryScheduler,
-        Runnable onClose,
-        Supplier<BulkRequest> bulkRequestSupplier
-    ) {
-        this(
-            consumer,
-            backoffPolicy,
-            listener,
-            concurrentRequests,
-            bulkActions,
-            bulkSize,
-            flushInterval,
-            flushScheduler,
-            retryScheduler,
-            onClose,
-            bulkRequestSupplier,
-            () -> true
-        );
-    }
-
-    /**
-     * @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry
-     */
-    @Deprecated
-    BulkProcessor(
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
-        BackoffPolicy backoffPolicy,
-        Listener listener,
-        int concurrentRequests,
-        int bulkActions,
-        ByteSizeValue bulkSize,
-        @Nullable TimeValue flushInterval,
-        Scheduler scheduler,
-        Runnable onClose,
-        Supplier<BulkRequest> bulkRequestSupplier
-    ) {
-        this(
-            consumer,
-            backoffPolicy,
-            listener,
-            concurrentRequests,
-            bulkActions,
-            bulkSize,
-            flushInterval,
-            scheduler,
-            scheduler,
-            onClose,
-            bulkRequestSupplier
-        );
-    }
-
-    /**
-     * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
-     */
-    @Override
-    public void close() {
-        try {
-            awaitClose(0, TimeUnit.NANOSECONDS);
-        } catch (InterruptedException exc) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    /**
-     * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
-     * <p>
-     * If concurrent requests are not enabled, returns {@code true} immediately.
-     * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}
-     * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
-     *
-     * @param timeout The maximum time to wait for the bulk requests to complete
-     * @param unit    The time unit of the {@code timeout} argument
-     * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests
-     * completed
-     * @throws InterruptedException If the current thread is interrupted
-     */
-    public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
-        lock.lock();
-        try {
-            if (closed) {
-                return true;
-            }
-            closed = true;
-
-            this.cancellableFlushTask.cancel();
-
-            if (bulkRequest.numberOfActions() > 0) {
-                execute();
-            }
-            try {
-                return this.bulkRequestHandler.awaitClose(timeout, unit);
-            } finally {
-                onClose.run();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
-     * (for example, if no id is provided, one will be generated, or usage of the create flag).
-     */
-    public BulkProcessor add(IndexRequest request) {
-        return add((DocWriteRequest<?>) request);
-    }
-
-    /**
-     * Adds an {@link DeleteRequest} to the list of actions to execute.
-     */
-    public BulkProcessor add(DeleteRequest request) {
-        return add((DocWriteRequest<?>) request);
-    }
-
-    /**
-     * Adds either a delete or an index request.
-     */
-    public BulkProcessor add(DocWriteRequest<?> request) {
-        internalAdd(request);
-        return this;
-    }
-
-    boolean isOpen() {
-        return closed == false;
-    }
-
-    protected void ensureOpen() {
-        if (closed) {
-            throw new IllegalStateException("bulk process already closed");
-        }
-    }
-
-    private void internalAdd(DocWriteRequest<?> request) {
-        // bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
-        // once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
-        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
-        lock.lock();
-        try {
-            ensureOpen();
-            bulkRequest.add(request);
-            bulkRequestToExecute = newBulkRequestIfNeeded();
-        } finally {
-            lock.unlock();
-        }
-        // execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
-        if (bulkRequestToExecute != null) {
-            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
-        }
-    }
-
-    /**
-     * Adds the data from the bytes to be processed by the bulk processor
-     */
-    public BulkProcessor add(
-        BytesReference data,
-        @Nullable String defaultIndex,
-        @Nullable String defaultPipeline,
-        XContentType xContentType
-    ) throws Exception {
-        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
-        lock.lock();
-        try {
-            ensureOpen();
-            bulkRequest.add(
-                data,
-                defaultIndex,
-                null,
-                null,
-                defaultPipeline,
-                null,
-                null,
-                null,
-                true,
-                xContentType,
-                RestApiVersion.current()
-            );
-            bulkRequestToExecute = newBulkRequestIfNeeded();
-        } finally {
-            lock.unlock();
-        }
-
-        if (bulkRequestToExecute != null) {
-            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
-        }
-        return this;
-    }
-
-    private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
-        if (flushInterval == null) {
-            return new Scheduler.Cancellable() {
-                @Override
-                public boolean cancel() {
-                    return false;
-                }
-
-                @Override
-                public boolean isCancelled() {
-                    return true;
-                }
-            };
-        }
-        return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
-    }
-
-    // needs to be executed under a lock
-    private Tuple<BulkRequest, Long> newBulkRequestIfNeeded() {
-        ensureOpen();
-        if (isOverTheLimit() == false) {
-            return null;
-        }
-        final BulkRequest bulkRequest = this.bulkRequest;
-        this.bulkRequest = bulkRequestSupplier.get();
-        return new Tuple<>(bulkRequest, executionIdGen.incrementAndGet());
-    }
-
-    // may be executed without a lock
-    private void execute(BulkRequest bulkRequest, long executionId) {
-        this.bulkRequestHandler.execute(bulkRequest, executionId);
-    }
-
-    // needs to be executed under a lock
-    private void execute() {
-        if (flushSupplier.get()) {
-            final BulkRequest bulkRequest = this.bulkRequest;
-            final long executionId = executionIdGen.incrementAndGet();
-
-            this.bulkRequest = bulkRequestSupplier.get();
-            execute(bulkRequest, executionId);
-        }
-    }
-
-    // needs to be executed under a lock
-    private boolean isOverTheLimit() {
-        if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
-            return true;
-        }
-        if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Flush pending delete or index requests.
-     */
-    public void flush() {
-        lock.lock();
-        try {
-            ensureOpen();
-            if (bulkRequest.numberOfActions() > 0) {
-                execute();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    class Flush implements Runnable {
-        @Override
-        public void run() {
-            lock.lock();
-            try {
-                if (closed) {
-                    return;
-                }
-                if (bulkRequest.numberOfActions() == 0) {
-                    return;
-                }
-                execute();
-            } finally {
-                lock.unlock();
-            }
-        }
-    }
-}

+ 0 - 95
server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java

@@ -1,95 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-package org.elasticsearch.action.bulk;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.common.BackoffPolicy;
-import org.elasticsearch.threadpool.Scheduler;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-
-/**
- * Implements the low-level details of bulk request handling
- */
-public final class BulkRequestHandler {
-    private static final Logger logger = LogManager.getLogger(BulkRequestHandler.class);
-    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
-    private final BulkProcessor.Listener listener;
-    private final Semaphore semaphore;
-    private final Retry retry;
-    private final int concurrentRequests;
-
-    BulkRequestHandler(
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
-        BackoffPolicy backoffPolicy,
-        BulkProcessor.Listener listener,
-        Scheduler scheduler,
-        int concurrentRequests
-    ) {
-        assert concurrentRequests >= 0;
-        this.consumer = consumer;
-        this.listener = listener;
-        this.concurrentRequests = concurrentRequests;
-        this.retry = new Retry(backoffPolicy, scheduler);
-        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
-    }
-
-    public void execute(BulkRequest bulkRequest, long executionId) {
-        Runnable toRelease = () -> {};
-        boolean bulkRequestSetupSuccessful = false;
-        try {
-            listener.beforeBulk(executionId, bulkRequest);
-            semaphore.acquire();
-            toRelease = semaphore::release;
-            CountDownLatch latch = new CountDownLatch(1);
-            retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
-                @Override
-                public void onResponse(BulkResponse response) {
-                    listener.afterBulk(executionId, bulkRequest, response);
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.afterBulk(executionId, bulkRequest, e);
-                }
-            }, () -> {
-                semaphore.release();
-                latch.countDown();
-            }));
-            bulkRequestSetupSuccessful = true;
-            if (concurrentRequests == 0) {
-                latch.await();
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger.info(() -> "Bulk request " + executionId + " has been cancelled.", e);
-            listener.afterBulk(executionId, bulkRequest, e);
-        } catch (Exception e) {
-            logger.warn(() -> "Failed to execute bulk request " + executionId + ".", e);
-            listener.afterBulk(executionId, bulkRequest, e);
-        } finally {
-            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
-                toRelease.run();
-            }
-        }
-    }
-
-    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
-        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
-            semaphore.release(this.concurrentRequests);
-            return true;
-        }
-        return false;
-    }
-}

+ 0 - 557
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java

@@ -1,557 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.action.bulk;
-
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.common.BackoffPolicy;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.core.Strings;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler;
-import org.elasticsearch.threadpool.Scheduler;
-import org.elasticsearch.threadpool.TestThreadPool;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.RemoteTransportException;
-import org.elasticsearch.xcontent.XContentType;
-import org.junit.After;
-import org.junit.Before;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-public class BulkProcessorTests extends ESTestCase {
-
-    private ThreadPool threadPool;
-
-    @Before
-    public void startThreadPool() {
-        threadPool = new TestThreadPool("BulkProcessorTests");
-    }
-
-    @After
-    public void stopThreadPool() throws InterruptedException {
-        terminate(threadPool);
-    }
-
-    public void testBulkProcessorFlushPreservesContext() throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final String headerKey = randomAlphaOfLengthBetween(1, 8);
-        final String transientKey = randomAlphaOfLengthBetween(1, 8);
-        final String headerValue = randomAlphaOfLengthBetween(1, 32);
-        final Object transientValue = new Object();
-
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
-            ThreadContext threadContext = threadPool.getThreadContext();
-            assertEquals(headerValue, threadContext.getHeader(headerKey));
-            assertSame(transientValue, threadContext.getTransient(transientKey));
-            latch.countDown();
-        };
-
-        final int bulkSize = randomIntBetween(2, 32);
-        final TimeValue flushInterval = TimeValue.timeValueSeconds(1L);
-        final BulkProcessor bulkProcessor;
-        assertNull(threadPool.getThreadContext().getHeader(headerKey));
-        assertNull(threadPool.getThreadContext().getTransient(transientKey));
-        try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
-            threadPool.getThreadContext().putHeader(headerKey, headerValue);
-            threadPool.getThreadContext().putTransient(transientKey, transientValue);
-            bulkProcessor = new BulkProcessor(
-                consumer,
-                BackoffPolicy.noBackoff(),
-                emptyListener(),
-                1,
-                bulkSize,
-                ByteSizeValue.of(5, ByteSizeUnit.MB),
-                flushInterval,
-                threadPool,
-                () -> {},
-                BulkRequest::new
-            );
-        }
-        assertNull(threadPool.getThreadContext().getHeader(headerKey));
-        assertNull(threadPool.getThreadContext().getTransient(transientKey));
-
-        // add a single item which won't be over the size or number of items
-        bulkProcessor.add(new IndexRequest());
-
-        // wait for flush to execute
-        latch.await();
-
-        assertNull(threadPool.getThreadContext().getHeader(headerKey));
-        assertNull(threadPool.getThreadContext().getTransient(transientKey));
-        bulkProcessor.close();
-    }
-
-    public void testRetry() throws Exception {
-        final int maxAttempts = between(1, 3);
-        final AtomicInteger attemptRef = new AtomicInteger();
-
-        final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
-            final int attempt = attemptRef.incrementAndGet();
-            assertThat(attempt, lessThanOrEqualTo(maxAttempts));
-            if (attempt != 1) {
-                assertThat(Thread.currentThread().getName(), containsString("[BulkProcessorTests-retry-scheduler]"));
-            }
-
-            if (attempt == maxAttempts) {
-                listener.onFailure(new ElasticsearchException("final failure"));
-            } else {
-                listener.onFailure(new RemoteTransportException("remote", new EsRejectedExecutionException("retryable failure")));
-            }
-        };
-
-        final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
-
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {}
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                fail("afterBulk should not return success");
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                assertThat(failure, instanceOf(ElasticsearchException.class));
-                assertThat(failure.getMessage(), equalTo("final failure"));
-                countDownLatch.countDown();
-            }
-        };
-
-        try (
-            BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests")
-                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.ZERO, Integer.MAX_VALUE))
-                .build()
-        ) {
-            bulkProcessor.add(new IndexRequest());
-            bulkProcessor.flush();
-            assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
-        }
-
-        assertThat(attemptRef.get(), equalTo(maxAttempts));
-    }
-
-    public void testConcurrentExecutions() throws Exception {
-        final AtomicBoolean called = new AtomicBoolean(false);
-        final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-        int estimatedTimeForTest = Integer.MAX_VALUE;
-        final int simulateWorkTimeInMillis = 5;
-        int concurrentClients = 0;
-        int concurrentBulkRequests = 0;
-        int expectedExecutions = 0;
-        int maxBatchSize = 0;
-        int maxDocuments = 0;
-        int iterations = 0;
-        boolean runTest = true;
-        // find some randoms that allow this test to take under ~ 10 seconds
-        while (estimatedTimeForTest > 10_000) {
-            if (iterations++ > 1_000) { // extremely unlikely
-                runTest = false;
-                break;
-            }
-            maxBatchSize = randomIntBetween(1, 100);
-            maxDocuments = randomIntBetween(maxBatchSize, 1_000_000);
-            concurrentClients = randomIntBetween(1, 20);
-            concurrentBulkRequests = randomIntBetween(0, 20);
-            expectedExecutions = maxDocuments / maxBatchSize;
-            estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) / Math.min(
-                concurrentBulkRequests + 1,
-                concurrentClients
-            );
-        }
-        assumeTrue("failed to find random values that allows test to run quickly", runTest);
-        BulkResponse bulkResponse = new BulkResponse(
-            new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
-            0
-        );
-        AtomicInteger failureCount = new AtomicInteger(0);
-        AtomicInteger successCount = new AtomicInteger(0);
-        AtomicInteger requestCount = new AtomicInteger(0);
-        AtomicInteger docCount = new AtomicInteger(0);
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
-            try {
-                Thread.sleep(simulateWorkTimeInMillis); // simulate work
-                listener.onResponse(bulkResponse);
-            } catch (InterruptedException e) {
-                // should never happen
-                Thread.currentThread().interrupt();
-                failureCount.getAndIncrement();
-                exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
-            }
-        };
-        try (
-            BulkProcessor bulkProcessor = new BulkProcessor(
-                consumer,
-                BackoffPolicy.noBackoff(),
-                countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
-                concurrentBulkRequests,
-                maxBatchSize,
-                ByteSizeValue.ofBytes(Integer.MAX_VALUE),
-                null,
-                UnusedScheduler.INSTANCE,
-                () -> called.set(true),
-                BulkRequest::new
-            )
-        ) {
-
-            ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
-            CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
-
-            IndexRequest indexRequest = new IndexRequest();
-            String bulkRequest = """
-                { "index" : { "_index" : "test", "_id" : "1" } }
-                { "field1" : "value1" }
-                """;
-            BytesReference bytesReference = BytesReference.fromByteBuffers(
-                new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }
-            );
-            List<Future<?>> futures = new ArrayList<>();
-            for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
-                futures.add(executorService.submit(() -> {
-                    try {
-                        // don't start any work until all tasks are submitted
-                        startGate.countDown();
-                        startGate.await();
-                        // alternate between ways to add to the bulk processor
-                        if (randomBoolean()) {
-                            bulkProcessor.add(indexRequest);
-                        } else {
-                            bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
-                        }
-                    } catch (Exception e) {
-                        throw ExceptionsHelper.convertToRuntime(e);
-                    }
-                }));
-            }
-            startGate.countDown();
-            startGate.await();
-
-            for (Future<?> f : futures) {
-                try {
-                    f.get();
-                } catch (Exception e) {
-                    failureCount.incrementAndGet();
-                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
-                }
-            }
-            executorService.shutdown();
-            executorService.awaitTermination(10, TimeUnit.SECONDS);
-
-            if (failureCount.get() > 0 || successCount.get() != expectedExecutions || requestCount.get() != successCount.get()) {
-                if (exceptionRef.get() != null) {
-                    logger.error("exception(s) caught during test", exceptionRef.get());
-                }
-                String message = """
-
-                    Expected Bulks: %s
-                    Requested Bulks: %s
-                    Successful Bulks: %s
-                    Failed Bulks: %ds
-                    Max Documents: %s
-                    Max Batch Size: %s
-                    Concurrent Clients: %s
-                    Concurrent Bulk Requests: %s
-                    """;
-                fail(
-                    Strings.format(
-                        message,
-                        expectedExecutions,
-                        requestCount.get(),
-                        successCount.get(),
-                        failureCount.get(),
-                        maxDocuments,
-                        maxBatchSize,
-                        concurrentClients,
-                        concurrentBulkRequests
-                    )
-                );
-            }
-        }
-        // count total docs after processor is closed since there may have been partial batches that are flushed on close.
-        assertEquals(docCount.get(), maxDocuments);
-    }
-
-    public void testConcurrentExecutionsWithFlush() throws Exception {
-        final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-        final int maxDocuments = 100_000;
-        final int concurrentClients = 2;
-        final int maxBatchSize = Integer.MAX_VALUE; // don't flush based on size
-        final int concurrentBulkRequests = randomIntBetween(0, 20);
-        final int simulateWorkTimeInMillis = 5;
-        BulkResponse bulkResponse = new BulkResponse(
-            new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
-            0
-        );
-        AtomicInteger failureCount = new AtomicInteger(0);
-        AtomicInteger successCount = new AtomicInteger(0);
-        AtomicInteger requestCount = new AtomicInteger(0);
-        AtomicInteger docCount = new AtomicInteger(0);
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
-            try {
-                Thread.sleep(simulateWorkTimeInMillis); // simulate work
-                listener.onResponse(bulkResponse);
-            } catch (InterruptedException e) {
-                // should never happen
-                Thread.currentThread().interrupt();
-                failureCount.getAndIncrement();
-                exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
-            }
-        };
-        ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1);
-        try (
-            BulkProcessor bulkProcessor = new BulkProcessor(
-                consumer,
-                BackoffPolicy.noBackoff(),
-                countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
-                concurrentBulkRequests,
-                maxBatchSize,
-                ByteSizeValue.ofBytes(Integer.MAX_VALUE),
-                TimeValue.timeValueMillis(simulateWorkTimeInMillis * 2),
-                new ScheduledExecutorServiceScheduler(flushExecutor),
-                () -> {
-                    flushExecutor.shutdown();
-                    try {
-                        flushExecutor.awaitTermination(10L, TimeUnit.SECONDS);
-                        if (flushExecutor.isTerminated() == false) {
-                            flushExecutor.shutdownNow();
-                        }
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    }
-                },
-                BulkRequest::new
-            )
-        ) {
-
-            ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
-            IndexRequest indexRequest = new IndexRequest();
-            String bulkRequest = """
-                { "index" : { "_index" : "test", "_id" : "1" } }
-                { "field1" : "value1" }
-                """;
-            BytesReference bytesReference = BytesReference.fromByteBuffers(
-                new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }
-            );
-            List<Future<?>> futures = new ArrayList<>();
-            CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
-            for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
-                futures.add(executorService.submit(() -> {
-                    try {
-                        // don't start any work until all tasks are submitted
-                        startGate.countDown();
-                        startGate.await();
-                        // alternate between ways to add to the bulk processor
-                        if (randomBoolean()) {
-                            bulkProcessor.add(indexRequest);
-                        } else {
-                            bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
-                        }
-                    } catch (Exception e) {
-                        throw ExceptionsHelper.convertToRuntime(e);
-                    }
-                }));
-            }
-            startGate.countDown();
-            startGate.await();
-
-            for (Future<?> f : futures) {
-                try {
-                    f.get();
-                } catch (Exception e) {
-                    failureCount.incrementAndGet();
-                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
-                }
-            }
-            executorService.shutdown();
-            executorService.awaitTermination(10, TimeUnit.SECONDS);
-        }
-
-        if (failureCount.get() > 0 || requestCount.get() != successCount.get() || maxDocuments != docCount.get()) {
-            if (exceptionRef.get() != null) {
-                logger.error("exception(s) caught during test", exceptionRef.get());
-            }
-            String message = """
-
-                Requested Bulks: %d
-                Successful Bulks: %d
-                Failed Bulks: %d
-                Total Documents: %d
-                Max Documents: %d
-                Max Batch Size: %d
-                Concurrent Clients: %d
-                Concurrent Bulk Requests: %d
-                """;
-            fail(
-                Strings.format(
-                    message,
-                    requestCount.get(),
-                    successCount.get(),
-                    failureCount.get(),
-                    docCount.get(),
-                    maxDocuments,
-                    maxBatchSize,
-                    concurrentClients,
-                    concurrentBulkRequests
-                )
-            );
-        }
-    }
-
-    public void testAwaitOnCloseCallsOnClose() throws Exception {
-        final AtomicBoolean called = new AtomicBoolean(false);
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
-        BulkProcessor bulkProcessor = new BulkProcessor(
-            consumer,
-            BackoffPolicy.noBackoff(),
-            emptyListener(),
-            0,
-            10,
-            ByteSizeValue.ofBytes(1000),
-            null,
-            UnusedScheduler.INSTANCE,
-            () -> called.set(true),
-            BulkRequest::new
-        );
-
-        assertFalse(called.get());
-        bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS);
-        assertTrue(called.get());
-    }
-
-    public void testDisableFlush() {
-        final AtomicInteger attemptRef = new AtomicInteger();
-
-        BulkResponse bulkResponse = new BulkResponse(
-            new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
-            0
-        );
-
-        final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
-            listener.onResponse(bulkResponse);
-        };
-
-        final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
-
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {}
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                attemptRef.incrementAndGet();
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
-        };
-
-        AtomicBoolean flushEnabled = new AtomicBoolean(false);
-        try (
-            BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests")
-                .setFlushCondition(flushEnabled::get)
-                .build()
-        ) {
-            bulkProcessor.add(new IndexRequest());
-            bulkProcessor.flush();
-            assertThat(attemptRef.get(), equalTo(0));
-
-            flushEnabled.set(true);
-            bulkProcessor.flush();
-            assertThat(attemptRef.get(), equalTo(1));
-        }
-    }
-
-    private BulkProcessor.Listener emptyListener() {
-        return new BulkProcessor.Listener() {
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {}
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
-        };
-    }
-
-    private BulkProcessor.Listener countingListener(
-        AtomicInteger requestCount,
-        AtomicInteger successCount,
-        AtomicInteger failureCount,
-        AtomicInteger docCount,
-        AtomicReference<Throwable> exceptionRef
-    ) {
-
-        return new BulkProcessor.Listener() {
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {
-                requestCount.incrementAndGet();
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                successCount.incrementAndGet();
-                docCount.addAndGet(request.requests().size());
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                if (failure != null) {
-                    failureCount.incrementAndGet();
-                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), failure));
-
-                }
-            }
-        };
-    }
-
-    private DocWriteResponse mockResponse() {
-        return new IndexResponse(new ShardId("index", "uid", 0), "id", 1, 1, 1, true);
-    }
-
-    private static class UnusedScheduler implements Scheduler {
-        static UnusedScheduler INSTANCE = new UnusedScheduler();
-
-        @Override
-        public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
-            throw new AssertionError("should not be called");
-        }
-    }
-}