|
@@ -34,10 +34,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
|
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
|
-import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
|
|
|
-import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
|
+import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
|
|
|
+import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
import org.elasticsearch.action.delete.DeleteResponse;
|
|
import org.elasticsearch.action.delete.DeleteResponse;
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
@@ -199,7 +199,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
|
|
client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
|
|
action.setScroll(scrollId());
|
|
action.setScroll(scrollId());
|
|
- action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
|
|
|
|
|
|
+ TimeValue now = timeValueNanos(System.nanoTime());
|
|
|
|
+ action.startNextScroll(now, now, 0);
|
|
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
|
|
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
|
|
if (listener.isDone()) {
|
|
if (listener.isDone()) {
|
|
Object result = listener.get();
|
|
Object result = listener.get();
|
|
@@ -213,7 +214,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
|
|
client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
|
|
action.setScroll(scrollId());
|
|
action.setScroll(scrollId());
|
|
- action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
|
|
|
|
|
|
+ TimeValue now = timeValueNanos(System.nanoTime());
|
|
|
|
+ action.startNextScroll(now, now, 0);
|
|
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
|
|
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
|
|
assertBusy(() -> assertTrue(listener.isDone()));
|
|
assertBusy(() -> assertTrue(listener.isDone()));
|
|
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
|
|
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
|
|
@@ -438,7 +440,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
// Set throttle to 1 request per second to make the math simpler
|
|
// Set throttle to 1 request per second to make the math simpler
|
|
testTask.rethrottle(1f);
|
|
testTask.rethrottle(1f);
|
|
// Make the last batch look nearly instant but have 100 documents
|
|
// Make the last batch look nearly instant but have 100 documents
|
|
- action.startNextScroll(timeValueNanos(System.nanoTime()), 100);
|
|
|
|
|
|
+ TimeValue lastBatchStartTime = timeValueNanos(System.nanoTime());
|
|
|
|
+ TimeValue now = timeValueNanos(lastBatchStartTime.nanos() + 1);
|
|
|
|
+ action.startNextScroll(lastBatchStartTime, now, 100);
|
|
|
|
|
|
// So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish)
|
|
// So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish)
|
|
assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L)));
|
|
assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L)));
|
|
@@ -451,14 +455,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
|
|
|
|
if (randomBoolean()) {
|
|
if (randomBoolean()) {
|
|
client.lastScroll.get().listener.onResponse(searchResponse);
|
|
client.lastScroll.get().listener.onResponse(searchResponse);
|
|
- // The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it.
|
|
|
|
- assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L)));
|
|
|
|
|
|
+ assertEquals(99, capturedDelay.get().seconds());
|
|
} else {
|
|
} else {
|
|
// Let's rethrottle between the starting the scroll and getting the response
|
|
// Let's rethrottle between the starting the scroll and getting the response
|
|
testTask.rethrottle(10f);
|
|
testTask.rethrottle(10f);
|
|
client.lastScroll.get().listener.onResponse(searchResponse);
|
|
client.lastScroll.get().listener.onResponse(searchResponse);
|
|
// The delay uses the new throttle
|
|
// The delay uses the new throttle
|
|
- assertThat(capturedDelay.get().seconds(), either(equalTo(10L)).or(equalTo(9L)));
|
|
|
|
|
|
+ assertEquals(9, capturedDelay.get().seconds());
|
|
}
|
|
}
|
|
|
|
|
|
// Running the command ought to increment the delay counter on the task.
|
|
// Running the command ought to increment the delay counter on the task.
|
|
@@ -483,7 +486,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
CountDownLatch successLatch = new CountDownLatch(1);
|
|
CountDownLatch successLatch = new CountDownLatch(1);
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
|
|
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
|
|
@Override
|
|
@Override
|
|
- void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) {
|
|
|
|
|
|
+ void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
|
|
successLatch.countDown();
|
|
successLatch.countDown();
|
|
}
|
|
}
|
|
};
|
|
};
|
|
@@ -574,7 +577,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
public void testCancelBeforeStartNextScroll() throws Exception {
|
|
public void testCancelBeforeStartNextScroll() throws Exception {
|
|
- cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0));
|
|
|
|
|
|
+ TimeValue now = timeValueNanos(System.nanoTime());
|
|
|
|
+ cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(now, now, 0));
|
|
}
|
|
}
|
|
|
|
|
|
public void testCancelBeforeRefreshAndFinish() throws Exception {
|
|
public void testCancelBeforeRefreshAndFinish() throws Exception {
|