Browse Source

Reindex negative TimeValue fix (#54057)

Reindex would use timeValueNanos(System.nanoTime()). The intended use
for TimeValue is as a duration, not as absolute time. In particular,
this could result in negative TimeValue's, being unsupported in #53913.
Modified to use the bare long nano-second value.
Henning Andersen 5 years ago
parent
commit
2a67bee874

+ 10 - 10
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -247,16 +247,16 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
         // lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
         // like a bug?
-        onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
+        onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
     }
 
     /**
      * Process a scroll response.
-     * @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
+     * @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
      * @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
      * @param asyncResponse the response to process from ScrollableHitSource
      */
-    void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
+    void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
         ScrollableHitSource.Response response = asyncResponse.response();
         logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
         if (task.isCancelled()) {
@@ -284,7 +284,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
                  * It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
                  * waiting on the scroll doesn't count against this batch in the throttle.
                  */
-                prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
+                prepareBulkRequest(System.nanoTime(), asyncResponse);
             }
 
             @Override
@@ -293,7 +293,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             }
         };
         prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
-        worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
+        worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
     }
 
     /**
@@ -301,7 +301,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
      * thread may be blocked by the user script.
      */
-    void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
+    void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
         ScrollableHitSource.Response response = asyncResponse.response();
         logger.debug("[{}]: preparing bulk request", task.getId());
         if (task.isCancelled()) {
@@ -327,12 +327,12 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             /*
              * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
              */
-            notifyDone(thisBatchStartTime, asyncResponse, 0);
+            notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
             return;
         }
         request.timeout(mainRequest.getTimeout());
         request.waitForActiveShards(mainRequest.getWaitForActiveShards());
-        sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
+        sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
     }
 
     /**
@@ -418,14 +418,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         }
     }
 
-    void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
+    void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
         if (task.isCancelled()) {
             logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
         this.lastBatchSize = batchSize;
-        asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
+        asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
     }
 
     private void recordFailure(Failure failure, List<Failure> failures) {

+ 10 - 11
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -111,7 +111,6 @@ import static java.util.Collections.synchronizedSet;
 import static org.apache.lucene.util.TestUtil.randomSimpleString;
 import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
-import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
@@ -255,7 +254,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         long total = randomIntBetween(0, Integer.MAX_VALUE);
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
-        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
         assertEquals(total, testTask.getStatus().getTotal());
     }
 
@@ -268,7 +267,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
             Hit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
             ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
             DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
-            simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
+            simulateScrollResponse(action, System.nanoTime(), 0, response);
 
             // Use assert busy because the update happens on another thread
             final int expectedBatches = batches;
@@ -354,7 +353,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
             }
         });
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
-        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
         ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
         assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
         assertThat(e.getCause(), hasToString(containsString("test")));
@@ -372,7 +371,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
         ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
                 emptyList(), null);
-        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
         BulkByScrollResponse response = listener.get();
         assertThat(response.getBulkFailures(), empty());
         assertThat(response.getSearchFailures(), contains(shardFailure));
@@ -386,7 +385,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
      */
     public void testSearchTimeoutsAbortRequest() throws Exception {
         ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
-        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
         BulkByScrollResponse response = listener.get();
         assertThat(response.getBulkFailures(), empty());
         assertThat(response.getSearchFailures(), empty());
@@ -423,7 +422,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
         hit.setSource(new BytesArray("{}"), XContentType.JSON);
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
-        simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
+        simulateScrollResponse(action, System.nanoTime(), 0, response);
         ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
         assertThat(e.getCause(), instanceOf(RuntimeException.class));
         assertThat(e.getCause().getMessage(), equalTo("surprise"));
@@ -619,7 +618,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     public void testCancelBeforeScrollResponse() throws Exception {
-        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
                 new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
     }
 
@@ -634,7 +633,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     public void testCancelBeforeStartNextScroll() throws Exception {
-        TimeValue now = timeValueNanos(System.nanoTime());
+        long now = System.nanoTime();
         cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
     }
 
@@ -683,7 +682,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
         // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
         worker.rethrottle(1);
-        simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
+        simulateScrollResponse(action, System.nanoTime(), 1000, response);
 
         // Now that we've got our cancel we'll just verify that it all came through all right
         assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
@@ -712,7 +711,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     /**
      * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
      */
-    private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
+    private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
             ScrollableHitSource.Response response) {
         action.setScroll(scrollId());
         action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {

+ 4 - 4
server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java

@@ -182,11 +182,11 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
      * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
      * rescheduled over and over again.
      */
-    public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
+    public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
                                         AbstractRunnable prepareBulkRequestRunnable) {
         // Synchronize so we are less likely to schedule the same request twice.
         synchronized (delayedPrepareBulkRequestReference) {
-            TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
+            TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
             logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
             try {
                 delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
@@ -197,8 +197,8 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
         }
     }
 
-    public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
-        long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
+    public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
+        long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
         long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
         return timeValueNanos(waitTime);
     }

+ 2 - 3
server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java

@@ -36,7 +36,6 @@ import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.closeTo;
@@ -152,7 +151,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
             }
         };
         try {
-            workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay,
+            workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay,
                 new AbstractRunnable() {
                     @Override
                     protected void doRun() throws Exception {
@@ -225,7 +224,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
         };
         try {
             // Have the task use the thread pool to delay a task that does nothing
-            workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() {
+            workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() {
                 @Override
                 protected void doRun() throws Exception {
                 }