|
@@ -78,7 +78,7 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
private void testCase(AbstractBulkByScrollRequestBuilder<?, ?> request, String actionName) throws Exception {
|
|
|
logger.info("Starting test for [{}] with [{}] slices", actionName, request.request().getSlices());
|
|
|
/* Add ten documents per slice so most slices will have many documents to process, having to go to multiple batches.
|
|
|
- * we can't rely on all of them doing so, but
|
|
|
+ * We can't rely on the slices being evenly sized but 10 means we have some pretty big slices.
|
|
|
*/
|
|
|
|
|
|
createIndex("test");
|
|
@@ -170,6 +170,8 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
|
|
|
// Now the response should come back quickly because we've rethrottled the request
|
|
|
BulkByScrollResponse response = responseListener.get();
|
|
|
+
|
|
|
+ // It'd be bad if the entire require completed in a single batch. The test wouldn't be testing anything.
|
|
|
assertThat("Entire request completed in a single batch. This may invalidate the test as throttling is done between batches.",
|
|
|
response.getBatches(), greaterThanOrEqualTo(numSlices));
|
|
|
}
|
|
@@ -189,8 +191,9 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
assertThat(rethrottleResponse.getTasks(), hasSize(1));
|
|
|
response.set(rethrottleResponse);
|
|
|
} catch (ElasticsearchException e) {
|
|
|
- // if it's the error we're expecting, rethrow as AssertionError so awaitBusy doesn't exit early
|
|
|
if (e.getCause() instanceof IllegalArgumentException) {
|
|
|
+ // We want to retry in this case so we throw an assertion error
|
|
|
+ logger.info("caught unprepared task, retrying until prepared");
|
|
|
throw new AssertionError("Rethrottle request for task [" + taskToRethrottle.getId() + "] failed", e);
|
|
|
} else {
|
|
|
throw e;
|
|
@@ -206,14 +209,32 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
do {
|
|
|
ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions(actionName).setDetailed(true).get();
|
|
|
tasks.rethrowFailures("Finding tasks to rethrottle");
|
|
|
- assertThat(tasks.getTaskGroups(), hasSize(lessThan(2)));
|
|
|
+ assertThat("tasks are left over from the last execution of this test",
|
|
|
+ tasks.getTaskGroups(), hasSize(lessThan(2)));
|
|
|
if (0 == tasks.getTaskGroups().size()) {
|
|
|
+ // The parent task hasn't started yet
|
|
|
continue;
|
|
|
}
|
|
|
TaskGroup taskGroup = tasks.getTaskGroups().get(0);
|
|
|
- if (sliceCount != 1 && taskGroup.getChildTasks().size() == 0) {
|
|
|
- // If there are child tasks wait for at least one to start
|
|
|
- continue;
|
|
|
+ if (sliceCount != 1) {
|
|
|
+ BulkByScrollTask.Status status = (BulkByScrollTask.Status) taskGroup.getTaskInfo().getStatus();
|
|
|
+ /*
|
|
|
+ * If there are child tasks wait for all of them to start. It
|
|
|
+ * is possible that we'll end up with some very small slices
|
|
|
+ * (maybe even empty!) that complete super fast so we have to
|
|
|
+ * count them too.
|
|
|
+ */
|
|
|
+ long finishedChildStatuses = status.getSliceStatuses().stream()
|
|
|
+ .filter(n -> n != null)
|
|
|
+ .count();
|
|
|
+ logger.info("Expected [{}] total children, [{}] are running and [{}] are finished\n{}",
|
|
|
+ sliceCount, taskGroup.getChildTasks().size(), finishedChildStatuses, status.getSliceStatuses());
|
|
|
+ if (sliceCount == finishedChildStatuses) {
|
|
|
+ fail("all slices finished:\n" + status);
|
|
|
+ }
|
|
|
+ if (sliceCount != taskGroup.getChildTasks().size() + finishedChildStatuses) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
return taskGroup;
|
|
|
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
|