Bladeren bron

Add cluster alias to search_shards API (#95923)

While integrating the new search_shards API with CCS, I encountered an 
issue where the search_shards operation returned a wrong result when 
using the _index:cluster:index_name filter. The reason for this failure
is that we are not sending the cluster alias to the search_shards API,
resulting in the search context lacking a cluster alias to match the
index pattern filter.

Relates #94534
Nhat Nguyen 2 jaren geleden
bovenliggende
commit
cbcbd01d5e

+ 6 - 3
server/src/internalClusterTest/java/org/elasticsearch/action/search/SearchShardsIT.java

@@ -52,7 +52,8 @@ public class SearchShardsIT extends ESIntegTestCase {
                 rangeQuery,
                 null,
                 null,
-                randomBoolean()
+                randomBoolean(),
+                randomBoolean() ? null : randomAlphaOfLength(10)
             );
             var resp = client().execute(SearchShardsAction.INSTANCE, request).actionGet();
             assertThat(resp.getGroups(), hasSize(indicesWithData + indicesWithoutData));
@@ -79,7 +80,8 @@ public class SearchShardsIT extends ESIntegTestCase {
                 matchAll,
                 null,
                 null,
-                randomBoolean()
+                randomBoolean(),
+                randomBoolean() ? null : randomAlphaOfLength(10)
             );
             SearchShardsResponse resp = client().execute(SearchShardsAction.INSTANCE, request).actionGet();
             assertThat(resp.getGroups(), hasSize(indicesWithData + indicesWithoutData));
@@ -120,7 +122,8 @@ public class SearchShardsIT extends ESIntegTestCase {
                 rangeQuery,
                 null,
                 preference,
-                randomBoolean()
+                randomBoolean(),
+                randomBoolean() ? null : randomAlphaOfLength(10)
             );
             var searchShardsResponse = client().execute(SearchShardsAction.INSTANCE, searchShardsRequest).actionGet();
 

+ 67 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java

@@ -15,6 +15,11 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchShardsAction;
+import org.elasticsearch.action.search.SearchShardsGroup;
+import org.elasticsearch.action.search.SearchShardsRequest;
+import org.elasticsearch.action.search.SearchShardsResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.internal.Client;
@@ -26,6 +31,7 @@ import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.plugins.Plugin;
@@ -60,6 +66,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
@@ -447,4 +454,64 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
             super.onIndexModule(indexModule);
         }
     }
+
+    public void testSearchShardsWithIndexNameQuery() {
+        int numShards = randomIntBetween(1, 10);
+        Client remoteClient = client("cluster_a");
+        remoteClient.admin()
+            .indices()
+            .prepareCreate("my_index")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards))
+            .get();
+        int numDocs = randomIntBetween(100, 500);
+        for (int i = 0; i < numDocs; i++) {
+            remoteClient.prepareIndex("my_index").setSource("f", "v").get();
+        }
+        remoteClient.admin().indices().prepareRefresh("my_index").get();
+        String[] indices = new String[] { "my_index" };
+        IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
+        {
+            QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
+            SearchShardsRequest request = new SearchShardsRequest(indices, indicesOptions, query, null, null, randomBoolean(), "cluster_a");
+            SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
+            assertThat(resp.getGroups(), hasSize(numShards));
+            for (SearchShardsGroup group : resp.getGroups()) {
+                assertFalse(group.skipped());
+            }
+        }
+        {
+            QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
+            SearchShardsRequest request = new SearchShardsRequest(
+                indices,
+                indicesOptions,
+                query,
+                null,
+                null,
+                randomBoolean(),
+                randomFrom("cluster_b", null)
+            );
+            SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
+            assertThat(resp.getGroups(), hasSize(numShards));
+            for (SearchShardsGroup group : resp.getGroups()) {
+                assertTrue(group.skipped());
+            }
+        }
+        {
+            QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:not_my_index");
+            SearchShardsRequest request = new SearchShardsRequest(
+                indices,
+                indicesOptions,
+                query,
+                null,
+                null,
+                randomBoolean(),
+                randomFrom("cluster_a", "cluster_b", null)
+            );
+            SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
+            assertThat(resp.getGroups(), hasSize(numShards));
+            for (SearchShardsGroup group : resp.getGroups()) {
+                assertTrue(group.skipped());
+            }
+        }
+    }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

@@ -74,6 +74,7 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
     private final Map<String, AliasFilter> aliasFilter;
     private final SearchTask task;
     private final Executor executor;
+    private final boolean requireAtLeastOneMatch;
 
     private final CanMatchSearchPhaseResults results;
     private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;
@@ -89,6 +90,7 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
         GroupShardsIterator<SearchShardIterator> shardsIts,
         TransportSearchAction.SearchTimeProvider timeProvider,
         SearchTask task,
+        boolean requireAtLeastOneMatch,
         CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
         ActionListener<GroupShardsIterator<SearchShardIterator>> listener
     ) {
@@ -103,6 +105,7 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
         this.concreteIndexBoosts = concreteIndexBoosts;
         this.aliasFilter = aliasFilter;
         this.task = task;
+        this.requireAtLeastOneMatch = requireAtLeastOneMatch;
         this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
         this.executor = executor;
         this.shardItIndexMap = new HashMap<>();
@@ -142,6 +145,7 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
     // tries to pre-filter shards based on information that's available to the coordinator
     // without having to reach out to the actual shards
     private void runCoordinatorRewritePhase() {
+        // TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator
         assert assertSearchCoordinationThread();
         final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>();
         for (SearchShardIterator searchShardIterator : shardsIts) {
@@ -497,7 +501,8 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
     ) {
         int cardinality = results.getNumPossibleMatches();
         FixedBitSet possibleMatches = results.getPossibleMatches();
-        if (cardinality == 0) {
+        // TODO: pick the local shard when possible
+        if (requireAtLeastOneMatch && cardinality == 0) {
             // this is a special case where we have no hit but we need to get at least one search response in order
             // to produce a valid search result with all the aggs etc.
             // Since it's possible that some of the shards that we're skipping are

+ 17 - 5
server/src/main/java/org/elasticsearch/action/search/SearchShardsRequest.java

@@ -30,7 +30,6 @@ import java.util.Objects;
 public final class SearchShardsRequest extends ActionRequest implements IndicesRequest.Replaceable {
     private String[] indices;
     private final IndicesOptions indicesOptions;
-
     @Nullable
     private final QueryBuilder query;
 
@@ -41,13 +40,16 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
 
     private final boolean allowPartialSearchResults;
 
+    private final String clusterAlias;
+
     public SearchShardsRequest(
         String[] indices,
         IndicesOptions indicesOptions,
         QueryBuilder query,
         String routing,
         String preference,
-        boolean allowPartialSearchResults
+        boolean allowPartialSearchResults,
+        String clusterAlias
     ) {
         this.indices = indices;
         this.indicesOptions = indicesOptions;
@@ -55,6 +57,7 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
         this.routing = routing;
         this.preference = preference;
         this.allowPartialSearchResults = allowPartialSearchResults;
+        this.clusterAlias = clusterAlias;
     }
 
     public SearchShardsRequest(StreamInput in) throws IOException {
@@ -65,6 +68,7 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
         this.routing = in.readOptionalString();
         this.preference = in.readOptionalString();
         this.allowPartialSearchResults = in.readBoolean();
+        this.clusterAlias = in.readOptionalString();
     }
 
     @Override
@@ -76,6 +80,7 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
         out.writeOptionalString(routing);
         out.writeOptionalString(preference);
         out.writeBoolean(allowPartialSearchResults);
+        out.writeOptionalString(clusterAlias);
     }
 
     @Override
@@ -104,6 +109,10 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
         return new SearchTask(id, type, action, this::description, parentTaskId, headers);
     }
 
+    public String clusterAlias() {
+        return clusterAlias;
+    }
+
     public QueryBuilder query() {
         return query;
     }
@@ -134,7 +143,9 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
             + preference
             + '\''
             + ", allowPartialSearchResults="
-            + allowPartialSearchResults;
+            + allowPartialSearchResults
+            + ", clusterAlias="
+            + clusterAlias;
     }
 
     @Override
@@ -152,12 +163,13 @@ public final class SearchShardsRequest extends ActionRequest implements IndicesR
             && Objects.equals(query, request.query)
             && Objects.equals(routing, request.routing)
             && Objects.equals(preference, request.preference)
-            && allowPartialSearchResults == request.allowPartialSearchResults;
+            && allowPartialSearchResults == request.allowPartialSearchResults
+            && Objects.equals(clusterAlias, request.clusterAlias);
     }
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(indicesOptions, query, routing, preference, allowPartialSearchResults);
+        int result = Objects.hash(indicesOptions, query, routing, preference, allowPartialSearchResults, clusterAlias);
         result = 31 * result + Arrays.hashCode(indices);
         return result;
     }

+ 10 - 2
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -868,7 +868,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             final Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, searchRequest.indices());
             aliasFilter = buildIndexAliasFilters(clusterState, indicesAndAliases, indices);
             aliasFilter.putAll(remoteAliasMap);
-            localShardIterators = getLocalShardsIterator(clusterState, searchRequest, indicesAndAliases, concreteLocalIndices);
+            localShardIterators = getLocalShardsIterator(
+                clusterState,
+                searchRequest,
+                searchRequest.getLocalClusterAlias(),
+                indicesAndAliases,
+                concreteLocalIndices
+            );
         }
         final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
 
@@ -1043,6 +1049,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                     shardIterators,
                     timeProvider,
                     task,
+                    true,
                     searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
                     ActionListener.wrap(iters -> {
                         SearchPhase action = newSearchPhase(
@@ -1334,6 +1341,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     List<SearchShardIterator> getLocalShardsIterator(
         ClusterState clusterState,
         SearchRequest searchRequest,
+        String clusterAlias,
         Set<String> indicesAndAliases,
         String[] concreteIndices
     ) {
@@ -1356,7 +1364,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         return StreamSupport.stream(shardRoutings.spliterator(), false).map(it -> {
             OriginalIndices finalIndices = originalIndices.get(it.shardId().getIndex().getName());
             assert finalIndices != null;
-            return new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), finalIndices);
+            return new SearchShardIterator(clusterAlias, it.shardId(), it.getShardRoutings(), finalIndices);
         }).toList();
     }
 }

+ 9 - 4
server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

@@ -34,12 +34,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
  * An internal search shards API performs the can_match phase and returns target shards of indices that might match a query.
  */
 public class TransportSearchShardsAction extends HandledTransportAction<SearchShardsRequest, SearchShardsResponse> {
+    private final TransportService transportService;
     private final TransportSearchAction transportSearchAction;
     private final SearchService searchService;
     private final RemoteClusterService remoteClusterService;
@@ -59,6 +61,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
         IndexNameExpressionResolver indexNameExpressionResolver
     ) {
         super(SearchShardsAction.NAME, transportService, actionFilters, SearchShardsRequest::new);
+        this.transportService = transportService;
         this.transportSearchAction = transportSearchAction;
         this.searchService = searchService;
         this.remoteClusterService = transportService.getRemoteClusterService();
@@ -108,13 +111,14 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
                 var shardIterators = transportSearchAction.getLocalShardsIterator(
                     clusterState,
                     searchRequest,
+                    searchShardsRequest.clusterAlias(),
                     indicesAndAliases,
                     concreteIndexNames
                 );
-                var canMatchPhase = new CanMatchPreFilterSearchPhase(
-                    logger,
-                    searchTransportService,
-                    (clusterAlias, node) -> searchTransportService.getConnection(clusterAlias, clusterState.nodes().get(node)),
+                var canMatchPhase = new CanMatchPreFilterSearchPhase(logger, searchTransportService, (clusterAlias, node) -> {
+                    assert Objects.equals(clusterAlias, searchShardsRequest.clusterAlias());
+                    return transportService.getConnection(clusterState.nodes().get(node));
+                },
                     aliasFilters,
                     Map.of(),
                     threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
@@ -122,6 +126,7 @@ public class TransportSearchShardsAction extends HandledTransportAction<SearchSh
                     GroupShardsIterator.sortAndCreate(shardIterators),
                     timeProvider,
                     (SearchTask) task,
+                    false,
                     searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
                     listener.map(shardIts -> new SearchShardsResponse(toGroups(shardIts), clusterState.nodes().getAllNodes(), aliasFilters))
                 );

+ 5 - 0
server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

@@ -149,6 +149,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
             shardsIter,
             timeProvider,
             null,
+            true,
             EMPTY_CONTEXT_PROVIDER,
             ActionListener.wrap(iter -> {
                 result.set(iter);
@@ -246,6 +247,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
             shardsIter,
             timeProvider,
             null,
+            true,
             EMPTY_CONTEXT_PROVIDER,
             ActionListener.wrap(iter -> {
                 result.set(iter);
@@ -338,6 +340,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
                 shardsIter,
                 timeProvider,
                 null,
+                true,
                 EMPTY_CONTEXT_PROVIDER,
                 ActionListener.wrap(iter -> {
                     result.set(iter);
@@ -439,6 +442,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
                 shardsIter,
                 timeProvider,
                 null,
+                true,
                 EMPTY_CONTEXT_PROVIDER,
                 ActionListener.wrap(iter -> {
                     result.set(iter);
@@ -811,6 +815,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
             shardsIter,
             timeProvider,
             null,
+            true,
             contextProvider,
             ActionListener.wrap(iter -> {
                 result.set(iter);

+ 62 - 8
server/src/test/java/org/elasticsearch/action/search/SearchShardsRequestTests.java

@@ -39,35 +39,76 @@ public class SearchShardsRequestTests extends AbstractWireSerializingTestCase<Se
         QueryBuilder query = QueryBuilders.termQuery(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
         String routing = randomBoolean() ? null : randomAlphaOfLength(10);
         String preference = randomBoolean() ? null : randomAlphaOfLength(10);
-        return new SearchShardsRequest(indices, indicesOptions, query, routing, preference, randomBoolean());
+        String clusterAlias = randomBoolean() ? null : randomAlphaOfLength(10);
+        return new SearchShardsRequest(indices, indicesOptions, query, routing, preference, randomBoolean(), clusterAlias);
     }
 
     @Override
     protected SearchShardsRequest mutateInstance(SearchShardsRequest r) throws IOException {
-        return switch (between(0, 5)) {
+        return switch (between(0, 6)) {
             case 0 -> {
                 String[] extraIndices = randomArray(1, 10, String[]::new, () -> randomAlphaOfLength(10));
                 String[] indices = ArrayUtils.concat(r.indices(), extraIndices);
-                yield new SearchShardsRequest(indices, r.indicesOptions(), r.query(), r.routing(), r.preference(), randomBoolean());
+                yield new SearchShardsRequest(
+                    indices,
+                    r.indicesOptions(),
+                    r.query(),
+                    r.routing(),
+                    r.preference(),
+                    randomBoolean(),
+                    r.clusterAlias()
+                );
             }
             case 1 -> {
                 IndicesOptions indicesOptions = randomValueOtherThan(
                     r.indicesOptions(),
                     () -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
                 );
-                yield new SearchShardsRequest(r.indices(), indicesOptions, r.query(), r.routing(), r.preference(), randomBoolean());
+                yield new SearchShardsRequest(
+                    r.indices(),
+                    indicesOptions,
+                    r.query(),
+                    r.routing(),
+                    r.preference(),
+                    randomBoolean(),
+                    r.clusterAlias()
+                );
             }
             case 2 -> {
                 QueryBuilder query = QueryBuilders.rangeQuery(randomAlphaOfLengthBetween(5, 20)).from(randomNonNegativeLong());
-                yield new SearchShardsRequest(r.indices(), r.indicesOptions(), query, r.routing(), r.preference(), randomBoolean());
+                yield new SearchShardsRequest(
+                    r.indices(),
+                    r.indicesOptions(),
+                    query,
+                    r.routing(),
+                    r.preference(),
+                    randomBoolean(),
+                    r.clusterAlias()
+                );
             }
             case 3 -> {
                 String routing = randomValueOtherThan(r.routing(), () -> randomBoolean() ? null : randomAlphaOfLength(10));
-                yield new SearchShardsRequest(r.indices(), r.indicesOptions(), r.query(), routing, r.preference(), randomBoolean());
+                yield new SearchShardsRequest(
+                    r.indices(),
+                    r.indicesOptions(),
+                    r.query(),
+                    routing,
+                    r.preference(),
+                    randomBoolean(),
+                    r.clusterAlias()
+                );
             }
             case 4 -> {
                 String preference = randomValueOtherThan(r.preference(), () -> randomBoolean() ? null : randomAlphaOfLength(10));
-                yield new SearchShardsRequest(r.indices(), r.indicesOptions(), r.query(), r.routing(), preference, randomBoolean());
+                yield new SearchShardsRequest(
+                    r.indices(),
+                    r.indicesOptions(),
+                    r.query(),
+                    r.routing(),
+                    preference,
+                    randomBoolean(),
+                    r.clusterAlias()
+                );
             }
             case 5 -> new SearchShardsRequest(
                 r.indices(),
@@ -75,8 +116,21 @@ public class SearchShardsRequestTests extends AbstractWireSerializingTestCase<Se
                 r.query(),
                 r.routing(),
                 r.preference(),
-                r.allowPartialSearchResults() == false
+                r.allowPartialSearchResults() == false,
+                r.clusterAlias()
             );
+            case 6 -> {
+                String clusterAlias = randomValueOtherThan(r.clusterAlias(), () -> randomBoolean() ? null : randomAlphaOfLength(10));
+                yield new SearchShardsRequest(
+                    r.indices(),
+                    r.indicesOptions(),
+                    r.query(),
+                    r.routing(),
+                    r.preference(),
+                    randomBoolean(),
+                    clusterAlias
+                );
+            }
             default -> throw new AssertionError("unexpected value");
         };
     }