|
|
@@ -26,6 +26,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.bulk.Retry;
|
|
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
+import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
@@ -54,6 +55,10 @@ public class RetryTests extends ReindexTestCase {
|
|
|
* Enough docs that the requests will likely step on each other.
|
|
|
*/
|
|
|
private static final int DOC_COUNT = 200;
|
|
|
+ /**
|
|
|
+ * Maximum number of times to attempt the test case before bailing with a failure if we don't see both a bulk and a search retry.
|
|
|
+ */
|
|
|
+ private static final int ATTEMPTS = 10;
|
|
|
|
|
|
/**
|
|
|
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
|
|
|
@@ -85,19 +90,59 @@ public class RetryTests extends ReindexTestCase {
|
|
|
}
|
|
|
|
|
|
public void testReindex() throws Exception {
|
|
|
- setupSourceIndex("source");
|
|
|
- testCase(true, i -> reindex().source("source").destination("dest" + i));
|
|
|
+ testCase(true, () -> setupSourceIndex("source"), i -> reindex().source("source").destination("dest" + i));
|
|
|
}
|
|
|
|
|
|
public void testUpdateByQuery() throws Exception {
|
|
|
- for (int i = 0; i < CONCURRENT; i++) {
|
|
|
- setupSourceIndex("source" + i);
|
|
|
- }
|
|
|
- testCase(false, i -> updateByQuery().source("source" + i));
|
|
|
+ Runnable setup = () -> {
|
|
|
+ for (int i = 0; i < CONCURRENT; i++) {
|
|
|
+ setupSourceIndex("source" + i);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ testCase(false, setup, i -> updateByQuery().source("source" + i));
|
|
|
}
|
|
|
|
|
|
- private void testCase(boolean expectCreated, IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder)
|
|
|
+ /**
|
|
|
+ * Repeatedly attempts to cause bulk and search retries, failing if any of the requests fail and if after 5 attempts it isn't able to
|
|
|
+ * cause at least one of both types of retries across all attempts.
|
|
|
+ *
|
|
|
+ * @param expectCreated should the number of effected documents be created (true) or updated (false)
|
|
|
+ * @param setup called before every request attempt to create the test data
|
|
|
+ * @param requestBuilder called to build each parallel request.
|
|
|
+ */
|
|
|
+ private void testCase(boolean expectCreated, Runnable setup, IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder)
|
|
|
throws Exception {
|
|
|
+ long bulkRetries = 0;
|
|
|
+ long searchRetries = 0;
|
|
|
+ int attempt = 0;
|
|
|
+ while (attempt < ATTEMPTS) {
|
|
|
+ client().admin().indices().prepareDelete("*").get();
|
|
|
+ setup.run();
|
|
|
+ Tuple<Long, Long> result = attemptTestCase(expectCreated, requestBuilder);
|
|
|
+ bulkRetries += result.v1();
|
|
|
+ searchRetries += result.v2();
|
|
|
+ attempt += 1;
|
|
|
+ if (bulkRetries == 0) {
|
|
|
+ logger.warn("Didn't get any bulk retries after {} attempts", attempt);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (searchRetries == 0) {
|
|
|
+ logger.warn("Didn't get any search retries after {} attempts", attempt);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ // We expect at least one retry or this test isn't very useful
|
|
|
+ assertThat(bulkRetries, greaterThan(0L));
|
|
|
+ assertThat(searchRetries, greaterThan(0L));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Attempts to cause retries for the requests provided by requestBuilder, failing if the requests fail. Returns a tuple of (number of
|
|
|
+ * bulk retries, number of search retries).
|
|
|
+ */
|
|
|
+ private Tuple<Long, Long> attemptTestCase(boolean expectCreated,
|
|
|
+ IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder) throws Exception {
|
|
|
List<ListenableActionFuture<BulkIndexByScrollResponse>> futures = new ArrayList<>(CONCURRENT);
|
|
|
for (int i = 0; i < CONCURRENT; i++) {
|
|
|
AbstractBulkIndexByScrollRequestBuilder<?, ?> request = requestBuilder.apply(i);
|
|
|
@@ -128,10 +173,7 @@ public class RetryTests extends ReindexTestCase {
|
|
|
bulkRetries += response.getBulkRetries();
|
|
|
searchRetries += response.getSearchRetries();
|
|
|
}
|
|
|
-
|
|
|
- // We expect at least one retry or this test isn't very useful
|
|
|
- assertThat(bulkRetries, greaterThan(0L));
|
|
|
- assertThat(searchRetries, greaterThan(0L));
|
|
|
+ return new Tuple<>(bulkRetries, searchRetries);
|
|
|
}
|
|
|
|
|
|
private void setupSourceIndex(String name) {
|