Explorar o código

Elevate MockPersistedState one level (#94362)

Extracted from #93911 as a sensible self-contained change which
significantly reduces the review difficulty.
David Turner %!s(int64=2) %!d(string=hai) anos
pai
achega
4adc2aeade

+ 5 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -1628,8 +1628,11 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
             }
 
             final ClusterNode newNode = cluster1.new ClusterNode(
-                nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.new MockPersistedState(
-                    n, nodeInOtherCluster.persistedState, Function.identity(), Function.identity()
+                nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.createPersistedStateFromExistingState(
+                    n,
+                    nodeInOtherCluster.persistedState,
+                    Function.identity(),
+                    Function.identity()
                 ), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")
             );
 

+ 227 - 198
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -58,6 +58,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
@@ -105,8 +106,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -154,7 +157,7 @@ import static org.hamcrest.Matchers.sameInstance;
 public class AbstractCoordinatorTestCase extends ESTestCase {
 
     protected final List<NodeEnvironment> nodeEnvironments = new ArrayList<>();
-    protected final Set<Cluster.MockPersistedState> openPersistedStates = new HashSet<>();
+    protected final Set<CoordinationState.PersistedState> openPersistedStates = new HashSet<>();
 
     protected final AtomicInteger nextNodeIndex = new AtomicInteger();
 
@@ -276,8 +279,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
         private final Recycler<BytesRef> recycler;
         private final NodeHealthService nodeHealthService;
 
-        private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;
-
         @Nullable // null means construct a list from all the current nodes
         private List<TransportAddress> seedHostsList;
 
@@ -889,209 +890,41 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             return emptyList();
         }
 
-        class MockPersistedState implements CoordinationState.PersistedState {
-            private final CoordinationState.PersistedState delegate;
-            private final NodeEnvironment nodeEnvironment;
-
-            MockPersistedState(DiscoveryNode localNode) {
-                try {
-                    if (rarely()) {
-                        nodeEnvironment = newNodeEnvironment();
-                        nodeEnvironments.add(nodeEnvironment);
-                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode);
-                        gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
-                        delegate = gatewayMetaState.getPersistedState();
-                    } else {
-                        nodeEnvironment = null;
-                        delegate = new InMemoryPersistedState(
-                            0L,
-                            ClusterStateUpdaters.addStateNotRecoveredBlock(
-                                clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)
-                            )
-                        );
-                    }
-                } catch (IOException e) {
-                    throw new UncheckedIOException("Unable to create MockPersistedState", e);
-                }
-            }
-
-            MockPersistedState(
-                DiscoveryNode newLocalNode,
-                MockPersistedState oldState,
-                Function<Metadata, Metadata> adaptGlobalMetadata,
-                Function<Long, Long> adaptCurrentTerm
-            ) {
-                try {
-                    if (oldState.nodeEnvironment != null) {
-                        nodeEnvironment = oldState.nodeEnvironment;
-                        final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata());
-                        final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
-
-                        final Settings.Builder writerSettings = Settings.builder();
-                        if (randomBoolean()) {
-                            writerSettings.put(
-                                PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(),
-                                ByteSizeValue.ofBytes(randomLongBetween(1, 1024))
-                            );
-                        }
-
-                        if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) {
-                            try (
-                                PersistedClusterStateService.Writer writer = new PersistedClusterStateService(
-                                    nodeEnvironment,
-                                    xContentRegistry(),
-                                    new ClusterSettings(writerSettings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
-                                    deterministicTaskQueue::getCurrentTimeMillis
-                                ).createWriter()
-                            ) {
-                                writer.writeFullStateAndCommit(
-                                    updatedTerm,
-                                    ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build()
-                                );
-                            }
-                        }
-                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode);
-                        gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
-                        delegate = gatewayMetaState.getPersistedState();
-                    } else {
-                        nodeEnvironment = null;
-                        BytesStreamOutput outStream = new BytesStreamOutput();
-                        outStream.setTransportVersion(TransportVersion.CURRENT);
-
-                        final long persistedCurrentTerm;
-
-                        if ( // node is master-ineligible either before or after the restart ...
-                        (oldState.getLastAcceptedState().nodes().getLocalNode().isMasterNode() && newLocalNode.isMasterNode()) == false
-                            // ... and it's accepted some non-initial state so we can roll back ...
-                            && (oldState.getLastAcceptedState().term() > 0L || oldState.getLastAcceptedState().version() > 0L)
-                            // ... and we're feeling lucky ...
-                            && randomBoolean()) {
-
-                            // ... then we might not have reliably persisted the cluster state, so emulate a rollback
-
-                            persistedCurrentTerm = randomLongBetween(0L, oldState.getCurrentTerm());
-                            final long lastAcceptedTerm = oldState.getLastAcceptedState().term();
-                            final long lastAcceptedVersion = oldState.getLastAcceptedState().version();
-
-                            final long newLastAcceptedTerm;
-                            final long newLastAcceptedVersion;
-
-                            if (lastAcceptedVersion == 0L) {
-                                newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm - 1));
-                                newLastAcceptedVersion = randomNonNegativeLong();
-                            } else {
-                                newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm));
-                                newLastAcceptedVersion = randomLongBetween(
-                                    0L,
-                                    newLastAcceptedTerm == lastAcceptedTerm ? lastAcceptedVersion - 1 : Long.MAX_VALUE
-                                );
-                            }
-                            final VotingConfiguration newVotingConfiguration = new VotingConfiguration(
-                                randomBoolean() ? emptySet() : singleton(randomAlphaOfLength(10))
-                            );
-                            final long newValue = randomLong();
-
-                            logger.trace(
-                                "rolling back persisted cluster state on master-ineligible node [{}]: "
-                                    + "previously currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={} "
-                                    + "but now currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={}",
-                                newLocalNode,
-                                oldState.getCurrentTerm(),
-                                lastAcceptedTerm,
-                                lastAcceptedVersion,
-                                persistedCurrentTerm,
-                                newLastAcceptedTerm,
-                                newLastAcceptedVersion
-                            );
-
-                            clusterState(
-                                newLastAcceptedTerm,
-                                newLastAcceptedVersion,
-                                newLocalNode,
-                                newVotingConfiguration,
-                                newVotingConfiguration,
-                                newValue
-                            ).writeTo(outStream);
-                        } else {
-                            persistedCurrentTerm = oldState.getCurrentTerm();
-                            final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata());
-                            if (updatedMetadata != oldState.getLastAcceptedState().metadata()) {
-                                ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build().writeTo(outStream);
-                            } else {
-                                oldState.getLastAcceptedState().writeTo(outStream);
-                            }
-                        }
-
-                        final StreamInput inStream = new NamedWriteableAwareStreamInput(
-                            outStream.bytes().streamInput(),
-                            getNamedWriteableRegistry()
-                        );
-                        // adapt cluster state to new localNode instance and add blocks
-                        delegate = new InMemoryPersistedState(
-                            adaptCurrentTerm.apply(persistedCurrentTerm),
-                            ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))
-                        );
-                    }
-                } catch (IOException e) {
-                    throw new UncheckedIOException("Unable to create MockPersistedState", e);
-                }
-            }
-
-            private void possiblyFail(String description) {
-                if (disruptStorage && rarely()) {
-                    logger.trace("simulating IO exception [{}]", description);
-                    // In the real-life IOError might be thrown, for example if state fsync fails.
-                    // This will require node restart and we're not emulating it here.
-                    throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
-                }
-            }
-
-            @Override
-            public long getCurrentTerm() {
-                return delegate.getCurrentTerm();
-            }
-
-            @Override
-            public ClusterState getLastAcceptedState() {
-                return delegate.getLastAcceptedState();
-            }
-
-            @Override
-            public void setCurrentTerm(long currentTerm) {
-                possiblyFail("before writing term of " + currentTerm);
-                delegate.setCurrentTerm(currentTerm);
-            }
-
-            @Override
-            public void setLastAcceptedState(ClusterState clusterState) {
-                possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
-                delegate.setLastAcceptedState(clusterState);
-            }
-
-            @Override
-            public void close() {
-                assertTrue(openPersistedStates.remove(this));
-                try {
-                    delegate.close();
-                } catch (IOException e) {
-                    throw new AssertionError("unexpected", e);
-                }
-            }
-        }
-
         private NamedWriteableRegistry getNamedWriteableRegistry() {
             return new NamedWriteableRegistry(
                 Stream.concat(ClusterModule.getNamedWriteables().stream(), extraNamedWriteables().stream()).collect(Collectors.toList())
             );
         }
 
+        CoordinationState.PersistedState createFreshPersistedState(DiscoveryNode localNode) {
+            return new MockPersistedState(localNode, () -> disruptStorage);
+        }
+
+        CoordinationState.PersistedState createPersistedStateFromExistingState(
+            DiscoveryNode newLocalNode,
+            CoordinationState.PersistedState oldState,
+            Function<Metadata, Metadata> adaptGlobalMetadata,
+            Function<Long, Long> adaptCurrentTerm
+        ) {
+            assert oldState instanceof MockPersistedState : oldState.getClass();
+            return new MockPersistedState(
+                newLocalNode,
+                (MockPersistedState) oldState,
+                adaptGlobalMetadata,
+                adaptCurrentTerm,
+                deterministicTaskQueue::getCurrentTimeMillis,
+                getNamedWriteableRegistry(),
+                () -> disruptStorage
+            );
+        }
+
         public class ClusterNode {
             private final Logger logger = LogManager.getLogger(ClusterNode.class);
 
             private final int nodeIndex;
             Coordinator coordinator;
             private final DiscoveryNode localNode;
-            final MockPersistedState persistedState;
+            final CoordinationState.PersistedState persistedState;
             final Settings nodeSettings;
             private AckedFakeThreadPoolMasterService masterService;
             private DisruptableClusterApplierService clusterApplierService;
@@ -1109,7 +942,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                 this(
                     nodeIndex,
                     createDiscoveryNode(nodeIndex, masterEligible),
-                    defaultPersistedStateSupplier,
+                    Cluster.this::createFreshPersistedState,
                     nodeSettings,
                     nodeHealthService
                 );
@@ -1118,7 +951,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             ClusterNode(
                 int nodeIndex,
                 DiscoveryNode localNode,
-                Function<DiscoveryNode, MockPersistedState> persistedStateSupplier,
+                Function<DiscoveryNode, CoordinationState.PersistedState> persistedStateSupplier,
                 Settings nodeSettings,
                 NodeHealthService nodeHealthService
             ) {
@@ -1134,7 +967,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     success = true;
                 } finally {
                     if (success == false) {
-                        persistedState.close(); // removes it from openPersistedStates
+                        IOUtils.closeWhileHandlingException(persistedState); // removes it from openPersistedStates
                     }
                 }
             }
@@ -1348,7 +1181,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     return new ClusterNode(
                         nodeIndex,
                         newLocalNode,
-                        node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm),
+                        node -> createPersistedStateFromExistingState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm),
                         settings,
                         nodeHealthService
                     );
@@ -1970,4 +1803,200 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
     public interface CoordinatorTestClusterStateUpdateTask extends ClusterStateTaskListener {
         default void clusterStateProcessed(ClusterState oldState, ClusterState newState) {}
     }
+
+    class MockPersistedState implements CoordinationState.PersistedState {
+        private final CoordinationState.PersistedState delegate;
+        private final NodeEnvironment nodeEnvironment;
+        private final BooleanSupplier disruptStorage;
+
+        MockPersistedState(DiscoveryNode localNode, BooleanSupplier disruptStorage) {
+            this.disruptStorage = disruptStorage;
+            try {
+                if (rarely()) {
+                    nodeEnvironment = newNodeEnvironment();
+                    nodeEnvironments.add(nodeEnvironment);
+                    final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode);
+                    gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
+                    delegate = gatewayMetaState.getPersistedState();
+                } else {
+                    nodeEnvironment = null;
+                    delegate = new InMemoryPersistedState(
+                        0L,
+                        ClusterStateUpdaters.addStateNotRecoveredBlock(
+                            clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)
+                        )
+                    );
+                }
+            } catch (IOException e) {
+                throw new UncheckedIOException("Unable to create MockPersistedState", e);
+            }
+        }
+
+        MockPersistedState(
+            DiscoveryNode newLocalNode,
+            MockPersistedState oldState,
+            Function<Metadata, Metadata> adaptGlobalMetadata,
+            Function<Long, Long> adaptCurrentTerm,
+            LongSupplier currentTimeInMillisSupplier,
+            NamedWriteableRegistry namedWriteableRegistry,
+            BooleanSupplier disruptStorage
+        ) {
+            this.disruptStorage = disruptStorage;
+            try {
+                if (oldState.nodeEnvironment != null) {
+                    nodeEnvironment = oldState.nodeEnvironment;
+                    final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata());
+                    final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
+
+                    final Settings.Builder writerSettings = Settings.builder();
+                    if (randomBoolean()) {
+                        writerSettings.put(
+                            PersistedClusterStateService.DOCUMENT_PAGE_SIZE.getKey(),
+                            ByteSizeValue.ofBytes(randomLongBetween(1, 1024))
+                        );
+                    }
+
+                    if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) {
+                        try (
+                            PersistedClusterStateService.Writer writer = new PersistedClusterStateService(
+                                nodeEnvironment,
+                                xContentRegistry(),
+                                new ClusterSettings(writerSettings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                                currentTimeInMillisSupplier
+                            ).createWriter()
+                        ) {
+                            writer.writeFullStateAndCommit(
+                                updatedTerm,
+                                ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build()
+                            );
+                        }
+                    }
+                    final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode);
+                    gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
+                    delegate = gatewayMetaState.getPersistedState();
+                } else {
+                    nodeEnvironment = null;
+                    BytesStreamOutput outStream = new BytesStreamOutput();
+                    outStream.setTransportVersion(TransportVersion.CURRENT);
+
+                    final long persistedCurrentTerm;
+
+                    if ( // node is master-ineligible either before or after the restart ...
+                    (oldState.getLastAcceptedState().nodes().getLocalNode().isMasterNode() && newLocalNode.isMasterNode()) == false
+                        // ... and it's accepted some non-initial state so we can roll back ...
+                        && (oldState.getLastAcceptedState().term() > 0L || oldState.getLastAcceptedState().version() > 0L)
+                        // ... and we're feeling lucky ...
+                        && randomBoolean()) {
+
+                        // ... then we might not have reliably persisted the cluster state, so emulate a rollback
+
+                        persistedCurrentTerm = randomLongBetween(0L, oldState.getCurrentTerm());
+                        final long lastAcceptedTerm = oldState.getLastAcceptedState().term();
+                        final long lastAcceptedVersion = oldState.getLastAcceptedState().version();
+
+                        final long newLastAcceptedTerm;
+                        final long newLastAcceptedVersion;
+
+                        if (lastAcceptedVersion == 0L) {
+                            newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm - 1));
+                            newLastAcceptedVersion = randomNonNegativeLong();
+                        } else {
+                            newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm));
+                            newLastAcceptedVersion = randomLongBetween(
+                                0L,
+                                newLastAcceptedTerm == lastAcceptedTerm ? lastAcceptedVersion - 1 : Long.MAX_VALUE
+                            );
+                        }
+                        final VotingConfiguration newVotingConfiguration = new VotingConfiguration(
+                            randomBoolean() ? emptySet() : singleton(randomAlphaOfLength(10))
+                        );
+                        final long newValue = randomLong();
+
+                        logger.trace(
+                            "rolling back persisted cluster state on master-ineligible node [{}]: "
+                                + "previously currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={} "
+                                + "but now currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={}",
+                            newLocalNode,
+                            oldState.getCurrentTerm(),
+                            lastAcceptedTerm,
+                            lastAcceptedVersion,
+                            persistedCurrentTerm,
+                            newLastAcceptedTerm,
+                            newLastAcceptedVersion
+                        );
+
+                        clusterState(
+                            newLastAcceptedTerm,
+                            newLastAcceptedVersion,
+                            newLocalNode,
+                            newVotingConfiguration,
+                            newVotingConfiguration,
+                            newValue
+                        ).writeTo(outStream);
+                    } else {
+                        persistedCurrentTerm = oldState.getCurrentTerm();
+                        final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata());
+                        if (updatedMetadata != oldState.getLastAcceptedState().metadata()) {
+                            ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build().writeTo(outStream);
+                        } else {
+                            oldState.getLastAcceptedState().writeTo(outStream);
+                        }
+                    }
+
+                    final StreamInput inStream = new NamedWriteableAwareStreamInput(
+                        outStream.bytes().streamInput(),
+                        namedWriteableRegistry
+                    );
+                    // adapt cluster state to new localNode instance and add blocks
+                    delegate = new InMemoryPersistedState(
+                        adaptCurrentTerm.apply(persistedCurrentTerm),
+                        ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))
+                    );
+                }
+            } catch (IOException e) {
+                throw new UncheckedIOException("Unable to create MockPersistedState", e);
+            }
+        }
+
+        private void possiblyFail(String description) {
+            if (disruptStorage.getAsBoolean() && rarely()) {
+                logger.trace("simulating IO exception [{}]", description);
+                // In the real-life IOError might be thrown, for example if state fsync fails.
+                // This will require node restart and we're not emulating it here.
+                throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
+            }
+        }
+
+        @Override
+        public long getCurrentTerm() {
+            return delegate.getCurrentTerm();
+        }
+
+        @Override
+        public ClusterState getLastAcceptedState() {
+            return delegate.getLastAcceptedState();
+        }
+
+        @Override
+        public void setCurrentTerm(long currentTerm) {
+            possiblyFail("before writing term of " + currentTerm);
+            delegate.setCurrentTerm(currentTerm);
+        }
+
+        @Override
+        public void setLastAcceptedState(ClusterState clusterState) {
+            possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
+            delegate.setLastAcceptedState(clusterState);
+        }
+
+        @Override
+        public void close() {
+            assertTrue(openPersistedStates.remove(this));
+            try {
+                delegate.close();
+            } catch (IOException e) {
+                throw new AssertionError("unexpected", e);
+            }
+        }
+    }
 }