فهرست منبع

Allow `ClusterState.Custom` to be created on initial cluster states (#26144)

Today we have a `null` invariant on all `ClusterState.Custom`. This makes
several code paths complicated and requires complex state handling in some cases.
This change allows to register a custom supplier that is used to initialize the
initial clusterstate with these transient customs.
Simon Willnauer 8 سال پیش
والد
کامیت
6f82b0c6e2
22فایلهای تغییر یافته به همراه220 افزوده شده و 37 حذف شده
  1. 17 6
      core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
  2. 0 9
      core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
  3. 4 0
      core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java
  4. 1 1
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
  5. 6 0
      core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java
  6. 9 1
      core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
  7. 17 2
      core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
  8. 2 2
      core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java
  9. 3 3
      core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
  10. 1 1
      core/src/main/java/org/elasticsearch/gateway/Gateway.java
  11. 5 3
      core/src/main/java/org/elasticsearch/node/Node.java
  12. 8 1
      core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java
  13. 45 1
      core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java
  14. 1 1
      core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java
  15. 1 1
      core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java
  16. 62 0
      core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java
  17. 6 0
      core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java
  18. 6 0
      core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java
  19. 2 1
      core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java
  20. 2 2
      core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java
  21. 21 0
      core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
  22. 1 2
      test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

+ 17 - 6
core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -73,6 +73,7 @@ import org.elasticsearch.tasks.TaskResultsService;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -94,7 +95,6 @@ public class ClusterModule extends AbstractModule {
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final AllocationDeciders allocationDeciders;
     private final AllocationService allocationService;
-    private final Runnable onStarted;
     // pkg private for tests
     final Collection<AllocationDecider> deciderList;
     final ShardsAllocator shardsAllocator;
@@ -107,9 +107,24 @@ public class ClusterModule extends AbstractModule {
         this.clusterService = clusterService;
         this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
         this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
-        this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
     }
 
+    public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
+        final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
+        customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
+        customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
+        customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
+        for (ClusterPlugin plugin : clusterPlugins) {
+            Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
+            for (String key : initialCustomSupplier.keySet()) {
+                if (customSupplier.containsKey(key)) {
+                    throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
+                }
+            }
+            customSupplier.putAll(initialCustomSupplier);
+        }
+        return Collections.unmodifiableMap(customSupplier);
+    }
 
     public static List<Entry> getNamedWriteables() {
         List<Entry> entries = new ArrayList<>();
@@ -243,8 +258,4 @@ public class ClusterModule extends AbstractModule {
         bind(AllocationDeciders.class).toInstance(allocationDeciders);
         bind(ShardsAllocator.class).toInstance(shardsAllocator);
     }
-
-    public Runnable onStarted() {
-        return onStarted;
-    }
 }

+ 0 - 9
core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

@@ -45,15 +45,6 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
 
     private final List<Entry> entries;
 
-    /**
-     * Constructs new restore metadata
-     *
-     * @param entries list of currently running restore processes
-     */
-    public RestoreInProgress(List<Entry> entries) {
-        this.entries = entries;
-    }
-
     /**
      * Constructs new restore metadata
      *

+ 4 - 0
core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java

@@ -45,6 +45,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
     // the list of snapshot deletion request entries
     private final List<Entry> entries;
 
+    public SnapshotDeletionsInProgress() {
+        this(Collections.emptyList());
+    }
+
     private SnapshotDeletionsInProgress(List<Entry> entries) {
         this.entries = Collections.unmodifiableList(entries);
     }

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java

@@ -67,7 +67,7 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
             // Only primary shards are snapshotted
 
             SnapshotsInProgress snapshotsInProgress = allocation.custom(SnapshotsInProgress.TYPE);
-            if (snapshotsInProgress == null) {
+            if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
                 // Snapshots are not running
                 return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
             }

+ 6 - 0
core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java

@@ -39,4 +39,10 @@ public interface ClusterApplier {
      * @param listener callback that is invoked after cluster state is applied
      */
     void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
+
+    /**
+     * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
+     */
+    ClusterState.Builder newClusterStateBuilder();
+
 }

+ 9 - 1
core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -97,14 +97,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     private final AtomicReference<ClusterState> state; // last applied state
 
     private NodeConnectionsService nodeConnectionsService;
+    private Supplier<ClusterState.Builder> stateBuilderSupplier;
 
-    public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
+    public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
+        .Builder> stateBuilderSupplier) {
         super(settings);
         this.clusterSettings = clusterSettings;
         this.threadPool = threadPool;
         this.state = new AtomicReference<>();
         this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
         this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
+        this.stateBuilderSupplier = stateBuilderSupplier;
     }
 
     public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@@ -653,4 +656,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     protected long currentTimeInNanos() {
         return System.nanoTime();
     }
+
+    @Override
+    public ClusterState.Builder newClusterStateBuilder() {
+        return stateBuilderSupplier.get();
+    }
 }

+ 17 - 2
core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public class ClusterService extends AbstractLifecycleComponent {
 
@@ -58,16 +59,30 @@ public class ClusterService extends AbstractLifecycleComponent {
     private final OperationRouting operationRouting;
 
     private final ClusterSettings clusterSettings;
+    private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;
 
-    public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
+    public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
+                          Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
         super(settings);
-        this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
         this.masterService = new MasterService(settings, threadPool);
         this.operationRouting = new OperationRouting(settings, clusterSettings);
         this.clusterSettings = clusterSettings;
         this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
         this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
             this::setSlowTaskLoggingThreshold);
+        this.initialClusterStateCustoms = initialClusterStateCustoms;
+        this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
+    }
+
+    /**
+     * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
+     */
+    public ClusterState.Builder newClusterStateBuilder() {
+        ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
+        for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
+            builder.putCustom(entry.getKey(), entry.getValue().get());
+        }
+        return builder;
     }
 
     private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {

+ 2 - 2
core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java

@@ -117,8 +117,8 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
     }
 
     protected ClusterState createInitialState(DiscoveryNode localNode) {
-        return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
-            .nodes(DiscoveryNodes.builder().add(localNode)
+        ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
+        return builder.nodes(DiscoveryNodes.builder().add(localNode)
                 .localNodeId(localNode.getId())
                 .masterNodeId(localNode.getId())
                 .build())

+ 3 - 3
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -113,7 +113,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
 
     private final TransportService transportService;
     private final MasterService masterService;
-    private final ClusterName clusterName;
     private final DiscoverySettings discoverySettings;
     protected final ZenPing zenPing; // protected to allow tests access
     private final MasterFaultDetection masterFD;
@@ -169,7 +168,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
         this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
         this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
         this.threadPool = threadPool;
-        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
+        ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
         this.committedState = new AtomicReference<>();
 
         this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
@@ -238,7 +237,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
             // set initial state
             assert committedState.get() == null;
             assert localNode != null;
-            ClusterState initialState = ClusterState.builder(clusterName)
+            ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
+            ClusterState initialState = builder
                 .blocks(ClusterBlocks.builder()
                     .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
                     .addGlobalBlock(discoverySettings.getNoMasterBlock()))

+ 1 - 1
core/src/main/java/org/elasticsearch/gateway/Gateway.java

@@ -155,7 +155,7 @@ public class Gateway extends AbstractComponent implements ClusterStateApplier {
                 metaDataBuilder.transientSettings(),
                 e -> logUnknownSetting("transient", e),
                 (e, ex) -> logInvalidSetting("transient", e, ex)));
-        ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
+        ClusterState.Builder builder = clusterService.newClusterStateBuilder();
         builder.metaData(metaDataBuilder);
         listener.onSuccess(builder.build());
     }

+ 5 - 3
core/src/main/java/org/elasticsearch/node/Node.java

@@ -329,7 +329,10 @@ public class Node implements Closeable {
             resourcesToClose.add(resourceWatcherService);
             final NetworkService networkService = new NetworkService(
                 getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
-            final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
+
+            List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
+            final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
+               ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
             clusterService.addListener(scriptModule.getScriptService());
             resourcesToClose.add(clusterService);
             final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
@@ -346,8 +349,7 @@ public class Node implements Closeable {
                 modules.add(pluginModule);
             }
             final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
-            ClusterModule clusterModule = new ClusterModule(settings, clusterService,
-                pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
+            ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
             modules.add(clusterModule);
             IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
             modules.add(indicesModule);

+ 8 - 1
core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java

@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.function.Supplier;
 
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -63,6 +64,12 @@ public interface ClusterPlugin {
      * Called when the node is started
      */
     default void onNodeStarted() {
-
     }
+
+    /**
+     * Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
+     * This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
+     * but customs are not initialized.
+     */
+    default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
 }

+ 45 - 1
core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

@@ -59,7 +59,7 @@ import java.util.function.Supplier;
 public class ClusterModuleTests extends ModuleTestCase {
     private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
     private ClusterService clusterService = new ClusterService(Settings.EMPTY,
-        new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
+        new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
     static class FakeAllocationDecider extends AllocationDecider {
         protected FakeAllocationDecider(Settings settings) {
             super(settings);
@@ -196,4 +196,48 @@ public class ClusterModuleTests extends ModuleTestCase {
             assertSame(decider.getClass(), expectedDeciders.get(idx++));
         }
     }
+
+    public void testCustomSuppliers() {
+        Map<String, Supplier<ClusterState.Custom>> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList());
+        assertEquals(3, customSuppliers.size());
+        assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
+        assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
+        assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
+
+        customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
+            @Override
+            public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
+                return Collections.singletonMap("foo", () -> null);
+            }
+        }));
+        assertEquals(4, customSuppliers.size());
+        assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
+        assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
+        assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
+        assertTrue(customSuppliers.containsKey("foo"));
+
+
+        IllegalStateException ise = expectThrows(IllegalStateException.class,
+            () -> ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
+            @Override
+            public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
+                return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null);
+            }
+        })));
+        assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once");
+
+        ise = expectThrows(IllegalStateException.class,
+            () -> ClusterModule.getClusterStateCustomSuppliers(Arrays.asList(new ClusterPlugin() {
+            @Override
+            public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
+                return Collections.singletonMap("foo", () -> null);
+            }
+        }, new ClusterPlugin() {
+            @Override
+            public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
+                return Collections.singletonMap("foo", () -> null);
+            }
+        })));
+        assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
+    }
 }

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java

@@ -74,7 +74,7 @@ import static org.mockito.Mockito.when;
 public class TemplateUpgradeServiceTests extends ESTestCase {
 
     private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
-        ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
+        ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
 
     public void testCalculateChangesAddChangeAndDelete() {
 

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

@@ -413,7 +413,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
         public volatile Long currentTimeOverride = null;
 
         TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
-            super(settings, clusterSettings, threadPool);
+            super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
         }
 
         @Override

+ 62 - 0
core/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ClusterSerivceTests extends ESTestCase {
+
+    public void testNewBuilderContainsCustoms() {
+        ClusterState.Custom custom = new ClusterState.Custom() {
+            @Override
+            public Diff<ClusterState.Custom> diff(ClusterState.Custom previousState) {
+                return null;
+            }
+
+            @Override
+            public String getWriteableName() {
+                return null;
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                return null;
+            }
+        };
+        ClusterService service = new ClusterService(Settings.EMPTY,
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () ->
+            custom));
+        ClusterState.Builder builder = service.newClusterStateBuilder();
+        assertSame(builder.build().custom("foo"), custom);
+    }
+}

+ 6 - 0
core/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java

@@ -21,6 +21,7 @@ package org.elasticsearch.discovery.single;
 
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -64,6 +65,11 @@ public class SingleNodeDiscoveryTests extends ESTestCase {
                                 clusterState.set(initialState);
                             }
 
+                            @Override
+                            public ClusterState.Builder newClusterStateBuilder() {
+                                return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
+                            }
+
                             @Override
                             public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
                                                           ClusterStateTaskListener listener) {

+ 6 - 0
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

@@ -47,6 +47,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.plugins.ClusterPlugin;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
@@ -308,6 +309,11 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
 
             }
 
+            @Override
+            public ClusterState.Builder newClusterStateBuilder() {
+                return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
+            }
+
             @Override
             public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
                 listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());

+ 2 - 1
core/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java

@@ -27,13 +27,14 @@ import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
+import java.util.Collections;
 
 public class GatewayServiceTests extends ESTestCase {
 
     private GatewayService createService(Settings.Builder settings) {
         ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(),
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
-                null);
+                null, Collections.emptyMap());
         return new GatewayService(settings.build(),
                 null, clusterService, null, null, null, null);
     }

+ 2 - 2
core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java

@@ -22,7 +22,6 @@ package org.elasticsearch.node;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -35,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -53,7 +53,7 @@ public class ResponseCollectorServiceTests extends ESTestCase {
         threadpool = new TestThreadPool("response_collector_tests");
         clusterService = new ClusterService(Settings.EMPTY,
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
-                threadpool);
+                threadpool, Collections.emptyMap());
         collector = new ResponseCollectorService(Settings.EMPTY, clusterService);
     }
 

+ 21 - 0
core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.Client;
@@ -36,6 +37,8 @@ import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.cluster.RestoreInProgress;
+import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -148,6 +151,24 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         return Arrays.asList(MockRepository.Plugin.class, TestCustomMetaDataPlugin.class);
     }
 
+    public void testClusterStateHasCustoms() throws Exception {
+        ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().all().get();
+        assertNotNull(clusterStateResponse.getState().custom(SnapshotsInProgress.TYPE));
+        assertNotNull(clusterStateResponse.getState().custom(RestoreInProgress.TYPE));
+        assertNotNull(clusterStateResponse.getState().custom(SnapshotDeletionsInProgress.TYPE));
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        if (randomBoolean()) {
+            internalCluster().fullRestart();
+        } else {
+            internalCluster().rollingRestart();
+        }
+
+        clusterStateResponse = client().admin().cluster().prepareState().all().get();
+        assertNotNull(clusterStateResponse.getState().custom(SnapshotsInProgress.TYPE));
+        assertNotNull(clusterStateResponse.getState().custom(RestoreInProgress.TYPE));
+        assertNotNull(clusterStateResponse.getState().custom(SnapshotDeletionsInProgress.TYPE));
+    }
+
     public void testRestorePersistentSettings() throws Exception {
         logger.info("--> start 2 nodes");
         internalCluster().startNode();

+ 1 - 2
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -39,7 +39,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.Discovery.AckListener;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.concurrent.CountDownLatch;
@@ -133,7 +132,7 @@ public class ClusterServiceUtils {
 
     public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode, ClusterSettings clusterSettings) {
         ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(),
-            clusterSettings, threadPool);
+            clusterSettings, threadPool, Collections.emptyMap());
         clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
             @Override
             public void connectToNodes(DiscoveryNodes discoveryNodes) {