瀏覽代碼

Add internalClusterTest for and fix leak in ExpandSearchPhase (#108562)

`ExpandSearchPhase` was leaking `SearchHits` when a pooled `SearchHits`
that was read from the wire was added to an unpooled `SearchHit`.
This commit makes the relevant `SearchHit` instances that need to be
pooled so they released nested hits, pooled. This requires a couple of
smaller adjustments in the codebase, mainly around error handling.
Armin Braun 1 年之前
父節點
當前提交
e64f1b6317

+ 6 - 0
docs/changelog/108562.yaml

@@ -0,0 +1,6 @@
+pr: 108562
+summary: Add `internalClusterTest` for and fix leak in `ExpandSearchPhase`
+area: Search
+type: bug
+issues:
+ - 108369

+ 42 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/CollapseSearchResultsIT.java

@@ -0,0 +1,42 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.search.collapse.CollapseBuilder;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
+
+public class CollapseSearchResultsIT extends ESIntegTestCase {
+
+    public void testCollapse() {
+        final String indexName = "test_collapse";
+        createIndex(indexName);
+        final String collapseField = "collapse_field";
+        assertAcked(indicesAdmin().preparePutMapping(indexName).setSource(collapseField, "type=keyword"));
+        index(indexName, "id_1", Map.of(collapseField, "value1"));
+        index(indexName, "id_2", Map.of(collapseField, "value2"));
+        refresh(indexName);
+        assertNoFailuresAndResponse(
+            prepareSearch(indexName).setQuery(new MatchAllQueryBuilder())
+                .setCollapse(new CollapseBuilder(collapseField).setInnerHits(new InnerHitBuilder("ih").setSize(2))),
+            searchResponse -> {
+                assertEquals(collapseField, searchResponse.getHits().getCollapseField());
+                assertEquals(Set.of(new BytesRef("value1"), new BytesRef("value2")), Set.of(searchResponse.getHits().getCollapseValues()));
+            }
+        );
+    }
+}

+ 1 - 0
server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

@@ -101,6 +101,7 @@ final class ExpandSearchPhase extends SearchPhase {
                         hit.setInnerHits(Maps.newMapWithExpectedSize(innerHitBuilders.size()));
                     }
                     hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
+                    assert innerHits.isPooled() == false || hit.isPooled() : "pooled inner hits can only be added to a pooled hit";
                     innerHits.mustIncRef();
                 }
             }

+ 22 - 10
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

@@ -167,23 +167,35 @@ public final class FetchPhase {
                     leafSourceLoader,
                     leafIdLoader
                 );
-                sourceProvider.source = hit.source();
-                fieldLookupProvider.setPreloadedStoredFieldValues(hit.hit().getId(), hit.loadedFields());
-                for (FetchSubPhaseProcessor processor : processors) {
-                    processor.process(hit);
+                boolean success = false;
+                try {
+                    sourceProvider.source = hit.source();
+                    fieldLookupProvider.setPreloadedStoredFieldValues(hit.hit().getId(), hit.loadedFields());
+                    for (FetchSubPhaseProcessor processor : processors) {
+                        processor.process(hit);
+                    }
+                    success = true;
+                    return hit.hit();
+                } finally {
+                    if (success == false) {
+                        hit.hit().decRef();
+                    }
                 }
-                return hit.hit();
             }
         };
 
         SearchHit[] hits = docsIterator.iterate(context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad);
 
         if (context.isCancelled()) {
+            for (SearchHit hit : hits) {
+                // release all hits that would otherwise become owned and eventually released by SearchHits below
+                hit.decRef();
+            }
             throw new TaskCancelledException("cancelled");
         }
 
         TotalHits totalHits = context.getTotalHits();
-        return SearchHits.unpooled(hits, totalHits, context.getMaxScore());
+        return new SearchHits(hits, totalHits, context.getMaxScore());
     }
 
     List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContext context, Profiler profiler) {
@@ -257,12 +269,12 @@ public final class FetchPhase {
 
         String id = idLoader.getId(subDocId);
         if (id == null) {
-            // TODO: can we use pooled buffers here as well?
-            SearchHit hit = SearchHit.unpooled(docId, null);
+            SearchHit hit = new SearchHit(docId);
+            // TODO: can we use real pooled buffers here as well?
             Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId));
             return new HitContext(hit, subReaderContext, subDocId, Map.of(), source);
         } else {
-            SearchHit hit = SearchHit.unpooled(docId, id);
+            SearchHit hit = new SearchHit(docId, id);
             Source source;
             if (requiresSource) {
                 Timer timer = profiler.startLoadingSource();
@@ -339,7 +351,7 @@ public final class FetchPhase {
         assert nestedIdentity != null;
         Source nestedSource = nestedIdentity.extractSource(rootSource);
 
-        SearchHit hit = SearchHit.unpooled(topDocId, rootId, nestedIdentity);
+        SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity);
         return new HitContext(hit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource);
     }
 

+ 1 - 0
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

@@ -67,6 +67,7 @@ abstract class FetchPhaseDocsIterator {
                     setNextReader(ctx, docsInLeaf);
                 }
                 currentDoc = docs[i].docId;
+                assert searchHits[docs[i].index] == null;
                 searchHits[docs[i].index] = nextDoc(docs[i].docId);
             }
         } catch (Exception e) {

+ 6 - 1
server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java

@@ -61,8 +61,13 @@ public final class FetchSearchResult extends SearchPhaseResult {
 
     public void shardResult(SearchHits hits, ProfileResult profileResult) {
         assert assertNoSearchTarget(hits);
+        assert hasReferences();
+        var existing = this.hits;
+        if (existing != null) {
+            existing.decRef();
+        }
         this.hits = hits;
-        hits.incRef();
+        hits.mustIncRef();
         assert this.profileResult == null;
         this.profileResult = profileResult;
     }

+ 1 - 0
server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsPhase.java

@@ -104,6 +104,7 @@ public final class InnerHitsPhase implements FetchSubPhase {
                 }
             }
             var h = fetchResult.hits();
+            assert hit.isPooled() || h.isPooled() == false;
             results.put(entry.getKey(), h);
             h.mustIncRef();
         }