|
@@ -33,6 +33,7 @@ import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
+import static org.hamcrest.Matchers.both;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
@@ -120,9 +121,15 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
assertEquals(newRequestsPerSecond, status.getRequestsPerSecond(), Float.MIN_NORMAL);
|
|
|
} else {
|
|
|
/* Check that at least one slice was rethrottled. We won't always rethrottle all of them because they might have completed.
|
|
|
- * With multiple slices these numbers might not add up perfectly, thus the 0.0001f. */
|
|
|
- float expectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? Float.POSITIVE_INFINITY
|
|
|
- : newRequestsPerSecond / request.request().getSlices();
|
|
|
+ * With multiple slices these numbers might not add up perfectly, thus the 1.01F. */
|
|
|
+ long unfinished = status.getSliceStatuses().stream()
|
|
|
+ .filter(slice -> slice != null)
|
|
|
+ .filter(slice -> slice.getStatus().getTotal() > slice.getStatus().getSuccessfullyProcessed())
|
|
|
+ .count();
|
|
|
+ float maxExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ?
|
|
|
+ Float.POSITIVE_INFINITY : (newRequestsPerSecond / unfinished) * 1.01F;
|
|
|
+ float minExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ?
|
|
|
+ Float.POSITIVE_INFINITY : (newRequestsPerSecond / request.request().getSlices()) * 0.99F;
|
|
|
boolean oneSliceRethrottled = false;
|
|
|
float totalRequestsPerSecond = 0;
|
|
|
for (BulkByScrollTask.StatusOrException statusOrException : status.getSliceStatuses()) {
|
|
@@ -134,10 +141,12 @@ public class RethrottleTests extends ReindexTestCase {
|
|
|
assertNull(statusOrException.getException());
|
|
|
BulkByScrollTask.Status slice = statusOrException.getStatus();
|
|
|
if (slice.getTotal() > slice.getSuccessfullyProcessed()) {
|
|
|
- assertEquals(expectedSliceRequestsPerSecond, slice.getRequestsPerSecond(), expectedSliceRequestsPerSecond * 0.0001f);
|
|
|
+ // This slice reports as not having completed so it should have been processed.
|
|
|
+ assertThat(slice.getRequestsPerSecond(), both(greaterThanOrEqualTo(minExpectedSliceRequestsPerSecond))
|
|
|
+ .and(lessThanOrEqualTo(maxExpectedSliceRequestsPerSecond)));
|
|
|
}
|
|
|
- if (Math.abs(expectedSliceRequestsPerSecond - slice.getRequestsPerSecond()) <= expectedSliceRequestsPerSecond * 0.0001f
|
|
|
- || expectedSliceRequestsPerSecond == slice.getRequestsPerSecond()) {
|
|
|
+ if (minExpectedSliceRequestsPerSecond <= slice.getRequestsPerSecond()
|
|
|
+ && slice.getRequestsPerSecond() <= maxExpectedSliceRequestsPerSecond) {
|
|
|
oneSliceRethrottled = true;
|
|
|
}
|
|
|
totalRequestsPerSecond += slice.getRequestsPerSecond();
|