Browse Source

Allow plugins to validate cluster-state on join (#26595)

Today we don't have a pluggable way to validate if the cluster state
is compatible with the node that joins. We already apply some checks for index
compatibility that prevents nodes to join a cluster with indices it doesn't support
but for plugins this isn't possible. This change adds a cluster state validator that
allows plugins to prevent a join if the cluster-state is incompatible.
Simon Willnauer 8 years ago
parent
commit
42f3129d7b

+ 11 - 2
core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.discovery;
 
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.MasterService;
@@ -36,12 +38,15 @@ import org.elasticsearch.plugins.DiscoveryPlugin;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -62,7 +67,7 @@ public class DiscoveryModule {
                            ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
                            AllocationService allocationService) {
         final UnicastHostsProvider hostsProvider;
-
+        final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
         Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
         for (DiscoveryPlugin plugin : plugins) {
             plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
@@ -70,6 +75,10 @@ public class DiscoveryModule {
                     throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
                 }
             });
+            BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
+            if (joinValidator != null) {
+                joinValidators.add(joinValidator);
+            }
         }
         Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
         if (hostsProviderName.isPresent()) {
@@ -85,7 +94,7 @@ public class DiscoveryModule {
         Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
         discoveryTypes.put("zen",
             () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
-                clusterSettings, hostsProvider, allocationService));
+                clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
         discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
         for (DiscoveryPlugin plugin : plugins) {
             plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,

+ 17 - 5
core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

@@ -39,7 +39,10 @@ import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 
 public class MembershipAction extends AbstractComponent {
 
@@ -63,7 +66,8 @@ public class MembershipAction extends AbstractComponent {
 
     private final MembershipListener listener;
 
-    public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
+    public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener,
+                            Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
         super(settings);
         this.transportService = transportService;
         this.listener = listener;
@@ -73,7 +77,7 @@ public class MembershipAction extends AbstractComponent {
             ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
         transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
             () -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
-            new ValidateJoinRequestRequestHandler());
+            new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
         transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
             ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
     }
@@ -176,12 +180,20 @@ public class MembershipAction extends AbstractComponent {
     }
 
     static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
+        private final Supplier<DiscoveryNode> localNodeSupplier;
+        private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;
+
+        ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
+                                          Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
+            this.localNodeSupplier = localNodeSupplier;
+            this.joinValidators = joinValidators;
+        }
 
         @Override
         public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
-            ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
-            ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
-            // for now, the mere fact that we can serialize the cluster state acts as validation....
+            DiscoveryNode node = localNodeSupplier.get();
+            assert node != null : "local node is null";
+            joinValidators.stream().forEach(action -> action.accept(node, request.state));
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
         }
     }

+ 27 - 6
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -69,6 +69,8 @@ import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
@@ -78,6 +80,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -146,15 +149,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
 
     private final NodeJoinController nodeJoinController;
     private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
-
     private final ClusterApplier clusterApplier;
     private final AtomicReference<ClusterState> committedState; // last committed cluster state
     private final Object stateMutex = new Object();
+    private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators;
 
     public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
                         NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
-                        ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) {
+                        ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
+                        Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
         super(settings);
+        this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
         this.masterService = masterService;
         this.clusterApplier = clusterApplier;
         this.transportService = transportService;
@@ -211,7 +216,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
                         namedWriteableRegistry,
                         this,
                         discoverySettings);
-        this.membership = new MembershipAction(settings, transportService, new MembershipListener());
+        this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
         this.joinThreadControl = new JoinThreadControl();
 
         this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
@@ -223,6 +228,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
             DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
     }
 
+    static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
+        Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
+        Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
+        validators.add((node, state) -> {
+            MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
+            MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
+        });
+        validators.addAll(onJoinValidators);
+        return Collections.unmodifiableCollection(validators);
+    }
+
     // protected to allow overriding in tests
     protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
                                  UnicastHostsProvider hostsProvider) {
@@ -885,8 +901,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
         } else {
             // we do this in a couple of places including the cluster update thread. This one here is really just best effort
             // to ensure we fail as fast as possible.
-            MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
-            MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
+            onJoinValidators.stream().forEach(a -> a.accept(node, state));
             if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                 MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
             }
@@ -898,7 +913,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
             try {
                 membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
             } catch (Exception e) {
-                logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e);
+                logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
+                    e);
                 callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
                 return;
             }
@@ -1313,4 +1329,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
         }
 
     }
+
+    public final Collection<BiConsumer<DiscoveryNode, ClusterState>> getOnJoinValidators() {
+        return onJoinValidators;
+    }
+
 }

+ 11 - 0
core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java

@@ -19,10 +19,14 @@
 
 package org.elasticsearch.plugins;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.MasterService;
@@ -106,4 +110,11 @@ public interface DiscoveryPlugin {
                                                                              NetworkService networkService) {
         return Collections.emptyMap();
     }
+
+    /**
+     * Returns a consumer that validate the initial join cluster state. The validator, unless <code>null</code> is called exactly once per
+     * join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a
+     * {@link IllegalStateException} if the node and the cluster-state are incompatible.
+     */
+    default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }
 }

+ 21 - 1
core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -20,6 +20,8 @@ package org.elasticsearch.discovery;
 
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.MasterService;
@@ -40,10 +42,12 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
 import static org.mockito.Mockito.mock;
@@ -160,7 +164,23 @@ public class DiscoveryModuleTests extends ESTestCase {
 
     public void testLazyConstructionHostsProvider() {
         DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
-            () -> { throw new AssertionError("created hosts provider which was not selected"); });
+            () -> {
+                throw new AssertionError("created hosts provider which was not selected");
+            });
         newModule(Settings.EMPTY, Collections.singletonList(plugin));
     }
+
+    public void testJoinValidator() {
+        BiConsumer<DiscoveryNode, ClusterState> consumer = (a, b) -> {};
+        DiscoveryModule module = newModule(Settings.EMPTY, Collections.singletonList(new DiscoveryPlugin() {
+            @Override
+            public BiConsumer<DiscoveryNode, ClusterState> getJoinValidator() {
+                return consumer;
+            }
+        }));
+        ZenDiscovery discovery = (ZenDiscovery) module.getDiscovery();
+        Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators = discovery.getOnJoinValidators();
+        assertEquals(2, onJoinValidators.size());
+        assertTrue(onJoinValidators.contains(consumer));
+    }
 }

+ 6 - 2
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

@@ -320,7 +320,8 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
             }
         };
         ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
-            masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService());
+            masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(),
+            Collections.emptyList());
         zenDiscovery.start();
         return zenDiscovery;
     }
@@ -342,7 +343,10 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
             ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
             final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
                 EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
-            MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
+            final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
+                EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
+            MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler
+                (() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
             final boolean incompatible = randomBoolean();
             IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
                 .put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java

@@ -83,7 +83,7 @@ public class TestZenDiscovery extends ZenDiscovery {
                              ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
                              AllocationService allocationService) {
         super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
-            hostsProvider, allocationService);
+            hostsProvider, allocationService, Collections.emptyList());
     }
 
     @Override