1
0
Эх сурвалжийг харах

EQL: Fix bug in skipping window (#59196)

Corrected condition that caused a sequence window to be skipped when a query 
returns no results by checking not just the current stage but also following
ones as they can match with in-flight sequences.
Improve logging
Fix NPE when emptying a SequenceGroup
Increase randomization in testing
Make maxspan inclusive (up to and equal to value vs just up to)
Costin Leau 5 жил өмнө
parent
commit
ad32c48868

+ 2 - 2
client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java

@@ -191,8 +191,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
         return this.fetchSize;
     }
 
-    public EqlSearchRequest fetchSize(int size) {
-        this.fetchSize = size;
+    public EqlSearchRequest fetchSize(int fetchSize) {
+        this.fetchSize = fetchSize;
         if (fetchSize < 2) {
             throw new IllegalArgumentException("fetch size must be greater than 1");
         }

+ 1 - 1
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java

@@ -146,7 +146,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
         request.tiebreakerField("event.sequence");
         // some queries return more than 10 results
         request.size(50);
-        request.fetchSize(2);
+        request.fetchSize(randomIntBetween(2, 50));
         return eqlClient().search(request, RequestOptions.DEFAULT);
     }
 

+ 3 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java

@@ -63,16 +63,14 @@ public class BoxedQueryRequest implements QueryRequest {
     }
 
     /**
-     * Sets the lower boundary for the query (non-inclusive).
+     * Sets the lower boundary for the query (inclusive).
      * Can be removed (when the query in unbounded) through null.
      */
     public BoxedQueryRequest from(Ordinal begin) {
         from = begin;
+        timestampRange.gte(begin != null ? begin.timestamp() : null);
         if (tiebreakerRange != null) {
-            timestampRange.gte(begin != null ? begin.timestamp() : null);
-            tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null);
-        } else {
-            timestampRange.gt(begin != null ? begin.timestamp() : null);
+            tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null);
         }
         return this;
     }

+ 5 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java

@@ -61,7 +61,6 @@ class Matcher {
         return false;
     }
 
-
     public boolean hasCandidates(int stage) {
         return stateMachine.hasCandidates(stage);
     }
@@ -71,4 +70,9 @@ class Matcher {
         TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
         return new SequencePayload(completed, false, tookTime);
     }
+
+    @Override
+    public String toString() {
+        return stateMachine.toString();
+    }
 }

+ 8 - 6
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java

@@ -33,7 +33,7 @@ import static org.elasticsearch.action.ActionListener.wrap;
  */
 public class TumblingWindow implements Executable {
 
-    private final Logger log = LogManager.getLogger(Matcher.class);
+    private final Logger log = LogManager.getLogger(TumblingWindow.class);
 
     private final QueryClient client;
     private final List<Criterion<BoxedQueryRequest>> criteria;
@@ -72,7 +72,7 @@ public class TumblingWindow implements Executable {
 
     @Override
     public void execute(ActionListener<Payload> listener) {
-        log.info("Starting sequence window...");
+        log.trace("Starting sequence window w/ fetch size [{}]", windowSize);
         startTime = System.currentTimeMillis();
         advance(0, listener);
     }
@@ -83,7 +83,8 @@ public class TumblingWindow implements Executable {
         // remove any potential upper limit (if a criteria has been promoted)
         base.queryRequest().to(null);
 
-        log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest());
+        log.trace("{}", matcher);
+        log.trace("Querying base stage [{}] {}", base.stage(), base.queryRequest());
 
         client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure));
     }
@@ -119,7 +120,7 @@ public class TumblingWindow implements Executable {
         // update current query for the next request
         base.queryRequest().nextAfter(end);
 
-        log.info("Found base [{}] window {} {}", base.stage(), begin, end);
+        log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
 
         // find until ordinals
         //NB: not currently implemented
@@ -153,11 +154,12 @@ public class TumblingWindow implements Executable {
             request.to(window.end);
         }
 
-        log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request);
+        log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
 
         client.query(request, wrap(p -> {
             List<SearchHit> hits = p.values();
 
+            log.trace("Found [{}] hits", hits.size());
             // no more results for this query
             if (hits.isEmpty()) {
                 // put the markers in place before the next call
@@ -169,7 +171,7 @@ public class TumblingWindow implements Executable {
 
                 // if there are no candidates, advance the window
                 if (matcher.hasCandidates(criterion.stage()) == false) {
-                    log.info("Advancing window...");
+                    log.trace("Advancing window...");
                     advance(window.baseStage, listener);
                     return;
                 }

+ 4 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

@@ -46,4 +46,8 @@ class KeyToSequences {
         }
         groups[stage].add(sequence);
     }
+
+    int numberOfKeys() {
+        return keyToSequences.size();
+    }
 }

+ 2 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java

@@ -37,6 +37,7 @@ public class SequenceGroup {
         Ordinal ordinal = sequence.ordinal();
         if (start == null) {
             start = ordinal;
+        } else if (stop == null) {
             stop = ordinal;
         } else {
             if (start.compareTo(ordinal) > 0) {
@@ -91,6 +92,7 @@ public class SequenceGroup {
         if (sequences.isEmpty() == false) {
             start = sequences.get(0).ordinal();
         } else {
+            start = null;
             stop = null;
         }
     }

+ 28 - 7
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.eql.execution.search.Limit;
@@ -82,19 +83,19 @@ public class SequenceStateMachine {
         Sequence sequence = before.v1();
         // eliminate the match and all previous values from the frame
         group.trim(before.v2() + 1);
+
+        // remove the frame and keys early (as the key space is large)
+        if (group.isEmpty()) {
+            stageToKeys.keys(previousStage).remove(key);
+        }
         
         // check maxspan before continuing the sequence
-        if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() >= maxSpanInMillis)) {
+        if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() > maxSpanInMillis)) {
             return;
         }
 
         sequence.putMatch(stage, hit, ordinal);
 
-        // remove the frame and keys early (as the key space is large)
-        if (group.isEmpty()) {
-            stageToKeys.keys(previousStage).remove(key);
-        }
-
         // bump the stages
         if (stage == completionStage) {
             // add the sequence only if needed
@@ -117,7 +118,27 @@ public class SequenceStateMachine {
         return limitReached;
     }
 
+    /**
+     * Checks whether the rest of the stages have in-flight data.
+     * This method is called when a query returns no data meaning
+     * sequences on previous stages cannot match this window (since there's no new data).
+     * However sequences on higher stages can, hence this check to know whether
+     * it's possible to advance the window early.
+     */
     public boolean hasCandidates(int stage) {
-        return stage < completionStage && stageToKeys.keys(stage).isEmpty() == false;
+        for (int i = stage; i < completionStage; i++) {
+            if (stageToKeys.keys(i).isEmpty() == false) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}",
+                keyToSequences.numberOfKeys(),
+                completed.size(),
+                stageToKeys);
     }
 }

+ 6 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java

@@ -10,6 +10,7 @@ import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.StringJoiner;
 
 /** Dedicated collection for mapping a stage (represented by the index collection) to a set of keys */
 class StageToKeys {
@@ -32,7 +33,10 @@ class StageToKeys {
         return set;
     }
 
-    Set<SequenceKey> completedKeys() {
-        return keys(stageToKey.size() - 1);
+    @Override
+    public String toString() {
+        StringJoiner sj = new StringJoiner(",", "[", "]");
+        stageToKey.forEach(s -> sj.add(s != null ? "" + s.size() : "0"));
+        return sj.toString();
     }
 }