Sfoglia il codice sorgente

Refactor remote cluster handling in Analyzer (#126426)

* Refactor remote cluster handling in Analyzer

- Initialize clusters earlier
- Simplify cluster set calculation
- No need to keep separate skipped list for enrich resolution
Stanislav Malyshev 6 mesi fa
parent
commit
b21e3253a8

+ 0 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

@@ -23,7 +23,6 @@ public final class EnrichResolution {
 
     private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
     private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
-    private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
 
     public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
         return resolvedPolicies.get(new Key(policyName, mode));
@@ -52,14 +51,6 @@ public final class EnrichResolution {
         errors.putIfAbsent(new Key(policyName, mode), reason);
     }
 
-    public void addUnavailableCluster(String clusterAlias, Exception e) {
-        unavailableClusters.put(clusterAlias, e);
-    }
-
-    public Map<String, Exception> getUnavailableClusters() {
-        return unavailableClusters;
-    }
-
     private record Key(String policyName, Enrich.Mode mode) {
 
     }

+ 17 - 15
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.enrich;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.RefCountingListener;
@@ -37,6 +36,7 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -49,7 +49,6 @@ import org.elasticsearch.xpack.esql.session.IndexResolver;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -59,6 +58,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
+
 /**
  * Resolves enrich policies across clusters in several steps:
  * 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
@@ -98,21 +99,22 @@ public class EnrichPolicyResolver {
     /**
      * Resolves a set of enrich policies
      *
-     * @param targetClusters     the target clusters
      * @param unresolvedPolicies the unresolved policies
+     * @param executionInfo      the execution info
      * @param listener           notified with the enrich resolution
      */
     public void resolvePolicies(
-        Collection<String> targetClusters,
         Collection<UnresolvedPolicy> unresolvedPolicies,
+        EsqlExecutionInfo executionInfo,
         ActionListener<EnrichResolution> listener
     ) {
-        if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
+        if (unresolvedPolicies.isEmpty()) {
             listener.onResponse(new EnrichResolution());
             return;
         }
-        final Set<String> remoteClusters = new HashSet<>(targetClusters);
-        final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+
+        final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
+        final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
         lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
             final EnrichResolution enrichResolution = new EnrichResolution();
 
@@ -121,7 +123,14 @@ public class EnrichPolicyResolver {
             for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
                 String clusterAlias = entry.getKey();
                 if (entry.getValue().connectionError != null) {
-                    enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
+                    assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
+                        : "Should never have a connection error for the local cluster";
+                    markClusterWithFinalStateAndNoShards(
+                        executionInfo,
+                        clusterAlias,
+                        EsqlExecutionInfo.Cluster.Status.SKIPPED,
+                        entry.getValue().connectionError
+                    );
                     // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
                     remoteClusters.remove(clusterAlias);
                 } else {
@@ -445,11 +454,4 @@ public class EnrichPolicyResolver {
             listener
         );
     }
-
-    public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
-        return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
-    }
 }

+ 22 - 47
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.esql.session;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.OriginalIndices;
-import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.Strings;
@@ -19,7 +18,6 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.core.Releasables;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.IndexModeFieldMapper;
 import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -85,7 +83,6 @@ import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
 import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -365,16 +362,10 @@ public class EsqlSession {
         final List<IndexPattern> indices = preAnalysis.indices;
 
         EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
-
-        final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
-            configuredClusters,
-            indices.stream()
-                .flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern())))
-                .toArray(String[]::new)
-        ).keySet();
+        initializeClusterData(indices, executionInfo);
 
         var listener = SubscribableListener.<EnrichResolution>newForked(
-            l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
+            l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
         )
             .<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l))
             .<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l));
@@ -400,12 +391,6 @@ public class EsqlSession {
         }).<PreAnalysisResult>andThen((l, result) -> {
             assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
 
-            // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
-            // resolving one more time (the first attempt failed and the query had a filter)
-            for (String clusterAlias : executionInfo.clusterAliases()) {
-                executionInfo.swapCluster(clusterAlias, (k, v) -> null);
-            }
-
             // here the requestFilter is set to null, performing the pre-analysis after the first step failed
             preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
         }).<LogicalPlan>andThen((l, result) -> {
@@ -435,6 +420,26 @@ public class EsqlSession {
         // TODO: Verify that the resolved index actually has indexMode: "lookup"
     }
 
+    private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {
+        if (indices.isEmpty()) {
+            return;
+        }
+        assert indices.size() == 1 : "Only single index pattern is supported";
+        Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
+            configuredClusters,
+            IndicesOptions.DEFAULT,
+            indices.getFirst().indexPattern()
+        );
+        for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
+            final String clusterAlias = entry.getKey();
+            String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
+            executionInfo.swapCluster(clusterAlias, (k, v) -> {
+                assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
+                return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
+            });
+        }
+    }
+
     private void preAnalyzeMainIndices(
         PreAnalyzer.PreAnalysis preAnalysis,
         EsqlExecutionInfo executionInfo,
@@ -448,38 +453,8 @@ public class EsqlSession {
             // Note: JOINs are not supported but we detect them when
             listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
         } else if (indices.size() == 1) {
-            // known to be unavailable from the enrich policy API call
-            Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
             IndexPattern table = indices.getFirst();
 
-            Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
-                configuredClusters,
-                IndicesOptions.DEFAULT,
-                table.indexPattern()
-            );
-            for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
-                final String clusterAlias = entry.getKey();
-                String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
-                executionInfo.swapCluster(clusterAlias, (k, v) -> {
-                    assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
-                    if (unavailableClusters.containsKey(k)) {
-                        return new EsqlExecutionInfo.Cluster(
-                            clusterAlias,
-                            indexExpr,
-                            executionInfo.isSkipUnavailable(clusterAlias),
-                            EsqlExecutionInfo.Cluster.Status.SKIPPED,
-                            0,
-                            0,
-                            0,
-                            0,
-                            List.of(new ShardSearchFailure(unavailableClusters.get(k))),
-                            new TimeValue(0)
-                        );
-                    } else {
-                        return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
-                    }
-                });
-            }
             // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
             // based only on available clusters (which could now be an empty list)
             String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);

+ 6 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

@@ -39,6 +39,7 @@ import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -429,6 +430,10 @@ public class EnrichPolicyResolverTests extends ESTestCase {
 
         EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
             PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
+            EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
+            for (String cluster : clusters) {
+                esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
+            }
             if (randomBoolean()) {
                 unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
                 for (Enrich.Mode mode : Enrich.Mode.values()) {
@@ -442,7 +447,7 @@ public class EnrichPolicyResolverTests extends ESTestCase {
                     unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
                 }
             }
-            super.resolvePolicies(clusters, unresolvedPolicies, future);
+            super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
             return future.actionGet(30, TimeUnit.SECONDS);
         }