Răsfoiți Sursa

Cut PersistedState interface from GatewayMetaState (#47001)

Today `GatewayMetaState` implements `PersistedState` but it's an error to use
it as a `PersistedState` before it's been started, or if the node is
master-ineligible. It also holds some fields that are meaningless on nodes that
do not persist their states. Finally, it takes responsibility for both loading
the original cluster state and some of the high-level logic for writing the
cluster state back to disk.

This commit addresses these concerns by introducing a more specific
`PersistedState` implementation for use on master-eligible nodes which is only
instantiated if and when it's appropriate. It also moves the fields and
high-level persistence logic into a new `IncrementalClusterStateWriter` with a
more appropriate lifecycle.

Follow-up to #46326 and #46532
Forward-port of #46655
David Turner 6 ani în urmă
părinte
comite
215be64d47

+ 112 - 416
server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

@@ -29,6 +29,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateApplier;
 import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
 import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -36,8 +37,6 @@ import org.elasticsearch.cluster.metadata.Manifest;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.RoutingNode;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.collect.Tuple;
@@ -48,63 +47,53 @@ import org.elasticsearch.plugins.MetaDataUpgrader;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 
 /**
- * This class is responsible for storing/retrieving metadata to/from disk.
- * When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs
- * state upgrade if necessary. Also it checks that atomic move is supported on the filesystem level, because it's a must for metadata
- * store algorithm.
- * Please note that the state being loaded when constructing the instance of this class is NOT the state that will be used as a
- * {@link ClusterState#metaData()}. Instead when node is starting up, it calls {@link #getMetaData()} method and if this node is
- * elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the
- * gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster.
+ * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
+ *
+ * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
+ * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
+ * ClusterState#metaData()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
+ * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
  */
-public class GatewayMetaState implements PersistedState {
-    protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
-
-    private final MetaStateService metaStateService;
-    private final Settings settings;
+public class GatewayMetaState {
+    private static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
 
-    // On master-eligible Zen2 nodes, we use this very object for the PersistedState (so that the state is actually persisted); on other
-    // nodes we use an InMemoryPersistedState instead and persist using a cluster applier if needed. In all cases it's an error to try and
-    // use this object as a PersistedState before calling start(). TODO stop implementing PersistedState at the top level.
+    // Set by calling start()
     private final SetOnce<PersistedState> persistedState = new SetOnce<>();
 
-    // on master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call
-    // updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's no
-    // need to synchronize access to these variables.
-    protected Manifest previousManifest;
-    protected ClusterState previousClusterState;
-    protected boolean incrementalWrite;
+    public PersistedState getPersistedState() {
+        final PersistedState persistedState = this.persistedState.get();
+        assert persistedState != null : "not started";
+        return persistedState;
+    }
 
-    public GatewayMetaState(Settings settings, MetaStateService metaStateService) {
-        this.settings = settings;
-        this.metaStateService = metaStateService;
+    public MetaData getMetaData() {
+        return getPersistedState().getLastAcceptedState().metaData();
     }
 
-    public void start(TransportService transportService, ClusterService clusterService,
-                      MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
-        assert previousClusterState == null : "should only start once, but already have " + previousClusterState;
+    public void start(Settings settings, TransportService transportService, ClusterService clusterService,
+                      MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
+                      MetaDataUpgrader metaDataUpgrader) {
+        assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();
+
+        final Tuple<Manifest, ClusterState> manifestClusterStateTuple;
         try {
-            upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
-            initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
+            upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader);
+            manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService);
         } catch (IOException e) {
             throw new ElasticsearchException("failed to load metadata", e);
         }
-        incrementalWrite = false;
 
-        applyClusterStateUpdaters(transportService, clusterService);
+        final IncrementalClusterStateWriter incrementalClusterStateWriter
+            = new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(),
+                prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()));
         if (DiscoveryNode.isMasterNode(settings) == false) {
             if (DiscoveryNode.isDataNode(settings)) {
                 // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
@@ -121,43 +110,36 @@ public class GatewayMetaState implements PersistedState {
                 // state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is
                 // inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the
                 // cluster state, which is what this does:
-                clusterService.addLowPriorityApplier(this::applyClusterState);
+                clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter));
             }
-            persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()));
+
+            // Master-ineligible nodes do not need to persist the cluster state when accepting it because they are not in the voting
+            // configuration, so it's ok if they have a stale or incomplete cluster state when restarted. We track the latest cluster state
+            // in memory instead.
+            persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2()));
         } else {
-            persistedState.set(this);
+            // Master-ineligible nodes must persist the cluster state when accepting it because they must reload the (complete, fresh)
+            // last-accepted cluster state when restarted.
+            persistedState.set(new GatewayPersistedState(incrementalClusterStateWriter));
         }
     }
 
-    private void initializeClusterState(ClusterName clusterName) throws IOException {
-        long startNS = System.nanoTime();
-        Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
-        previousManifest = manifestAndMetaData.v1();
-
-        final MetaData metaData = manifestAndMetaData.v2();
-
-        previousClusterState = ClusterState.builder(clusterName)
-                .version(previousManifest.getClusterStateVersion())
-                .metaData(metaData).build();
-
-        logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
-    }
-
-    protected void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
-        assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once";
+    // exposed so it can be overridden by tests
+    ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) {
+        assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once";
         assert transportService.getLocalNode() != null : "transport service is not yet started";
-
-        previousClusterState = Function.<ClusterState>identity()
+        return Function.<ClusterState>identity()
             .andThen(ClusterStateUpdaters::addStateNotRecoveredBlock)
             .andThen(state -> ClusterStateUpdaters.setLocalNode(state, transportService.getLocalNode()))
             .andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings()))
             .andThen(ClusterStateUpdaters::recoverClusterBlocks)
-            .apply(previousClusterState);
+            .apply(clusterState);
     }
 
-    protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
-            throws IOException {
-        if (isMasterOrDataNode()) {
+    // exposed so it can be overridden by tests
+    void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
+                         MetaDataUpgrader metaDataUpgrader) throws IOException {
+        if (isMasterOrDataNode(settings)) {
             try {
                 final Tuple<Manifest, MetaData> metaStateAndData = metaStateService.loadFullState();
                 final Manifest manifest = metaStateAndData.v1();
@@ -170,7 +152,8 @@ public class GatewayMetaState implements PersistedState {
                 // if there is manifest file, it means metadata is properly persisted to all data paths
                 // if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths,
                 // but anyway we will re-write it as soon as we receive first ClusterState
-                final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, manifest);
+                final IncrementalClusterStateWriter.AtomicClusterStateWriter writer
+                    = new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest);
                 final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
 
                 final long globalStateGeneration;
@@ -198,231 +181,23 @@ public class GatewayMetaState implements PersistedState {
         }
     }
 
-    private boolean isMasterOrDataNode() {
-        return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
-    }
-
-    public PersistedState getPersistedState() {
-        final PersistedState persistedState = this.persistedState.get();
-        assert persistedState != null : "not started";
-        return persistedState;
-    }
-
-    public MetaData getMetaData() {
-        return previousClusterState.metaData();
-    }
-
-    private void applyClusterState(ClusterChangedEvent event) {
-        assert isMasterOrDataNode();
-
-        if (event.state().blocks().disableStatePersistence()) {
-            incrementalWrite = false;
-            return;
-        }
-
-        try {
-            // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
-            // that's higher than the last accepted term.
-            // TODO: can we get rid of this hack?
-            if (event.state().term() > getCurrentTerm()) {
-                innerSetCurrentTerm(event.state().term());
-            }
-
-            updateClusterState(event.state(), event.previousState());
-            incrementalWrite = true;
-        } catch (WriteStateException e) {
-            logger.warn("Exception occurred when storing new meta data", e);
-        }
-    }
-
-    @Override
-    public long getCurrentTerm() {
-        return previousManifest.getCurrentTerm();
-    }
-
-    @Override
-    public ClusterState getLastAcceptedState() {
-        assert previousClusterState.nodes().getLocalNode() != null : "Cluster state is not fully built yet";
-        return previousClusterState;
-    }
-
-    @Override
-    public void setCurrentTerm(long currentTerm) {
-        try {
-            innerSetCurrentTerm(currentTerm);
-        } catch (WriteStateException e) {
-            logger.error(new ParameterizedMessage("Failed to set current term to {}", currentTerm), e);
-            e.rethrowAsErrorOrUncheckedException();
-        }
-    }
-
-    private void innerSetCurrentTerm(long currentTerm) throws WriteStateException {
-        Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(),
-            new HashMap<>(previousManifest.getIndexGenerations()));
-        metaStateService.writeManifestAndCleanup("current term changed", manifest);
-        previousManifest = manifest;
-    }
-
-    @Override
-    public void setLastAcceptedState(ClusterState clusterState) {
-        try {
-            incrementalWrite = previousClusterState.term() == clusterState.term();
-            updateClusterState(clusterState, previousClusterState);
-        } catch (WriteStateException e) {
-            logger.error(new ParameterizedMessage("Failed to set last accepted state with version {}", clusterState.version()), e);
-            e.rethrowAsErrorOrUncheckedException();
-        }
-    }
-
-    /**
-     * This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk.
-     * This class delegates <code>write*</code> calls to corresponding write calls in {@link MetaStateService} and
-     * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails.
-     */
-    static class AtomicClusterStateWriter {
-        private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished";
-        private final List<Runnable> commitCleanupActions;
-        private final List<Runnable> rollbackCleanupActions;
-        private final Manifest previousManifest;
-        private final MetaStateService metaStateService;
-        private boolean finished;
-
-        AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
-            this.metaStateService = metaStateService;
-            assert previousManifest != null;
-            this.previousManifest = previousManifest;
-            this.commitCleanupActions = new ArrayList<>();
-            this.rollbackCleanupActions = new ArrayList<>();
-            this.finished = false;
-        }
-
-        long writeGlobalState(String reason, MetaData metaData) throws WriteStateException {
-            assert finished == false : FINISHED_MSG;
-            try {
-                rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
-                long generation = metaStateService.writeGlobalState(reason, metaData);
-                commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
-                return generation;
-            } catch (WriteStateException e) {
-                rollback();
-                throw e;
-            }
-        }
-
-        long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException {
-            assert finished == false : FINISHED_MSG;
-            try {
-                Index index = metaData.getIndex();
-                Long previousGeneration = previousManifest.getIndexGenerations().get(index);
-                if (previousGeneration != null) {
-                    // we prefer not to clean-up index metadata in case of rollback,
-                    // if it's not referenced by previous manifest file
-                    // not to break dangling indices functionality
-                    rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration));
-                }
-                long generation = metaStateService.writeIndex(reason, metaData);
-                commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation));
-                return generation;
-            } catch (WriteStateException e) {
-                rollback();
-                throw e;
-            }
-        }
-
-        void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
-            assert finished == false : FINISHED_MSG;
-            try {
-                metaStateService.writeManifestAndCleanup(reason, manifest);
-                commitCleanupActions.forEach(Runnable::run);
-                finished = true;
-            } catch (WriteStateException e) {
-                // if Manifest write results in dirty WriteStateException it's not safe to remove
-                // new metadata files, because if Manifest was actually written to disk and its deletion
-                // fails it will reference these new metadata files.
-                // In the future, we might decide to add more fine grained check to understand if after
-                // WriteStateException Manifest deletion has actually failed.
-                if (e.isDirty() == false) {
-                    rollback();
-                }
-                throw e;
-            }
-        }
-
-        void rollback() {
-            rollbackCleanupActions.forEach(Runnable::run);
-            finished = true;
-        }
-    }
-
-    /**
-     * Updates manifest and meta data on disk.
-     *
-     * @param newState new {@link ClusterState}
-     * @param previousState previous {@link ClusterState}
-     *
-     * @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}.
-     */
-    private void updateClusterState(ClusterState newState, ClusterState previousState)
-            throws WriteStateException {
-        MetaData newMetaData = newState.metaData();
-
-        final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
-        long globalStateGeneration = writeGlobalState(writer, newMetaData);
-        Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
-        Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
-        writeManifest(writer, manifest);
-
-        previousManifest = manifest;
-        previousClusterState = newState;
-    }
-
-    private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
-        if (manifest.equals(previousManifest) == false) {
-            writer.writeManifestAndCleanup("changed", manifest);
-        }
-    }
-
-    private Map<Index, Long> writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState)
-            throws WriteStateException {
-        Map<Index, Long> previouslyWrittenIndices = previousManifest.getIndexGenerations();
-        Set<Index> relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet());
+    private static Tuple<Manifest,ClusterState> loadStateAndManifest(ClusterName clusterName,
+                                                                     MetaStateService metaStateService) throws IOException {
+        final long startNS = System.nanoTime();
+        final Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
+        final Manifest manifest = manifestAndMetaData.v1();
 
-        Map<Index, Long> newIndices = new HashMap<>();
+        final ClusterState clusterState = ClusterState.builder(clusterName)
+            .version(manifest.getClusterStateVersion())
+            .metaData(manifestAndMetaData.v2()).build();
 
-        MetaData previousMetaData = incrementalWrite ? previousState.metaData() : null;
-        Iterable<IndexMetaDataAction> actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData,
-                newState.metaData());
-
-        for (IndexMetaDataAction action : actions) {
-            long generation = action.execute(writer);
-            newIndices.put(action.getIndex(), generation);
-        }
+        logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
 
-        return newIndices;
+        return Tuple.tuple(manifest, clusterState);
     }
 
-    private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData)
-            throws WriteStateException {
-        if (incrementalWrite == false || MetaData.isGlobalStateEquals(previousClusterState.metaData(), newMetaData) == false) {
-            return writer.writeGlobalState("changed", newMetaData);
-        }
-        return previousManifest.getGlobalGeneration();
-    }
-
-    public static Set<Index> getRelevantIndices(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
-        Set<Index> relevantIndices;
-        if (isDataOnlyNode(state)) {
-            relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices);
-        } else if (state.nodes().getLocalNode().isMasterNode()) {
-            relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
-        } else {
-            relevantIndices = Collections.emptySet();
-        }
-        return relevantIndices;
-    }
-
-    private static boolean isDataOnlyNode(ClusterState state) {
-        return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
+    private static boolean isMasterOrDataNode(Settings settings) {
+        return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
     }
 
     /**
@@ -480,160 +255,81 @@ public class GatewayMetaState implements PersistedState {
         return false;
     }
 
-    /**
-     * Returns list of {@link IndexMetaDataAction} for each relevant index.
-     * For each relevant index there are 3 options:
-     * <ol>
-     * <li>
-     * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no
-     * action is required.
-     * </li>
-     * <li>
-     * {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written.
-     * </li>
-     * <li>
-     * {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated
-     * index metadata should be written to disk.
-     * </li>
-     * </ol>
-     *
-     * @param previouslyWrittenIndices A list of indices for which the state was already written before
-     * @param relevantIndices          The list of indices for which state should potentially be written
-     * @param previousMetaData         The last meta data we know of
-     * @param newMetaData              The new metadata
-     * @return list of {@link IndexMetaDataAction} for each relevant index.
-     */
-    public static List<IndexMetaDataAction> resolveIndexMetaDataActions(Map<Index, Long> previouslyWrittenIndices,
-                                                                        Set<Index> relevantIndices,
-                                                                        MetaData previousMetaData,
-                                                                        MetaData newMetaData) {
-        List<IndexMetaDataAction> actions = new ArrayList<>();
-        for (Index index : relevantIndices) {
-            IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index);
-            IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
-
-            if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) {
-                actions.add(new WriteNewIndexMetaData(newIndexMetaData));
-            } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) {
-                actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData));
-            } else {
-                actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index)));
-            }
-        }
-        return actions;
-    }
-
-    private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index>
-            previouslyWrittenIndices) {
-        RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
-        if (newRoutingNode == null) {
-            throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
-        }
-        Set<Index> indices = new HashSet<>();
-        for (ShardRouting routing : newRoutingNode) {
-            indices.add(routing.index());
-        }
-        // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if
-        // we have it written on disk previously
-        for (IndexMetaData indexMetaData : state.metaData()) {
-            boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE);
-            // if the index is open we might still have to write the state if it just transitioned from closed to open
-            // so we have to check for that as well.
-            IndexMetaData previousMetaData = previousState.metaData().index(indexMetaData.getIndex());
-            if (previousMetaData != null) {
-                isOrWasClosed = isOrWasClosed || previousMetaData.getState().equals(IndexMetaData.State.CLOSE);
-            }
-            if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) {
-                indices.add(indexMetaData.getIndex());
-            }
-        }
-        return indices;
-    }
-
-    private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
-        Set<Index> relevantIndices = new HashSet<>();
-        // we have to iterate over the metadata to make sure we also capture closed indices
-        for (IndexMetaData indexMetaData : state.metaData()) {
-            relevantIndices.add(indexMetaData.getIndex());
-        }
-        return relevantIndices;
-    }
 
-    /**
-     * Action to perform with index metadata.
-     */
-    public interface IndexMetaDataAction {
-        /**
-         * @return index for index metadata.
-         */
-        Index getIndex();
-
-        /**
-         * Executes this action using provided {@link AtomicClusterStateWriter}.
-         *
-         * @return new index metadata state generation, to be used in manifest file.
-         * @throws WriteStateException if exception occurs.
-         */
-        long execute(AtomicClusterStateWriter writer) throws WriteStateException;
-    }
+    private static class GatewayClusterApplier implements ClusterStateApplier {
 
-    public static class KeepPreviousGeneration implements IndexMetaDataAction {
-        private final Index index;
-        private final long generation;
+        private final IncrementalClusterStateWriter incrementalClusterStateWriter;
 
-        KeepPreviousGeneration(Index index, long generation) {
-            this.index = index;
-            this.generation = generation;
+        private GatewayClusterApplier(IncrementalClusterStateWriter incrementalClusterStateWriter) {
+            this.incrementalClusterStateWriter = incrementalClusterStateWriter;
         }
 
         @Override
-        public Index getIndex() {
-            return index;
-        }
+        public void applyClusterState(ClusterChangedEvent event) {
+            if (event.state().blocks().disableStatePersistence()) {
+                incrementalClusterStateWriter.setIncrementalWrite(false);
+                return;
+            }
 
-        @Override
-        public long execute(AtomicClusterStateWriter writer) {
-            return generation;
+            try {
+                // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
+                // that's higher than the last accepted term.
+                // TODO: can we get rid of this hack?
+                if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
+                    incrementalClusterStateWriter.setCurrentTerm(event.state().term());
+                }
+
+                incrementalClusterStateWriter.updateClusterState(event.state(), event.previousState());
+                incrementalClusterStateWriter.setIncrementalWrite(true);
+            } catch (WriteStateException e) {
+                logger.warn("Exception occurred when storing new meta data", e);
+            }
         }
+
     }
 
-    public static class WriteNewIndexMetaData implements IndexMetaDataAction {
-        private final IndexMetaData indexMetaData;
+    private static class GatewayPersistedState implements PersistedState {
 
-        WriteNewIndexMetaData(IndexMetaData indexMetaData) {
-            this.indexMetaData = indexMetaData;
-        }
+        private final IncrementalClusterStateWriter incrementalClusterStateWriter;
 
-        @Override
-        public Index getIndex() {
-            return indexMetaData.getIndex();
+        GatewayPersistedState(IncrementalClusterStateWriter incrementalClusterStateWriter) {
+            this.incrementalClusterStateWriter = incrementalClusterStateWriter;
         }
 
         @Override
-        public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
-            return writer.writeIndex("freshly created", indexMetaData);
+        public long getCurrentTerm() {
+            return incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm();
         }
-    }
 
-    public static class WriteChangedIndexMetaData implements IndexMetaDataAction {
-        private final IndexMetaData newIndexMetaData;
-        private final IndexMetaData oldIndexMetaData;
-
-        WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) {
-            this.oldIndexMetaData = oldIndexMetaData;
-            this.newIndexMetaData = newIndexMetaData;
+        @Override
+        public ClusterState getLastAcceptedState() {
+            final ClusterState previousClusterState = incrementalClusterStateWriter.getPreviousClusterState();
+            assert previousClusterState.nodes().getLocalNode() != null : "Cluster state is not fully built yet";
+            return previousClusterState;
         }
 
         @Override
-        public Index getIndex() {
-            return newIndexMetaData.getIndex();
+        public void setCurrentTerm(long currentTerm) {
+            try {
+                incrementalClusterStateWriter.setCurrentTerm(currentTerm);
+            } catch (WriteStateException e) {
+                logger.error(new ParameterizedMessage("Failed to set current term to {}", currentTerm), e);
+                e.rethrowAsErrorOrUncheckedException();
+            }
         }
 
         @Override
-        public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
-            return writer.writeIndex(
-                    "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
-                    newIndexMetaData);
+        public void setLastAcceptedState(ClusterState clusterState) {
+            try {
+                final ClusterState previousClusterState = incrementalClusterStateWriter.getPreviousClusterState();
+                incrementalClusterStateWriter.setIncrementalWrite(previousClusterState.term() == clusterState.term());
+                incrementalClusterStateWriter.updateClusterState(clusterState, previousClusterState);
+            } catch (WriteStateException e) {
+                logger.error(new ParameterizedMessage("Failed to set last accepted state with version {}", clusterState.version()), e);
+                e.rethrowAsErrorOrUncheckedException();
+            }
         }
+
     }
+
 }

+ 384 - 0
server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java

@@ -0,0 +1,384 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.gateway;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.Manifest;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.index.Index;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata).
+ */
+class IncrementalClusterStateWriter {
+
+    private final MetaStateService metaStateService;
+
+    // On master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call
+    // updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's
+    // no need to synchronize access to these fields.
+    private Manifest previousManifest;
+    private ClusterState previousClusterState;
+    private boolean incrementalWrite;
+
+    IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) {
+        this.metaStateService = metaStateService;
+        this.previousManifest = manifest;
+        this.previousClusterState = clusterState;
+        this.incrementalWrite = false;
+    }
+
+    void setCurrentTerm(long currentTerm) throws WriteStateException {
+        Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(),
+            new HashMap<>(previousManifest.getIndexGenerations()));
+        metaStateService.writeManifestAndCleanup("current term changed", manifest);
+        previousManifest = manifest;
+    }
+
+    Manifest getPreviousManifest() {
+        return previousManifest;
+    }
+
+    ClusterState getPreviousClusterState() {
+        return previousClusterState;
+    }
+
+    void setIncrementalWrite(boolean incrementalWrite) {
+        this.incrementalWrite = incrementalWrite;
+    }
+
+    /**
+     * Updates manifest and meta data on disk.
+     *
+     * @param newState new {@link ClusterState}
+     * @param previousState previous {@link ClusterState}
+     *
+     * @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}.
+     */
+    void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException {
+        MetaData newMetaData = newState.metaData();
+
+        final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
+        long globalStateGeneration = writeGlobalState(writer, newMetaData);
+        Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
+        Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
+        writeManifest(writer, manifest);
+
+        previousManifest = manifest;
+        previousClusterState = newState;
+    }
+
+    private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
+        if (manifest.equals(previousManifest) == false) {
+            writer.writeManifestAndCleanup("changed", manifest);
+        }
+    }
+
+    private Map<Index, Long> writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState)
+        throws WriteStateException {
+        Map<Index, Long> previouslyWrittenIndices = previousManifest.getIndexGenerations();
+        Set<Index> relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet());
+
+        Map<Index, Long> newIndices = new HashMap<>();
+
+        MetaData previousMetaData = incrementalWrite ? previousState.metaData() : null;
+        Iterable<IndexMetaDataAction> actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData,
+            newState.metaData());
+
+        for (IndexMetaDataAction action : actions) {
+            long generation = action.execute(writer);
+            newIndices.put(action.getIndex(), generation);
+        }
+
+        return newIndices;
+    }
+
+    private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData) throws WriteStateException {
+        if (incrementalWrite == false || MetaData.isGlobalStateEquals(previousClusterState.metaData(), newMetaData) == false) {
+            return writer.writeGlobalState("changed", newMetaData);
+        }
+        return previousManifest.getGlobalGeneration();
+    }
+
+
+    /**
+     * Returns list of {@link IndexMetaDataAction} for each relevant index.
+     * For each relevant index there are 3 options:
+     * <ol>
+     * <li>
+     * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no
+     * action is required.
+     * </li>
+     * <li>
+     * {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written.
+     * </li>
+     * <li>
+     * {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated
+     * index metadata should be written to disk.
+     * </li>
+     * </ol>
+     *
+     * @param previouslyWrittenIndices A list of indices for which the state was already written before
+     * @param relevantIndices          The list of indices for which state should potentially be written
+     * @param previousMetaData         The last meta data we know of
+     * @param newMetaData              The new metadata
+     * @return list of {@link IndexMetaDataAction} for each relevant index.
+     */
+    // exposed for tests
+    static List<IndexMetaDataAction> resolveIndexMetaDataActions(Map<Index, Long> previouslyWrittenIndices,
+                                                                 Set<Index> relevantIndices,
+                                                                 MetaData previousMetaData,
+                                                                 MetaData newMetaData) {
+        List<IndexMetaDataAction> actions = new ArrayList<>();
+        for (Index index : relevantIndices) {
+            IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index);
+            IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
+
+            if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) {
+                actions.add(new WriteNewIndexMetaData(newIndexMetaData));
+            } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) {
+                actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData));
+            } else {
+                actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index)));
+            }
+        }
+        return actions;
+    }
+
+    private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index>
+        previouslyWrittenIndices) {
+        RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
+        if (newRoutingNode == null) {
+            throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
+        }
+        Set<Index> indices = new HashSet<>();
+        for (ShardRouting routing : newRoutingNode) {
+            indices.add(routing.index());
+        }
+        // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if
+        // we have it written on disk previously
+        for (IndexMetaData indexMetaData : state.metaData()) {
+            boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE);
+            // if the index is open we might still have to write the state if it just transitioned from closed to open
+            // so we have to check for that as well.
+            IndexMetaData previousMetaData = previousState.metaData().index(indexMetaData.getIndex());
+            if (previousMetaData != null) {
+                isOrWasClosed = isOrWasClosed || previousMetaData.getState().equals(IndexMetaData.State.CLOSE);
+            }
+            if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) {
+                indices.add(indexMetaData.getIndex());
+            }
+        }
+        return indices;
+    }
+
+    private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
+        Set<Index> relevantIndices = new HashSet<>();
+        // we have to iterate over the metadata to make sure we also capture closed indices
+        for (IndexMetaData indexMetaData : state.metaData()) {
+            relevantIndices.add(indexMetaData.getIndex());
+        }
+        return relevantIndices;
+    }
+
+    // exposed for tests
+    static Set<Index> getRelevantIndices(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
+        Set<Index> relevantIndices;
+        if (isDataOnlyNode(state)) {
+            relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices);
+        } else if (state.nodes().getLocalNode().isMasterNode()) {
+            relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
+        } else {
+            relevantIndices = Collections.emptySet();
+        }
+        return relevantIndices;
+    }
+
+    private static boolean isDataOnlyNode(ClusterState state) {
+        return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
+    }
+
+    /**
+     * Action to perform with index metadata.
+     */
+    interface IndexMetaDataAction {
+        /**
+         * @return index for index metadata.
+         */
+        Index getIndex();
+
+        /**
+         * Executes this action using provided {@link AtomicClusterStateWriter}.
+         *
+         * @return new index metadata state generation, to be used in manifest file.
+         * @throws WriteStateException if exception occurs.
+         */
+        long execute(AtomicClusterStateWriter writer) throws WriteStateException;
+    }
+
+    /**
+     * This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk.
+     * This class delegates <code>write*</code> calls to corresponding write calls in {@link MetaStateService} and
+     * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails.
+     */
+    static class AtomicClusterStateWriter {
+        private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished";
+        private final List<Runnable> commitCleanupActions;
+        private final List<Runnable> rollbackCleanupActions;
+        private final Manifest previousManifest;
+        private final MetaStateService metaStateService;
+        private boolean finished;
+
+        AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
+            this.metaStateService = metaStateService;
+            assert previousManifest != null;
+            this.previousManifest = previousManifest;
+            this.commitCleanupActions = new ArrayList<>();
+            this.rollbackCleanupActions = new ArrayList<>();
+            this.finished = false;
+        }
+
+        long writeGlobalState(String reason, MetaData metaData) throws WriteStateException {
+            assert finished == false : FINISHED_MSG;
+            try {
+                rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
+                long generation = metaStateService.writeGlobalState(reason, metaData);
+                commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
+                return generation;
+            } catch (WriteStateException e) {
+                rollback();
+                throw e;
+            }
+        }
+
+        long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException {
+            assert finished == false : FINISHED_MSG;
+            try {
+                Index index = metaData.getIndex();
+                Long previousGeneration = previousManifest.getIndexGenerations().get(index);
+                if (previousGeneration != null) {
+                    // we prefer not to clean-up index metadata in case of rollback,
+                    // if it's not referenced by previous manifest file
+                    // not to break dangling indices functionality
+                    rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration));
+                }
+                long generation = metaStateService.writeIndex(reason, metaData);
+                commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation));
+                return generation;
+            } catch (WriteStateException e) {
+                rollback();
+                throw e;
+            }
+        }
+
+        void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
+            assert finished == false : FINISHED_MSG;
+            try {
+                metaStateService.writeManifestAndCleanup(reason, manifest);
+                commitCleanupActions.forEach(Runnable::run);
+                finished = true;
+            } catch (WriteStateException e) {
+                // If the Manifest write results in a dirty WriteStateException it's not safe to roll back, removing the new metadata files,
+                // because if the Manifest was actually written to disk and its deletion fails it will reference these new metadata files.
+                // On master-eligible nodes a dirty WriteStateException here is fatal to the node since we no longer really have any idea
+                // what the state on disk is and the only sensible response is to start again from scratch.
+                if (e.isDirty() == false) {
+                    rollback();
+                }
+                throw e;
+            }
+        }
+
+        void rollback() {
+            rollbackCleanupActions.forEach(Runnable::run);
+            finished = true;
+        }
+    }
+
+    static class KeepPreviousGeneration implements IndexMetaDataAction {
+        private final Index index;
+        private final long generation;
+
+        KeepPreviousGeneration(Index index, long generation) {
+            this.index = index;
+            this.generation = generation;
+        }
+
+        @Override
+        public Index getIndex() {
+            return index;
+        }
+
+        @Override
+        public long execute(AtomicClusterStateWriter writer) {
+            return generation;
+        }
+    }
+
+    static class WriteNewIndexMetaData implements IndexMetaDataAction {
+        private final IndexMetaData indexMetaData;
+
+        WriteNewIndexMetaData(IndexMetaData indexMetaData) {
+            this.indexMetaData = indexMetaData;
+        }
+
+        @Override
+        public Index getIndex() {
+            return indexMetaData.getIndex();
+        }
+
+        @Override
+        public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
+            return writer.writeIndex("freshly created", indexMetaData);
+        }
+    }
+
+    static class WriteChangedIndexMetaData implements IndexMetaDataAction {
+        private final IndexMetaData newIndexMetaData;
+        private final IndexMetaData oldIndexMetaData;
+
+        WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) {
+            this.oldIndexMetaData = oldIndexMetaData;
+            this.newIndexMetaData = newIndexMetaData;
+        }
+
+        @Override
+        public Index getIndex() {
+            return newIndexMetaData.getIndex();
+        }
+
+        @Override
+        public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
+            return writer.writeIndex(
+                    "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
+                    newIndexMetaData);
+        }
+    }
+}

+ 2 - 2
server/src/main/java/org/elasticsearch/node/Node.java

@@ -479,7 +479,7 @@ public class Node implements Closeable {
             ).collect(Collectors.toSet());
             final TransportService transportService = newTransportService(settings, transport, threadPool,
                 networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
-            final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService);
+            final GatewayMetaState gatewayMetaState = new GatewayMetaState();
             final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
             final SearchTransportService searchTransportService =  new SearchTransportService(transportService,
                 SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
@@ -693,7 +693,7 @@ public class Node implements Closeable {
 
         // Load (and maybe upgrade) the metadata stored on disk
         final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
-        gatewayMetaState.start(transportService, clusterService,
+        gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),
             injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class));
         // we load the global state here (the persistent part of the cluster state stored on disk) to
         // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.

+ 26 - 22
server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

@@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.CoordinationMetaData;
 import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
+import org.elasticsearch.cluster.coordination.CoordinationState;
+import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.Manifest;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -35,10 +37,10 @@ import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.test.ESTestCase;
 
-import java.io.IOException;
 import java.util.Collections;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 
 public class GatewayMetaStatePersistedStateTests extends ESTestCase {
@@ -63,21 +65,23 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         super.tearDown();
     }
 
-    private MockGatewayMetaState newGateway() {
-        final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
-        gateway.start();
-        return gateway;
+    private CoordinationState.PersistedState newGatewayPersistedState() {
+        final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
+        gateway.start(settings, nodeEnvironment, xContentRegistry());
+        final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
+        assertThat(persistedState, not(instanceOf(InMemoryPersistedState.class)));
+        return persistedState;
     }
 
-    private MockGatewayMetaState maybeNew(MockGatewayMetaState gateway) throws IOException {
+    private CoordinationState.PersistedState maybeNew(CoordinationState.PersistedState persistedState) {
         if (randomBoolean()) {
-            return newGateway();
+            return newGatewayPersistedState();
         }
-        return gateway;
+        return persistedState;
     }
 
-    public void testInitialState() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testInitialState() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
         ClusterState state = gateway.getLastAcceptedState();
         assertThat(state.getClusterName(), equalTo(clusterName));
         assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA));
@@ -88,8 +92,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         assertThat(currentTerm, equalTo(Manifest.empty().getCurrentTerm()));
     }
 
-    public void testSetCurrentTerm() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testSetCurrentTerm() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
 
         for (int i = 0; i < randomIntBetween(1, 5); i++) {
             final long currentTerm = randomNonNegativeLong();
@@ -142,8 +146,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         }
     }
 
-    public void testSetLastAcceptedState() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testSetLastAcceptedState() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
         final long term = randomNonNegativeLong();
 
         for (int i = 0; i < randomIntBetween(1, 5); i++) {
@@ -165,8 +169,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         }
     }
 
-    public void testSetLastAcceptedStateTermChanged() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testSetLastAcceptedStateTermChanged() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
 
         final String indexName = randomAlphaOfLength(10);
         final int numberOfShards = randomIntBetween(1, 5);
@@ -178,7 +182,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         gateway.setLastAcceptedState(state);
 
         gateway = maybeNew(gateway);
-        final long newTerm = randomValueOtherThan(term, () -> randomNonNegativeLong());
+        final long newTerm = randomValueOtherThan(term, ESTestCase::randomNonNegativeLong);
         final int newNumberOfShards = randomValueOtherThan(numberOfShards, () -> randomIntBetween(1,5));
         final IndexMetaData newIndexMetaData = createIndexMetaData(indexName, newNumberOfShards, version);
         final ClusterState newClusterState = createClusterState(randomNonNegativeLong(),
@@ -189,11 +193,11 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         assertThat(gateway.getLastAcceptedState().metaData().index(indexName), equalTo(newIndexMetaData));
     }
 
-    public void testCurrentTermAndTermAreDifferent() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testCurrentTermAndTermAreDifferent() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
 
         long currentTerm = randomNonNegativeLong();
-        long term  = randomValueOtherThan(currentTerm, () -> randomNonNegativeLong());
+        long term  = randomValueOtherThan(currentTerm, ESTestCase::randomNonNegativeLong);
 
         gateway.setCurrentTerm(currentTerm);
         gateway.setLastAcceptedState(createClusterState(randomNonNegativeLong(),
@@ -204,8 +208,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         assertThat(gateway.getLastAcceptedState().coordinationMetaData().term(), equalTo(term));
     }
 
-    public void testMarkAcceptedConfigAsCommitted() throws IOException {
-        MockGatewayMetaState gateway = newGateway();
+    public void testMarkAcceptedConfigAsCommitted() {
+        CoordinationState.PersistedState gateway = newGatewayPersistedState();
 
         //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration
         CoordinationMetaData coordinationMetaData;

+ 2 - 395
server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java

@@ -19,417 +19,24 @@
 
 package org.elasticsearch.gateway;
 
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MockDirectoryWrapper;
 import org.elasticsearch.Version;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ESAllocationTestCase;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
-import org.elasticsearch.cluster.metadata.Manifest;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
-import org.elasticsearch.cluster.node.DiscoveryNodeRole;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.routing.RoutingTable;
-import org.elasticsearch.cluster.routing.allocation.AllocationService;
-import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.plugins.MetaDataUpgrader;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TestCustomMetaData;
-import org.mockito.ArgumentCaptor;
 
-import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class GatewayMetaStateTests extends ESAllocationTestCase {
-
-    private ClusterState noIndexClusterState(boolean masterEligible) {
-        MetaData metaData = MetaData.builder().build();
-        RoutingTable routingTable = RoutingTable.builder().build();
-
-        return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
-                .metaData(metaData)
-                .routingTable(routingTable)
-                .nodes(generateDiscoveryNodes(masterEligible))
-                .build();
-    }
-
-    private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
-        MetaData metaData = MetaData.builder()
-                .put(indexMetaData, false)
-                .build();
-
-        RoutingTable routingTable = RoutingTable.builder()
-                .addAsNew(metaData.index("test"))
-                .build();
-
-        return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
-                .metaData(metaData)
-                .routingTable(routingTable)
-                .nodes(generateDiscoveryNodes(masterEligible))
-                .build();
-    }
-
-    private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
-        AllocationService strategy = createAllocationService(Settings.builder()
-                .put("cluster.routing.allocation.node_concurrent_recoveries", 100)
-                .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
-                .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
-                .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
-                .build());
-
-        ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible);
-        RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable();
-
-        MetaData metaDataNewClusterState = MetaData.builder()
-                .put(oldClusterState.metaData().index("test"), false)
-                .build();
-
-        return ClusterState.builder(oldClusterState).routingTable(routingTable)
-                .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
-    }
-
-    private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
-        ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible);
-
-        MetaData metaDataNewClusterState = MetaData.builder()
-                .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE)
-                        .numberOfShards(5).numberOfReplicas(2))
-                .version(oldClusterState.metaData().version() + 1)
-                .build();
-        RoutingTable routingTable = RoutingTable.builder()
-                .addAsNew(metaDataNewClusterState.index("test"))
-                .build();
-
-        return ClusterState.builder(oldClusterState).routingTable(routingTable)
-                .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
-    }
-
-    private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
-        ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible);
-
-        MetaData metaDataNewClusterState = MetaData.builder()
-                .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN)
-                        .numberOfShards(5).numberOfReplicas(2))
-                .version(oldClusterState.metaData().version() + 1)
-                .build();
-
-        return ClusterState.builder(oldClusterState)
-                .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
-    }
-
-    private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
-        Set<DiscoveryNodeRole> dataOnlyRoles = Set.of(DiscoveryNodeRole.DATA_ROLE);
-        return DiscoveryNodes.builder().add(newNode("node1", masterEligible ? MASTER_DATA_ROLES : dataOnlyRoles))
-                .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
-    }
-
-    private Set<Index> randomPrevWrittenIndices(IndexMetaData indexMetaData) {
-        if (randomBoolean()) {
-            return Collections.singleton(indexMetaData.getIndex());
-        } else {
-            return Collections.emptySet();
-        }
-    }
-
-    private IndexMetaData createIndexMetaData(String name) {
-        return IndexMetaData.builder(name).
-                settings(settings(Version.CURRENT)).
-                numberOfShards(5).
-                numberOfReplicas(2).
-                build();
-    }
-
-    public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithUnassignedIndex(indexMetaData, true),
-                noIndexClusterState(true),
-                randomPrevWrittenIndices(indexMetaData));
-        assertThat(indices.size(), equalTo(1));
-    }
-
-    public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithUnassignedIndex(indexMetaData, false),
-                noIndexClusterState(false),
-                randomPrevWrittenIndices(indexMetaData));
-        assertThat(indices.size(), equalTo(0));
-    }
-
-    public void testGetRelevantIndicesWithAssignedShards() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        boolean masterEligible = randomBoolean();
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithAssignedIndex(indexMetaData, masterEligible),
-                clusterStateWithUnassignedIndex(indexMetaData, masterEligible),
-                randomPrevWrittenIndices(indexMetaData));
-        assertThat(indices.size(), equalTo(1));
-    }
-
-    public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithClosedIndex(indexMetaData, false),
-                clusterStateWithAssignedIndex(indexMetaData, false),
-                Collections.singleton(indexMetaData.getIndex()));
-        assertThat(indices.size(), equalTo(1));
-    }
-
-    public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithJustOpenedIndex(indexMetaData, false),
-                clusterStateWithClosedIndex(indexMetaData, false),
-                Collections.emptySet());
-        assertThat(indices.size(), equalTo(0));
-    }
-
-    public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() {
-        IndexMetaData indexMetaData = createIndexMetaData("test");
-        Set<Index> indices = GatewayMetaState.getRelevantIndices(
-                clusterStateWithJustOpenedIndex(indexMetaData, false),
-                clusterStateWithClosedIndex(indexMetaData, false),
-                Collections.singleton(indexMetaData.getIndex()));
-        assertThat(indices.size(), equalTo(1));
-    }
-
-    public void testResolveStatesToBeWritten() throws WriteStateException {
-        Map<Index, Long> indices = new HashMap<>();
-        Set<Index> relevantIndices = new HashSet<>();
-
-        IndexMetaData removedIndex = createIndexMetaData("removed_index");
-        indices.put(removedIndex.getIndex(), 1L);
-
-        IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index");
-        indices.put(versionChangedIndex.getIndex(), 2L);
-        relevantIndices.add(versionChangedIndex.getIndex());
 
-        IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index");
-        indices.put(notChangedIndex.getIndex(), 3L);
-        relevantIndices.add(notChangedIndex.getIndex());
-
-        IndexMetaData newIndex = createIndexMetaData("new_index");
-        relevantIndices.add(newIndex.getIndex());
-
-        MetaData oldMetaData = MetaData.builder()
-                .put(removedIndex, false)
-                .put(versionChangedIndex, false)
-                .put(notChangedIndex, false)
-                .build();
-
-        MetaData newMetaData = MetaData.builder()
-                .put(versionChangedIndex, true)
-                .put(notChangedIndex, false)
-                .put(newIndex, false)
-                .build();
-
-        IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex());
-
-        List<GatewayMetaState.IndexMetaDataAction> actions =
-                GatewayMetaState.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData);
-
-        assertThat(actions, hasSize(3));
-
-        for (GatewayMetaState.IndexMetaDataAction action : actions) {
-            if (action instanceof GatewayMetaState.KeepPreviousGeneration) {
-                assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
-                GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
-                assertThat(action.execute(writer), equalTo(3L));
-                verifyZeroInteractions(writer);
-            }
-            if (action instanceof GatewayMetaState.WriteNewIndexMetaData) {
-                assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
-                GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
-                when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
-                assertThat(action.execute(writer), equalTo(0L));
-            }
-            if (action instanceof GatewayMetaState.WriteChangedIndexMetaData) {
-                assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
-                GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class);
-                when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L);
-                assertThat(action.execute(writer), equalTo(3L));
-                ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
-                verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
-                assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
-                assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
-            }
-        }
-    }
-
-    private static class MetaStateServiceWithFailures extends MetaStateService {
-        private final int invertedFailRate;
-        private boolean failRandomly;
-
-        private <T> MetaDataStateFormat<T> wrap(MetaDataStateFormat<T> format) {
-            return new MetaDataStateFormat<T>(format.getPrefix()) {
-                @Override
-                public void toXContent(XContentBuilder builder, T state) throws IOException {
-                   format.toXContent(builder, state);
-                }
-
-                @Override
-                public T fromXContent(XContentParser parser) throws IOException {
-                    return format.fromXContent(parser);
-                }
-
-                @Override
-                protected Directory newDirectory(Path dir) {
-                    MockDirectoryWrapper mock = newMockFSDirectory(dir);
-                    if (failRandomly) {
-                        MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
-                            @Override
-                            public void eval(MockDirectoryWrapper dir) throws IOException {
-                                int r = randomIntBetween(0, invertedFailRate);
-                                if (r == 0) {
-                                    throw new MockDirectoryWrapper.FakeIOException();
-                                }
-                            }
-                        };
-                        mock.failOn(fail);
-                    }
-                    closeAfterSuite(mock);
-                    return mock;
-                }
-            };
-        }
-
-        MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) {
-            super(nodeEnv, namedXContentRegistry);
-            META_DATA_FORMAT = wrap(MetaData.FORMAT);
-            INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT);
-            MANIFEST_FORMAT = wrap(Manifest.FORMAT);
-            failRandomly = false;
-            this.invertedFailRate = invertedFailRate;
-        }
-
-        void failRandomly() {
-            failRandomly = true;
-        }
-
-        void noFailures() {
-            failRandomly = false;
-        }
-    }
-
-    private boolean metaDataEquals(MetaData md1, MetaData md2) {
-        boolean equals = MetaData.isGlobalStateEquals(md1, md2);
-
-        for (IndexMetaData imd : md1) {
-            IndexMetaData imd2 = md2.index(imd.getIndex());
-            equals = equals && imd.equals(imd2);
-        }
-
-        for (IndexMetaData imd : md2) {
-            IndexMetaData imd2 = md1.index(imd.getIndex());
-            equals = equals && imd.equals(imd2);
-        }
-        return equals;
-    }
-
-    private static MetaData randomMetaDataForTx() {
-        int settingNo = randomIntBetween(0, 10);
-        MetaData.Builder builder = MetaData.builder()
-                .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build());
-        int numOfIndices = randomIntBetween(0, 3);
-
-        for (int i = 0; i < numOfIndices; i++) {
-            int indexNo = randomIntBetween(0, 50);
-            IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings(
-                    Settings.builder()
-                            .put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo)
-                            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                            .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
-                            .build()
-            ).build();
-            builder.put(indexMetaData, false);
-        }
-        return builder.build();
-    }
-
-    public void testAtomicityWithFailures() throws IOException {
-        try (NodeEnvironment env = newNodeEnvironment()) {
-            MetaStateServiceWithFailures metaStateService =
-                    new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry());
-
-            // We only guarantee atomicity of writes, if there is initial Manifest file
-            Manifest manifest = Manifest.empty();
-            MetaData metaData = MetaData.EMPTY_META_DATA;
-            metaStateService.writeManifestAndCleanup("startup", Manifest.empty());
-            long currentTerm = randomNonNegativeLong();
-            long clusterStateVersion = randomNonNegativeLong();
-
-            metaStateService.failRandomly();
-            Set<MetaData> possibleMetaData = new HashSet<>();
-            possibleMetaData.add(metaData);
-
-            for (int i = 0; i < randomIntBetween(1, 5); i++) {
-                GatewayMetaState.AtomicClusterStateWriter writer =
-                        new GatewayMetaState.AtomicClusterStateWriter(metaStateService, manifest);
-                metaData = randomMetaDataForTx();
-                Map<Index, Long> indexGenerations = new HashMap<>();
-
-                try {
-                    long globalGeneration = writer.writeGlobalState("global", metaData);
-
-                    for (IndexMetaData indexMetaData : metaData) {
-                        long generation = writer.writeIndex("index", indexMetaData);
-                        indexGenerations.put(indexMetaData.getIndex(), generation);
-                    }
-
-                    Manifest newManifest = new Manifest(currentTerm, clusterStateVersion, globalGeneration, indexGenerations);
-                    writer.writeManifestAndCleanup("manifest", newManifest);
-                    possibleMetaData.clear();
-                    possibleMetaData.add(metaData);
-                    manifest = newManifest;
-                } catch (WriteStateException e) {
-                    if (e.isDirty()) {
-                        possibleMetaData.add(metaData);
-                        /*
-                         * If dirty WriteStateException occurred, it's only safe to proceed if there is subsequent
-                         * successful write of metadata and Manifest. We prefer to break here, not to over complicate test logic.
-                         * See also MetaDataStateFormat#testFailRandomlyAndReadAnyState, that does not break.
-                         */
-                        break;
-                    }
-                }
-            }
-
-            metaStateService.noFailures();
-
-            Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
-            MetaData loadedMetaData = manifestAndMetaData.v2();
-
-            assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
-        }
-    }
+public class GatewayMetaStateTests extends ESTestCase {
 
     public void testAddCustomMetaDataOnUpgrade() throws Exception {
         MetaData metaData = randomMetaData();

+ 429 - 0
server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java

@@ -0,0 +1,429 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.gateway;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ESAllocationTestCase;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.Manifest;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.Index;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
+
+    private ClusterState noIndexClusterState(boolean masterEligible) {
+        MetaData metaData = MetaData.builder().build();
+        RoutingTable routingTable = RoutingTable.builder().build();
+
+        return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .metaData(metaData)
+            .routingTable(routingTable)
+            .nodes(generateDiscoveryNodes(masterEligible))
+            .build();
+    }
+
+    private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
+        MetaData metaData = MetaData.builder()
+            .put(indexMetaData, false)
+            .build();
+
+        RoutingTable routingTable = RoutingTable.builder()
+            .addAsNew(metaData.index("test"))
+            .build();
+
+        return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .metaData(metaData)
+            .routingTable(routingTable)
+            .nodes(generateDiscoveryNodes(masterEligible))
+            .build();
+    }
+
+    private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
+        AllocationService strategy = createAllocationService(Settings.builder()
+            .put("cluster.routing.allocation.node_concurrent_recoveries", 100)
+            .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
+            .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
+            .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
+            .build());
+
+        ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible);
+        RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable();
+
+        MetaData metaDataNewClusterState = MetaData.builder()
+            .put(oldClusterState.metaData().index("test"), false)
+            .build();
+
+        return ClusterState.builder(oldClusterState).routingTable(routingTable)
+            .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
+    }
+
+    private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
+        ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible);
+
+        MetaData metaDataNewClusterState = MetaData.builder()
+            .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE)
+                .numberOfShards(5).numberOfReplicas(2))
+            .version(oldClusterState.metaData().version() + 1)
+            .build();
+        RoutingTable routingTable = RoutingTable.builder()
+            .addAsNew(metaDataNewClusterState.index("test"))
+            .build();
+
+        return ClusterState.builder(oldClusterState).routingTable(routingTable)
+            .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
+    }
+
+    private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
+        ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible);
+
+        MetaData metaDataNewClusterState = MetaData.builder()
+            .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN)
+                .numberOfShards(5).numberOfReplicas(2))
+            .version(oldClusterState.metaData().version() + 1)
+            .build();
+
+        return ClusterState.builder(oldClusterState)
+            .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
+    }
+
+    private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
+        Set<DiscoveryNodeRole> dataOnlyRoles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE);
+        return DiscoveryNodes.builder().add(newNode("node1", masterEligible ? MASTER_DATA_ROLES : dataOnlyRoles))
+            .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
+    }
+
+    private Set<Index> randomPrevWrittenIndices(IndexMetaData indexMetaData) {
+        if (randomBoolean()) {
+            return Collections.singleton(indexMetaData.getIndex());
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    private IndexMetaData createIndexMetaData(String name) {
+        return IndexMetaData.builder(name).
+            settings(settings(Version.CURRENT)).
+            numberOfShards(5).
+            numberOfReplicas(2).
+            build();
+    }
+
+    public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithUnassignedIndex(indexMetaData, true),
+            noIndexClusterState(true),
+            randomPrevWrittenIndices(indexMetaData));
+        assertThat(indices.size(), equalTo(1));
+    }
+
+    public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithUnassignedIndex(indexMetaData, false),
+            noIndexClusterState(false),
+            randomPrevWrittenIndices(indexMetaData));
+        assertThat(indices.size(), equalTo(0));
+    }
+
+    public void testGetRelevantIndicesWithAssignedShards() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        boolean masterEligible = randomBoolean();
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithAssignedIndex(indexMetaData, masterEligible),
+            clusterStateWithUnassignedIndex(indexMetaData, masterEligible),
+            randomPrevWrittenIndices(indexMetaData));
+        assertThat(indices.size(), equalTo(1));
+    }
+
+    public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithClosedIndex(indexMetaData, false),
+            clusterStateWithAssignedIndex(indexMetaData, false),
+            Collections.singleton(indexMetaData.getIndex()));
+        assertThat(indices.size(), equalTo(1));
+    }
+
+    public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithJustOpenedIndex(indexMetaData, false),
+            clusterStateWithClosedIndex(indexMetaData, false),
+            Collections.emptySet());
+        assertThat(indices.size(), equalTo(0));
+    }
+
+    public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() {
+        IndexMetaData indexMetaData = createIndexMetaData("test");
+        Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
+            clusterStateWithJustOpenedIndex(indexMetaData, false),
+            clusterStateWithClosedIndex(indexMetaData, false),
+            Collections.singleton(indexMetaData.getIndex()));
+        assertThat(indices.size(), equalTo(1));
+    }
+
+    public void testResolveStatesToBeWritten() throws WriteStateException {
+        Map<Index, Long> indices = new HashMap<>();
+        Set<Index> relevantIndices = new HashSet<>();
+
+        IndexMetaData removedIndex = createIndexMetaData("removed_index");
+        indices.put(removedIndex.getIndex(), 1L);
+
+        IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index");
+        indices.put(versionChangedIndex.getIndex(), 2L);
+        relevantIndices.add(versionChangedIndex.getIndex());
+
+        IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index");
+        indices.put(notChangedIndex.getIndex(), 3L);
+        relevantIndices.add(notChangedIndex.getIndex());
+
+        IndexMetaData newIndex = createIndexMetaData("new_index");
+        relevantIndices.add(newIndex.getIndex());
+
+        MetaData oldMetaData = MetaData.builder()
+            .put(removedIndex, false)
+            .put(versionChangedIndex, false)
+            .put(notChangedIndex, false)
+            .build();
+
+        MetaData newMetaData = MetaData.builder()
+            .put(versionChangedIndex, true)
+            .put(notChangedIndex, false)
+            .put(newIndex, false)
+            .build();
+
+        IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex());
+
+        List<IncrementalClusterStateWriter.IndexMetaDataAction> actions =
+            IncrementalClusterStateWriter.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData);
+
+        assertThat(actions, hasSize(3));
+
+        for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) {
+            if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) {
+                assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
+                IncrementalClusterStateWriter.AtomicClusterStateWriter writer
+                    = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
+                assertThat(action.execute(writer), equalTo(3L));
+                verifyZeroInteractions(writer);
+            }
+            if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) {
+                assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
+                IncrementalClusterStateWriter.AtomicClusterStateWriter writer
+                    = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
+                when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
+                assertThat(action.execute(writer), equalTo(0L));
+            }
+            if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) {
+                assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
+                IncrementalClusterStateWriter.AtomicClusterStateWriter writer
+                    = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
+                when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L);
+                assertThat(action.execute(writer), equalTo(3L));
+                ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
+                verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
+                assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
+                assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
+            }
+        }
+    }
+
+    private static class MetaStateServiceWithFailures extends MetaStateService {
+        private final int invertedFailRate;
+        private boolean failRandomly;
+
+        private <T> MetaDataStateFormat<T> wrap(MetaDataStateFormat<T> format) {
+            return new MetaDataStateFormat<T>(format.getPrefix()) {
+                @Override
+                public void toXContent(XContentBuilder builder, T state) throws IOException {
+                    format.toXContent(builder, state);
+                }
+
+                @Override
+                public T fromXContent(XContentParser parser) throws IOException {
+                    return format.fromXContent(parser);
+                }
+
+                @Override
+                protected Directory newDirectory(Path dir) {
+                    MockDirectoryWrapper mock = newMockFSDirectory(dir);
+                    if (failRandomly) {
+                        MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
+                            @Override
+                            public void eval(MockDirectoryWrapper dir) throws IOException {
+                                int r = randomIntBetween(0, invertedFailRate);
+                                if (r == 0) {
+                                    throw new MockDirectoryWrapper.FakeIOException();
+                                }
+                            }
+                        };
+                        mock.failOn(fail);
+                    }
+                    closeAfterSuite(mock);
+                    return mock;
+                }
+            };
+        }
+
+        MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) {
+            super(nodeEnv, namedXContentRegistry);
+            META_DATA_FORMAT = wrap(MetaData.FORMAT);
+            INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT);
+            MANIFEST_FORMAT = wrap(Manifest.FORMAT);
+            failRandomly = false;
+            this.invertedFailRate = invertedFailRate;
+        }
+
+        void failRandomly() {
+            failRandomly = true;
+        }
+
+        void noFailures() {
+            failRandomly = false;
+        }
+    }
+
+    private boolean metaDataEquals(MetaData md1, MetaData md2) {
+        boolean equals = MetaData.isGlobalStateEquals(md1, md2);
+
+        for (IndexMetaData imd : md1) {
+            IndexMetaData imd2 = md2.index(imd.getIndex());
+            equals = equals && imd.equals(imd2);
+        }
+
+        for (IndexMetaData imd : md2) {
+            IndexMetaData imd2 = md1.index(imd.getIndex());
+            equals = equals && imd.equals(imd2);
+        }
+        return equals;
+    }
+
+    private static MetaData randomMetaDataForTx() {
+        int settingNo = randomIntBetween(0, 10);
+        MetaData.Builder builder = MetaData.builder()
+            .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build());
+        int numOfIndices = randomIntBetween(0, 3);
+
+        for (int i = 0; i < numOfIndices; i++) {
+            int indexNo = randomIntBetween(0, 50);
+            IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings(
+                Settings.builder()
+                    .put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo)
+                    .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                    .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                    .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                    .build()
+            ).build();
+            builder.put(indexMetaData, false);
+        }
+        return builder.build();
+    }
+
+    public void testAtomicityWithFailures() throws IOException {
+        try (NodeEnvironment env = newNodeEnvironment()) {
+            MetaStateServiceWithFailures metaStateService =
+                new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry());
+
+            // We only guarantee atomicity of writes, if there is initial Manifest file
+            Manifest manifest = Manifest.empty();
+            MetaData metaData = MetaData.EMPTY_META_DATA;
+            metaStateService.writeManifestAndCleanup("startup", Manifest.empty());
+            long currentTerm = randomNonNegativeLong();
+            long clusterStateVersion = randomNonNegativeLong();
+
+            metaStateService.failRandomly();
+            Set<MetaData> possibleMetaData = new HashSet<>();
+            possibleMetaData.add(metaData);
+
+            for (int i = 0; i < randomIntBetween(1, 5); i++) {
+                IncrementalClusterStateWriter.AtomicClusterStateWriter writer =
+                    new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest);
+                metaData = randomMetaDataForTx();
+                Map<Index, Long> indexGenerations = new HashMap<>();
+
+                try {
+                    long globalGeneration = writer.writeGlobalState("global", metaData);
+
+                    for (IndexMetaData indexMetaData : metaData) {
+                        long generation = writer.writeIndex("index", indexMetaData);
+                        indexGenerations.put(indexMetaData.getIndex(), generation);
+                    }
+
+                    Manifest newManifest = new Manifest(currentTerm, clusterStateVersion, globalGeneration, indexGenerations);
+                    writer.writeManifestAndCleanup("manifest", newManifest);
+                    possibleMetaData.clear();
+                    possibleMetaData.add(metaData);
+                    manifest = newManifest;
+                } catch (WriteStateException e) {
+                    if (e.isDirty()) {
+                        possibleMetaData.add(metaData);
+                        /*
+                         * If dirty WriteStateException occurred, it's only safe to proceed if there is subsequent
+                         * successful write of metadata and Manifest. We prefer to break here, not to over complicate test logic.
+                         * See also MetaDataStateFormat#testFailRandomlyAndReadAnyState, that does not break.
+                         */
+                        break;
+                    }
+                }
+            }
+
+            metaStateService.noFailures();
+
+            Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
+            MetaData loadedMetaData = manifestAndMetaData.v2();
+
+            assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
+        }
+    }
+}

+ 4 - 6
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -703,9 +703,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     if (rarely()) {
                         nodeEnvironment = newNodeEnvironment();
                         nodeEnvironments.add(nodeEnvironment);
-                        final MockGatewayMetaState gatewayMetaState
-                            = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode);
-                        gatewayMetaState.start();
+                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode);
+                        gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
                         delegate = gatewayMetaState.getPersistedState();
                     } else {
                         nodeEnvironment = null;
@@ -736,9 +735,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                                 new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
                                     manifest.getIndexGenerations()));
                         }
-                        final MockGatewayMetaState gatewayMetaState
-                            = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode);
-                        gatewayMetaState.start();
+                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode);
+                        gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
                         delegate = gatewayMetaState.getPersistedState();
                     } else {
                         nodeEnvironment = null;

+ 8 - 8
test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.gateway;
 
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -37,24 +38,23 @@ import org.elasticsearch.transport.TransportService;
 public class MockGatewayMetaState extends GatewayMetaState {
     private final DiscoveryNode localNode;
 
-    public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
-                                NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) {
-        super(settings, new MetaStateService(nodeEnvironment, xContentRegistry));
+    public MockGatewayMetaState(DiscoveryNode localNode) {
         this.localNode = localNode;
     }
 
     @Override
-    protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
+    void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
+                         MetaDataUpgrader metaDataUpgrader) {
         // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier
     }
 
     @Override
-    public void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
+    ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) {
         // Just set localNode here, not to mess with ClusterService and IndicesService mocking
-        previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
+        return ClusterStateUpdaters.setLocalNode(clusterState, localNode);
     }
 
-    public void start() {
-        start(null, null, null, null);
+    public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) {
+        start(settings, null, null, new MetaStateService(nodeEnvironment, xContentRegistry), null, null);
     }
 }