Переглянути джерело

Handle partial search result with point in time (#81349)

Today, a search request with PIT would fail immediately if any 
associated indices or nodes are gone, which is inconsistent when
allow_partial_search_results is true.

Relates #81256
Nhat Nguyen 3 роки тому
батько
коміт
d0d91c690e

+ 3 - 2
docs/reference/search/point-in-time-api.asciidoc

@@ -54,7 +54,7 @@ POST /_search <1>
     }
 }
 --------------------------------------------------
-// TEST[catch:missing]
+// TEST[catch:unavailable]
 
 <1> A search request with the `pit` parameter must not specify `index`, `routing`,
 and {ref}/search-request-body.html#request-body-search-preference[`preference`]
@@ -88,7 +88,8 @@ Additionally, if a segment contains deleted or updated documents then the
 point in time must keep track of whether each document in the segment was live at
 the time of the initial search request. Ensure that your nodes have sufficient heap
 space if you have many open point-in-times on an index that is subject to ongoing
-deletes or updates.
+deletes or updates. Note that a point-in-time doesn't prevent its associated indices
+from being deleted.
 
 You can check how many point-in-times (i.e, search contexts) are open with the
 <<cluster-nodes-stats,nodes stats API>>:

+ 3 - 3
docs/reference/search/search-your-data/paginate-search-results.asciidoc

@@ -106,7 +106,7 @@ GET /_search
   ]
 }
 ----
-// TEST[catch:missing]
+// TEST[catch:unavailable]
 
 <1> PIT ID for the search.
 <2> Sorts hits for the search with an implicit tiebreak on `_shard_doc` ascending.
@@ -138,7 +138,7 @@ GET /_search
   ]
 }
 ----
-// TEST[catch:missing]
+// TEST[catch:unavailable]
 
 <1> PIT ID for the search.
 <2> Sorts hits for the search with an explicit tiebreak on `_shard_doc` descending.
@@ -205,7 +205,7 @@ GET /_search
   "track_total_hits": false                        <3>
 }
 ----
-// TEST[catch:missing]
+// TEST[catch:unavailable]
 
 <1> PIT ID returned by the previous search.
 <2> Sort values from the previous search's last hit.

+ 31 - 14
server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java

@@ -8,13 +8,13 @@
 
 package org.elasticsearch.action.search;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.admin.indices.stats.CommonStats;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -241,21 +241,38 @@ public class PointInTimeIT extends ESIntegTestCase {
         }
         refresh();
         String pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2));
-        SearchResponse resp1 = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
-        assertNoFailures(resp1);
-        assertHitCount(resp1, index1 + index2);
-        client().admin().indices().prepareDelete("index-1").get();
-        if (randomBoolean()) {
-            SearchResponse resp2 = client().prepareSearch("index-*").get();
-            assertNoFailures(resp2);
-            assertHitCount(resp2, index2);
+        try {
+            SearchResponse resp = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
+            assertNoFailures(resp);
+            assertHitCount(resp, index1 + index2);
+            client().admin().indices().prepareDelete("index-1").get();
+            if (randomBoolean()) {
+                resp = client().prepareSearch("index-*").get();
+                assertNoFailures(resp);
+                assertHitCount(resp, index2);
+            }
 
+            // Allow partial search result
+            resp = client().prepareSearch()
+                .setPreference(null)
+                .setAllowPartialSearchResults(true)
+                .setPointInTime(new PointInTimeBuilder(pit))
+                .get();
+            assertFailures(resp);
+            assertHitCount(resp, index2);
+
+            // Do not allow partial search result
+            expectThrows(
+                ElasticsearchException.class,
+                () -> client().prepareSearch()
+                    .setPreference(null)
+                    .setAllowPartialSearchResults(false)
+                    .setPointInTime(new PointInTimeBuilder(pit))
+                    .get()
+            );
+        } finally {
+            closePointInTime(pit);
         }
-        expectThrows(
-            IndexNotFoundException.class,
-            () -> client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get()
-        );
-        closePointInTime(resp1.pointInTimeId());
     }
 
     public void testCanMatch() throws Exception {

+ 29 - 13
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -45,8 +45,10 @@ import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.search.SearchPhaseResult;
@@ -914,6 +916,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     ) {
 
         clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
+        if (searchRequest.allowPartialSearchResults() == null) {
+            // No user preference defined in search request - apply cluster service default
+            searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
+        }
 
         // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
         // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
@@ -931,7 +937,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                 localIndices,
                 searchRequest.getLocalClusterAlias(),
                 searchContext,
-                searchRequest.pointInTimeBuilder().getKeepAlive()
+                searchRequest.pointInTimeBuilder().getKeepAlive(),
+                searchRequest.allowPartialSearchResults()
             );
         } else {
             final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
@@ -988,10 +995,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
             searchRequest.searchType(QUERY_THEN_FETCH);
         }
-        if (searchRequest.allowPartialSearchResults() == null) {
-            // No user preference defined in search request - apply cluster service default
-            searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
-        }
         if (searchRequest.isSuggestOnly()) {
             // disable request cache if we have only suggest
             searchRequest.requestCache(false);
@@ -1413,22 +1416,35 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         OriginalIndices originalIndices,
         String localClusterAlias,
         SearchContextId searchContext,
-        TimeValue keepAlive
+        TimeValue keepAlive,
+        boolean allowPartialSearchResults
     ) {
         final List<SearchShardIterator> iterators = new ArrayList<>(searchContext.shards().size());
         for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
             final SearchContextIdForNode perNode = entry.getValue();
             if (Strings.isEmpty(perNode.getClusterAlias())) {
                 final ShardId shardId = entry.getKey();
-                final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
-                final List<String> targetNodes = new ArrayList<>(shards.size());
-                targetNodes.add(perNode.getNode());
-                if (perNode.getSearchContextId().getSearcherId() != null) {
-                    for (ShardRouting shard : shards) {
-                        if (shard.currentNodeId().equals(perNode.getNode()) == false) {
-                            targetNodes.add(shard.currentNodeId());
+                final List<String> targetNodes = new ArrayList<>(2);
+                // Prefer executing shard requests on nodes that are part of PIT first.
+                if (clusterState.nodes().nodeExists(perNode.getNode())) {
+                    targetNodes.add(perNode.getNode());
+                }
+                try {
+                    final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
+                    if (perNode.getSearchContextId().getSearcherId() != null) {
+                        for (ShardRouting shard : shards) {
+                            if (shard.currentNodeId().equals(perNode.getNode()) == false) {
+                                targetNodes.add(shard.currentNodeId());
+                            }
                         }
                     }
+                } catch (IndexNotFoundException | ShardNotFoundException e) {
+                    // We can hit these exceptions if the index was deleted after creating PIT or the cluster state on
+                    // this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards
+                    // when allowPartialSearchResults is false
+                    if (allowPartialSearchResults == false) {
+                        throw e;
+                    }
                 }
                 OriginalIndices finalIndices = new OriginalIndices(
                     new String[] { shardId.getIndexName() },

+ 38 - 2
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -81,6 +82,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -1273,7 +1275,7 @@ public class TransportSearchActionTests extends ESTestCase {
             } else {
                 // relocated or no longer assigned
                 relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
-                targetNode = UUIDs.randomBase64UUID();
+                targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
             }
             contexts.put(
                 new ShardId(indexMetadata.getIndex(), shardId),
@@ -1292,7 +1294,8 @@ public class TransportSearchActionTests extends ESTestCase {
             OriginalIndices.NONE,
             null,
             new SearchContextId(contexts, aliasFilterMap),
-            keepAlive
+            keepAlive,
+            randomBoolean()
         );
         shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId));
         assertThat(shardIterators, hasSize(numberOfShards));
@@ -1319,5 +1322,38 @@ public class TransportSearchActionTests extends ESTestCase {
             assertThat(shardIterator.getSearchContextId(), equalTo(context.getSearchContextId()));
             assertThat(shardIterator.getSearchContextKeepAlive(), equalTo(keepAlive));
         }
+
+        // Fails when some indices don't exist and `allowPartialSearchResults` is false.
+        ShardId anotherShardId = new ShardId(new Index("another-index", IndexMetadata.INDEX_UUID_NA_VALUE), randomIntBetween(0, 10));
+        contexts.put(
+            anotherShardId,
+            new SearchContextIdForNode(
+                null,
+                randomFrom(clusterState.nodes().getAllNodes()).getId(),
+                new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
+            )
+        );
+        IndexNotFoundException error = expectThrows(IndexNotFoundException.class, () -> {
+            TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
+                clusterState,
+                OriginalIndices.NONE,
+                null,
+                new SearchContextId(contexts, aliasFilterMap),
+                keepAlive,
+                false
+            );
+        });
+        assertThat(error.getIndex().getName(), equalTo("another-index"));
+        // Ok when some indices don't exist and `allowPartialSearchResults` is true.
+        Optional<SearchShardIterator> anotherShardIterator = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
+            clusterState,
+            OriginalIndices.NONE,
+            null,
+            new SearchContextId(contexts, aliasFilterMap),
+            keepAlive,
+            true
+        ).stream().filter(si -> si.shardId().equals(anotherShardId)).findFirst();
+        assertTrue(anotherShardIterator.isPresent());
+        assertThat(anotherShardIterator.get().getTargetNodeIds(), hasSize(1));
     }
 }

+ 47 - 0
x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/frozen/FrozenIndexIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.index.engine.frozen;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.ClosePointInTimeAction;
 import org.elasticsearch.action.search.ClosePointInTimeRequest;
@@ -46,6 +47,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -272,4 +274,49 @@ public class FrozenIndexIT extends ESIntegTestCase {
             client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
         }
     }
+
+    public void testPointInTimeWithDeletedIndices() {
+        createIndex("index-1");
+        createIndex("index-2");
+
+        int index1 = randomIntBetween(10, 50);
+        for (int i = 0; i < index1; i++) {
+            String id = Integer.toString(i);
+            client().prepareIndex("index-1").setId(id).setSource("value", i).get();
+        }
+
+        int index2 = randomIntBetween(10, 50);
+        for (int i = 0; i < index2; i++) {
+            String id = Integer.toString(i);
+            client().prepareIndex("index-2").setId(id).setSource("value", i).get();
+        }
+
+        assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index-1", "index-2")).actionGet());
+        final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest("index-*").indicesOptions(
+            IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED
+        ).keepAlive(TimeValue.timeValueMinutes(2));
+
+        final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPointInTimeRequest).actionGet().getPointInTimeId();
+        try {
+            client().admin().indices().prepareDelete("index-1").get();
+            // Return partial results if allow partial search result is allowed
+            SearchResponse resp = client().prepareSearch()
+                .setPreference(null)
+                .setAllowPartialSearchResults(true)
+                .setPointInTime(new PointInTimeBuilder(pitId))
+                .get();
+            assertFailures(resp);
+            assertHitCount(resp, index2);
+            // Fails if allow partial search result is not allowed
+            expectThrows(
+                ElasticsearchException.class,
+                client().prepareSearch()
+                    .setPreference(null)
+                    .setAllowPartialSearchResults(false)
+                    .setPointInTime(new PointInTimeBuilder(pitId))::get
+            );
+        } finally {
+            client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
+        }
+    }
 }