瀏覽代碼

Reindex: never report negative throttled_until

Just clamp the value at 0. It isn't useful to tell the user "this
thread should have woken 5ms ago".

Closes #17783
Nik Everett 9 年之前
父節點
當前提交
d3b1306069

+ 2 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.lang.Math.max;
 import static java.lang.Math.round;
 import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
 
@@ -93,7 +94,7 @@ public class BulkByScrollTask extends CancellableTask {
         if (delayed.future == null) {
             return timeValueNanos(0);
         }
-        return timeValueNanos(delayed.future.getDelay(TimeUnit.NANOSECONDS));
+        return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS)));
     }
 
     /**

+ 63 - 0
modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java

@@ -29,8 +29,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
@@ -206,4 +209,64 @@ public class BulkByScrollTaskTests extends ESTestCase {
         }
         assertThat(errors, empty());
     }
+
+    public void testDelayNeverNegative() throws IOException {
+        // Thread pool that returns a ScheduledFuture that claims to have a negative delay
+        ThreadPool threadPool = new ThreadPool("test") {
+            public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
+                return new ScheduledFuture<Void>() {
+                    @Override
+                    public long getDelay(TimeUnit unit) {
+                        return -1;
+                    }
+
+                    @Override
+                    public int compareTo(Delayed o) {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public boolean cancel(boolean mayInterruptIfRunning) {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public boolean isCancelled() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public boolean isDone() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public Void get() throws InterruptedException, ExecutionException {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+        try {
+            // Have the task use the thread pool to delay a task that does nothing
+            task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), new AbstractRunnable() {
+                @Override
+                protected void doRun() throws Exception {
+                }
+                @Override
+                public void onFailure(Throwable t) {
+                    throw new UnsupportedOperationException();
+                }
+            });
+            // Even though the future returns a negative delay we just return 0 because the time is up.
+            assertEquals(timeValueSeconds(0), task.getStatus().getThrottledUntil());
+        } finally {
+            threadPool.shutdown();
+        }
+    }
 }