|
|
@@ -21,179 +21,159 @@ package org.elasticsearch.index.reindex;
|
|
|
|
|
|
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
|
|
import org.elasticsearch.action.ListenableActionFuture;
|
|
|
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
|
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.bulk.Retry;
|
|
|
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
-import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
-import org.elasticsearch.search.MockSearchService;
|
|
|
+import org.elasticsearch.test.ESSingleNodeTestCase;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.List;
|
|
|
-import java.util.function.IntFunction;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
|
|
|
-import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
|
|
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.elasticsearch.index.reindex.ReindexTestCase.matcher;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
+import static org.hamcrest.Matchers.hasSize;
|
|
|
|
|
|
/**
|
|
|
* Integration test for retry behavior. Useful because retrying relies on the way that the rest of Elasticsearch throws exceptions and unit
|
|
|
* tests won't verify that.
|
|
|
*/
|
|
|
-@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/18456")
|
|
|
-public class RetryTests extends ReindexTestCase {
|
|
|
- /**
|
|
|
- * The number of concurrent requests to test.
|
|
|
- */
|
|
|
- private static final int CONCURRENT = 12;
|
|
|
- /**
|
|
|
- * Enough docs that the requests will likely step on each other.
|
|
|
- */
|
|
|
- private static final int DOC_COUNT = 200;
|
|
|
- /**
|
|
|
- * Maximum number of times to attempt the test case before bailing with a failure if we don't see both a bulk and a search retry.
|
|
|
- */
|
|
|
- private static final int ATTEMPTS = 10;
|
|
|
+public class RetryTests extends ESSingleNodeTestCase {
|
|
|
+ private static final int DOC_COUNT = 20;
|
|
|
+
|
|
|
+ private List<CyclicBarrier> blockedExecutors = new ArrayList<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Collection<Class<? extends Plugin>> getPlugins() {
|
|
|
+ return pluginList(ReindexPlugin.class);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
|
|
|
*/
|
|
|
@Override
|
|
|
- protected Settings nodeSettings(int nodeOrdinal) {
|
|
|
- Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
|
|
|
- settings.put("threadpool.bulk.queue_size", 1);
|
|
|
+ protected Settings nodeSettings() {
|
|
|
+ Settings.Builder settings = Settings.builder().put(super.nodeSettings());
|
|
|
+ // Use pools of size 1 so we can block them
|
|
|
settings.put("threadpool.bulk.size", 1);
|
|
|
- settings.put("threadpool.search.queue_size", 1);
|
|
|
settings.put("threadpool.search.size", 1);
|
|
|
+ // Use queues of size 1 because size 0 is broken and because search requests need the queue to function
|
|
|
+ settings.put("threadpool.bulk.queue_size", 1);
|
|
|
+ settings.put("threadpool.search.queue_size", 1);
|
|
|
return settings.build();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Disable search context leak detection because we expect leaks when there is an {@link EsRejectedExecutionException} queueing the
|
|
|
- * reduce phase.
|
|
|
- */
|
|
|
- @Override
|
|
|
- protected Collection<Class<? extends Plugin>> getMockPlugins() {
|
|
|
- List<Class<? extends Plugin>> mockPlugins = new ArrayList<>();
|
|
|
- for (Class<? extends Plugin> plugin: super.getMockPlugins()) {
|
|
|
- if (plugin.equals(MockSearchService.TestPlugin.class)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- mockPlugins.add(plugin);
|
|
|
+ @Before
|
|
|
+ public void setupSourceIndex() throws Exception {
|
|
|
+ createIndex("source");
|
|
|
+ // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
|
|
|
+ BulkRequestBuilder bulk = client().prepareBulk();
|
|
|
+ for (int i = 0; i < DOC_COUNT; i++) {
|
|
|
+ bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
|
|
|
+ }
|
|
|
+ Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff());
|
|
|
+ BulkResponse response = retry.withSyncBackoff(client(), bulk.request());
|
|
|
+ assertFalse(response.buildFailureMessage(), response.hasFailures());
|
|
|
+ client().admin().indices().prepareRefresh("source").get();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void forceUnblockAllExecutors() {
|
|
|
+ for (CyclicBarrier barrier: blockedExecutors) {
|
|
|
+ barrier.reset();
|
|
|
}
|
|
|
- return mockPlugins;
|
|
|
}
|
|
|
|
|
|
public void testReindex() throws Exception {
|
|
|
- testCase(true, () -> setupSourceIndex("source"), i -> reindex().source("source").destination("dest" + i));
|
|
|
+ testCase(ReindexAction.NAME, ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest"),
|
|
|
+ matcher().created(DOC_COUNT));
|
|
|
}
|
|
|
|
|
|
public void testUpdateByQuery() throws Exception {
|
|
|
- Runnable setup = () -> {
|
|
|
- for (int i = 0; i < CONCURRENT; i++) {
|
|
|
- setupSourceIndex("source" + i);
|
|
|
- }
|
|
|
- };
|
|
|
- testCase(false, setup, i -> updateByQuery().source("source" + i));
|
|
|
+ testCase(UpdateByQueryAction.NAME, UpdateByQueryAction.INSTANCE.newRequestBuilder(client()).source("source"),
|
|
|
+ matcher().updated(DOC_COUNT));
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Repeatedly attempts to cause bulk and search retries, failing if any of the requests fail and if after 5 attempts it isn't able to
|
|
|
- * cause at least one of both types of retries across all attempts.
|
|
|
- *
|
|
|
- * @param expectCreated should the number of effected documents be created (true) or updated (false)
|
|
|
- * @param setup called before every request attempt to create the test data
|
|
|
- * @param requestBuilder called to build each parallel request.
|
|
|
- */
|
|
|
- private void testCase(boolean expectCreated, Runnable setup, IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder)
|
|
|
+ private void testCase(String action, AbstractBulkIndexByScrollRequestBuilder<?, ?> request, BulkIndexByScrollResponseMatcher matcher)
|
|
|
throws Exception {
|
|
|
- long bulkRetries = 0;
|
|
|
- long searchRetries = 0;
|
|
|
- int attempt = 0;
|
|
|
- while (attempt < ATTEMPTS) {
|
|
|
- client().admin().indices().prepareDelete("*").get();
|
|
|
- setup.run();
|
|
|
- Tuple<Long, Long> result = attemptTestCase(expectCreated, requestBuilder);
|
|
|
- bulkRetries += result.v1();
|
|
|
- searchRetries += result.v2();
|
|
|
- attempt += 1;
|
|
|
- if (bulkRetries == 0) {
|
|
|
- logger.warn("Didn't get any bulk retries after {} attempts", attempt);
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (searchRetries == 0) {
|
|
|
- logger.warn("Didn't get any search retries after {} attempts", attempt);
|
|
|
- continue;
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- // We expect at least one retry or this test isn't very useful
|
|
|
- assertThat(bulkRetries, greaterThan(0L));
|
|
|
- assertThat(searchRetries, greaterThan(0L));
|
|
|
- }
|
|
|
+ logger.info("Blocking search");
|
|
|
+ CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH);
|
|
|
|
|
|
- /**
|
|
|
- * Attempts to cause retries for the requests provided by requestBuilder, failing if the requests fail. Returns a tuple of (number of
|
|
|
- * bulk retries, number of search retries).
|
|
|
- */
|
|
|
- private Tuple<Long, Long> attemptTestCase(boolean expectCreated,
|
|
|
- IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder) throws Exception {
|
|
|
- List<ListenableActionFuture<BulkIndexByScrollResponse>> futures = new ArrayList<>(CONCURRENT);
|
|
|
- for (int i = 0; i < CONCURRENT; i++) {
|
|
|
- AbstractBulkIndexByScrollRequestBuilder<?, ?> request = requestBuilder.apply(i);
|
|
|
- // Make sure we use more than one batch so we get the full reindex behavior
|
|
|
- request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
|
|
|
- // Use a low, random initial wait so we are unlikely collide with others retrying.
|
|
|
- request.setRetryBackoffInitialTime(timeValueMillis(randomIntBetween(10, 300)));
|
|
|
- futures.add(request.execute());
|
|
|
- }
|
|
|
+ // Make sure we use more than one batch so we have to scroll
|
|
|
+ request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
|
|
|
|
|
|
- // Finish all the requests
|
|
|
- List<BulkIndexByScrollResponse> responses = new ArrayList<>(CONCURRENT);
|
|
|
- for (ListenableActionFuture<BulkIndexByScrollResponse> future : futures) {
|
|
|
- responses.add(future.get());
|
|
|
- }
|
|
|
+ logger.info("Starting request");
|
|
|
+ ListenableActionFuture<BulkIndexByScrollResponse> responseListener = request.execute();
|
|
|
|
|
|
- // Now check them
|
|
|
- long bulkRetries = 0;
|
|
|
- long searchRetries = 0;
|
|
|
- BulkIndexByScrollResponseMatcher matcher = matcher();
|
|
|
- if (expectCreated) {
|
|
|
- matcher.created(DOC_COUNT);
|
|
|
- } else {
|
|
|
- matcher.updated(DOC_COUNT);
|
|
|
- }
|
|
|
- for (BulkIndexByScrollResponse response : responses) {
|
|
|
- assertThat(response, matcher);
|
|
|
- bulkRetries += response.getBulkRetries();
|
|
|
- searchRetries += response.getSearchRetries();
|
|
|
- }
|
|
|
- return new Tuple<>(bulkRetries, searchRetries);
|
|
|
+ logger.info("Waiting for search rejections on the initial search");
|
|
|
+ assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));
|
|
|
+
|
|
|
+ logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
|
|
|
+ CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK);
|
|
|
+ initialSearchBlock.await();
|
|
|
+
|
|
|
+ logger.info("Waiting for bulk rejections");
|
|
|
+ assertBusy(() -> assertThat(taskStatus(action).getBulkRetries(), greaterThan(0L)));
|
|
|
+
|
|
|
+ // Keep a copy of the current number of search rejections so we can assert that we get more when we block the scroll
|
|
|
+ long initialSearchRejections = taskStatus(action).getSearchRetries();
|
|
|
+
|
|
|
+ logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll");
|
|
|
+ CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH);
|
|
|
+ bulkBlock.await();
|
|
|
+
|
|
|
+ logger.info("Waiting for search rejections for the scroll");
|
|
|
+ assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(initialSearchRejections)));
|
|
|
+
|
|
|
+ logger.info("Unblocking the scroll");
|
|
|
+ scrollBlock.await();
|
|
|
+
|
|
|
+ logger.info("Waiting for the request to finish");
|
|
|
+ BulkIndexByScrollResponse response = responseListener.get();
|
|
|
+ assertThat(response, matcher);
|
|
|
+ assertThat(response.getBulkRetries(), greaterThan(0L));
|
|
|
+ assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections));
|
|
|
}
|
|
|
|
|
|
- private void setupSourceIndex(String name) {
|
|
|
- try {
|
|
|
- // Build the test index with a single shard so we can be sure that a search request *can* complete with the one thread
|
|
|
- assertAcked(client().admin().indices().prepareCreate(name).setSettings(
|
|
|
- "index.number_of_shards", 1,
|
|
|
- "index.number_of_replicas", 0).get());
|
|
|
- waitForRelocation(ClusterHealthStatus.GREEN);
|
|
|
- // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
|
|
|
- BulkRequestBuilder bulk = client().prepareBulk();
|
|
|
- for (int i = 0; i < DOC_COUNT; i++) {
|
|
|
- bulk.add(client().prepareIndex(name, "test").setSource("foo", "bar " + i));
|
|
|
+ /**
|
|
|
+ * Blocks the named executor by getting its only thread running a task blocked on a CyclicBarrier and fills the queue with a noop task.
|
|
|
+ * So requests to use this queue should get {@link EsRejectedExecutionException}s.
|
|
|
+ */
|
|
|
+ private CyclicBarrier blockExecutor(String name) throws Exception {
|
|
|
+ ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
|
|
|
+ CyclicBarrier barrier = new CyclicBarrier(2);
|
|
|
+ logger.info("Blocking the [{}] executor", name);
|
|
|
+ threadPool.executor(name).execute(() -> {
|
|
|
+ try {
|
|
|
+ threadPool.executor(name).execute(() -> {});
|
|
|
+ barrier.await();
|
|
|
+ logger.info("Blocked the [{}] executor", name);
|
|
|
+ barrier.await();
|
|
|
+ logger.info("Ublocking the [{}] executor", name);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
|
- Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff());
|
|
|
- BulkResponse response = retry.withSyncBackoff(client(), bulk.request());
|
|
|
- assertFalse(response.buildFailureMessage(), response.hasFailures());
|
|
|
- refresh(name);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
+ });
|
|
|
+ barrier.await();
|
|
|
+ blockedExecutors.add(barrier);
|
|
|
+ return barrier;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetch the status for a task of type "action". Fails if there aren't exactly one of that type of task running.
|
|
|
+ */
|
|
|
+ private BulkByScrollTask.Status taskStatus(String action) {
|
|
|
+ ListTasksResponse response = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get();
|
|
|
+ assertThat(response.getTasks(), hasSize(1));
|
|
|
+ return (BulkByScrollTask.Status) response.getTasks().get(0).getStatus();
|
|
|
}
|
|
|
}
|