1
0
Эх сурвалжийг харах

EQL: Fix matching of tail/desc queries (#59827)

When dealing with tail queries, data is returned descending for the base
criterion yet the rest of the queries are ascending. This caused a
problem during insertion since while in a page, the data is ASC, between
pages the blocks of data is DESC.
This caused incorrectly sorting inside a SequenceGroup which led to
incorrect results.

Further more in case of limit, since the data in a page is ASC, early
return is not possible neither is desc matching. Thus the page needs to
be consumed first before finding the final results.
A future improvement could be to keep only the top N results dropping
the rest during insertion time.
Costin Leau 5 жил өмнө
parent
commit
77c88da054

+ 3 - 2
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java

@@ -105,8 +105,9 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
 
             boolean[] values = spec.caseSensitive() == null ? new boolean[] { true, false } : new boolean[] { spec.caseSensitive() };
             
-            for (boolean bool : values) {
-                results.add(new Object[] { spec.query(), name, spec.expectedEventIds(), bool });
+            for (boolean sensitive : values) {
+                String prefixed = name + (sensitive ? "-sensitive" : "-insensitive");
+                results.add(new Object[] { spec.query(), prefixed, spec.expectedEventIds(), sensitive });
             }
         }
 

+ 2 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

@@ -74,7 +74,7 @@ public class ExecutionManager {
                 QueryRequest original = () -> source;
                 BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
                 Criterion<BoxedQueryRequest> criterion =
-                        new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);
+                        new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i == 0 && descending);
                 criteria.add(criterion);
             } else {
                 // until
@@ -87,7 +87,7 @@ public class ExecutionManager {
         }
         
         int completionStage = criteria.size() - 1;
-        SequenceMatcher matcher = new SequenceMatcher(completionStage, maxSpan, limit);
+        SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit);
 
         TumblingWindow w = new TumblingWindow(new BasicQueryClient(session),
                 criteria.subList(0, completionStage),

+ 20 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Limit.java

@@ -8,9 +8,13 @@ package org.elasticsearch.xpack.eql.execution.search;
 
 import org.elasticsearch.xpack.eql.util.MathUtils;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static java.util.Collections.emptyList;
+
 public class Limit {
 
     private final int limit;
@@ -23,6 +27,10 @@ public class Limit {
         this.total = MathUtils.abs(limit) + offset;
     }
 
+    public int limit() {
+        return limit;
+    }
+
     public int absLimit() {
         return MathUtils.abs(limit);
     }
@@ -58,7 +66,19 @@ public class Limit {
      * Offer a limited view (including offset) for the given list.
      */
     public <E> List<E> view(List<E> values) {
+        if (values == null || values.isEmpty()) {
+            return values;
+        }
+        if (limit == 0) {
+            return emptyList();
+        }
+        
+        if (limit < 0) {
+            values = new ArrayList<>(values);
+            Collections.reverse(values);
+        }
         int size = values.size();
+
         if (size >= total) {
             return values.subList(offset, total);
         }

+ 20 - 3
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

@@ -29,7 +29,7 @@ class KeyToSequences {
         this.keyToUntil = new LinkedHashMap<>();
     }
 
-    private SequenceGroup[] group(SequenceKey key) {
+    private SequenceGroup[] groups(SequenceKey key) {
         return keyToSequences.computeIfAbsent(key, k -> new SequenceGroup[listSize]);
     }
 
@@ -44,7 +44,7 @@ class KeyToSequences {
 
     void add(int stage, Sequence sequence) {
         SequenceKey key = sequence.key();
-        SequenceGroup[] groups = group(key);
+        SequenceGroup[] groups = groups(key);
         // create the group on demand
         if (groups[stage] == null) {
             groups[stage] = new SequenceGroup(key);
@@ -52,6 +52,24 @@ class KeyToSequences {
         groups[stage].add(sequence);
     }
 
+    void resetGroupInsertPosition() {
+        for (SequenceGroup[] groups : keyToSequences.values()) {
+            for (SequenceGroup group : groups) {
+                if (group != null) {
+                    group.resetInsertPosition();
+                }
+            }
+        }
+    }
+
+    void resetUntilInsertPosition() {
+        for (UntilGroup until : keyToUntil.values()) {
+            if (until != null) {
+                until.resetInsertPosition();
+            }
+        }
+    }
+
     void until(Iterable<KeyAndOrdinal> until) {
         for (KeyAndOrdinal keyAndOrdinal : until) {
             // ignore unknown keys
@@ -111,5 +129,4 @@ class KeyToSequences {
     public String toString() {
         return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
     }
-
 }

+ 25 - 6
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java

@@ -28,6 +28,13 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
     // timestamp compression (whose range is known for the current frame).
     private final List<E> elements = new LinkedList<>();
 
+    /**
+     * index in the list used for resetting the insertion point
+     * it gets reset when dealing with descending queries since the data inserted is ascending in a page
+     * but descending compared to the previous stages.
+     */
+    private int insertPosition = 0;
+
     private int hashCode = 0;
 
     private Ordinal start, stop;
@@ -39,12 +46,11 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
         this.extractor = extractor;
     }
 
-    public SequenceKey key() {
+    SequenceKey key() {
         return key;
     }
 
-    public void add(E element) {
-        elements.add(element);
+    void add(E element) {
         hashCode = 31 * hashCode + Objects.hashCode(element);
 
         Ordinal ordinal = extractor.apply(element);
@@ -60,8 +66,14 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
                 stop = ordinal;
             }
         }
+        // add element at the current position
+        elements.add(insertPosition++, element);
     }
-    
+
+    void resetInsertPosition() {
+        insertPosition = 0;
+    }
+
     /**
      * Returns the latest element from the group that has its timestamp
      * less than the given argument alongside its position in the list.
@@ -72,7 +84,14 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
 
         // trim
         if (match != null) {
-            elements.subList(0, match.v2() + 1).clear();
+            int pos = match.v2() + 1;
+            elements.subList(0, pos).clear();
+
+            // update insert position
+            insertPosition = insertPosition - pos;
+            if (insertPosition < 0) {
+                insertPosition = 0;
+            }
 
             // update min time
             if (elements.isEmpty() == false) {
@@ -107,7 +126,7 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
         return match != null ? new Tuple<>(match, matchPos) : null;
     }
 
-    public boolean isEmpty() {
+    boolean isEmpty() {
         return elements.isEmpty();
     }
 

+ 69 - 18
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

@@ -62,18 +62,23 @@ public class SequenceMatcher {
 
     /** list of completed sequences - separate to avoid polluting the other stages */
     private final List<Sequence> completed;
+    private int completedInsertPosition = 0;
+
     private final long maxSpanInMillis;
 
+    private final boolean descending;
+
     private Limit limit;
-    private boolean limitReached = false;
+    private boolean headLimit = false;
 
     private final Stats stats = new Stats();
 
     @SuppressWarnings("rawtypes")
-    public SequenceMatcher(int stages, TimeValue maxSpan, Limit limit) {
+    public SequenceMatcher(int stages, boolean descending, TimeValue maxSpan, Limit limit) {
         this.numberOfStages = stages;
         this.completionStage = stages - 1;
 
+        this.descending = descending;
         this.stageToKeys = new StageToKeys(completionStage);
         this.keyToSequences = new KeyToSequences(completionStage);
         this.completed = new LinkedList<>();
@@ -84,7 +89,7 @@ public class SequenceMatcher {
         this.limit = limit;
     }
 
-    public void trackSequence(Sequence sequence) {
+    private void trackSequence(Sequence sequence) {
         SequenceKey key = sequence.key();
 
         stageToKeys.add(0, key);
@@ -97,29 +102,42 @@ public class SequenceMatcher {
      * Match hits for the given stage.
      * Returns false if the process needs to be stopped.
      */
-    public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
+    boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
         for (Tuple<KeyAndOrdinal, HitReference> tuple : hits) {
             KeyAndOrdinal ko = tuple.v1();
             HitReference hit = tuple.v2();
 
             if (stage == 0) {
                 Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
+                // descending queries return descending blocks of ASC data
+                // to avoid sorting things during insertion,
+
                 trackSequence(seq);
             } else {
                 match(stage, ko.key, ko.ordinal, hit);
 
                 // early skip in case of reaching the limit
                 // check the last stage to avoid calling the state machine in other stages
-                if (reachedLimit()) {
-                    log.trace("Limit reached {}", stats);
+                if (headLimit) {
+                    log.trace("(Head) Limit reached {}", stats);
                     return false;
                 }
             }
         }
+
+        // check tail limit
+        if (tailLimitReached()) {
+            log.trace("(Tail) Limit reached {}", stats);
+            return false;
+        }
         log.trace("{}", stats);
         return true;
     }
 
+    private boolean tailLimitReached() {
+        return limit != null && limit.limit() < 0 && limit.absLimit() <= completed.size();
+    }
+
     /**
      * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
      * given stage. If that's the case, update the sequence and the rest of the references.
@@ -175,21 +193,41 @@ public class SequenceMatcher {
 
         // bump the stages
         if (stage == completionStage) {
-            if (limitReached == false) {
-                completed.add(sequence);
-                // update the bool lazily
-                limitReached = limit != null && completed.size() == limit.totalLimit();
+            // when dealing with descending queries
+            // avoid duplicate matching (since the ASC query can return previously seen results)
+            if (descending) {
+                for (Sequence seen : completed) {
+                    if (seen.key().equals(key) && seen.ordinal().equals(ordinal)) {
+                        return;
+                    }
+                }
             }
+
+            completed.add(completedInsertPosition++, sequence);
+            // update the bool lazily
+            // only consider positive limits / negative ones imply tail which means having to go
+            // through the whole page of results before selecting the last ones
+            // doing a limit early returns the 'head' not 'tail'
+            headLimit = limit != null && limit.limit() > 0 && completed.size() == limit.totalLimit();
         } else {
+            if (descending) {
+                // when dealing with descending queries
+                // avoid duplicate matching (since the ASC query can return previously seen results)
+                group = keyToSequences.groupIfPresent(stage, key);
+                if (group != null) {
+                    for (Ordinal previous : group) {
+                        if (previous.equals(ordinal)) {
+                            return;
+                        }
+                    }
+                }
+            }
+
             stageToKeys.add(stage, key);
             keyToSequences.add(stage, sequence);
         }
     }
 
-    public boolean reachedLimit() {
-        return limitReached;
-    }
-
     /**
      * Checks whether the rest of the stages have in-flight data.
      * This method is called when a query returns no data meaning
@@ -197,7 +235,7 @@ public class SequenceMatcher {
      * However sequences on higher stages can, hence this check to know whether
      * it's possible to advance the window early.
      */
-    public boolean hasCandidates(int stage) {
+    boolean hasCandidates(int stage) {
         for (int i = stage; i < completionStage; i++) {
             if (stageToKeys.isEmpty(i) == false) {
                 return true;
@@ -207,18 +245,31 @@ public class SequenceMatcher {
     }
 
 
-    public List<Sequence> completed() {
+    List<Sequence> completed() {
         return limit != null ? limit.view(completed) : completed;
     }
 
-    public void dropUntil() {
+    void dropUntil() {
         keyToSequences.dropUntil();
     }
 
-    public void until(Iterable<KeyAndOrdinal> markers) {
+    void until(Iterable<KeyAndOrdinal> markers) {
         keyToSequences.until(markers);
     }
 
+    void resetInsertPosition() {
+        // when dealing with descending calls
+        // update the insert point of all sequences
+        // for the next batch of hits which will be sorted ascending
+        // yet will occur _before_ the current batch
+        if (descending) {
+            keyToSequences.resetGroupInsertPosition();
+            keyToSequences.resetUntilInsertPosition();
+
+            completedInsertPosition = 0;
+        }
+    }
+
     public Stats stats() {
         return stats;
     }

+ 5 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java

@@ -87,6 +87,7 @@ public class TumblingWindow implements Executable {
         Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
         // remove any potential upper limit (if a criteria has been promoted)
         base.queryRequest().to(null);
+        matcher.resetInsertPosition();
 
         log.trace("{}", matcher);
         log.trace("Querying base stage [{}] {}", base.stage(), base.queryRequest());
@@ -267,10 +268,11 @@ public class TumblingWindow implements Executable {
         final BoxedQueryRequest request = criterion.queryRequest();
         Criterion<BoxedQueryRequest> base = criteria.get(window.baseStage);
 
+        boolean reverse = criterion.reverse() != base.reverse();
         // first box the query
         // only the first base can be descending
         // all subsequence queries are ascending
-        if (criterion.reverse() != base.reverse()) {
+        if (reverse) {
             if (window.end.equals(request.from()) == false) {
                 // if that's the case, set the starting point
                 request.from(window.end);
@@ -282,7 +284,7 @@ public class TumblingWindow implements Executable {
             request.to(window.end);
         }
 
-        return criterion.reverse() != base.reverse();
+        return reverse;
     }
 
     private void payload(ActionListener<Payload> listener) {
@@ -305,6 +307,7 @@ public class TumblingWindow implements Executable {
     private TimeValue timeTook() {
         return new TimeValue(System.currentTimeMillis() - startTime);
     }
+
     Iterable<List<HitReference>> hits(List<Sequence> sequences) {
         return () -> {
             final Iterator<Sequence> delegate = criteria.get(0).reverse() != criteria.get(1).reverse() ?

+ 2 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java

@@ -210,8 +210,8 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
         LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
 
         // add fetch size as a limit so it gets propagated into the resulting query
-        LogicalPlan fetchSize = new LimitWithOffset(synthetic("<fetch-size>"), 
-                new Literal(synthetic("<fetch-value>"), params.fetchSize(), DataTypes.INTEGER), 
+        LogicalPlan fetchSize = new LimitWithOffset(synthetic("<fetch-size>"),
+                new Literal(synthetic("<fetch-value>"), params.fetchSize(), DataTypes.INTEGER),
                 eventQuery);
         // filter fields
         LogicalPlan child = new Project(source(ctx), fetchSize, CollectionUtils.combine(keys, defaultProjection()));

+ 1 - 1
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java

@@ -218,7 +218,7 @@ public class SequenceSpecTests extends ESTestCase {
         }
 
         // convert the results through a test specific payload
-        SequenceMatcher matcher = new SequenceMatcher(stages, TimeValue.MINUS_ONE, null);
+        SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null);
         
         QueryClient testClient = new TestQueryClient();
         TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher);