Browse Source

Make Reconfigurator and PreVoteCollector pluggable (#95248)

Francisco Fernández Castaño 2 years ago
parent
commit
83fadff83d

+ 47 - 2
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.coordination.ElectionStrategy;
 import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
 import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -48,6 +49,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.BiConsumer;
@@ -182,6 +184,9 @@ public class DiscoveryModule {
                 );
         }
 
+        var reconfigurator = getReconfigurator(settings, clusterSettings, clusterCoordinationPlugins);
+        var preVoteCollectorFactory = getPreVoteCollectorFactory(clusterCoordinationPlugins);
+
         if (MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
             || LEGACY_MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
             || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
@@ -203,9 +208,9 @@ public class DiscoveryModule {
                 electionStrategy,
                 nodeHealthService,
                 circuitBreakerService,
-                new Reconfigurator(settings, clusterSettings),
+                reconfigurator,
                 LeaderHeartbeatService.NO_OP,
-                StatefulPreVoteCollector::new
+                preVoteCollectorFactory
             );
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
@@ -214,6 +219,46 @@ public class DiscoveryModule {
         logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
     }
 
+    // visible for testing
+    static Reconfigurator getReconfigurator(
+        Settings settings,
+        ClusterSettings clusterSettings,
+        List<ClusterCoordinationPlugin> clusterCoordinationPlugins
+    ) {
+        final var reconfiguratorFactories = clusterCoordinationPlugins.stream()
+            .map(ClusterCoordinationPlugin::getReconfiguratorFactory)
+            .flatMap(Optional::stream)
+            .toList();
+
+        if (reconfiguratorFactories.size() > 1) {
+            throw new IllegalStateException("multiple reconfigurator factories found: " + reconfiguratorFactories);
+        }
+
+        if (reconfiguratorFactories.size() == 1) {
+            return reconfiguratorFactories.get(0).newReconfigurator(settings, clusterSettings);
+        }
+
+        return new Reconfigurator(settings, clusterSettings);
+    }
+
+    // visible for testing
+    static PreVoteCollector.Factory getPreVoteCollectorFactory(List<ClusterCoordinationPlugin> clusterCoordinationPlugins) {
+        final var preVoteCollectorFactories = clusterCoordinationPlugins.stream()
+            .map(ClusterCoordinationPlugin::getPreVoteCollectorFactory)
+            .flatMap(Optional::stream)
+            .toList();
+
+        if (preVoteCollectorFactories.size() > 1) {
+            throw new IllegalStateException("multiple pre-vote collector factories found: " + preVoteCollectorFactories);
+        }
+
+        if (preVoteCollectorFactories.size() == 1) {
+            return preVoteCollectorFactories.get(0);
+        }
+
+        return StatefulPreVoteCollector::new;
+    }
+
     public static boolean isSingleNodeDiscovery(Settings settings) {
         return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
     }

+ 14 - 0
server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java

@@ -11,6 +11,8 @@ package org.elasticsearch.plugins;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.CoordinationState;
 import org.elasticsearch.cluster.coordination.ElectionStrategy;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
+import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -52,6 +54,14 @@ public interface ClusterCoordinationPlugin {
         return Optional.empty();
     }
 
+    default Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+        return Optional.empty();
+    }
+
+    default Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
+        return Optional.empty();
+    }
+
     interface PersistedStateFactory {
         CoordinationState.PersistedState createPersistedState(
             Settings settings,
@@ -68,4 +78,8 @@ public interface ClusterCoordinationPlugin {
             ThreadPool threadPool
         );
     }
+
+    interface ReconfiguratorFactory {
+        Reconfigurator newReconfigurator(Settings settings, ClusterSettings clusterSettings);
+    }
 }

+ 80 - 0
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -11,6 +11,9 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
+import org.elasticsearch.cluster.coordination.PreVoteCollector;
+import org.elasticsearch.cluster.coordination.Reconfigurator;
+import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterApplier;
@@ -24,6 +27,7 @@ import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.plugins.ClusterCoordinationPlugin;
 import org.elasticsearch.plugins.DiscoveryPlugin;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -36,10 +40,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 
 public class DiscoveryModuleTests extends ESTestCase {
@@ -219,4 +226,77 @@ public class DiscoveryModuleTests extends ESTestCase {
                 + "[multi-node] instead."
         );
     }
+
+    public void testRejectsMultipleReconfigurators() {
+        assertThat(
+            expectThrows(
+                IllegalStateException.class,
+                () -> DiscoveryModule.getReconfigurator(
+                    Settings.EMPTY,
+                    new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                    List.of(
+                        new BaseTestClusterCoordinationPlugin(),
+                        new TestClusterCoordinationPlugin1(),
+                        new TestClusterCoordinationPlugin2()
+                    )
+                )
+            ).getMessage(),
+            containsString("multiple reconfigurator factories found")
+        );
+
+        assertThat(
+            DiscoveryModule.getReconfigurator(
+                Settings.EMPTY,
+                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                List.of(new BaseTestClusterCoordinationPlugin())
+            ),
+            is(BaseTestClusterCoordinationPlugin.reconfiguratorInstance)
+        );
+    }
+
+    public void testRejectsMultiplePreVoteCollectorFactories() {
+        assertThat(
+            expectThrows(
+                IllegalStateException.class,
+                () -> DiscoveryModule.getPreVoteCollectorFactory(
+                    List.of(new BaseTestClusterCoordinationPlugin(), new TestClusterCoordinationPlugin1() {
+                        @Override
+                        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+                            return Optional.empty();
+                        }
+                    }, new TestClusterCoordinationPlugin2() {
+                        @Override
+                        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+                            return Optional.empty();
+                        }
+                    })
+                )
+            ).getMessage(),
+            containsString("multiple pre-vote collector factories found")
+        );
+
+        assertThat(
+            DiscoveryModule.getPreVoteCollectorFactory(List.of(new BaseTestClusterCoordinationPlugin())),
+            is(BaseTestClusterCoordinationPlugin.preVoteCollectorFactory)
+        );
+    }
+
+    private static class BaseTestClusterCoordinationPlugin extends Plugin implements ClusterCoordinationPlugin {
+        static Reconfigurator reconfiguratorInstance;
+        static PreVoteCollector.Factory preVoteCollectorFactory = StatefulPreVoteCollector::new;
+
+        @Override
+        public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
+            return Optional.of((settings, clusterSettings) -> reconfiguratorInstance = new Reconfigurator(settings, clusterSettings));
+        }
+
+        @Override
+        public Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
+            return Optional.of(preVoteCollectorFactory);
+        }
+    }
+
+    public static class TestClusterCoordinationPlugin1 extends BaseTestClusterCoordinationPlugin {}
+
+    public static class TestClusterCoordinationPlugin2 extends BaseTestClusterCoordinationPlugin {}
 }