Преглед изворни кода

EQL: Improve sequence limiting (#59439)

Improve the way limit (in particular offset) is being applied to handle
the case where the matches are less than the offset and absolute limit.

Combine Matcher and SequenceStateMachine into one class since the two
have evolved beyond their original name and structure.
Costin Leau пре 5 година
родитељ
комит
63d3c62cdf

+ 2 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
 import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
 import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
 import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
+import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
 import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry;
@@ -86,7 +87,7 @@ public class ExecutionManager {
         }
         
         int completionStage = criteria.size() - 1;
-        Matcher matcher = new Matcher(completionStage, maxSpan, limit);
+        SequenceMatcher matcher = new SequenceMatcher(completionStage, maxSpan, limit);
 
         TumblingWindow w = new TumblingWindow(new BasicQueryClient(session),
                 criteria.subList(0, completionStage),

+ 0 - 87
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java

@@ -1,87 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.xpack.eql.execution.assembler;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.xpack.eql.execution.search.Limit;
-import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
-import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
-import org.elasticsearch.xpack.eql.session.Payload;
-
-import java.util.List;
-
-/**
- * Executable tracking sequences at runtime.
- */
-class Matcher {
-
-    private final Logger log = LogManager.getLogger(Matcher.class);
-
-    // NB: just like in a list, this represents the total number of stages yet counting starts at 0
-    private final SequenceStateMachine stateMachine;
-    private final int numberOfStages;
-
-    Matcher(int numberOfStages, TimeValue maxSpan, Limit limit) {
-        this.numberOfStages = numberOfStages;
-        this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit);
-    }
-
-    /**
-     * Match hits for the given stage.
-     * Returns false if the process needs to be stopped.
-     */
-    boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
-        for (Tuple<KeyAndOrdinal, SearchHit> tuple : hits) {
-            KeyAndOrdinal ko = tuple.v1();
-            SearchHit hit = tuple.v2();
-
-            if (stage == 0) {
-                Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
-                stateMachine.trackSequence(seq);
-            } else {
-                stateMachine.match(stage, ko.key, ko.ordinal, hit);
-
-                // early skip in case of reaching the limit
-                // check the last stage to avoid calling the state machine in other stages
-                if (stateMachine.reachedLimit()) {
-                    log.trace("Limit reached {}", stateMachine.stats());
-                    return false;
-                }
-            }
-        }
-        log.trace("{}", stateMachine.stats());
-        return true;
-    }
-
-    void until(Iterable<KeyAndOrdinal> markers) {
-        stateMachine.until(markers);
-    }
-
-    boolean hasCandidates(int stage) {
-        return stateMachine.hasCandidates(stage);
-    }
-
-    void dropUntil() {
-        stateMachine.dropUntil();
-    }
-
-    Payload payload(long startTime) {
-        List<Sequence> completed = stateMachine.completeSequences();
-        TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
-        Payload p = new SequencePayload(completed, false, tookTime);
-        stateMachine.clear();
-        return p;
-    }
-
-    @Override
-    public String toString() {
-        return stateMachine.toString();
-    }
-}

+ 4 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java

@@ -13,7 +13,9 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
 import org.elasticsearch.xpack.eql.execution.search.QueryClient;
+import org.elasticsearch.xpack.eql.execution.sequence.KeyAndOrdinal;
 import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
+import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
 import org.elasticsearch.xpack.eql.session.Payload;
 import org.elasticsearch.xpack.eql.util.ReversedIterator;
 
@@ -38,7 +40,7 @@ public class TumblingWindow implements Executable {
     private final QueryClient client;
     private final List<Criterion<BoxedQueryRequest>> criteria;
     private final Criterion<BoxedQueryRequest> until;
-    private final Matcher matcher;
+    private final SequenceMatcher matcher;
     // shortcut
     private final int maxStages;
     private final int windowSize;
@@ -58,7 +60,7 @@ public class TumblingWindow implements Executable {
     public TumblingWindow(QueryClient client,
                           List<Criterion<BoxedQueryRequest>> criteria,
                           Criterion<BoxedQueryRequest> until,
-                          Matcher matcher) {
+                          SequenceMatcher matcher) {
         this.client = client;
 
         this.until = until;

+ 28 - 4
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Limit.java

@@ -8,13 +8,14 @@ package org.elasticsearch.xpack.eql.execution.search;
 
 import org.elasticsearch.xpack.eql.util.MathUtils;
 
+import java.util.List;
 import java.util.Objects;
 
 public class Limit {
 
-    public final int limit;
-    public final int offset;
-    public final int total;
+    private final int limit;
+    private final int offset;
+    private final int total;
 
     public Limit(int limit, int offset) {
         this.limit = limit;
@@ -26,6 +27,14 @@ public class Limit {
         return MathUtils.abs(limit);
     }
 
+    public int offset() {
+        return offset;
+    }
+
+    public int totalLimit() {
+        return total;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(limit, offset);
@@ -44,4 +53,19 @@ public class Limit {
         Limit other = (Limit) obj;
         return Objects.equals(limit, other.limit) && Objects.equals(offset, other.offset);
     }
-}
+
+    /**
+     * Offer a limited view (including offset) for the given list.
+     */
+    public <E> List<E> view(List<E> values) {
+        int size = values.size();
+        if (size >= total) {
+            return values.subList(offset, total);
+        }
+        int l = absLimit();
+        if (size <= l) {
+            return values;
+        }
+        return values.subList(size - l, values.size());
+    }
+}

+ 2 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java

@@ -63,8 +63,8 @@ public abstract class SourceGenerator {
             // add size and from
             source.size(container.limit().absLimit());
             // this should be added only for event queries
-            if (container.limit().offset > 0) {
-                source.from(container.limit().offset);
+            if (container.limit().offset() > 0) {
+                source.from(container.limit().offset());
             }
         }
 

+ 2 - 3
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java → x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyAndOrdinal.java

@@ -4,10 +4,9 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.eql.execution.assembler;
+package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
-import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
 
 import java.util.Objects;
 
@@ -15,7 +14,7 @@ public class KeyAndOrdinal {
     final SequenceKey key;
     final Ordinal ordinal;
 
-    KeyAndOrdinal(SequenceKey key, Ordinal ordinal) {
+    public KeyAndOrdinal(SequenceKey key, Ordinal ordinal) {
         this.key = key;
         this.ordinal = ordinal;
     }

+ 0 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

@@ -7,7 +7,6 @@
 package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.elasticsearch.common.logging.LoggerMessageFormat;
-import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
 
 import java.util.LinkedHashMap;

+ 54 - 25
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java → x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

@@ -6,20 +6,25 @@
 
 package org.elasticsearch.xpack.eql.execution.sequence;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
 import org.elasticsearch.xpack.eql.execution.search.Limit;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
+import org.elasticsearch.xpack.eql.session.Payload;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * State machine that holds and manages all in-flight sequences.
+ * Matcher of sequences. Keeps track of on-going sequences and advancing them through each stage.
  */
-public class SequenceStateMachine {
+public class SequenceMatcher {
+
+    private final Logger log = LogManager.getLogger(SequenceMatcher.class);
 
     static class Stats {
         long seen = 0;
@@ -53,20 +58,21 @@ public class SequenceStateMachine {
     /** Current keys on each stage */
     private final StageToKeys stageToKeys;
 
+    private final int numberOfStages;
     private final int completionStage;
 
     /** list of completed sequences - separate to avoid polluting the other stages */
     private final List<Sequence> completed;
     private final long maxSpanInMillis;
 
-    private int offset = 0;
-    private int limit = -1;
+    private Limit limit;
     private boolean limitReached = false;
 
     private final Stats stats = new Stats();
 
     @SuppressWarnings("rawtypes")
-    public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) {
+    public SequenceMatcher(int stages, TimeValue maxSpan, Limit limit) {
+        this.numberOfStages = stages;
         this.completionStage = stages - 1;
 
         this.stageToKeys = new StageToKeys(completionStage);
@@ -75,15 +81,8 @@ public class SequenceStateMachine {
 
         this.maxSpanInMillis = maxSpan.millis();
 
-        // limit && offset
-        if (limit != null) {
-            this.offset = limit.offset;
-            this.limit = limit.absLimit();
-        }
-    }
-
-    public List<Sequence> completeSequences() {
-        return completed;
+        // limit
+        this.limit = limit;
     }
 
     public void trackSequence(Sequence sequence) {
@@ -95,11 +94,38 @@ public class SequenceStateMachine {
         stats.seen++;
     }
 
+    /**
+     * Match hits for the given stage.
+     * Returns false if the process needs to be stopped.
+     */
+    public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
+        for (Tuple<KeyAndOrdinal, SearchHit> tuple : hits) {
+            KeyAndOrdinal ko = tuple.v1();
+            SearchHit hit = tuple.v2();
+
+            if (stage == 0) {
+                Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
+                trackSequence(seq);
+            } else {
+                match(stage, ko.key, ko.ordinal, hit);
+
+                // early skip in case of reaching the limit
+                // check the last stage to avoid calling the state machine in other stages
+                if (reachedLimit()) {
+                    log.trace("Limit reached {}", stats);
+                    return false;
+                }
+            }
+        }
+        log.trace("{}", stats);
+        return true;
+    }
+
     /**
      * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
      * given stage. If that's the case, update the sequence and the rest of the references.
      */
-    public void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
+    private void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
         stats.seen++;
         
         int previousStage = stage - 1;
@@ -150,15 +176,10 @@ public class SequenceStateMachine {
 
         // bump the stages
         if (stage == completionStage) {
-            // add the sequence only if needed
-            if (offset > 0) {
-                offset--;
-            } else {
-                if (limit < 0 || (limit > 0 && completed.size() < limit)) {
-                    completed.add(sequence);
-                    // update the bool lazily
-                    limitReached = limit > 0 && completed.size() == limit;
-                }
+            if (limitReached == false) {
+                completed.add(sequence);
+                // update the bool lazily
+                limitReached = limit != null && completed.size() == limit.totalLimit();
             }
         } else {
             stageToKeys.add(stage, key);
@@ -186,6 +207,14 @@ public class SequenceStateMachine {
         return false;
     }
 
+    public Payload payload(long startTime) {
+        TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
+        List<Sequence> view = limit != null ? limit.view(completed) : completed;
+        Payload p = new SequencePayload(view, false, tookTime);
+        clear();
+        return p;
+    }
+
     public void dropUntil() {
         keyToSequences.dropUntil();
     }

+ 1 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java → x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java

@@ -4,11 +4,10 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.eql.execution.assembler;
+package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
-import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
 import org.elasticsearch.xpack.eql.session.Results.Type;
 import org.elasticsearch.xpack.eql.util.ReversedIterator;
 

+ 0 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/UntilGroup.java

@@ -6,8 +6,6 @@
 
 package org.elasticsearch.xpack.eql.execution.sequence;
 
-import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
-
 public class UntilGroup extends OrdinalGroup<KeyAndOrdinal> {
 
     UntilGroup(SequenceKey key) {

+ 2 - 1
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
 import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec;
 import org.elasticsearch.xpack.eql.execution.search.QueryClient;
+import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
 import org.elasticsearch.xpack.eql.session.Payload;
 import org.elasticsearch.xpack.eql.session.Results;
 import org.elasticsearch.xpack.eql.session.Results.Type;
@@ -196,7 +197,7 @@ public class SequenceSpecTests extends ESTestCase {
         }
 
         // convert the results through a test specific payload
-        Matcher matcher = new Matcher(stages, TimeValue.MINUS_ONE, null);
+        SequenceMatcher matcher = new SequenceMatcher(stages, TimeValue.MINUS_ONE, null);
         
         QueryClient testClient = (r, l) -> {
             int ordinal = r.searchSource().size();

+ 51 - 0
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/LimitTests.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.eql.execution.search;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+
+public class LimitTests extends ESTestCase {
+
+    private final List<Integer> list = asList(1, 2, 3, 4, 5, 6, 7);
+
+    public void testLimitUnderResults() throws Exception {
+        assertEquals(asList(1, 2, 3, 4, 5), new Limit(5, 0).view(list));
+    }
+
+    public void testLimitWithOffsetEqualResults() throws Exception {
+        assertEquals(asList(5, 6, 7), new Limit(3, 4).view(list));
+    }
+
+    public void testLimitWithOffsetUnderResults() throws Exception {
+        assertEquals(asList(5, 6), new Limit(2, 4).view(list));
+    }
+
+    public void testLimitOverResultsNoOffset() throws Exception {
+        assertEquals(list, new Limit(8, randomInt(100)).view(list));
+    }
+
+    public void testLimitEqualResults() throws Exception {
+        assertEquals(list, new Limit(7, randomInt(100)).view(list));
+    }
+
+    public void testLimitOverResultsWithHigherOffset() throws Exception {
+        assertEquals(asList(6, 7), new Limit(2, 8).view(list));
+    }
+
+    public void testLimitOverResultsWithEqualOffset() throws Exception {
+        assertEquals(asList(6, 7), new Limit(2, 7).view(list));
+    }
+
+    public void testLimitOverResultsWithSmallerOffset() throws Exception {
+        assertEquals(asList(3, 4, 5, 6, 7), new Limit(5, 6).view(list));
+    }
+}