瀏覽代碼

Cleanup esql session (#134998)

Ievgen Degtiarenko 3 周之前
父節點
當前提交
04ff798c82

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -319,6 +319,10 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status);
     }
 
+    public Stream<String> getRunningClusterAliases() {
+        return getClusterStates(Cluster.Status.RUNNING).map(Cluster::getClusterAlias);
+    }
+
     @Override
     public String toString() {
         return "EsqlExecutionInfo{"

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

@@ -22,7 +22,7 @@ import java.util.List;
  */
 public class PreAnalyzer {
 
-    public record PreAnalysis(IndexMode indexMode, IndexPattern index, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
+    public record PreAnalysis(IndexMode indexMode, IndexPattern indexPattern, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
         public static final PreAnalysis EMPTY = new PreAnalysis(null, null, List.of(), List.of());
     }
 

+ 16 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

@@ -40,7 +40,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toSet;
 
 public class EsqlCCSUtils {
 
@@ -178,6 +180,15 @@ public class EsqlCCSUtils {
         }
     }
 
+    static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo, String localPattern) {
+        if (executionInfo.getClusters().isEmpty()) {
+            return localPattern;
+        }
+        return executionInfo.getRunningClusterAliases()
+            .map(clusterAlias -> RemoteClusterAware.buildRemoteIndexName(clusterAlias, localPattern))
+            .collect(joining(","));
+    }
+
     static void updateExecutionInfoWithUnavailableClusters(
         EsqlExecutionInfo execInfo,
         Map<String, List<FieldCapabilitiesFailure>> failures
@@ -205,9 +216,7 @@ public class EsqlCCSUtils {
     ) {
         // Get the clusters which are still running, and we will check whether they have any matching indices.
         // NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
-        final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
-            .map(Cluster::getClusterAlias)
-            .collect(Collectors.toSet());
+        final Set<String> clustersWithNoMatchingIndices = executionInfo.getRunningClusterAliases().collect(toSet());
         for (String indexName : indexResolution.resolvedIndices()) {
             clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
         }
@@ -323,10 +332,10 @@ public class EsqlCCSUtils {
     public static void initCrossClusterState(
         IndicesExpressionGrouper indicesGrouper,
         XPackLicenseState licenseState,
-        IndexPattern pattern,
+        IndexPattern indexPattern,
         EsqlExecutionInfo executionInfo
     ) throws ElasticsearchStatusException {
-        if (pattern == null) {
+        if (indexPattern == null) {
             return;
         }
         try {
@@ -335,7 +344,7 @@ public class EsqlCCSUtils {
                 // it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with *
                 Set.copyOf(indicesGrouper.getConfiguredClusters()),
                 IndicesOptions.DEFAULT,
-                pattern.indexPattern()
+                indexPattern.indexPattern()
             );
 
             executionInfo.clusterInfoInitializing(true);

+ 20 - 43
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -84,8 +84,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toSet;
@@ -396,7 +394,7 @@ public class EsqlSession {
         }
 
         var preAnalysis = preAnalyzer.preAnalyze(parsed);
-        EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo);
+        EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);
 
         SubscribableListener. //
         <EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
@@ -437,25 +435,9 @@ public class EsqlSession {
             ThreadPool.Names.SEARCH_COORDINATION,
             ThreadPool.Names.SYSTEM_READ
         );
-        Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;
-
-        String patternWithRemotes;
-
-        if (executionInfo.getClusters().isEmpty()) {
-            patternWithRemotes = localPattern;
-        } else {
-            // convert index -> cluster1:index,cluster2:index, etc.for each running cluster
-            patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
-                .map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
-                .collect(Collectors.joining(","));
-        }
-        if (patternWithRemotes.isEmpty()) {
-            return;
-        }
-        // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
         indexResolver.resolveAsMergedMapping(
-            patternWithRemotes,
-            fieldNames,
+            EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
+            result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
             null,
             false,
             listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
@@ -560,10 +542,8 @@ public class EsqlSession {
         });
 
         // These are clusters that are still in the running, we need to have the index on all of them
-        Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
         // Verify that all active clusters have the lookup index resolved
-        clusters.forEach(cluster -> {
-            String clusterAlias = cluster.getClusterAlias();
+        executionInfo.getRunningClusterAliases().forEach(clusterAlias -> {
             if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) {
                 // Missing cluster resolution
                 skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias));
@@ -622,9 +602,7 @@ public class EsqlSession {
      * concrete indices aliased to the same index name.
      */
     private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
-        Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
-        clusters.forEach(cluster -> {
-            String clusterAlias = cluster.getClusterAlias();
+        executionInfo.getRunningClusterAliases().forEach(clusterAlias -> {
             if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
                 // No need to check local, obviously
                 var connection = remoteClusterService.getConnection(clusterAlias);
@@ -655,31 +633,30 @@ public class EsqlSession {
             ThreadPool.Names.SEARCH_COORDINATION,
             ThreadPool.Names.SYSTEM_READ
         );
-        if (preAnalysis.index() != null) {
+        if (preAnalysis.indexPattern() != null) {
             String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
             if (indexExpressionToResolve.isEmpty()) {
                 // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
                 listener.onResponse(
-                    result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of())))
+                    result.withIndexResolution(
+                        IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))
+                    )
                 );
             } else {
-                boolean includeAllDimensions = false;
-                // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
-                if (preAnalysis.indexMode() == IndexMode.TIME_SERIES) {
-                    includeAllDimensions = true;
-                    // TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
-                    var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
-                    if (requestFilter != null) {
-                        requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
-                    } else {
-                        requestFilter = indexModeFilter;
-                    }
-                }
                 indexResolver.resolveAsMergedMapping(
                     indexExpressionToResolve,
                     result.fieldNames,
-                    requestFilter,
-                    includeAllDimensions,
+                    // Maybe if no indices are returned, retry without index mode and provide a clearer error message.
+                    switch (preAnalysis.indexMode()) {
+                        case IndexMode.TIME_SERIES -> {
+                            var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
+                            yield requestFilter != null
+                                ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
+                                : indexModeFilter;
+                        }
+                        default -> requestFilter;
+                    },
+                    preAnalysis.indexMode() == IndexMode.TIME_SERIES,
                     listener.delegateFailure((l, indexResolution) -> {
                         l.onResponse(result.withIndexResolution(indexResolution));
                     })

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -530,12 +530,12 @@ public class CsvTests extends ESTestCase {
 
     private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) {
         var preAnalysis = new PreAnalyzer().preAnalyze(parsed);
-        if (preAnalysis.index() == null) {
+        if (preAnalysis.indexPattern() == null) {
             // If the data set doesn't matter we'll just grab one we know works. Employees is fine.
             return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees"));
         }
 
-        String indexName = preAnalysis.index().indexPattern();
+        String indexName = preAnalysis.indexPattern().indexPattern();
         List<CsvTestsDataLoader.TestDataset> datasets = new ArrayList<>();
         if (indexName.endsWith("*")) {
             String indexPrefix = indexName.substring(0, indexName.length() - 1);

+ 65 - 39
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

@@ -32,12 +32,12 @@ import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
 import org.elasticsearch.xpack.esql.plan.IndexPattern;
 import org.elasticsearch.xpack.esql.type.EsFieldTests;
+import org.hamcrest.Matcher;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -60,20 +60,16 @@ public class EsqlCCSUtilsTests extends ESTestCase {
     private final String REMOTE2_ALIAS = "remote2";
 
     public void testCreateIndexExpressionFromAvailableClusters() {
-
+        var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
         // no clusters marked as skipped
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
             executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
             executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
-
-            String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
-            List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
-            assertThat(list.size(), equalTo(5));
-            assertThat(
-                new HashSet<>(list),
-                equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*"))
+            assertIndexPattern(
+                EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
+                containsInAnyOrder("logs*", "remote1:*", "remote2:mylogs1", "remote2:mylogs2", "remote2:logs*")
             );
         }
 
@@ -84,38 +80,23 @@ public class EsqlCCSUtilsTests extends ESTestCase {
             executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true));
             executionInfo.swapCluster(
                 REMOTE2_ALIAS,
-                (k, v) -> new EsqlExecutionInfo.Cluster(
-                    REMOTE2_ALIAS,
-                    "mylogs1,mylogs2,logs*",
-                    true,
-                    EsqlExecutionInfo.Cluster.Status.SKIPPED
-                )
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
+            );
+            assertIndexPattern(
+                EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
+                containsInAnyOrder("logs*", "remote1:*", "remote1:foo")
             );
-
-            String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
-            List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
-            assertThat(list.size(), equalTo(3));
-            assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo")));
         }
 
         // two clusters marked as skipped, so only local cluster present in revised index expression
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
-            executionInfo.swapCluster(
-                REMOTE1_ALIAS,
-                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
-            );
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, skipped));
             executionInfo.swapCluster(
                 REMOTE2_ALIAS,
-                (k, v) -> new EsqlExecutionInfo.Cluster(
-                    REMOTE2_ALIAS,
-                    "mylogs1,mylogs2,logs*",
-                    true,
-                    EsqlExecutionInfo.Cluster.Status.SKIPPED
-                )
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
             );
-
             assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*"));
         }
 
@@ -128,18 +109,64 @@ public class EsqlCCSUtilsTests extends ESTestCase {
             );
             executionInfo.swapCluster(
                 REMOTE2_ALIAS,
-                (k, v) -> new EsqlExecutionInfo.Cluster(
-                    REMOTE2_ALIAS,
-                    "mylogs1,mylogs2,logs*",
-                    true,
-                    EsqlExecutionInfo.Cluster.Status.SKIPPED
-                )
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
             );
-
             assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo(""));
         }
     }
 
+    public void testCreateQualifiedLookupIndexExpressionFromAvailableClusters() {
+
+        var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
+        // no clusters marked as skipped
+        {
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true));
+            assertIndexPattern(
+                EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
+                containsInAnyOrder("lookup", REMOTE1_ALIAS + ":lookup", REMOTE2_ALIAS + ":lookup")
+            );
+        }
+        // one cluster marked as skipped
+        {
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
+            assertIndexPattern(
+                EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
+                containsInAnyOrder("lookup", REMOTE1_ALIAS + ":lookup")
+            );
+        }
+        // all remotes marked as skipped
+        {
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, skipped));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
+            assertIndexPattern(
+                EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
+                containsInAnyOrder("lookup")
+            );
+        }
+        // all remotes are skipped and no local
+        {
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, skipped));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
+            assertIndexPattern(
+                EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
+                containsInAnyOrder()
+            );
+        }
+    }
+
+    private static void assertIndexPattern(String indexPattern, Matcher<Iterable<? extends String>> matcher) {
+        assertThat(Set.of(Strings.splitStringByCommaToArray(indexPattern)), matcher);
+    }
+
     public void testUpdateExecutionInfoWithUnavailableClusters() {
 
         // skip_unavailable=true clusters are unavailable, both marked as SKIPPED
@@ -806,5 +833,4 @@ public class EsqlCCSUtilsTests extends ESTestCase {
             return originalIndicesMap;
         }
     }
-
 }