Browse Source

Update error states from inside the main state executor (#90346)

* Do a check inside the main cluster state executor to confirm we should apply an error state

This fixes #90337
Simon Cooper 3 years ago
parent
commit
f0aa4e7a42

+ 6 - 0
docs/changelog/90346.yaml

@@ -0,0 +1,6 @@
+pr: 90346
+summary: Update error states from inside the main state executor
+area: Infra/Core
+type: bug
+issues:
+ - 90337

+ 14 - 32
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

@@ -40,6 +40,8 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.ExceptionsHelper.stackTrace;
 import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.checkErrorVersion;
+import static org.elasticsearch.reservedstate.service.ReservedStateErrorTask.isNewError;
 import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
 import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler;
 
@@ -110,7 +112,7 @@ public class ReservedClusterStateService {
             stateChunk = stateChunkParser.apply(parser, null);
         } catch (Exception e) {
             ErrorState errorState = new ErrorState(namespace, -1L, e, ReservedStateErrorMetadata.ErrorKind.PARSING);
-            saveErrorState(clusterService.state(), errorState);
+            updateErrorState(errorState);
             logger.debug("error processing state change request for [{}] with the following errors [{}]", namespace, errorState);
 
             errorListener.accept(
@@ -145,7 +147,7 @@ public class ReservedClusterStateService {
                 ReservedStateErrorMetadata.ErrorKind.PARSING
             );
 
-            saveErrorState(clusterService.state(), errorState);
+            updateErrorState(errorState);
             logger.debug("error processing state change request for [{}] with the following errors [{}]", namespace, errorState);
 
             errorListener.accept(
@@ -167,7 +169,8 @@ public class ReservedClusterStateService {
         // We trial run all handler validations to ensure that we can process all of the cluster state error free. During
         // the trial run we collect 'consumers' (functions) for any non cluster state transforms that need to run.
         var trialRunResult = trialRun(namespace, state, reservedStateChunk, orderedHandlers);
-        var error = checkAndReportError(namespace, trialRunResult.errors, state, reservedStateVersion);
+        // this is not using the modified trial state above, but that doesn't matter, we're just setting errors here
+        var error = checkAndReportError(namespace, trialRunResult.errors, reservedStateVersion);
 
         if (error != null) {
             errorListener.accept(error);
@@ -192,7 +195,7 @@ public class ReservedClusterStateService {
                         nonStateTransformResults,
                         handlers,
                         orderedHandlers,
-                        (clusterState, errorState) -> saveErrorState(clusterState, errorState),
+                        ReservedClusterStateService.this::updateErrorState,
                         new ActionListener<>() {
                             @Override
                             public void onResponse(ActionResponse.Empty empty) {
@@ -220,18 +223,13 @@ public class ReservedClusterStateService {
             @Override
             public void onFailure(Exception e) {
                 // If we encounter an error while runnin the non-state transforms, we avoid saving any cluster state.
-                errorListener.accept(checkAndReportError(namespace, List.of(e.getMessage()), state, reservedStateVersion));
+                errorListener.accept(checkAndReportError(namespace, List.of(e.getMessage()), reservedStateVersion));
             }
         });
     }
 
     // package private for testing
-    Exception checkAndReportError(
-        String namespace,
-        List<String> errors,
-        ClusterState currentState,
-        ReservedStateVersion reservedStateVersion
-    ) {
+    Exception checkAndReportError(String namespace, List<String> errors, ReservedStateVersion reservedStateVersion) {
         // Any errors should be discovered through validation performed in the transform calls
         if (errors.isEmpty() == false) {
             logger.debug("Error processing state change request for [{}] with the following errors [{}]", namespace, errors);
@@ -243,7 +241,7 @@ public class ReservedClusterStateService {
                 ReservedStateErrorMetadata.ErrorKind.VALIDATION
             );
 
-            saveErrorState(currentState, errorState);
+            updateErrorState(errorState);
 
             return new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState);
         }
@@ -252,26 +250,10 @@ public class ReservedClusterStateService {
     }
 
     // package private for testing
-    static boolean isNewError(ReservedStateMetadata existingMetadata, Long newStateVersion) {
-        return (existingMetadata == null
-            || existingMetadata.errorMetadata() == null
-            || newStateVersion <= 0 // version will be -1 when we can't even parse the file, it might be 0 on snapshot restore
-            || existingMetadata.errorMetadata().version() < newStateVersion);
-    }
-
-    // package private for testing
-    void saveErrorState(ClusterState clusterState, ErrorState errorState) {
-        ReservedStateMetadata existingMetadata = clusterState.metadata().reservedStateMetadata().get(errorState.namespace());
-
-        if (isNewError(existingMetadata, errorState.version()) == false) {
-            logger.info(
-                () -> format(
-                    "Not updating error state because version [%s] is less or equal to the last state error version [%s]",
-                    errorState.version(),
-                    existingMetadata.errorMetadata().version()
-                )
-            );
-
+    void updateErrorState(ErrorState errorState) {
+        // optimistic check here - the cluster state might change after this, so also need to re-check later
+        if (checkErrorVersion(clusterService.state(), errorState) == false) {
+            // nothing to update
             return;
         }
 

+ 33 - 0
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.reservedstate.service;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.ClusterState;
@@ -16,6 +18,8 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
 import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
 
+import static org.elasticsearch.core.Strings.format;
+
 /**
  * Cluster state update task that sets the error state of the reserved cluster state metadata.
  * <p>
@@ -23,6 +27,7 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
  * the {@link ReservedStateChunk}.
  */
 public class ReservedStateErrorTask implements ClusterStateTaskListener {
+    private static final Logger logger = LogManager.getLogger(ReservedStateErrorTask.class);
 
     private final ErrorState errorState;
     private final ActionListener<ActionResponse.Empty> listener;
@@ -41,6 +46,34 @@ public class ReservedStateErrorTask implements ClusterStateTaskListener {
         return listener;
     }
 
+    // package private for testing
+    static boolean isNewError(ReservedStateMetadata existingMetadata, Long newStateVersion) {
+        return (existingMetadata == null
+            || existingMetadata.errorMetadata() == null
+            || newStateVersion <= 0 // version will be -1 when we can't even parse the file, it might be 0 on snapshot restore
+            || existingMetadata.errorMetadata().version() < newStateVersion);
+    }
+
+    static boolean checkErrorVersion(ClusterState currentState, ErrorState errorState) {
+        ReservedStateMetadata existingMetadata = currentState.metadata().reservedStateMetadata().get(errorState.namespace());
+        // check for noop here
+        if (isNewError(existingMetadata, errorState.version()) == false) {
+            logger.info(
+                () -> format(
+                    "Not updating error state because version [%s] is less or equal to the last state error version [%s]",
+                    errorState.version(),
+                    existingMetadata.errorMetadata().version()
+                )
+            );
+            return false;
+        }
+        return true;
+    }
+
+    boolean shouldUpdate(ClusterState currentState) {
+        return checkErrorVersion(currentState, errorState);
+    }
+
     ClusterState execute(ClusterState currentState) {
         ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState);
         Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

+ 6 - 2
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java

@@ -25,11 +25,15 @@ record ReservedStateErrorTaskExecutor() implements ClusterStateTaskExecutor<Rese
     @Override
     public ClusterState execute(BatchExecutionContext<ReservedStateErrorTask> batchExecutionContext) {
         var updatedState = batchExecutionContext.initialState();
+
         for (final var taskContext : batchExecutionContext.taskContexts()) {
             final var task = taskContext.getTask();
-            try (var ignored = taskContext.captureResponseHeaders()) {
-                updatedState = task.execute(updatedState);
+            if (task.shouldUpdate(updatedState)) {
+                try (var ignored = taskContext.captureResponseHeaders()) {
+                    updatedState = task.execute(updatedState);
+                }
             }
+            // if the task didn't run, it still 'succeeded', it just didn't have any effect
             taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE));
         }
         return updatedState;

+ 11 - 6
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java

@@ -29,7 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.ExceptionsHelper.stackTrace;
 import static org.elasticsearch.core.Strings.format;
@@ -48,7 +48,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
     private final ReservedStateChunk stateChunk;
     private final Map<String, ReservedClusterStateHandler<?>> handlers;
     private final Collection<String> orderedHandlers;
-    private final BiConsumer<ClusterState, ErrorState> errorReporter;
+    private final Consumer<ErrorState> errorReporter;
     private final ActionListener<ActionResponse.Empty> listener;
     private final Collection<NonStateTransformResult> nonStateTransformResults;
 
@@ -58,7 +58,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
         Collection<NonStateTransformResult> nonStateTransformResults,
         Map<String, ReservedClusterStateHandler<?>> handlers,
         Collection<String> orderedHandlers,
-        BiConsumer<ClusterState, ErrorState> errorReporter,
+        Consumer<ErrorState> errorReporter,
         ActionListener<ActionResponse.Empty> listener
     ) {
         this.namespace = namespace;
@@ -105,7 +105,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
             }
         }
 
-        checkAndThrowOnError(errors, currentState, reservedStateVersion);
+        checkAndThrowOnError(errors, reservedStateVersion);
 
         // Once we have set all of the handler state from the cluster state update tasks, we add the reserved keys
         // from the non cluster state transforms.
@@ -122,7 +122,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
         return stateBuilder.metadata(metadataBuilder).build();
     }
 
-    private void checkAndThrowOnError(List<String> errors, ClusterState currentState, ReservedStateVersion reservedStateVersion) {
+    private void checkAndThrowOnError(List<String> errors, ReservedStateVersion reservedStateVersion) {
         // Any errors should be discovered through validation performed in the transform calls
         if (errors.isEmpty() == false) {
             logger.debug("Error processing state change request for [{}] with the following errors [{}]", namespace, errors);
@@ -134,7 +134,12 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
                 ReservedStateErrorMetadata.ErrorKind.VALIDATION
             );
 
-            errorReporter.accept(currentState, errorState);
+            /*
+             * It doesn't matter this reporter needs to re-access the base state,
+             * any updates set by this task will just be discarded when the below exception is thrown,
+             * and we just need to set the error state once
+             */
+            errorReporter.accept(errorState);
 
             throw new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState);
         }

+ 13 - 11
server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

@@ -153,7 +153,7 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
                 List.of(),
                 Collections.emptyMap(),
                 Collections.emptySet(),
-                (clusterState, errorState) -> {},
+                errorState -> {},
                 new ActionListener<>() {
                     @Override
                     public void onResponse(ActionResponse.Empty empty) {}
@@ -326,10 +326,10 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
         Metadata metadata = Metadata.builder().put(operatorMetadata).build();
         ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();
 
-        assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, 2L));
-        assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, 1L));
-        assertTrue(ReservedClusterStateService.isNewError(operatorMetadata, 3L));
-        assertTrue(ReservedClusterStateService.isNewError(null, 1L));
+        assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L));
+        assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L));
+        assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L));
+        assertTrue(ReservedStateErrorTask.isNewError(null, 1L));
 
         var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, Version.CURRENT));
         var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());
@@ -343,7 +343,7 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
             List.of(),
             Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker),
             orderedHandlers,
-            (clusterState, errorState) -> { assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, errorState.version())); },
+            errorState -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, errorState.version())),
             new ActionListener<>() {
                 @Override
                 public void onResponse(ActionResponse.Empty empty) {}
@@ -484,17 +484,19 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
 
     public void testCheckAndReportError() {
         ClusterService clusterService = mock(ClusterService.class);
+        var state = ClusterState.builder(new ClusterName("elasticsearch")).build();
+        when(clusterService.state()).thenReturn(state);
+
         final var controller = spy(new ReservedClusterStateService(clusterService, List.of()));
 
-        assertNull(controller.checkAndReportError("test", List.of(), null, null));
-        verify(controller, times(0)).saveErrorState(any(), any());
+        assertNull(controller.checkAndReportError("test", List.of(), null));
+        verify(controller, times(0)).updateErrorState(any());
 
-        var state = ClusterState.builder(new ClusterName("elasticsearch")).build();
         var version = new ReservedStateVersion(2L, Version.CURRENT);
-        var error = controller.checkAndReportError("test", List.of("test error"), state, version);
+        var error = controller.checkAndReportError("test", List.of("test error"), version);
         assertThat(error, allOf(notNullValue(), instanceOf(IllegalStateException.class)));
         assertEquals("Error processing state change request for test, errors: test error", error.getMessage());
-        verify(controller, times(1)).saveErrorState(any(), any());
+        verify(controller, times(1)).updateErrorState(any());
     }
 
     public void testTrialRunExtractsNonStateActions() {