فهرست منبع

Refactor RemoteClusterService to be multi-project aware (#131894)

This is the first in a series of PRs to refactor the
RemoteClusterService to support multi-project.  This
first PR uses the DefaultProjectResolver.INSTANCE in
the constructor.  A follow up PR will handle refactoring
needed to pass in the ProjectResolver in the constructor.
Other follow up PRs will handle refactoring
RemoteClusterAware.updateRemoteCluster() and other
settings related methods that require a project ID to
be provided.
Jeremy Dahlgren 2 ماه پیش
والد
کامیت
4be260af50
1فایلهای تغییر یافته به همراه113 افزوده شده و 41 حذف شده
  1. 113 41
      server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

+ 113 - 41
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -21,8 +21,11 @@ import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.client.internal.RemoteClusterClient;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.ProjectId;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.project.DefaultProjectResolver;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.SecureSetting;
@@ -31,6 +34,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.FixForMultiProject;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.indices.IndicesExpressionGrouper;
@@ -52,6 +56,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.common.settings.Setting.boolSetting;
@@ -154,14 +159,20 @@ public final class RemoteClusterService extends RemoteClusterAware
     }
 
     private final TransportService transportService;
-    private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
+    private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters;
     private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
+    private final ProjectResolver projectResolver;
 
+    @FixForMultiProject(description = "Inject the ProjectResolver instance.")
     RemoteClusterService(Settings settings, TransportService transportService) {
         super(settings);
         this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
         this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
         this.transportService = transportService;
+        this.projectResolver = DefaultProjectResolver.INSTANCE;
+        this.remoteClusters = projectResolver.supportsMultipleProjects()
+            ? ConcurrentCollections.newConcurrentMap()
+            : Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap());
         this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
         if (remoteClusterServerEnabled) {
             registerRemoteClusterHandshakeRequestHandler(transportService);
@@ -250,8 +261,9 @@ public final class RemoteClusterService extends RemoteClusterAware
     /**
      * Returns the registered remote cluster names.
      */
+    @FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.")
     public Set<String> getRegisteredRemoteClusterNames() {
-        return remoteClusters.keySet();
+        return getConnectionsMapForCurrentProject().keySet();
     }
 
     /**
@@ -328,7 +340,8 @@ public final class RemoteClusterService extends RemoteClusterAware
                 "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
             );
         }
-        RemoteClusterConnection connection = remoteClusters.get(cluster);
+        @FixForMultiProject(description = "Verify all callers will have the proper context set for resolving the origin project ID.")
+        RemoteClusterConnection connection = getConnectionsMapForCurrentProject().get(cluster);
         if (connection == null) {
             throw new NoSuchRemoteClusterException(cluster);
         }
@@ -342,48 +355,63 @@ public final class RemoteClusterService extends RemoteClusterAware
     }
 
     private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
-        RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
+        RemoteClusterConnection remote = getConnectionsMapForCurrentProject().get(clusterAlias);
         if (remote != null) {
             remote.setSkipUnavailable(skipUnavailable);
         }
     }
 
+    @FixForMultiProject(description = "Refactor as needed to support project specific changes to linked remotes.")
     public synchronized void updateRemoteClusterCredentials(Supplier<Settings> settingsSupplier, ActionListener<Void> listener) {
+        final var projectId = projectResolver.getProjectId();
         final Settings settings = settingsSupplier.get();
         final UpdateRemoteClusterCredentialsResult result = remoteClusterCredentialsManager.updateClusterCredentials(settings);
         // We only need to rebuild connections when a credential was newly added or removed for a cluster alias, not if the credential
         // value was updated. Therefore, only consider added or removed aliases
         final int totalConnectionsToRebuild = result.addedClusterAliases().size() + result.removedClusterAliases().size();
         if (totalConnectionsToRebuild == 0) {
-            logger.debug("no connection rebuilding required after credentials update");
+            logger.debug("project [{}] no connection rebuilding required after credentials update", projectId);
             listener.onResponse(null);
             return;
         }
-        logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild);
+        logger.info("project [{}] rebuilding [{}] connections after credentials update", projectId, totalConnectionsToRebuild);
         try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) {
             for (var clusterAlias : result.addedClusterAliases()) {
-                maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
+                maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
             }
             for (var clusterAlias : result.removedClusterAliases()) {
-                maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs);
+                maybeRebuildConnectionOnCredentialsChange(projectId, clusterAlias, settings, connectionRefs);
             }
         }
     }
 
-    // package-private for testing
-
-    private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) {
-        if (false == remoteClusters.containsKey(clusterAlias)) {
+    private void maybeRebuildConnectionOnCredentialsChange(
+        ProjectId projectId,
+        String clusterAlias,
+        Settings settings,
+        RefCountingRunnable connectionRefs
+    ) {
+        final var connectionsMap = getConnectionsMapForProject(projectId);
+        if (false == connectionsMap.containsKey(clusterAlias)) {
             // A credential was added or removed before a remote connection was configured.
             // Without an existing connection, there is nothing to rebuild.
-            logger.info("no connection rebuild required for remote cluster [{}] after credentials change", clusterAlias);
+            logger.info(
+                "project [{}] no connection rebuild required for remote cluster [{}] after credentials change",
+                projectId,
+                clusterAlias
+            );
             return;
         }
 
-        updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
+        updateRemoteCluster(projectId, clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() {
             @Override
             public void onResponse(RemoteClusterConnectionStatus status) {
-                logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status);
+                logger.info(
+                    "project [{}] remote cluster connection [{}] updated after credentials change: [{}]",
+                    projectId,
+                    clusterAlias,
+                    status
+                );
             }
 
             @Override
@@ -392,23 +420,32 @@ public final class RemoteClusterService extends RemoteClusterAware
                 // does *not* imply a failure to reload secure settings; however, that's how it would surface in the reload-settings call.
                 // Instead, we log a warning which is also consistent with how we handle remote cluster settings updates (logging instead of
                 // returning an error)
-                logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e);
+                logger.warn(
+                    () -> "project ["
+                        + projectId
+                        + "] failed to update remote cluster connection ["
+                        + clusterAlias
+                        + "] after credentials change",
+                    e
+                );
             }
         }, connectionRefs.acquire()));
     }
 
     @Override
     protected void updateRemoteCluster(String clusterAlias, Settings settings) {
+        @FixForMultiProject(description = "ES-12270: Refactor as needed to support project specific changes to linked remotes.")
+        final var projectId = projectResolver.getProjectId();
         CountDownLatch latch = new CountDownLatch(1);
-        updateRemoteCluster(clusterAlias, settings, ActionListener.runAfter(new ActionListener<>() {
+        updateRemoteCluster(projectId, clusterAlias, settings, false, ActionListener.runAfter(new ActionListener<>() {
             @Override
             public void onResponse(RemoteClusterConnectionStatus status) {
-                logger.info("remote cluster connection [{}] updated: {}", clusterAlias, status);
+                logger.info("project [{}] remote cluster connection [{}] updated: {}", projectId, clusterAlias, status);
             }
 
             @Override
             public void onFailure(Exception e) {
-                logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "]", e);
+                logger.warn(() -> "project [" + projectId + " failed to update remote cluster connection [" + clusterAlias + "]", e);
             }
         }, latch::countDown));
 
@@ -417,25 +454,35 @@ public final class RemoteClusterService extends RemoteClusterAware
             // are on the cluster state thread and our custom future implementation will throw an
             // assertion.
             if (latch.await(10, TimeUnit.SECONDS) == false) {
-                logger.warn("failed to update remote cluster connection [{}] within {}", clusterAlias, TimeValue.timeValueSeconds(10));
+                logger.warn(
+                    "project [{}] failed to update remote cluster connection [{}] within {}",
+                    projectId,
+                    clusterAlias,
+                    TimeValue.timeValueSeconds(10)
+                );
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
     }
 
-    /**
-     * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure
-     *
-     * @param clusterAlias a cluster alias to discovery node mapping representing the remote clusters seeds nodes
-     * @param newSettings the updated settings for the remote connection
-     * @param listener a listener invoked once every configured cluster has been connected to
-     */
+    // Package-access for testing.
+    @FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.")
     void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener<RemoteClusterConnectionStatus> listener) {
-        updateRemoteCluster(clusterAlias, newSettings, false, listener);
+        updateRemoteCluster(projectResolver.getProjectId(), clusterAlias, newSettings, false, listener);
     }
 
+    /**
+     * Adds, rebuilds, or closes and removes the connection for the specified remote cluster.
+     *
+     * @param projectId The project the remote cluster is associated with.
+     * @param clusterAlias The alias used for the remote cluster being connected.
+     * @param newSettings The updated settings for the remote connection.
+     * @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it.
+     * @param listener The listener invoked once the configured cluster has been connected.
+     */
     private synchronized void updateRemoteCluster(
+        ProjectId projectId,
         String clusterAlias,
         Settings newSettings,
         boolean forceRebuild,
@@ -445,14 +492,15 @@ public final class RemoteClusterService extends RemoteClusterAware
             throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
         }
 
-        RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
+        final var connectionMap = getConnectionsMapForProject(projectId);
+        RemoteClusterConnection remote = connectionMap.get(clusterAlias);
         if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) {
             try {
                 IOUtils.close(remote);
             } catch (IOException e) {
-                logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
+                logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
             }
-            remoteClusters.remove(clusterAlias);
+            connectionMap.remove(clusterAlias);
             listener.onResponse(RemoteClusterConnectionStatus.DISCONNECTED);
             return;
         }
@@ -461,19 +509,19 @@ public final class RemoteClusterService extends RemoteClusterAware
             // this is a new cluster we have to add a new representation
             Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
             remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
-            remoteClusters.put(clusterAlias, remote);
+            connectionMap.put(clusterAlias, remote);
             remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
         } else if (forceRebuild || remote.shouldRebuildConnection(newSettings)) {
             // Changes to connection configuration. Must tear down existing connection
             try {
                 IOUtils.close(remote);
             } catch (IOException e) {
-                logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
+                logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
             }
-            remoteClusters.remove(clusterAlias);
+            connectionMap.remove(clusterAlias);
             Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
             remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
-            remoteClusters.put(clusterAlias, remote);
+            connectionMap.put(clusterAlias, remote);
             remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
         } else {
             // No changes to connection configuration.
@@ -493,6 +541,8 @@ public final class RemoteClusterService extends RemoteClusterAware
      * to all configured seed nodes.
      */
     void initializeRemoteClusters() {
+        @FixForMultiProject(description = "Refactor for initializing connections to linked projects for each origin project supported.")
+        final var projectId = projectResolver.getProjectId();
         final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
         final PlainActionFuture<Void> future = new PlainActionFuture<>();
         Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings);
@@ -503,7 +553,7 @@ public final class RemoteClusterService extends RemoteClusterAware
 
         CountDownActionListener listener = new CountDownActionListener(enabledClusters.size(), future);
         for (String clusterAlias : enabledClusters) {
-            updateRemoteCluster(clusterAlias, settings, listener.map(ignored -> null));
+            updateRemoteCluster(projectId, clusterAlias, settings, false, listener.map(ignored -> null));
         }
 
         if (enabledClusters.isEmpty()) {
@@ -515,19 +565,20 @@ public final class RemoteClusterService extends RemoteClusterAware
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         } catch (TimeoutException ex) {
-            logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
+            logger.warn("project [{}] failed to connect to remote clusters within {}", projectId, timeValue.toString());
         } catch (Exception e) {
-            logger.warn("failed to connect to remote clusters", e);
+            logger.warn("project [" + projectId + "] failed to connect to remote clusters", e);
         }
     }
 
     @Override
     public void close() throws IOException {
-        IOUtils.close(remoteClusters.values());
+        IOUtils.close(remoteClusters.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList()));
     }
 
+    @FixForMultiProject(description = "Analyze use cases, determine possible need for cluster scoped and project scoped versions.")
     public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
-        return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
+        return getConnectionsMapForCurrentProject().values().stream().map(RemoteClusterConnection::getConnectionInfo);
     }
 
     @Override
@@ -549,9 +600,11 @@ public final class RemoteClusterService extends RemoteClusterAware
                 "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
             );
         }
+        @FixForMultiProject(description = "Analyze usages and determine if the project ID must be provided.")
+        final var projectConnectionsMap = getConnectionsMapForCurrentProject();
         final var connectionsMap = new HashMap<String, RemoteClusterConnection>();
         for (String cluster : clusters) {
-            final var connection = this.remoteClusters.get(cluster);
+            final var connection = projectConnectionsMap.get(cluster);
             if (connection == null) {
                 listener.onFailure(new NoSuchRemoteClusterException(cluster));
                 return;
@@ -654,6 +707,25 @@ public final class RemoteClusterService extends RemoteClusterAware
         );
     }
 
+    /**
+     * Returns the map of connections for the {@link ProjectId} currently returned by the {@link ProjectResolver}.
+     */
+    private Map<String, RemoteClusterConnection> getConnectionsMapForCurrentProject() {
+        return getConnectionsMapForProject(projectResolver.getProjectId());
+    }
+
+    /**
+     * Returns the map of connections for the given {@link ProjectId}.
+     */
+    private Map<String, RemoteClusterConnection> getConnectionsMapForProject(ProjectId projectId) {
+        if (projectResolver.supportsMultipleProjects()) {
+            assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment";
+            return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap());
+        }
+        assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported";
+        return remoteClusters.get(projectId);
+    }
+
     private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {
 
         private final String clusterAlias;