Browse Source

Add ProjectResolver to RemoteClusterService constructor (#133006)

Follow up to work done in #131894 to make RemoteClusterService
multi-project aware.  This task plumbs the ProjectResolver down
into the RemoteClusterService from NodeConstruction.

Resolves: ES-12572
Jeremy Dahlgren 1 month ago
parent
commit
feb99ea509

+ 2 - 1
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -1121,7 +1121,8 @@ class NodeConstruction {
             settingsModule.getClusterSettings(),
             taskManager,
             telemetryProvider.getTracer(),
-            nodeEnvironment.nodeId()
+            nodeEnvironment.nodeId(),
+            projectResolver
         );
         final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
         final SearchTransportService searchTransportService = new SearchTransportService(

+ 14 - 2
server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

@@ -42,6 +42,7 @@ import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.telemetry.tracing.Tracer;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.ClusterConnectionManager;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportInterceptor;
 import org.elasticsearch.transport.TransportService;
@@ -119,9 +120,20 @@ class NodeServiceProvider {
         ClusterSettings clusterSettings,
         TaskManager taskManager,
         Tracer tracer,
-        String nodeId
+        String nodeId,
+        ProjectResolver projectResolver
     ) {
-        return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager);
+        return new TransportService(
+            settings,
+            transport,
+            threadPool,
+            interceptor,
+            localNodeFactory,
+            clusterSettings,
+            new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
+            taskManager,
+            projectResolver
+        );
     }
 
     HttpServerTransport newHttpTransport(PluginsService pluginsService, NetworkModule networkModule) {

+ 3 - 5
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -24,7 +24,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.ProjectId;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
-import org.elasticsearch.cluster.project.DefaultProjectResolver;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -85,15 +84,14 @@ public final class RemoteClusterService extends RemoteClusterAware
     private final ProjectResolver projectResolver;
     private final boolean canUseSkipUnavailable;
 
-    @FixForMultiProject(description = "Inject the ProjectResolver instance.")
-    RemoteClusterService(Settings settings, TransportService transportService) {
+    RemoteClusterService(Settings settings, TransportService transportService, ProjectResolver projectResolver) {
         super(settings);
         this.isRemoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
         this.isSearchNode = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
         this.isStateless = DiscoveryNode.isStateless(settings);
         this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
         this.transportService = transportService;
-        this.projectResolver = DefaultProjectResolver.INSTANCE;
+        this.projectResolver = projectResolver;
         this.remoteClusters = projectResolver.supportsMultipleProjects()
             ? ConcurrentCollections.newConcurrentMap()
             : Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap());
@@ -694,9 +692,9 @@ public final class RemoteClusterService extends RemoteClusterAware
     /**
      * Returns the map of connections for the given {@link ProjectId}.
      */
+    @FixForMultiProject(description = "Assert ProjectId.DEFAULT should not be used in multi-project environment")
     private Map<String, RemoteClusterConnection> getConnectionsMapForProject(ProjectId projectId) {
         if (projectResolver.supportsMultipleProjects()) {
-            assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment";
             return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap());
         }
         assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported";

+ 30 - 4
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -18,6 +18,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.project.DefaultProjectResolver;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -232,7 +234,7 @@ public class TransportService extends AbstractLifecycleComponent
         );
     }
 
-    // NOTE: Only for use in tests
+    // Public access for tests.
     public TransportService(
         Settings settings,
         Transport transport,
@@ -249,12 +251,11 @@ public class TransportService extends AbstractLifecycleComponent
             transportInterceptor,
             localNodeFactory,
             clusterSettings,
-            new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
             new TaskManager(settings, threadPool, taskHeaders)
         );
     }
 
-    @SuppressWarnings("this-escape")
+    // Public access for tests.
     public TransportService(
         Settings settings,
         Transport transport,
@@ -264,6 +265,31 @@ public class TransportService extends AbstractLifecycleComponent
         @Nullable ClusterSettings clusterSettings,
         ConnectionManager connectionManager,
         TaskManager taskManger
+    ) {
+        this(
+            settings,
+            transport,
+            threadPool,
+            transportInterceptor,
+            localNodeFactory,
+            clusterSettings,
+            connectionManager,
+            taskManger,
+            DefaultProjectResolver.INSTANCE
+        );
+    }
+
+    @SuppressWarnings("this-escape")
+    public TransportService(
+        Settings settings,
+        Transport transport,
+        ThreadPool threadPool,
+        TransportInterceptor transportInterceptor,
+        Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
+        @Nullable ClusterSettings clusterSettings,
+        ConnectionManager connectionManager,
+        TaskManager taskManger,
+        ProjectResolver projectResolver
     ) {
         this.transport = transport;
         transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
@@ -278,7 +304,7 @@ public class TransportService extends AbstractLifecycleComponent
         this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
         this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
         this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings);
-        remoteClusterService = new RemoteClusterService(settings, this);
+        remoteClusterService = new RemoteClusterService(settings, this, projectResolver);
         responseHandlers = transport.getResponseHandlers();
         if (clusterSettings != null) {
             clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);

+ 27 - 22
server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.VersionInformation;
+import org.elasticsearch.cluster.project.DefaultProjectResolver;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.AbstractScopedSettings;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -72,6 +73,10 @@ public class RemoteClusterServiceTests extends ESTestCase {
         ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
     }
 
+    private RemoteClusterService createRemoteClusterService(final Settings settings, final MockTransportService transportService) {
+        return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE);
+    }
+
     private MockTransportService startTransport(
         String id,
         List<DiscoveryNode> knownNodes,
@@ -166,7 +171,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 Settings.Builder builder = Settings.builder();
                 builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
                 builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
-                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertTrue(hasRegisteredClusters(service));
@@ -380,7 +385,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 Settings.Builder builder = Settings.builder();
                 builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
                 builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
-                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertTrue(hasRegisteredClusters(service));
@@ -443,7 +448,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build(),
             Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE)
         );
-        try (RemoteClusterService service = new RemoteClusterService(settings, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(settings, null)) {
             expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
             assertFalse(hasRegisteredClusters(service));
             final IllegalArgumentException error = expectThrows(
@@ -487,7 +492,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             ) {
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertFalse(hasRegisteredClusters(service));
@@ -563,7 +568,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             ) {
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertTrue(hasRegisteredClusters(service));
@@ -627,7 +632,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE :
                     TimeValue.timeValueSeconds(randomIntBetween(1, 10));
                 builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2);
-                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
                     service.initializeRemoteClusters();
                     assertTrue(isRemoteClusterRegistered(service, "cluster_1"));
                     RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");
@@ -666,7 +671,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 transportService.acceptIncomingRequests();
                 Settings.Builder builder = Settings.builder();
                 builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
-                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
                     service.initializeRemoteClusters();
                     RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
                     Settings.Builder settingsChange = Settings.builder();
@@ -751,7 +756,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             ) {
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertFalse(hasRegisteredClusters(service));
@@ -841,7 +846,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             ) {
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertFalse(hasRegisteredClusters(service));
@@ -936,7 +941,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             ) {
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertFalse(hasRegisteredClusters(service));
@@ -1090,7 +1095,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             transportService.start();
             transportService.acceptIncomingRequests();
 
-            try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
+            try (RemoteClusterService service = createRemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
                 service.initializeRemoteClusters();
                 assertTrue(hasRegisteredClusters(service));
                 final var numTasks = between(3, 5);
@@ -1277,7 +1282,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
 
                 final Settings.Builder builder = Settings.builder();
                 builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
-                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
                     assertFalse(hasRegisteredClusters(service));
                     service.initializeRemoteClusters();
                     assertTrue(hasRegisteredClusters(service));
@@ -1420,7 +1425,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(nodeNameSettings)
             .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(settingsWithRemoteClusterClientRole, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole, null)) {
             service.ensureClientIsEnabled();
         }
 
@@ -1429,7 +1434,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(nodeNameSettings)
             .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) {
             final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
             assertThat(exception.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role"));
         }
@@ -1440,7 +1445,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(onlyRole(DiscoveryNodeRole.INDEX_ROLE))
             .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) {
             final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
             assertThat(
                 exception.getMessage(),
@@ -1457,7 +1462,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE))
             .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) {
             service.ensureClientIsEnabled();
         }
         final var statelessEnabledOnRemoteClusterClientSettings = Settings.builder()
@@ -1465,7 +1470,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))
             .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) {
             service.ensureClientIsEnabled();
         }
         final var statelessEnabledOnSearchNodeAndRemoteClusterClientSettings = Settings.builder()
@@ -1473,7 +1478,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
             .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
             .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
             .build();
-        try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) {
+        try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) {
             service.ensureClientIsEnabled();
         }
     }
@@ -1544,7 +1549,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 });
                 transportService.start();
                 transportService.acceptIncomingRequests();
-                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
                     service.initializeRemoteClusters();
 
                     final CountDownLatch firstLatch = new CountDownLatch(1);
@@ -1618,7 +1623,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 transportService.start();
                 transportService.acceptIncomingRequests();
 
-                try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) {
                     service.initializeRemoteClusters();
 
                     final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString());
@@ -1707,7 +1712,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
                     alias -> alias.equals(goodCluster) || alias.equals(badCluster),
                     () -> randomAlphaOfLength(10)
                 );
-                try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
+                try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) {
                     service.initializeRemoteClusters();
 
                     final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString());
@@ -1793,7 +1798,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
         try (
             var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY);
             var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY);
-            var remoteClusterService = new RemoteClusterService(Settings.EMPTY, local)
+            var remoteClusterService = createRemoteClusterService(Settings.EMPTY, local)
         ) {
             var clusterSettings = ClusterSettings.createBuiltInClusterSettings();
             remoteClusterService.listenForUpdates(clusterSettings);

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/node/MockNode.java

@@ -173,7 +173,8 @@ public class MockNode extends Node {
             ClusterSettings clusterSettings,
             TaskManager taskManager,
             Tracer tracer,
-            String nodeId
+            String nodeId,
+            ProjectResolver projectResolver
         ) {
 
             // we use the MockTransportService.TestPlugin class as a marker to create a network
@@ -191,7 +192,8 @@ public class MockNode extends Node {
                     clusterSettings,
                     taskManager,
                     tracer,
-                    nodeId
+                    nodeId,
+                    projectResolver
                 );
             } else {
                 return new MockTransportService(

+ 3 - 0
x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle

@@ -73,6 +73,9 @@ tasks.named("yamlRestTest").configure {
     '^reindex/60_wait_for_active_shards/can override wait_for_active_shards', // <- Requires a single shard
     '^reindex/90_remote/*',
     '^reindex/95_parent_join/Reindex from remote*',
+
+    // Linked project configuration via ClusterSettings currently does not support multi-project
+    '^cluster.stats/30_ccs_stats/cross-cluster search stats search',
   ];
   if (buildParams.snapshotBuild == false) {
     blacklist += [];