Browse Source

Revert "Remove MDP from PersistedClusterStateService (#72278)" (#78495)

This reverts commit 2dfaf7a9e03407cde6f01f685d32c71f50b299e4.

The revert was not clean, it required merging with a few changes to
loadOnDiskState since the initial removal of MDP support.

relates #71205
Ryan Ernst 4 years ago
parent
commit
1aa54547e2

+ 3 - 3
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java

@@ -156,7 +156,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
         internalCluster().stopRandomDataNode();
         Environment environment = TestEnvironment.newEnvironment(
             Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
-        PersistedClusterStateService.delete(nodeEnvironment.nodeDataPath());
+        PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());
 
         expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
     }
@@ -170,7 +170,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
         internalCluster().stopRandomDataNode();
         Environment environment = TestEnvironment.newEnvironment(
             Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
-        PersistedClusterStateService.delete(nodeEnvironment.nodeDataPath());
+        PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());
 
         expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
     }
@@ -253,7 +253,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
         logger.info("--> unsafely-bootstrap 1st master-eligible node");
         MockTerminal terminal = unsafeBootstrap(environmentMaster1);
         Metadata metadata = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPath())
-            .loadOnDiskState().metadata;
+            .loadBestOnDiskState().metadata;
         assertThat(terminal.getOutput(), containsString(
             String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT,
                 metadata.coordinationMetadata().term(), metadata.version())));

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java

@@ -567,7 +567,8 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
         for (String nodeName : nodeNames) {
             final Path indexPath = indexPathByNodeName.get(nodeName);
             final OptionSet options = parser.parse("--dir", indexPath.toAbsolutePath().toString());
-            command.findAndProcessShardPath(options, environmentByNodeName.get(nodeName), environmentByNodeName.get(nodeName).dataFile(),
+            command.findAndProcessShardPath(options, environmentByNodeName.get(nodeName),
+                environmentByNodeName.get(nodeName).dataFile(),
                 state, shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath)));
         }
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java

@@ -106,7 +106,7 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
         }
 
         String nodeId = nodeMetadata.nodeId();
-        return new PersistedClusterStateService(dataPath, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE,
+        return new PersistedClusterStateService(new Path[] { dataPath }, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE,
             new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
     }
 
@@ -119,7 +119,7 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
 
     public static Tuple<Long, ClusterState> loadTermAndClusterState(PersistedClusterStateService psf,
                                                                     Environment env) throws IOException {
-        final PersistedClusterStateService.OnDiskState bestOnDiskState = psf.loadOnDiskState();
+        final PersistedClusterStateService.OnDiskState bestOnDiskState = psf.loadBestOnDiskState();
         if (bestOnDiskState.empty()) {
             throw new ElasticsearchException(CS_MISSING_MSG);
         }

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/coordination/RemoveSettingsCommand.java

@@ -52,6 +52,7 @@ public class RemoveSettingsCommand extends ElasticsearchNodeCommand {
         }
 
         final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPath);
+
         terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
         final Tuple<Long, ClusterState> termAndClusterState = loadTermAndClusterState(persistedClusterStateService, env);
         final ClusterState oldClusterState = termAndClusterState.v2();

+ 1 - 1
server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java

@@ -143,7 +143,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
 
     private ClusterState loadClusterState(Terminal terminal, Environment env, PersistedClusterStateService psf) throws IOException {
         terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
-        return clusterState(env, psf.loadOnDiskState());
+        return clusterState(env, psf.loadBestOnDiskState());
     }
 
     private void outputVerboseInformation(Terminal terminal, Collection<Path> pathsToCleanup, Set<String> indexUUIDs, Metadata metadata) {

+ 20 - 18
server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

@@ -22,10 +22,10 @@ import org.elasticsearch.cluster.coordination.CoordinationMetadata;
 import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
 import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Manifest;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -94,7 +94,7 @@ public class GatewayMetaState implements Closeable {
 
         if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.canContainData(settings)) {
             try {
-                final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState();
+                final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();
 
                 Metadata metadata = onDiskState.metadata;
                 long lastAcceptedVersion = onDiskState.lastAcceptedVersion;
@@ -132,7 +132,7 @@ public class GatewayMetaState implements Closeable {
                     }
                     // write legacy node metadata to prevent accidental downgrades from spawning empty cluster state
                     NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT),
-                        persistedClusterStateService.getDataPath());
+                        persistedClusterStateService.getDataPaths());
                     success = true;
                 } finally {
                     if (success == false) {
@@ -148,21 +148,23 @@ public class GatewayMetaState implements Closeable {
             final long currentTerm = 0L;
             final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService,
                     ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build());
-            // write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with
-            // cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state
-            try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) {
-                persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState);
-            } catch (IOException e) {
-                throw new ElasticsearchException("failed to load metadata", e);
-            }
-            try {
-                // delete legacy cluster state files
-                metaStateService.deleteAll();
-                // write legacy node metadata to prevent downgrades from spawning empty cluster state
-                NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT),
-                    persistedClusterStateService.getDataPath());
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
+            if (persistedClusterStateService.getDataPaths().length > 0) {
+                // write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with
+                // cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state
+                try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) {
+                    persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState);
+                } catch (IOException e) {
+                    throw new ElasticsearchException("failed to load metadata", e);
+                }
+                try {
+                    // delete legacy cluster state files
+                    metaStateService.deleteAll();
+                    // write legacy node metadata to prevent downgrades from spawning empty cluster state
+                    NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT),
+                        persistedClusterStateService.getDataPaths());
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
             }
             persistedState.set(new InMemoryPersistedState(currentTerm, clusterState));
         }

+ 130 - 73
server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.gateway;
 
 import com.carrotsearch.hppc.cursors.ObjectCursor;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
@@ -42,19 +43,14 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
-import org.elasticsearch.core.CheckedConsumer;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.core.Releasable;
-import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.ByteArray;
 import org.elasticsearch.common.util.PageCacheRecycler;
@@ -64,6 +60,11 @@ import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.NodeMetadata;
@@ -130,7 +131,7 @@ public class PersistedClusterStateService {
     public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
         TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);
 
-    private final Path dataPath;
+    private final Path[] dataPaths;
     private final String nodeId;
     private final NamedXContentRegistry namedXContentRegistry;
     private final BigArrays bigArrays;
@@ -140,13 +141,13 @@ public class PersistedClusterStateService {
 
     public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
                                         ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) {
-        this(nodeEnvironment.nodeDataPath(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
+        this(new Path[] { nodeEnvironment.nodeDataPath() }, nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
             relativeTimeMillisSupplier);
     }
 
-    public PersistedClusterStateService(Path dataPath, String nodeId, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
+    public PersistedClusterStateService(Path[] dataPaths, String nodeId, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
                                         ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) {
-        this.dataPath = dataPath;
+        this.dataPaths = dataPaths;
         this.nodeId = nodeId;
         this.namedXContentRegistry = namedXContentRegistry;
         this.bigArrays = bigArrays;
@@ -171,12 +172,14 @@ public class PersistedClusterStateService {
         final List<Closeable> closeables = new ArrayList<>();
         boolean success = false;
         try {
-            final Directory directory = createDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME));
-            closeables.add(directory);
+            for (final Path path : dataPaths) {
+                final Directory directory = createDirectory(path.resolve(METADATA_DIRECTORY_NAME));
+                closeables.add(directory);
 
-            final IndexWriter indexWriter = createIndexWriter(directory, false);
-            closeables.add(indexWriter);
-            metadataIndexWriters.add(new MetadataIndexWriter(directory, indexWriter));
+                final IndexWriter indexWriter = createIndexWriter(directory, false);
+                closeables.add(indexWriter);
+                metadataIndexWriters.add(new MetadataIndexWriter(directory, indexWriter));
+            }
             success = true;
         } finally {
             if (success == false) {
@@ -201,11 +204,13 @@ public class PersistedClusterStateService {
     }
 
     /**
-     * Remove all persisted cluster states from the given data path, for use in tests. Should only be called when there is no open
-     * {@link Writer} on this path.
+     * Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open
+     * {@link Writer} on these paths.
      */
-    public static void delete(Path dataPath) throws IOException {
-        Lucene.cleanLuceneIndex(new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)));
+    public static void deleteAll(Path... dataPaths) throws IOException {
+        for (Path dataPath : dataPaths) {
+            Lucene.cleanLuceneIndex(new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)));
+        }
     }
 
     // exposed for tests
@@ -215,8 +220,8 @@ public class PersistedClusterStateService {
         return new NIOFSDirectory(path);
     }
 
-    public Path getDataPath() {
-        return dataPath;
+    public Path[] getDataPaths() {
+        return dataPaths;
     }
 
     public static class OnDiskState {
@@ -242,27 +247,38 @@ public class PersistedClusterStateService {
     }
 
     /**
-     * Returns the node metadata for the given data path, and checks if the node ids are unique
-     * @param dataPath the data path to scan
+     * Returns the node metadata for the given data paths, and checks if the node ids are unique
+     * @param dataPaths the data paths to scan
      */
     @Nullable
-    public static NodeMetadata nodeMetadata(Path dataPath) throws IOException {
-        final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
-        if (Files.exists(indexPath)) {
-            try (DirectoryReader reader = DirectoryReader.open(new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) {
-                final Map<String, String> userData = reader.getIndexCommit().getUserData();
-                assert userData.get(NODE_VERSION_KEY) != null;
-
-                final String nodeId = userData.get(NODE_ID_KEY);
-                assert nodeId != null;
-                final Version version = Version.fromId(Integer.parseInt(userData.get(NODE_VERSION_KEY)));
-                return new NodeMetadata(nodeId, version);
-
-            } catch (IndexNotFoundException e) {
-                logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
+    public static NodeMetadata nodeMetadata(Path... dataPaths) throws IOException {
+        String nodeId = null;
+        Version version = null;
+        for (final Path dataPath : dataPaths) {
+            final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
+            if (Files.exists(indexPath)) {
+                try (DirectoryReader reader = DirectoryReader.open(new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) {
+                    final Map<String, String> userData = reader.getIndexCommit().getUserData();
+                    assert userData.get(NODE_VERSION_KEY) != null;
+
+                    final String thisNodeId = userData.get(NODE_ID_KEY);
+                    assert thisNodeId != null;
+                    if (nodeId != null && nodeId.equals(thisNodeId) == false) {
+                        throw new IllegalStateException("unexpected node ID in metadata, found [" + thisNodeId +
+                            "] in [" + dataPath + "] but expected [" + nodeId + "]");
+                    } else if (nodeId == null) {
+                        nodeId = thisNodeId;
+                        version = Version.fromId(Integer.parseInt(userData.get(NODE_VERSION_KEY)));
+                    }
+                } catch (IndexNotFoundException e) {
+                    logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
+                }
             }
         }
-        return null;
+        if (nodeId == null) {
+            return null;
+        }
+        return new NodeMetadata(nodeId, version);
     }
 
     /**
@@ -291,60 +307,100 @@ public class PersistedClusterStateService {
     }
 
     /**
-     * Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
+     * Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
      */
-    public OnDiskState loadOnDiskState() throws IOException {
-        return loadOnDiskState(true);
+    public OnDiskState loadBestOnDiskState() throws IOException {
+        return loadBestOnDiskState(true);
     }
 
     /**
      * Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
      * @param checkClean whether to check the index for corruption before loading, only for tests
      */
-    OnDiskState loadOnDiskState(boolean checkClean) throws IOException {
-        OnDiskState onDiskState = OnDiskState.NO_ON_DISK_STATE;
-
-        final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
-        if (Files.exists(indexPath)) {
-            try (Directory directory = createDirectory(indexPath)) {
-                if (checkClean) {
-                    try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
-                        final boolean isClean;
-                        try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8);
-                             CheckIndex checkIndex = new CheckIndex(directory)) {
-                            checkIndex.setInfoStream(printStream);
-                            checkIndex.setChecksumsOnly(true);
-                            isClean = checkIndex.checkIndex().clean;
-                        }
+    OnDiskState loadBestOnDiskState(boolean checkClean) throws IOException {
+        String committedClusterUuid = null;
+        Path committedClusterUuidPath = null;
+        OnDiskState bestOnDiskState = OnDiskState.NO_ON_DISK_STATE;
+        OnDiskState maxCurrentTermOnDiskState = bestOnDiskState;
+
+        // We use a write-all-read-one strategy: metadata is written to every data path when accepting it, which means it is mostly
+        // sufficient to read _any_ copy. "Mostly" sufficient because the user can change the set of data paths when restarting, and may
+        // add a data path containing a stale copy of the metadata. We deal with this by using the freshest copy we can find.
+        for (final Path dataPath : dataPaths) {
+            final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
+            if (Files.exists(indexPath)) {
+                try (Directory directory = createDirectory(indexPath)) {
+                    if (checkClean) {
+                        try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
+                            final boolean isClean;
+                            try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8);
+                                 CheckIndex checkIndex = new CheckIndex(directory)) {
+                                checkIndex.setInfoStream(printStream);
+                                checkIndex.setChecksumsOnly(true);
+                                isClean = checkIndex.checkIndex().clean;
+                            }
 
-                        if (isClean == false) {
-                            if (logger.isErrorEnabled()) {
-                                outputStream.bytes().utf8ToString().lines().forEach(l -> logger.error("checkIndex: {}", l));
+                            if (isClean == false) {
+                                if (logger.isErrorEnabled()) {
+                                    outputStream.bytes().utf8ToString().lines().forEach(l -> logger.error("checkIndex: {}", l));
+                                }
+                                throw new IllegalStateException(
+                                    "the index containing the cluster metadata under the data path ["
+                                        + dataPath
+                                        + "] has been changed by an external force after it was last written by Elasticsearch and is "
+                                        + "now unreadable"
+                                );
                             }
-                            throw new IllegalStateException(
-                                "the index containing the cluster metadata under the data path ["
-                                    + dataPath
-                                    + "] has been changed by an external force after it was last written by Elasticsearch and is "
-                                    + "now unreadable"
-                            );
                         }
                     }
-                }
 
-                try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
-                    onDiskState = loadOnDiskState(dataPath, directoryReader);
+                    try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
+                        final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);
+
+                        if (nodeId.equals(onDiskState.nodeId) == false) {
+                            throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
+                                "] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
+                        }
+
+                        if (onDiskState.metadata.clusterUUIDCommitted()) {
+                            if (committedClusterUuid == null) {
+                                committedClusterUuid = onDiskState.metadata.clusterUUID();
+                                committedClusterUuidPath = dataPath;
+                            } else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
+                                throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
+                                    "] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
+                                    + dataPath + "]");
+                            }
+                        }
+
+                        if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
+                            maxCurrentTermOnDiskState = onDiskState;
+                        }
 
-                    if (nodeId.equals(onDiskState.nodeId) == false) {
-                        throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
-                            "] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
+                        long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
+                        long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
+                        if (bestOnDiskState.empty()
+                            || acceptedTerm > maxAcceptedTerm
+                            || (acceptedTerm == maxAcceptedTerm
+                                && (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
+                                    || (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
+                                        && onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
+                            bestOnDiskState = onDiskState;
+                        }
                     }
+                } catch (IndexNotFoundException e) {
+                    logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
                 }
-            } catch (IndexNotFoundException e) {
-                logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
             }
         }
 
-        return onDiskState;
+        if (bestOnDiskState.currentTerm != maxCurrentTermOnDiskState.currentTerm) {
+            throw new IllegalStateException("inconsistent terms found: best state is from [" + bestOnDiskState.dataPath +
+                "] in term [" + bestOnDiskState.currentTerm + "] but there is a stale state in [" + maxCurrentTermOnDiskState.dataPath +
+                "] with greater term [" + maxCurrentTermOnDiskState.currentTerm + "]");
+        }
+
+        return bestOnDiskState;
     }
 
     private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throws IOException {
@@ -369,6 +425,7 @@ public class PersistedClusterStateService {
         }
 
         logger.trace("got global metadata, now reading index metadata");
+
         final Set<String> indexUUIDs = new HashSet<>();
         consumeFromType(searcher, INDEX_TYPE_NAME, bytes ->
         {

+ 3 - 2
server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java

@@ -51,6 +51,7 @@ public class NodeRepurposeCommandTests extends ESTestCase {
     private static final Index INDEX = new Index("testIndex", "testUUID");
     private Settings dataMasterSettings;
     private Environment environment;
+    private Path[] nodePaths;
     private Settings dataNoMasterSettings;
     private Settings noDataNoMasterSettings;
     private Settings noDataMasterSettings;
@@ -60,9 +61,9 @@ public class NodeRepurposeCommandTests extends ESTestCase {
         dataMasterSettings = buildEnvSettings(Settings.EMPTY);
         environment = TestEnvironment.newEnvironment(dataMasterSettings);
         try (NodeEnvironment nodeEnvironment = new NodeEnvironment(dataMasterSettings, environment)) {
-            Path nodePath = nodeEnvironment.nodeDataPath();
+            nodePaths = new Path[] { nodeEnvironment.nodeDataPath() };
             final String nodeId = randomAlphaOfLength(10);
-            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePath, nodeId,
+            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId,
                 xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
                 new ClusterSettings(dataMasterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
                 writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE);

+ 3 - 3
server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java

@@ -45,7 +45,7 @@ public class OverrideNodeVersionCommandTests extends ESTestCase {
             nodePath = nodeEnvironment.nodeDataPath();
             nodeId = nodeEnvironment.nodeId();
 
-            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePath, nodeId,
+            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { nodePath }, nodeId,
                 xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
                 new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
                 writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder()
@@ -57,10 +57,10 @@ public class OverrideNodeVersionCommandTests extends ESTestCase {
 
     @After
     public void checkClusterStateIntact() throws IOException {
-        assertTrue(Metadata.SETTING_READ_ONLY_SETTING.get(new PersistedClusterStateService(nodePath, nodeId,
+        assertTrue(Metadata.SETTING_READ_ONLY_SETTING.get(new PersistedClusterStateService(new Path[] { nodePath }, nodeId,
             xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
             new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L)
-            .loadOnDiskState().metadata.persistentSettings()));
+            .loadBestOnDiskState().metadata.persistentSettings()));
     }
 
     public void testFailsOnEmptyPath() {

+ 6 - 6
server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

@@ -316,7 +316,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
             final PersistedClusterStateService newPersistedClusterStateService =
                 new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                     new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
-            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadOnDiskState();
+            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
             assertFalse(onDiskState.empty());
             assertThat(onDiskState.currentTerm, equalTo(42L));
             assertClusterStateEqual(state,
@@ -370,7 +370,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
             assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
                 not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
             CoordinationMetadata persistedCoordinationMetadata =
-                persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata();
+                persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
             assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
                 equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
             assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
@@ -386,12 +386,12 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
                     .clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();
 
             assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
-            persistedCoordinationMetadata = persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata();
+            persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
             assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
                 equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
             assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
                 equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
-            assertTrue(persistedClusterStateService.loadOnDiskState(false).metadata.clusterUUIDCommitted());
+            assertTrue(persistedClusterStateService.loadBestOnDiskState(false).metadata.clusterUUIDCommitted());
 
             // generate a series of updates and check if batching works
             final String indexName = randomAlphaOfLength(10);
@@ -513,7 +513,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
             final PersistedClusterStateService newPersistedClusterStateService =
                 new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                     new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
-            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadOnDiskState();
+            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
             assertFalse(onDiskState.empty());
             assertThat(onDiskState.currentTerm, equalTo(currentTerm));
             assertClusterStateEqual(state,
@@ -585,7 +585,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
             final PersistedClusterStateService newPersistedClusterStateService =
                 new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                     new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
-            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadOnDiskState();
+            final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
             assertFalse(onDiskState.empty());
             assertThat(onDiskState.currentTerm, equalTo(currentTerm));
             assertClusterStateEqual(state,

+ 8 - 8
server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java

@@ -78,13 +78,13 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
             final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);
             final long newTerm = randomNonNegativeLong();
 
-            assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(0L));
+            assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L));
             try (Writer writer = persistedClusterStateService.createWriter()) {
                 writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE);
-                assertThat(persistedClusterStateService.loadOnDiskState(false).currentTerm, equalTo(newTerm));
+                assertThat(persistedClusterStateService.loadBestOnDiskState(false).currentTerm, equalTo(newTerm));
             }
 
-            assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(newTerm));
+            assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
         }
     }
 
@@ -286,7 +286,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
             }
 
             final String message = expectThrows(IllegalStateException.class,
-                () -> newPersistedClusterStateService(nodeEnvironment).loadOnDiskState()).getMessage();
+                () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage();
             assertThat(message, allOf(containsString("no global metadata found"), containsString(brokenPath.toString())));
         }
     }
@@ -318,7 +318,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
             }
 
             final String message = expectThrows(IllegalStateException.class,
-                () -> newPersistedClusterStateService(nodeEnvironment1).loadOnDiskState()).getMessage();
+                () -> newPersistedClusterStateService(nodeEnvironment1).loadBestOnDiskState()).getMessage();
             assertThat(message, allOf(containsString("duplicate global metadata found"), containsString(brokenPath.toString())));
         }
     }
@@ -364,7 +364,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
             }
 
             final String message = expectThrows(IllegalStateException.class,
-                () -> newPersistedClusterStateService(nodeEnvironment1).loadOnDiskState()).getMessage();
+                () -> newPersistedClusterStateService(nodeEnvironment1).loadBestOnDiskState()).getMessage();
             assertThat(message, allOf(
                 containsString("duplicate metadata found"),
                 containsString(brokenPath.toString()),
@@ -664,7 +664,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
                     .collect(Collectors.toList())));
             }
 
-            assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadOnDiskState).getMessage(), allOf(
+            assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadBestOnDiskState).getMessage(), allOf(
                     startsWith("the index containing the cluster metadata under the data path ["),
                     endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable")));
         }
@@ -707,7 +707,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
     }
 
     private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException {
-        final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState(false);
+        final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false);
         return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
     }
 

+ 1 - 1
server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java

@@ -131,7 +131,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
 
         try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) {
             final NodeEnvironment.NodePath dataPath = lock.getNodePath();
-            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPath.path, nodeId,
+            try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { dataPath.path }, nodeId,
                 xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
                 new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
                 writer.writeFullStateAndCommit(1L, clusterState);