瀏覽代碼

Use pooled search response sources in EQL (#105179)

Remove all usage of asUnpooled from EQL, use ref counting in the one spot where the logic forks.
Armin Braun 1 年之前
父節點
當前提交
4cf89428d7

+ 1 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java

@@ -277,6 +277,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
         private Event(StreamInput in) throws IOException {
             index = in.readString();
             id = in.readString();
+            // TODO: make this pooled?
             source = in.readBytesReference();
             if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_13_0) && in.readBoolean()) {
                 fetchFields = in.readMap(DocumentField::new);

+ 4 - 4
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sample/SampleIterator.java

@@ -26,7 +26,6 @@ import org.elasticsearch.xpack.eql.execution.assembler.SampleQueryRequest;
 import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.Limit;
 import org.elasticsearch.xpack.eql.execution.search.QueryClient;
-import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
 import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
 import org.elasticsearch.xpack.eql.session.EmptyPayload;
 import org.elasticsearch.xpack.eql.session.Payload;
@@ -34,6 +33,7 @@ import org.elasticsearch.xpack.eql.session.Payload.Type;
 import org.elasticsearch.xpack.ql.util.ActionListeners;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -222,9 +222,9 @@ public class SampleIterator implements Executable {
 
             for (int responseIndex = 0; responseIndex < response.length; responseIndex++) {
                 MultiSearchResponse.Item item = response[responseIndex];
-                final var hits = RuntimeUtils.searchHits(item.getResponse());
-                if (hits.size() > 0) {
-                    sample.add(hits);
+                final var hits = item.getResponse().getHits();
+                if (hits.getHits().length > 0) {
+                    sample.add(Arrays.asList(hits.getHits()));
                 }
                 if (docGroupsCounter == maxCriteria) {
                     List<List<SearchHit>> matches = matchSamples(sample, maxCriteria, maxSamplesPerKey);

+ 0 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java

@@ -181,11 +181,6 @@ public final class RuntimeUtils {
         return searchRequest;
     }
 
-    public static List<SearchHit> searchHits(SearchResponse response) {
-        // TODO remove unpooled usage
-        return Arrays.asList(response.getHits().asUnpooled().getHits());
-    }
-
     /**
      * optimized method that adds filter to existing bool queries without additional wrapping
      * additionally checks whether the given query exists for safe decoration

+ 21 - 12
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java

@@ -19,6 +19,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.xpack.eql.execution.assembler.BoxedQueryRequest;
@@ -38,6 +39,7 @@ import org.elasticsearch.xpack.ql.util.ActionListeners;
 import org.elasticsearch.xpack.ql.util.CollectionUtils;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -49,7 +51,6 @@ import static java.util.stream.Collectors.toList;
 import static org.elasticsearch.action.ActionListener.runAfter;
 import static org.elasticsearch.xpack.eql.execution.ExecutionUtils.copySource;
 import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.combineFilters;
-import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits;
 import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
 
 /**
@@ -361,18 +362,19 @@ public class TumblingWindow implements Executable {
      */
     private void baseCriterion(int baseStage, SearchResponse r, ActionListener<Payload> listener) {
         SequenceCriterion base = criteria.get(baseStage);
-        List<SearchHit> hits = searchHits(r);
+        SearchHits hits = r.getHits();
 
-        log.trace("Found [{}] hits", hits.size());
+        log.trace("Found [{}] hits", hits.getHits().length);
 
         Ordinal begin = null, end = null;
         WindowInfo info;
 
         // if there is at least one result, process it
-        if (hits.isEmpty() == false) {
+        if (hits.getHits().length > 0) {
             // get borders for the rest of the queries - but only when at least one result is found
-            begin = headOrdinal(hits, base);
-            end = tailOrdinal(hits, base);
+            var hitsAsList = Arrays.asList(hits.getHits());
+            begin = headOrdinal(hitsAsList, base);
+            end = tailOrdinal(hitsAsList, base);
             // always create an ASC window
             info = new WindowInfo(baseStage, begin, end);
 
@@ -391,7 +393,14 @@ public class TumblingWindow implements Executable {
             if (until != null && baseStage > 0) {
                 // find "until" ordinals - early on to discard data in-flight to avoid matching
                 // hits that can occur in other documents
-                untilCriterion(info, listener, () -> completeBaseCriterion(baseStage, hits, info, listener));
+                hits.incRef();
+                untilCriterion(info, listener, () -> {
+                    try {
+                        completeBaseCriterion(baseStage, hits, info, listener);
+                    } finally {
+                        hits.decRef();
+                    }
+                });
                 return;
             }
         } else {
@@ -405,17 +414,17 @@ public class TumblingWindow implements Executable {
         completeBaseCriterion(baseStage, hits, info, listener);
     }
 
-    private void completeBaseCriterion(int baseStage, List<SearchHit> hits, WindowInfo info, ActionListener<Payload> listener) {
+    private void completeBaseCriterion(int baseStage, SearchHits hits, WindowInfo info, ActionListener<Payload> listener) {
         SequenceCriterion base = criteria.get(baseStage);
 
         // check for matches - if the limit has been reached, abort
-        if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
+        if (matcher.match(baseStage, wrapValues(base, Arrays.asList(hits.getHits()))) == false) {
             payload(listener);
             return;
         }
 
         int nextStage = nextPositiveStage(baseStage);
-        boolean windowCompleted = hits.size() < windowSize;
+        boolean windowCompleted = hits.getHits().length < windowSize;
 
         // there are still queries
         if (nextStage > 0) { // -1 means no further positive stages
@@ -527,7 +536,7 @@ public class TumblingWindow implements Executable {
         log.trace("Querying until stage {}", request);
 
         client.query(request, listener.delegateFailureAndWrap((delegate, r) -> {
-            List<SearchHit> hits = searchHits(r);
+            List<SearchHit> hits = Arrays.asList(r.getHits().getHits());
 
             log.trace("Found [{}] hits", hits.size());
             // no more results for until - let the other queries run
@@ -558,7 +567,7 @@ public class TumblingWindow implements Executable {
         log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
 
         client.query(request, listener.delegateFailureAndWrap((delegate, r) -> {
-            List<SearchHit> hits = searchHits(r);
+            List<SearchHit> hits = Arrays.asList(r.getHits().getHits());
 
             // filter hits that are escaping the window (same timestamp but different tiebreaker)
             // apply it only to ASC queries; DESC queries need it to find matches going the opposite direction