Browse Source

Ensure tasks preserve versions in `MasterService` (#109850)

`ClusterState#version`, `Metadata#version` and `RoutingTable#version`
are all managed solely by the `MasterService`, in the sense that it's a
definite bug for the cluster state update task executor to meddle with
them. Today if we encounter such a bug then we try and publish the
resulting state anyway, which hopefully fails (triggering a master
election) but it may in theory succeed (potentially reverting older
cluster state updates). Neither is a particularly good outcome.

With this commit we add a check for consistency of these version numbers
during the cluster state computation and fail the state update without a
master failover if a discrepancy is found.

It also fixes a super-subtle bug in `TransportMigrateToDataTiersAction`
that can muck up these version numbers.
David Turner 1 year ago
parent
commit
5aa9f442a3

+ 5 - 0
docs/changelog/109850.yaml

@@ -0,0 +1,5 @@
+pr: 109850
+summary: Ensure tasks preserve versions in `MasterService`
+area: Cluster Coordination
+type: bug
+issues: []

+ 31 - 1
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -524,6 +524,24 @@ public class MasterService extends AbstractLifecycleComponent {
         return ClusterState.builder(clusterState).incrementVersion();
     }
 
+    private static boolean versionNumbersPreserved(ClusterState oldState, ClusterState newState) {
+        if (oldState.nodes().getMasterNodeId() == null && newState.nodes().getMasterNodeId() != null) {
+            return true; // NodeJoinExecutor is special, we trust it to do the right thing with versions
+        }
+
+        if (oldState.version() != newState.version()) {
+            return false;
+        }
+        if (oldState.metadata().version() != newState.metadata().version()) {
+            return false;
+        }
+        if (oldState.routingTable().version() != newState.routingTable().version()) {
+            // GatewayService is special and for odd legacy reasons gets to do this:
+            return oldState.clusterRecovered() == false && newState.clusterRecovered() && newState.routingTable().version() == 0;
+        }
+        return true;
+    }
+
     /**
      * Submits an unbatched cluster state update task. This method exists for legacy reasons but is deprecated and forbidden in new
      * production code because unbatched tasks are a source of performance and stability bugs. You should instead implement your update
@@ -1035,6 +1053,8 @@ public class MasterService extends AbstractLifecycleComponent {
         return true;
     }
 
+    static final String TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME = "test_only_executor_may_change_version_number";
+
     private static <T extends ClusterStateTaskListener> ClusterState innerExecuteTasks(
         ClusterState previousClusterState,
         List<ExecutionResult<T>> executionResults,
@@ -1047,13 +1067,23 @@ public class MasterService extends AbstractLifecycleComponent {
             // to avoid leaking headers in production that were missed by tests
 
             try {
-                return executor.execute(
+                final var updatedState = executor.execute(
                     new ClusterStateTaskExecutor.BatchExecutionContext<>(
                         previousClusterState,
                         executionResults,
                         threadContext::newStoredContext
                     )
                 );
+                if (versionNumbersPreserved(previousClusterState, updatedState) == false) {
+                    // Shenanigans! Executors mustn't meddle with version numbers. Perhaps the executor based its update on the wrong
+                    // initial state, potentially losing an intervening cluster state update. That'd be very bad!
+                    final var exception = new IllegalStateException(
+                        "cluster state update executor did not preserve version numbers: [" + summary.toString() + "]"
+                    );
+                    assert threadContext.getTransient(TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME) != null : exception;
+                    throw exception;
+                }
+                return updatedState;
             } catch (Exception e) {
                 logger.trace(
                     () -> format(

+ 70 - 4
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.component.Lifecycle;
@@ -77,6 +78,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptySet;
@@ -93,6 +95,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
 
 public class MasterServiceTests extends ESTestCase {
 
@@ -498,7 +501,7 @@ public class MasterServiceTests extends ESTestCase {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
-                    return ClusterState.builder(currentState).incrementVersion().build();
+                    return ClusterState.builder(currentState).build();
                 }
 
                 @Override
@@ -1243,7 +1246,7 @@ public class MasterServiceTests extends ESTestCase {
                 public ClusterState execute(ClusterState currentState) {
                     relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
                         + randomLongBetween(1, 1000000);
-                    return ClusterState.builder(currentState).incrementVersion().build();
+                    return ClusterState.builder(currentState).build();
                 }
 
                 @Override
@@ -1277,7 +1280,7 @@ public class MasterServiceTests extends ESTestCase {
             masterService.submitUnbatchedStateUpdateTask("test5", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    return ClusterState.builder(currentState).incrementVersion().build();
+                    return ClusterState.builder(currentState).build();
                 }
 
                 @Override
@@ -1293,7 +1296,7 @@ public class MasterServiceTests extends ESTestCase {
             masterService.submitUnbatchedStateUpdateTask("test6", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    return ClusterState.builder(currentState).incrementVersion().build();
+                    return ClusterState.builder(currentState).build();
                 }
 
                 @Override
@@ -2592,6 +2595,69 @@ public class MasterServiceTests extends ESTestCase {
         }
     }
 
+    public void testVersionNumberProtection() {
+        runVersionNumberProtectionTest(
+            currentState -> ClusterState.builder(currentState)
+                .version(randomFrom(currentState.version() - 1, currentState.version() + 1))
+                .build()
+        );
+
+        runVersionNumberProtectionTest(
+            currentState -> currentState.copyAndUpdateMetadata(
+                b -> b.version(randomFrom(currentState.metadata().version() - 1, currentState.metadata().version() + 1))
+            )
+        );
+
+        runVersionNumberProtectionTest(
+            currentState -> ClusterState.builder(currentState)
+                .routingTable(
+                    RoutingTable.builder(currentState.routingTable())
+                        .version(randomFrom(currentState.routingTable().version() - 1, currentState.routingTable().version() + 1))
+                        .build()
+                )
+                .build()
+        );
+    }
+
+    private void runVersionNumberProtectionTest(UnaryOperator<ClusterState> updateOperator) {
+        final var deterministicTaskQueue = new DeterministicTaskQueue();
+        final var threadPool = deterministicTaskQueue.getThreadPool();
+        final var threadContext = threadPool.getThreadContext();
+        final var failureCaught = new AtomicBoolean();
+
+        try (
+            var masterService = createMasterService(true, null, threadPool, deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor());
+            var ignored = threadContext.stashContext()
+        ) {
+            final var taskId = randomIdentifier();
+
+            masterService.submitUnbatchedStateUpdateTask(taskId, new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    return updateOperator.apply(currentState);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    assertThat(
+                        asInstanceOf(IllegalStateException.class, e).getMessage(),
+                        allOf(startsWith("cluster state update executor did not preserve version numbers"), containsString(taskId))
+                    );
+                    assertTrue(failureCaught.compareAndSet(false, true));
+                }
+            });
+
+            // suppress assertion errors to check production behaviour
+            threadContext.putTransient(MasterService.TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME, new Object());
+            threadContext.markAsSystemContext();
+            deterministicTaskQueue.runAllRunnableTasks();
+            assertFalse(deterministicTaskQueue.hasRunnableTasks());
+            assertFalse(deterministicTaskQueue.hasDeferredTasks());
+
+            assertTrue(failureCaught.get());
+        }
+    }
+
     /**
      * Returns the cluster state that the master service uses (and that is provided by the discovery layer)
      */

+ 13 - 2
server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.RerouteService;
+import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -494,12 +495,22 @@ public class GatewayServiceTests extends ESTestCase {
 
     private MasterServiceTaskQueue<SetClusterStateTask> createSetClusterStateTaskQueue(ClusterService clusterService) {
         return clusterService.createTaskQueue("set-cluster-state", Priority.NORMAL, batchExecutionContext -> {
-            ClusterState targetState = batchExecutionContext.initialState();
+            final var initialState = batchExecutionContext.initialState();
+            var targetState = initialState;
             for (var taskContext : batchExecutionContext.taskContexts()) {
                 targetState = taskContext.getTask().clusterState();
                 taskContext.success(() -> {});
             }
-            return targetState;
+            // fix up the version numbers
+            final var finalStateBuilder = ClusterState.builder(targetState)
+                .version(initialState.version())
+                .metadata(Metadata.builder(targetState.metadata()).version(initialState.metadata().version()));
+            if (initialState.clusterRecovered() || targetState.clusterRecovered() == false) {
+                finalStateBuilder.routingTable(
+                    RoutingTable.builder(targetState.routingTable()).version(initialState.routingTable().version())
+                );
+            }
+            return finalStateBuilder.build();
         });
     }
 }

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMigrateToDataTiersAction.java

@@ -123,9 +123,9 @@ public class TransportMigrateToDataTiersAction extends TransportMasterNodeAction
         final SetOnce<MigratedEntities> migratedEntities = new SetOnce<>();
         submitUnbatchedTask("migrate-to-data-tiers []", new ClusterStateUpdateTask(Priority.HIGH) {
             @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
+            public ClusterState execute(ClusterState currentState) {
                 Tuple<ClusterState, MigratedEntities> migratedEntitiesTuple = migrateToDataTiersRouting(
-                    state,
+                    currentState,
                     request.getNodeAttributeName(),
                     request.getLegacyTemplateToDelete(),
                     xContentRegistry,