Browse Source

Fence retry delays from growing exceedingly large (#95515)

Currently the retry delays can grow significantly beyond the timeout in
the retryable action. This has surfaced itself in tests. This commit
improves the situation by only allowing a retry to be scheduled 20%
beyond the configured timeout. Additionally, it allows a max delay bound
parameter to be passed which will limit the max bound that can occur
during exponential backoffs.

Fixes #76165.
Tim Brooks 2 years ago
parent
commit
366d4236e4

+ 47 - 9
server/src/main/java/org/elasticsearch/action/support/RetryableAction.java

@@ -35,6 +35,7 @@ public abstract class RetryableAction<Response> {
     private final AtomicBoolean isDone = new AtomicBoolean(false);
     private final ThreadPool threadPool;
     private final long initialDelayMillis;
+    private final long maxDelayBoundMillis;
     private final long timeoutMillis;
     private final long startMillis;
     private final ActionListener<Response> finalListener;
@@ -59,17 +60,47 @@ public abstract class RetryableAction<Response> {
         TimeValue timeoutValue,
         ActionListener<Response> listener,
         String executor
+    ) {
+        this(logger, threadPool, initialDelay, TimeValue.MAX_VALUE, timeoutValue, listener, executor);
+    }
+
+    public RetryableAction(
+        Logger logger,
+        ThreadPool threadPool,
+        TimeValue initialDelay,
+        TimeValue maxDelayBound,
+        TimeValue timeoutValue,
+        ActionListener<Response> listener
+    ) {
+        this(logger, threadPool, initialDelay, maxDelayBound, timeoutValue, listener, ThreadPool.Names.SAME);
+    }
+
+    public RetryableAction(
+        Logger logger,
+        ThreadPool threadPool,
+        TimeValue initialDelay,
+        TimeValue maxDelayBound,
+        TimeValue timeoutValue,
+        ActionListener<Response> listener,
+        String executor
     ) {
         this.logger = logger;
         this.threadPool = threadPool;
         this.initialDelayMillis = initialDelay.getMillis();
+        this.maxDelayBoundMillis = maxDelayBound.getMillis();
         if (initialDelayMillis < 1) {
             throw new IllegalArgumentException("Initial delay was less than 1 millisecond: " + initialDelay);
         }
+        if (maxDelayBoundMillis < initialDelayMillis) {
+            throw new IllegalArgumentException(
+                "Max delay bound [" + maxDelayBound + "] cannot be less than the initial delay [" + initialDelay + "]"
+            );
+        }
         this.timeoutMillis = timeoutValue.getMillis();
         this.startMillis = threadPool.relativeTimeInMillis();
         this.finalListener = listener;
         this.executor = executor;
+
     }
 
     public void run() {
@@ -104,8 +135,6 @@ public abstract class RetryableAction<Response> {
             @Override
             public void onRejection(Exception e) {
                 retryTask = null;
-                // TODO: The only implementations of this class use SAME which means the execution will not be
-                // rejected. Future implementations can adjust this functionality as needed.
                 onFailure(e);
             }
         };
@@ -116,11 +145,7 @@ public abstract class RetryableAction<Response> {
     public abstract boolean shouldRetry(Exception e);
 
     protected long calculateDelayBound(long previousDelayBound) {
-        return Math.min(previousDelayBound * 2, Integer.MAX_VALUE);
-    }
-
-    protected static long minimumDelayMillis() {
-        return 0L;
+        return Math.min(previousDelayBound * 2, maxDelayBoundMillis);
     }
 
     public void onFinished() {}
@@ -149,17 +174,30 @@ public abstract class RetryableAction<Response> {
         public void onFailure(Exception e) {
             if (shouldRetry(e)) {
                 final long elapsedMillis = threadPool.relativeTimeInMillis() - startMillis;
-                if (elapsedMillis >= timeoutMillis) {
+                long remainingMillis = timeoutMillis - elapsedMillis;
+                if (remainingMillis <= 0) {
                     logger.debug(() -> format("retryable action timed out after %s", TimeValue.timeValueMillis(elapsedMillis)), e);
                     onFinalFailure(e);
                 } else {
                     addException(e);
 
+                    // Adjust the max
                     final long nextDelayMillisBound = calculateDelayBound(delayMillisBound);
                     final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
                     final Runnable runnable = createRunnable(retryingListener);
                     int range = Math.toIntExact((delayMillisBound + 1) / 2);
-                    final long delayMillis = Randomness.get().nextInt(range) + delayMillisBound - range + 1L;
+                    long delayMillis = Randomness.get().nextInt(range) + delayMillisBound - range + 1L;
+
+                    long millisExceedingTimeout = delayMillis - remainingMillis;
+                    if (millisExceedingTimeout > 0) {
+                        long twentyPercent = (long) (timeoutMillis * .2);
+                        if (millisExceedingTimeout > twentyPercent) {
+                            // Adjust the actual delay to only exceed the timeout by 10-20%
+                            int tenPercent = Math.toIntExact((long) (timeoutMillis * .1));
+                            int delayBeyondTimeout = Randomness.get().nextInt(tenPercent) + tenPercent;
+                            delayMillis = remainingMillis + delayBeyondTimeout;
+                        }
+                    }
                     assert delayMillis > 0;
                     if (isDone.get() == false) {
                         final TimeValue delay = TimeValue.timeValueMillis(delayMillis);

+ 45 - 3
server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java

@@ -108,7 +108,6 @@ public class RetryableActionTests extends ESTestCase {
         assertTrue(future.actionGet());
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/76165")
     public void testRetryableActionTimeout() {
         final AtomicInteger retryCount = new AtomicInteger();
         final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
@@ -152,8 +151,8 @@ public class RetryableActionTests extends ESTestCase {
         expectThrows(EsRejectedExecutionException.class, future::actionGet);
 
         long end = taskQueue.getCurrentTimeMillis();
-        // max 3x timeout since we minimum wait half the bound for every retry.
-        assertThat(end - begin, lessThanOrEqualTo(3000L));
+        // max 20% greater than the timeout.
+        assertThat(end - begin, lessThanOrEqualTo(1200L));
     }
 
     public void testTimeoutOfZeroMeansNoRetry() {
@@ -251,4 +250,47 @@ public class RetryableActionTests extends ESTestCase {
         assertEquals(1, executedCount.get());
         expectThrows(ElasticsearchException.class, future::actionGet);
     }
+
+    public void testMaxDelayBound() {
+        final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
+        final RetryableAction<Boolean> retryableAction = new RetryableAction<>(
+            logger,
+            taskQueue.getThreadPool(),
+            TimeValue.timeValueMillis(10),
+            TimeValue.timeValueMillis(50),
+            TimeValue.timeValueSeconds(1),
+            future
+        ) {
+
+            @Override
+            public void tryAction(ActionListener<Boolean> listener) {
+                if (randomBoolean()) {
+                    listener.onFailure(new EsRejectedExecutionException());
+                } else {
+                    throw new EsRejectedExecutionException();
+                }
+            }
+
+            @Override
+            public boolean shouldRetry(Exception e) {
+                return e instanceof EsRejectedExecutionException;
+            }
+        };
+        retryableAction.run();
+        taskQueue.runAllRunnableTasks();
+        long previousDeferredTime = 0;
+        while (previousDeferredTime < 1000) {
+            assertTrue(taskQueue.hasDeferredTasks());
+            long latestDeferredExecutionTime = taskQueue.getLatestDeferredExecutionTime();
+            assertThat(latestDeferredExecutionTime - previousDeferredTime, lessThanOrEqualTo(50L));
+            previousDeferredTime = latestDeferredExecutionTime;
+            taskQueue.advanceTime();
+            taskQueue.runAllRunnableTasks();
+        }
+
+        assertFalse(taskQueue.hasDeferredTasks());
+        assertFalse(taskQueue.hasRunnableTasks());
+
+        expectThrows(EsRejectedExecutionException.class, future::actionGet);
+    }
 }