Browse Source

Add a historical feature for transport version fixups (#102211)

Make sure logging is configured in the historical versions task

Co-authored-by: Mark Vieira <portugee@gmail.com>
Simon Cooper 1 year ago
parent
commit
4c98fd9c5c

+ 1 - 0
server/src/main/java/module-info.java

@@ -407,6 +407,7 @@ module org.elasticsearch.server {
         with
             org.elasticsearch.features.FeatureInfrastructureFeatures,
             org.elasticsearch.health.HealthFeatures,
+            org.elasticsearch.cluster.service.TransportFeatures,
             org.elasticsearch.cluster.metadata.MetadataFeatures,
             org.elasticsearch.rest.RestFeatures,
             org.elasticsearch.indices.IndicesFeatures;

+ 24 - 0
server/src/main/java/org/elasticsearch/cluster/service/TransportFeatures.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.features.FeatureSpecification;
+import org.elasticsearch.features.NodeFeature;
+
+import java.util.Map;
+
+public class TransportFeatures implements FeatureSpecification {
+    @Override
+    public Map<NodeFeature, Version> getHistoricalFeatures() {
+        // transport version was introduced in 8.8.0, but we need to wait until all nodes are >8.8.0
+        // to properly detect when we need to fix transport versions
+        return Map.of(TransportVersionsFixupListener.FIX_TRANSPORT_VERSION, Version.V_8_8_1);
+    }
+}

+ 17 - 3
server/src/main/java/org/elasticsearch/cluster/service/TransportVersionsFixupListener.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.cluster.service;
 
 import org.elasticsearch.TransportVersion;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
@@ -26,6 +25,9 @@ import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.UpdateForV9;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.threadpool.Scheduler;
@@ -47,10 +49,13 @@ import static org.elasticsearch.cluster.ClusterState.INFERRED_TRANSPORT_VERSION;
  * due to the master node not understanding cluster state with transport versions added in 8.8.0.
  * Any nodes with the inferred placeholder cluster state is then refreshed with their actual transport version
  */
+@UpdateForV9    // this can be removed in v9
 public class TransportVersionsFixupListener implements ClusterStateListener {
 
     private static final Logger logger = LogManager.getLogger(TransportVersionsFixupListener.class);
 
+    static final NodeFeature FIX_TRANSPORT_VERSION = new NodeFeature("transport.fix_transport_version");
+
     private static final TimeValue RETRY_TIME = TimeValue.timeValueSeconds(30);
 
     private final MasterServiceTaskQueue<NodeTransportVersionTask> taskQueue;
@@ -58,13 +63,20 @@ public class TransportVersionsFixupListener implements ClusterStateListener {
     private final Scheduler scheduler;
     private final Executor executor;
     private final Set<String> pendingNodes = Collections.synchronizedSet(new HashSet<>());
+    private final FeatureService featureService;
 
-    public TransportVersionsFixupListener(ClusterService service, ClusterAdminClient client, ThreadPool threadPool) {
+    public TransportVersionsFixupListener(
+        ClusterService service,
+        ClusterAdminClient client,
+        FeatureService featureService,
+        ThreadPool threadPool
+    ) {
         // there tends to be a lot of state operations on an upgrade - this one is not time-critical,
         // so use LOW priority. It just needs to be run at some point after upgrade.
         this(
             service.createTaskQueue("fixup-transport-versions", Priority.LOW, new TransportVersionUpdater()),
             client,
+            featureService,
             threadPool,
             threadPool.executor(ThreadPool.Names.CLUSTER_COORDINATION)
         );
@@ -73,11 +85,13 @@ public class TransportVersionsFixupListener implements ClusterStateListener {
     TransportVersionsFixupListener(
         MasterServiceTaskQueue<NodeTransportVersionTask> taskQueue,
         ClusterAdminClient client,
+        FeatureService featureService,
         Scheduler scheduler,
         Executor executor
     ) {
         this.taskQueue = taskQueue;
         this.client = client;
+        this.featureService = featureService;
         this.scheduler = scheduler;
         this.executor = executor;
     }
@@ -139,7 +153,7 @@ public class TransportVersionsFixupListener implements ClusterStateListener {
         // if the min node version > 8.8.0, and the cluster state has some transport versions == 8.8.0,
         // then refresh all inferred transport versions to their real versions
         // now that everything should understand cluster state with transport versions
-        if (event.state().nodes().getMinNodeVersion().after(Version.V_8_8_0)
+        if (featureService.clusterHasFeature(event.state(), FIX_TRANSPORT_VERSION)
             && event.state().getMinTransportVersion().equals(INFERRED_TRANSPORT_VERSION)) {
 
             // find all the relevant nodes

+ 5 - 3
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -721,17 +721,19 @@ class NodeConstruction {
             .flatMap(m -> m.entrySet().stream())
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
+        FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
+
         if (DiscoveryNode.isMasterNode(settings)) {
             clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
-            clusterService.addListener(new TransportVersionsFixupListener(clusterService, client.admin().cluster(), threadPool));
+            clusterService.addListener(
+                new TransportVersionsFixupListener(clusterService, client.admin().cluster(), featureService, threadPool)
+            );
         }
 
         final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
         rerouteServiceReference.set(rerouteService);
         clusterService.setRerouteService(rerouteService);
 
-        FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
-
         final IndicesService indicesService = new IndicesService(
             settings,
             pluginsService,

+ 1 - 0
server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification

@@ -8,6 +8,7 @@
 
 org.elasticsearch.features.FeatureInfrastructureFeatures
 org.elasticsearch.health.HealthFeatures
+org.elasticsearch.cluster.service.TransportFeatures
 org.elasticsearch.cluster.metadata.MetadataFeatures
 org.elasticsearch.rest.RestFeatures
 org.elasticsearch.indices.IndicesFeatures

+ 43 - 6
server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.service.TransportVersionsFixupListener.NodeTran
 import org.elasticsearch.cluster.version.CompatibilityVersions;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.Scheduler;
 import org.mockito.ArgumentCaptor;
@@ -116,7 +117,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
             .nodeIdsToCompatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of())))
             .build();
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            null,
+            null
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));
 
         verify(taskQueue, never()).submitTask(anyString(), any(), any());
@@ -131,7 +138,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
             .nodeIdsToCompatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of())))
             .build();
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            null,
+            null
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));
 
         verify(taskQueue, never()).submitTask(anyString(), any(), any());
@@ -151,7 +164,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
             )
             .build();
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            null,
+            null
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));
 
         verify(taskQueue, never()).submitTask(anyString(), any(), any());
@@ -175,7 +194,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
         ArgumentCaptor<ActionListener<NodesInfoResponse>> action = ArgumentCaptor.forClass(ActionListener.class);
         ArgumentCaptor<NodeTransportVersionTask> task = ArgumentCaptor.forClass(NodeTransportVersionTask.class);
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            null,
+            null
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));
         verify(client).nodesInfo(
             argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))),
@@ -201,7 +226,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
             )
             .build();
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            null,
+            null
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE));
         verify(client).nodesInfo(argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), any());
         // don't send back the response yet
@@ -240,7 +271,13 @@ public class TransportVersionsFixupListenerTests extends ESTestCase {
         ArgumentCaptor<ActionListener<NodesInfoResponse>> action = ArgumentCaptor.forClass(ActionListener.class);
         ArgumentCaptor<Runnable> retry = ArgumentCaptor.forClass(Runnable.class);
 
-        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, scheduler, executor);
+        TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(
+            taskQueue,
+            client,
+            new FeatureService(List.of(new TransportFeatures())),
+            scheduler,
+            executor
+        );
         listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE));
         verify(client, times(1)).nodesInfo(any(), action.capture());
         // do response immediately

+ 6 - 0
test/metadata-extractor/src/main/java/org/elasticsearch/extractor/features/HistoricalFeaturesMetadataExtractor.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.extractor.features;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.logging.LogConfigurator;
 import org.elasticsearch.features.FeatureSpecification;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.xcontent.XContentGenerator;
@@ -29,6 +30,11 @@ import java.util.ServiceLoader;
 public class HistoricalFeaturesMetadataExtractor {
     private final ClassLoader classLoader;
 
+    static {
+        // Make sure we initialize logging since this is normally done by Elasticsearch startup
+        LogConfigurator.configureESLogging();
+    }
+
     public HistoricalFeaturesMetadataExtractor(ClassLoader classLoader) {
         this.classLoader = classLoader;
     }