Bläddra i källkod

Timeout health API on busy master (#57587)

Today `GET _cluster/health?wait_for_events=...&timeout=...` will wait
indefinitely for the master to process the pending cluster health task,
ignoring the specified timeout. This could take a very long time if the master
is overloaded. This commit fixes this by adding a timeout to the pending
cluster health task.
David Turner 5 år sedan
förälder
incheckning
d81ea8e7c7

+ 57 - 9
server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java

@@ -22,11 +22,13 @@ package org.elasticsearch.cluster;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
 
@@ -261,12 +263,13 @@ public class ClusterHealthIT extends ESIntegTestCase {
         clusterHealthThread.join();
     }
 
-    public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception {
+    public void testWaitForEventsRetriesIfOtherConditionsNotMet() {
         final ActionFuture<ClusterHealthResponse> healthResponseFuture
             = client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute();
 
         final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
         final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
+        final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
         clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
@@ -275,6 +278,7 @@ public class ClusterHealthIT extends ESIntegTestCase {
 
                 @Override
                 public void onFailure(String source, Exception e) {
+                    completionFuture.onFailure(e);
                     throw new AssertionError(source, e);
                 }
 
@@ -282,19 +286,25 @@ public class ClusterHealthIT extends ESIntegTestCase {
                 public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                     if (keepSubmittingTasks.get()) {
                         clusterService.submitStateUpdateTask("looping task", this);
+                    } else {
+                        completionFuture.onResponse(null);
                     }
                 }
             });
 
-        createIndex("index");
-        assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
+        try {
+            createIndex("index");
+            assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
 
-        // at this point the original health response should not have returned: there was never a point where the index was green AND
-        // the master had processed all pending tasks above LANGUID priority.
-        assertFalse(healthResponseFuture.isDone());
-
-        keepSubmittingTasks.set(false);
-        assertFalse(healthResponseFuture.get().isTimedOut());
+            // at this point the original health response should not have returned: there was never a point where the index was green AND
+            // the master had processed all pending tasks above LANGUID priority.
+            assertFalse(healthResponseFuture.isDone());
+            keepSubmittingTasks.set(false);
+            assertFalse(healthResponseFuture.actionGet(TimeValue.timeValueSeconds(30)).isTimedOut());
+        } finally {
+            keepSubmittingTasks.set(false);
+            completionFuture.actionGet(TimeValue.timeValueSeconds(30));
+        }
     }
 
     public void testHealthOnMasterFailover() throws Exception {
@@ -311,4 +321,42 @@ public class ClusterHealthIT extends ESIntegTestCase {
             assertSame(responseFuture.get().getStatus(), ClusterHealthStatus.GREEN);
         }
     }
+
+    public void testWaitForEventsTimesOutIfMasterBusy() {
+        final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
+        final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
+        final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
+        clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                return currentState;
+            }
+
+            @Override
+            public void onFailure(String source, Exception e) {
+                completionFuture.onFailure(e);
+                throw new AssertionError(source, e);
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                if (keepSubmittingTasks.get()) {
+                    clusterService.submitStateUpdateTask("looping task", this);
+                } else {
+                    completionFuture.onResponse(null);
+                }
+            }
+        });
+
+        try {
+            final ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
+                .setWaitForEvents(Priority.LANGUID)
+                .setTimeout(TimeValue.timeValueSeconds(1))
+                .get(TimeValue.timeValueSeconds(30));
+            assertTrue(clusterHealthResponse.isTimedOut());
+        } finally {
+            keepSubmittingTasks.set(false);
+            completionFuture.actionGet(TimeValue.timeValueSeconds(30));
+        }
+    }
 }

+ 29 - 14
server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -96,7 +97,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
             waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
         } else {
             executeHealth(request, clusterService.state(), listener, waitCount,
-                clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false)));
+                clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, TimeoutState.OK)));
         }
     }
 
@@ -129,6 +130,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
                     }
                 });
         } else {
+            final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
             clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
                 new ClusterStateUpdateTask(request.waitForEvents()) {
                     @Override
@@ -136,6 +138,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
                         return currentState;
                     }
 
+                    @Override
+                    public TimeValue timeout() {
+                        return taskTimeout;
+                    }
+
                     @Override
                     public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                         final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
@@ -161,8 +168,12 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
 
                     @Override
                     public void onFailure(String source, Exception e) {
-                        logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
-                        listener.onFailure(e);
+                        if (e instanceof ProcessClusterEventTimeoutException) {
+                            listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
+                        } else {
+                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
+                            listener.onFailure(e);
+                        }
                     }
                 });
         }
@@ -175,13 +186,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
                                final Consumer<ClusterState> onNewClusterStateAfterDelay) {
 
         if (request.timeout().millis() == 0) {
-            listener.onResponse(getResponse(request, currentState, waitCount, true));
+            listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT));
             return;
         }
 
         final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
         if (validationPredicate.test(currentState)) {
-            listener.onResponse(getResponse(request, currentState, waitCount, false));
+            listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
         } else {
             final ClusterStateObserver observer
                 = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
@@ -198,7 +209,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
 
                 @Override
                 public void onTimeout(TimeValue timeout) {
-                    listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true));
+                    listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT));
                 }
             };
             observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
@@ -234,19 +245,23 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
         return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
     }
 
+    private enum TimeoutState {
+        OK,
+        TIMED_OUT,
+        ZERO_TIMEOUT
+    }
+
     private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
-                                              final int waitFor, boolean timedOut) {
+                                              final int waitFor, TimeoutState timeoutState) {
         ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
             allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
         int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
         boolean valid = (readyCounter == waitFor);
-        assert valid || timedOut;
-        // we check for a timeout here since this method might be called from the wait_for_events
-        // response handler which might have timed out already.
-        // if the state is sufficient for what we where waiting for we don't need to mark this as timedOut.
-        // We spend too much time in waiting for events such that we might already reached a valid state.
-        // this should not mark the request as timed out
-        response.setTimedOut(timedOut && valid == false);
+        assert valid || (timeoutState != TimeoutState.OK);
+        // If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't
+        // consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we
+        // failed on wait_for_events) so this does count as a timeout.
+        response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT);
         return response;
     }