Browse Source

Always stop the timer when profiling the fetch phase (#132570)

If setNextReader in any sub fetch phase throws we will not stop
the profile timer we started. Subphases do various things in
setNextReader from validations, throwing Exceptions outright, to
reading Lucene specific things like doc values.

This moves the timer.stop call in a finally block to make sure
everything timer related is copacetic.
Andrei Dan 2 months ago
parent
commit
f4a7948b5b

+ 5 - 0
docs/changelog/132570.yaml

@@ -0,0 +1,5 @@
+pr: 132570
+summary: Always stop the timer when profiling the fetch phase
+area: Search
+type: bug
+issues: []

+ 15 - 11
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

@@ -165,17 +165,21 @@ public final class FetchPhase {
             @Override
             protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException {
                 Timer timer = profiler.startNextReader();
-                this.ctx = ctx;
-                this.leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(ctx);
-                this.leafStoredFieldLoader = storedFieldLoader.getLoader(ctx, docsInLeaf);
-                this.leafSourceLoader = sourceLoader.leaf(ctx.reader(), docsInLeaf);
-                this.leafIdLoader = idLoader.leaf(leafStoredFieldLoader, ctx.reader(), docsInLeaf);
-                fieldLookupProvider.setNextReader(ctx);
-                for (FetchSubPhaseProcessor processor : processors) {
-                    processor.setNextReader(ctx);
-                }
-                if (timer != null) {
-                    timer.stop();
+                try {
+                    this.ctx = ctx;
+                    this.leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(ctx);
+                    this.leafStoredFieldLoader = storedFieldLoader.getLoader(ctx, docsInLeaf);
+                    this.leafSourceLoader = sourceLoader.leaf(ctx.reader(), docsInLeaf);
+                    this.leafIdLoader = idLoader.leaf(leafStoredFieldLoader, ctx.reader(), docsInLeaf);
+
+                    fieldLookupProvider.setNextReader(ctx);
+                    for (FetchSubPhaseProcessor processor : processors) {
+                        processor.setNextReader(ctx);
+                    }
+                } finally {
+                    if (timer != null) {
+                        timer.stop();
+                    }
                 }
             }
 

+ 74 - 1
server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java

@@ -50,6 +50,7 @@ import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.fetch.FetchPhase;
+import org.elasticsearch.search.fetch.FetchPhaseExecutionException;
 import org.elasticsearch.search.fetch.FetchSearchResult;
 import org.elasticsearch.search.fetch.FetchSubPhase;
 import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
@@ -63,6 +64,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.lookup.Source;
 import org.elasticsearch.search.profile.ProfileResult;
+import org.elasticsearch.search.profile.Profilers;
 import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
 import org.elasticsearch.search.profile.SearchProfileShardResult;
 import org.elasticsearch.search.query.QuerySearchResult;
@@ -873,6 +875,63 @@ public class FetchSearchPhaseTests extends ESTestCase {
         }
     }
 
+    public void testTimerStoppedAndSubPhasesExceptionsPropagate() throws IOException {
+        // if the timer is not stopped properly whilst profiling the fetch phase the exceptions
+        // in sub phases#setNextReader will not propagate as the cause that failed the fetch phase (instead a timer illegal state exception
+        // will propagate)
+        // this tests ensures that exceptions in sub phases are propagated correctly as the cause of the fetch phase failure (which in turn
+        // implies the timer was handled correctly)
+        Directory dir = newDirectory();
+        RandomIndexWriter w = new RandomIndexWriter(random(), dir);
+
+        String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }";
+        for (int i = 0; i < 10; i++) {
+            Document document = new Document();
+            document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
+            w.addDocument(document);
+        }
+        if (randomBoolean()) {
+            w.forceMerge(1);
+        }
+        IndexReader r = w.getReader();
+        w.close();
+        ContextIndexSearcher contextIndexSearcher = createSearcher(r);
+        try (
+            SearchContext searchContext = createSearchContext(
+                contextIndexSearcher,
+                true,
+                new NoopCircuitBreaker(CircuitBreaker.REQUEST),
+                true
+            )
+        ) {
+            FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() {
+                @Override
+                public void setNextReader(LeafReaderContext readerContext) throws IOException {
+                    throw new IOException("bad things");
+                }
+
+                @Override
+                public void process(FetchSubPhase.HitContext hitContext) throws IOException {
+                    Source source = hitContext.source();
+                    hitContext.hit().sourceRef(source.internalSourceRef());
+                }
+
+                @Override
+                public StoredFieldsSpec storedFieldsSpec() {
+                    return StoredFieldsSpec.NEEDS_SOURCE;
+                }
+            }));
+            FetchPhaseExecutionException fetchPhaseExecutionException = assertThrows(
+                FetchPhaseExecutionException.class,
+                () -> fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null)
+            );
+            assertThat(fetchPhaseExecutionException.getCause().getMessage(), is("bad things"));
+        } finally {
+            r.close();
+            dir.close();
+        }
+    }
+
     private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException {
         return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() {
             @Override
@@ -910,13 +969,22 @@ public class FetchSearchPhaseTests extends ESTestCase {
     }
 
     private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
-        return createSearchContext(contextIndexSearcher, allowPartialResults, null);
+        return createSearchContext(contextIndexSearcher, allowPartialResults, null, false);
     }
 
     private static SearchContext createSearchContext(
         ContextIndexSearcher contextIndexSearcher,
         boolean allowPartialResults,
         @Nullable CircuitBreaker circuitBreaker
+    ) {
+        return createSearchContext(contextIndexSearcher, allowPartialResults, circuitBreaker, false);
+    }
+
+    private static SearchContext createSearchContext(
+        ContextIndexSearcher contextIndexSearcher,
+        boolean allowPartialResults,
+        @Nullable CircuitBreaker circuitBreaker,
+        boolean profileEnabled
     ) {
         IndexSettings indexSettings = new IndexSettings(
             IndexMetadata.builder("index")
@@ -999,6 +1067,11 @@ public class FetchSearchPhaseTests extends ESTestCase {
                     return super.circuitBreaker();
                 }
             }
+
+            @Override
+            public Profilers getProfilers() {
+                return profileEnabled ? new Profilers(contextIndexSearcher) : null;
+            }
         };
         searchContext.addReleasable(searchContext.fetchResult()::decRef);
         searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));