Browse Source

Pass compatibility versions in ClusterCoordinationPlugin (#99396)

In https://github.com/elastic/elasticsearch/pull/99321, we passed
CompatibilityVersions into GatewayMetaState so that a node's initial
cluster state will hold correct initial values for CompatibilityVersions
for a single node, and not have to wait until a node join event to
populate that part of cluster state.

However, a ClusterCoordinationPlugin may provide a different
`PersistedClusterStateService`, which might also need its own access to
CompatibilityVersions. Here, we add a `CompatibilityVersions` argument
to `PersistedClusterStateServiceFactory#newPersistedClusterStateService`
so that an implementation of `PersistedClusterStateServiceFactory` can
pass CompatibilityVersions into whatever mechanism it uses for creating
the initial cluster state.
William Brafford 2 years ago
parent
commit
e386a3ffa4

+ 6 - 4
server/src/main/java/org/elasticsearch/node/Node.java

@@ -622,6 +622,7 @@ public class Node implements Closeable {
             resourcesToClose.add(circuitBreakerService);
             resourcesToClose.add(circuitBreakerService);
             modules.add(new GatewayModule());
             modules.add(new GatewayModule());
 
 
+            CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
             PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
             PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
             BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
             BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
             modules.add(settingsModule);
             modules.add(settingsModule);
@@ -629,7 +630,8 @@ public class Node implements Closeable {
             final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(
             final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(
                 xContentRegistry,
                 xContentRegistry,
                 clusterService.getClusterSettings(),
                 clusterService.getClusterSettings(),
-                threadPool
+                threadPool,
+                compatibilityVersions
             );
             );
 
 
             // collect engine factory providers from plugins
             // collect engine factory providers from plugins
@@ -927,7 +929,6 @@ public class Node implements Closeable {
             );
             );
             clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
             clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
 
 
-            CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
             final DiscoveryModule discoveryModule = new DiscoveryModule(
             final DiscoveryModule discoveryModule = new DiscoveryModule(
                 settings,
                 settings,
                 transportService,
                 transportService,
@@ -1359,7 +1360,8 @@ public class Node implements Closeable {
     private PersistedClusterStateService newPersistedClusterStateService(
     private PersistedClusterStateService newPersistedClusterStateService(
         NamedXContentRegistry xContentRegistry,
         NamedXContentRegistry xContentRegistry,
         ClusterSettings clusterSettings,
         ClusterSettings clusterSettings,
-        ThreadPool threadPool
+        ThreadPool threadPool,
+        CompatibilityVersions compatibilityVersions
     ) {
     ) {
         final List<ClusterCoordinationPlugin.PersistedClusterStateServiceFactory> persistedClusterStateServiceFactories = pluginsService
         final List<ClusterCoordinationPlugin.PersistedClusterStateServiceFactory> persistedClusterStateServiceFactories = pluginsService
             .filterPlugins(ClusterCoordinationPlugin.class)
             .filterPlugins(ClusterCoordinationPlugin.class)
@@ -1374,7 +1376,7 @@ public class Node implements Closeable {
 
 
         if (persistedClusterStateServiceFactories.size() == 1) {
         if (persistedClusterStateServiceFactories.size() == 1) {
             return persistedClusterStateServiceFactories.get(0)
             return persistedClusterStateServiceFactories.get(0)
-                .newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
+                .newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool, compatibilityVersions);
         }
         }
 
 
         return new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis);
         return new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis);

+ 13 - 0
server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java

@@ -15,6 +15,7 @@ import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
 import org.elasticsearch.cluster.coordination.PreVoteCollector;
 import org.elasticsearch.cluster.coordination.PreVoteCollector;
 import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.coordination.Reconfigurator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.version.CompatibilityVersions;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -76,12 +77,24 @@ public interface ClusterCoordinationPlugin {
     }
     }
 
 
     interface PersistedClusterStateServiceFactory {
     interface PersistedClusterStateServiceFactory {
+
+        @Deprecated(forRemoval = true)
         PersistedClusterStateService newPersistedClusterStateService(
         PersistedClusterStateService newPersistedClusterStateService(
             NodeEnvironment nodeEnvironment,
             NodeEnvironment nodeEnvironment,
             NamedXContentRegistry xContentRegistry,
             NamedXContentRegistry xContentRegistry,
             ClusterSettings clusterSettings,
             ClusterSettings clusterSettings,
             ThreadPool threadPool
             ThreadPool threadPool
         );
         );
+
+        default PersistedClusterStateService newPersistedClusterStateService(
+            NodeEnvironment nodeEnvironment,
+            NamedXContentRegistry xContentRegistry,
+            ClusterSettings clusterSettings,
+            ThreadPool threadPool,
+            CompatibilityVersions compatibilityVersions
+        ) {
+            return newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
+        }
     }
     }
 
 
     interface ReconfiguratorFactory {
     interface ReconfiguratorFactory {

+ 25 - 5
server/src/test/java/org/elasticsearch/node/NodeTests.java

@@ -16,12 +16,14 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.version.CompatibilityVersions;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.LifecycleComponent;
 import org.elasticsearch.common.component.LifecycleComponent;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.RestApiVersion;
@@ -633,15 +635,33 @@ public class NodeTests extends ESTestCase {
 
 
         @Override
         @Override
         public Optional<PersistedClusterStateServiceFactory> getPersistedClusterStateServiceFactory() {
         public Optional<PersistedClusterStateServiceFactory> getPersistedClusterStateServiceFactory() {
-            return Optional.of(
-                (nodeEnvironment, namedXContentRegistry, clusterSettings, threadPool) -> persistedClusterStateService =
-                    new PersistedClusterStateService(
+            return Optional.of(new PersistedClusterStateServiceFactory() {
+                @Override
+                public PersistedClusterStateService newPersistedClusterStateService(
+                    NodeEnvironment nodeEnvironment,
+                    NamedXContentRegistry xContentRegistry,
+                    ClusterSettings clusterSettings,
+                    ThreadPool threadPool
+                ) {
+                    throw new AssertionError("not called");
+                }
+
+                @Override
+                public PersistedClusterStateService newPersistedClusterStateService(
+                    NodeEnvironment nodeEnvironment,
+                    NamedXContentRegistry namedXContentRegistry,
+                    ClusterSettings clusterSettings,
+                    ThreadPool threadPool,
+                    CompatibilityVersions compatibilityVersions
+                ) {
+                    return persistedClusterStateService = new PersistedClusterStateService(
                         nodeEnvironment,
                         nodeEnvironment,
                         namedXContentRegistry,
                         namedXContentRegistry,
                         clusterSettings,
                         clusterSettings,
                         threadPool::relativeTimeInMillis
                         threadPool::relativeTimeInMillis
-                    )
-            );
+                    );
+                }
+            });
         }
         }
     }
     }