1
0
Эх сурвалжийг харах

EQL: Fix early trimming of in-flight data (#66493)

Rework trimToLast to take into account an ordinal for last trimming so
instead of keeping the last entry in a stage, it keeps the last entry
before the given ordinal.
This takes care of the case where a dense stage that requires several
passes does not discard valid data from a previous sparse stage that go
beyond the current stage point.
Costin Leau 4 жил өмнө
parent
commit
4f55749072

+ 11 - 8
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

@@ -95,27 +95,30 @@ class KeyToSequences {
     }
 
     /**
-     * Remove all matches expect the latest.
+     * Remove all matches except the latest occurring _before_ the given ordinal.
      */
-    void trimToTail() {
+    void trimToTail(Ordinal ordinal) {
         for (Iterator<SequenceEntry> it = keyToSequences.values().iterator(); it.hasNext(); ) {
             SequenceEntry seqs =  it.next();
-            // first remove the sequences
-            // and remember the last item from the first
-            // initialized stage to be used with until
+            // remember the last item found (will be ascending)
+            // to trim unneeded until that occur before it
             Sequence firstTail = null;
+            // remove any empty keys
+            boolean keyIsEmpty = true;
             for (SequenceGroup group : seqs.groups) {
                 if (group != null) {
-                    Sequence sequence = group.trimToLast();
+                    Sequence sequence = group.trimBeforeLast(ordinal);
                     if (firstTail == null) {
                         firstTail = sequence;
                     }
+                    keyIsEmpty &= group.isEmpty();
                 }
             }
             // there are no sequences on any stage for this key, drop it
-            if (firstTail == null) {
+            if (keyIsEmpty) {
                 it.remove();
-            } else {
+            }
+            if (firstTail != null) {
                 // drop any possible UNTIL that occurs before the last tail
                 UntilGroup until = seqs.until;
                 if (until != null) {

+ 17 - 12
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java

@@ -52,11 +52,27 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
      * The element and everything before it is removed.
      */
     E trimBefore(Ordinal ordinal) {
+        return trimBefore(ordinal, true);
+    }
+
+    /**
+     * Returns the latest element from the group that has its timestamp
+     * less than the given argument alongside its position in the list.
+     * Everything before the element it is removed. The element is kept.
+     */
+    E trimBeforeLast(Ordinal ordinal) {
+        return trimBefore(ordinal, false);
+    }
+
+    private E trimBefore(Ordinal ordinal, boolean removeMatch) {
         Tuple<E, Integer> match = findBefore(ordinal);
 
         // trim
         if (match != null) {
-            int pos = match.v2() + 1;
+            int pos = match.v2();
+            if (removeMatch) {
+                pos = pos + 1;
+            }
             elements.subList(0, pos).clear();
 
             // update min time
@@ -76,17 +92,6 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
         return match != null ? match.v1() : null;
     }
 
-    E trimToLast() {
-        E last = elements.peekLast();
-        if (last != null) {
-            elements.clear();
-            start = null;
-            stop = null;
-            add(last);
-        }
-        return last;
-    }
-
     private Tuple<E, Integer> findBefore(Ordinal ordinal) {
         E match = null;
         int matchPos = -1;

+ 3 - 7
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

@@ -253,19 +253,15 @@ public class SequenceMatcher {
      * This allows the matcher to keep only the last match per stage
      * and adjust insertion positions.
      */
-    void trim(boolean everything) {
+    void trim(Ordinal ordinal) {
         // for descending sequences, remove all in-flight sequences
         // since the windows moves head and thus there is no chance
         // of new results coming in
-
-        // however this needs to be indicated from outside since
-        // the same window can be only ASC trimmed during a loop
-        // and fully once the DESC query moves
-        if (everything) {
+        if (ordinal == null) {
             keyToSequences.clear();
         } else {
             // keep only the tail
-            keyToSequences.trimToTail();
+            keyToSequences.trimToTail(ordinal);
         }
     }
 

+ 10 - 21
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java

@@ -77,7 +77,6 @@ public class TumblingWindow implements Executable {
     // flag used for DESC sequences to indicate whether
     // the window needs to restart (since the DESC query still has results)
     private boolean restartWindowFromTailQuery;
-    private final boolean earlyUntil;
 
     private long startTime;
 
@@ -107,7 +106,6 @@ public class TumblingWindow implements Executable {
         Criterion<BoxedQueryRequest> baseRequest = criteria.get(0);
         this.windowSize = baseRequest.queryRequest().searchSource().size();
         this.restartWindowFromTailQuery = baseRequest.descending();
-        this.earlyUntil = baseRequest.descending();
     }
 
     @Override
@@ -137,27 +135,19 @@ public class TumblingWindow implements Executable {
         // for descending queries clean everything
         if (restartWindowFromTailQuery) {
             if (currentStage == 0) {
-                matcher.trim(true);
+                matcher.trim(null);
             }
         }
-        // trim to last
         else {
-           // check case when a rebase occurred and the current query
-           // has a lot more results than the first once and hasn't
-           // covered the whole window. Running a trim early data before
-           // the whole window is matched
-           boolean trimToLast = false;
-           if (currentStage == 0) {
-               trimToLast = true;
-           }
-           else {
-               Ordinal current = criteria.get(currentStage).queryRequest().after();
-               Ordinal previous = criteria.get(currentStage - 1).queryRequest().after();
-               trimToLast = current.after(previous);
-           }
-           if (trimToLast) {
-               matcher.trim(false);
-           }
+            // trim to last until the current window
+            // that's because some stages can be sparse, other dense
+            // and results from the sparse stage can be after those in the dense one
+            // trimming to last removes these results
+            // same applies for rebase
+            Ordinal marker = criteria.get(currentStage).queryRequest().after();
+            if (marker != null) {
+                matcher.trim(marker);
+            }
         }
 
         advance(currentStage, listener);
@@ -200,7 +190,6 @@ public class TumblingWindow implements Executable {
             // get borders for the rest of the queries - but only when at least one result is found
             begin = headOrdinal(hits, base);
             end = tailOrdinal(hits, base);
-            boolean desc = base.descending();
             // always create an ASC window
             info = new WindowInfo(baseStage, begin, end);