Browse Source

EQL: Improve retrieval of results (#59552)

Instead of retrieving an entire SearchHit, get just a reference and 
postpone the document retrieval when assembling the final results.
Remove sort information from results to make them consistent.
Move TumblingWindow under the sequence package.

Co-authored-by: James Rodewig <james.rodewig@elastic.co>
Costin Leau 5 years ago
parent
commit
bccfbcd81f
18 changed files with 309 additions and 144 deletions
  1. 5 16
      docs/reference/eql/eql-search-api.asciidoc
  2. 15 51
      docs/reference/eql/search.asciidoc
  3. 1 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java
  4. 4 4
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java
  5. 7 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java
  6. 84 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java
  7. 55 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java
  8. 5 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java
  9. 9 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java
  10. 5 5
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java
  11. 5 5
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java
  12. 9 13
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java
  13. 9 10
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java
  14. 60 21
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java
  15. 5 5
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java
  16. 7 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java
  17. 22 9
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java
  18. 2 1
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java

+ 5 - 16
docs/reference/eql/eql-search-api.asciidoc

@@ -564,10 +564,7 @@ the events in ascending, lexicographic order.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
-        },
-        "sort": [
-          1607252647000
-        ]
+        }
       },
       {
         "_index": "my_index",
@@ -594,10 +591,7 @@ the events in ascending, lexicographic order.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
-        },
-        "sort": [
-          1607339228000
-        ]
+        }
       }
     ]
   }
@@ -693,10 +687,7 @@ the events in ascending, lexicographic order.
                 "name": "cmd.exe",
                 "path": "C:\\Windows\\System32\\cmd.exe"
               }
-            },
-            "sort": [
-              1607339228000
-            ]
+            }
           },
           {
             "_index": "my_index",
@@ -716,10 +707,7 @@ the events in ascending, lexicographic order.
                 "name": "regsvr32.exe",
                 "path": "C:\\Windows\\System32\\regsvr32.exe"
               }
-            },
-            "sort": [
-              1607339229000
-            ]
+            }
           }
         ]
       }
@@ -728,3 +716,4 @@ the events in ascending, lexicographic order.
 }
 ----
 // TESTRESPONSE[s/"took": 6/"took": $body.took/]
+// TESTRESPONSE[skip: response format updated]

+ 15 - 51
docs/reference/eql/search.asciidoc

@@ -84,7 +84,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
       "relation": "eq"
     },
     "events": [
-      {
+       {
         "_index": "sec_logs",
         "_id": "1",
         "_score": null,
@@ -102,10 +102,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
-        },
-        "sort": [
-          1607252645000
-        ]
+        }
       },
       {
         "_index": "sec_logs",
@@ -125,10 +122,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
-        },
-        "sort": [
-          1607339167000
-        ]
+        }
       }
     ]
   }
@@ -220,10 +214,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
                 "name": "cmd.exe",
                 "path": "C:\\Windows\\System32\\cmd.exe"
               }
-            },
-            "sort": [
-              1607339228000
-            ]
+            }
           },
           {
             "_index": "sec_logs",
@@ -243,10 +234,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
                 "name": "regsvr32.exe",
                 "path": "C:\\Windows\\System32\\regsvr32.exe"
               }
-            },
-            "sort": [
-              1607339229000
-            ]
+            }
           }
         ]
       }
@@ -255,6 +243,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
 }
 ----
 // TESTRESPONSE[s/"took": 60/"took": $body.took/]
+// TESTRESPONSE[skip: response format updated]
 
 You can use the <<eql-with-maxspan-keywords,`with maxspan` keywords>> to
 constrain a sequence to a specified timespan.
@@ -357,10 +346,7 @@ contains the shared `agent.id` value for each matching event.
                 "name": "cmd.exe",
                 "path": "C:\\Windows\\System32\\cmd.exe"
               }
-            },
-            "sort": [
-              1607339228000
-            ]
+            }
           },
           {
             "_index": "sec_logs",
@@ -380,10 +366,7 @@ contains the shared `agent.id` value for each matching event.
                 "name": "regsvr32.exe",
                 "path": "C:\\Windows\\System32\\regsvr32.exe"
               }
-            },
-            "sort": [
-              1607339229000
-            ]
+            }
           }
         ]
       }
@@ -392,6 +375,7 @@ contains the shared `agent.id` value for each matching event.
 }
 ----
 // TESTRESPONSE[s/"took": 60/"took": $body.took/]
+// TESTRESPONSE[skip: response format updated]
 
 You can use the <<eql-until-keyword,`until` keyword>> to specify an expiration
 event for sequences. Matching sequences must end before this event.
@@ -495,15 +479,7 @@ GET /sec_logs/_eql/search
 ----
 // TEST[s/search/search\?filter_path\=\-\*\.events\.\*fields/]
 
-The API returns the following response. Note the `sort` property of each
-matching event contains an array of two items:
-
-* The first item is the event's <<eql-search-api-timestamp-field,timestamp>>,
-converted to milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix
-epoch].
-
-* The second item is the event's `event.id` value. This value is used as a sort
-tiebreaker for events with the same timestamp.
+The API returns the following response.
 
 [source,console-result]
 ----
@@ -518,7 +494,7 @@ tiebreaker for events with the same timestamp.
       "relation": "eq"
     },
     "events": [
-      {
+        {
         "_index": "sec_logs",
         "_id": "1",
         "_score": null,
@@ -536,13 +512,9 @@ tiebreaker for events with the same timestamp.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
+        }
         },
-        "sort": [
-          1607252645000,                                <1>
-          "edwCRnyD"                                    <2>
-          ]
-        },
-            {
+        {
         "_index": "sec_logs",
         "_id": "3",
         "_score": null,
@@ -560,21 +532,13 @@ tiebreaker for events with the same timestamp.
             "name": "cmd.exe",
             "path": "C:\\Windows\\System32\\cmd.exe"
           }
-        },
-        "sort": [
-          1607339167000,                                <1>
-          "cMyt5SZ2"                                    <2>
-        ]
-      }
+        }   
+        }
     ]
   }
 }
 ----
 // TESTRESPONSE[s/"took": 34/"took": $body.took/]
-<1> The event's <<eql-search-api-timestamp-field,timestamp>>, converted to
-milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix
-epoch]
-<2> The event's `event.id` value.
 ====
 
 

+ 1 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java

@@ -44,7 +44,7 @@ public class Criterion<Q extends QueryRequest> {
         return stage;
     }
 
-    boolean reverse() {
+    public boolean reverse() {
         return reverse;
     }
 

+ 4 - 4
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.eql.execution.assembler;
 
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
 import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
 import org.elasticsearch.xpack.eql.execution.search.Limit;
@@ -15,6 +16,7 @@ import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
 import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
 import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
 import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
+import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
 import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry;
@@ -68,10 +70,8 @@ public class ExecutionManager {
             PhysicalPlan query = plans.get(i);
             // search query
             if (query instanceof EsQueryExec) {
-                QueryRequest original = ((EsQueryExec) query).queryRequest(session);
-                
-                // increase the request size based on the fetch size (since size is applied already through limit)
-
+                SearchSourceBuilder source = ((EsQueryExec) query).source(session);
+                QueryRequest original = () -> source;
                 BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
                 Criterion<BoxedQueryRequest> criterion =
                         new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);

+ 7 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java

@@ -7,7 +7,9 @@
 package org.elasticsearch.xpack.eql.execution.payload;
 
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchSortValues;
 import org.elasticsearch.xpack.eql.session.Results.Type;
 
 import java.util.Arrays;
@@ -20,6 +22,11 @@ public class SearchResponsePayload extends AbstractPayload {
     public SearchResponsePayload(SearchResponse response) {
         super(response.isTimedOut(), response.getTook());
         hits = Arrays.asList(response.getHits().getHits());
+        // clean hits
+        SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]);
+        for (SearchHit hit : hits) {
+            hit.sortValues(sortValues);
+        }
     }
 
     @Override

+ 84 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java

@@ -8,15 +8,33 @@ package org.elasticsearch.xpack.eql.execution.search;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.MultiGetItemResponse;
+import org.elasticsearch.action.get.MultiGetRequest.Item;
+import org.elasticsearch.action.get.MultiGetRequestBuilder;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.io.stream.InputStreamStreamInput;
+import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.xpack.eql.session.EqlConfiguration;
 import org.elasticsearch.xpack.eql.session.EqlSession;
 import org.elasticsearch.xpack.eql.session.Payload;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.action.ActionListener.wrap;
 import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
 
 public class BasicQueryClient implements QueryClient {
@@ -49,4 +67,69 @@ public class BasicQueryClient implements QueryClient {
         SearchRequest search = prepareRequest(client, searchSource, false, indices);
         client.search(search, new BasicListener(listener));
     }
-}
+
+    @Override
+    public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
+        MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
+        // no need for real-time
+        requestBuilder.setRealtime(false)
+                      .setRefresh(false);
+
+        int sz = 0;
+
+        for (List<HitReference> list : refs) {
+            sz = list.size();
+            for (HitReference ref : list) {
+                Item item = new Item(ref.index(), ref.id());
+                // make sure to get the whole source
+                item.fetchSourceContext(FetchSourceContext.FETCH_SOURCE);
+                requestBuilder.add(item);
+            }
+        }
+        
+        final int listSize = sz;
+        client.multiGet(requestBuilder.request(), wrap(r -> {
+            List<List<SearchHit>> hits = new ArrayList<>(r.getResponses().length / listSize);
+            
+            List<SearchHit> sequence = new ArrayList<>(listSize);
+            
+            // copy streams - reused across the whole loop
+            PipedInputStream in = new PipedInputStream();
+            PipedOutputStream out = new PipedOutputStream(in);
+            StreamOutput so = new OutputStreamStreamOutput(out);
+            StreamInput si = new InputStreamStreamInput(in);
+
+            int counter = 0;
+            for (MultiGetItemResponse mgr : r.getResponses()) {
+                if (mgr.isFailed()) {
+                    listener.onFailure(mgr.getFailure().getFailure());
+                    return;
+                }
+                // HACK: the only way to get GetResult is to serialize it and then load it back :(
+                mgr.getResponse().writeTo(so);
+                GetResult result = new GetResult(si);
+
+                SearchHit hit = new SearchHit(-1, result.getId(), result.getDocumentFields(), result.getMetadataFields());
+                hit.sourceRef(result.internalSourceRef());
+                // need to create these objects to set the index
+                hit.shard(new SearchShardTarget(null, new ShardId(result.getIndex(), "", -1), null, null));
+
+                hit.setSeqNo(result.getSeqNo());
+                hit.setPrimaryTerm(result.getPrimaryTerm());
+                hit.version(result.getVersion());
+
+
+                sequence.add(hit);
+
+                if (++counter == listSize) {
+                    counter = 0;
+                    hits.add(sequence);
+                    sequence = new ArrayList<>(listSize);
+                }
+            }
+            // send the results
+            listener.onResponse(hits);
+
+        }, listener::onFailure));
+    }
+}

+ 55 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.eql.execution.search;
+
+import org.elasticsearch.search.SearchHit;
+
+import java.util.Objects;
+
+public class HitReference {
+
+    private final String index;
+    private final String id;
+    
+    public HitReference(SearchHit hit) {
+        this.index = hit.getIndex();
+        this.id = hit.getId();
+    }
+    
+    public String index() {
+        return index;
+    }
+
+    public String id() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, id);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        
+        HitReference other = (HitReference) obj;
+        return Objects.equals(index, other.index)
+                && Objects.equals(id, other.id);
+    }
+
+    @Override
+    public String toString() {
+        return "doc[" + index + "][" + id + "]";
+    }
+}

+ 5 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java

@@ -7,12 +7,17 @@
 package org.elasticsearch.xpack.eql.execution.search;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.eql.session.Payload;
 
+import java.util.List;
+
 /**
  * Infrastructure interface used to decouple listener consumers from the stateful classes holding client-references and co.
  */
 public interface QueryClient {
 
     void query(QueryRequest request, ActionListener<Payload> listener);
+
+    void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener);
 }

+ 9 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.eql.execution.search;
 
+import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@@ -57,7 +58,14 @@ public abstract class SourceGenerator {
         sourceBuilder.build(source);
 
         sorting(container, source);
-        source.fetchSource(FetchSourceContext.FETCH_SOURCE);
+
+        // disable the source if there are no includes
+        if (source.fetchSource() == null || CollectionUtils.isEmpty(source.fetchSource().includes())) {
+            source.fetchSource(FetchSourceContext.DO_NOT_FETCH_SOURCE);
+        } else {
+            // use true to fetch only the needed bits from the source
+            source.fetchSource(true);
+        }
 
         if (container.limit() != null) {
             // add size and from

+ 5 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java

@@ -6,7 +6,7 @@
 
 package org.elasticsearch.xpack.eql.execution.sequence;
 
-import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
 
 import java.util.Objects;
@@ -17,9 +17,9 @@ import java.util.Objects;
 class Match {
 
     private final Ordinal ordinal;
-    private final SearchHit hit;
+    private final HitReference hit;
 
-    Match(Ordinal ordinal, SearchHit hit) {
+    Match(Ordinal ordinal, HitReference hit) {
         this.ordinal = ordinal;
         this.hit = hit;
     }
@@ -28,7 +28,7 @@ class Match {
         return ordinal;
     }
 
-    SearchHit hit() {
+    HitReference hit() {
         return hit;
     }
 
@@ -54,6 +54,6 @@ class Match {
 
     @Override
     public String toString() {
-        return ordinal.toString() + "->" + hit.getId();
+        return ordinal + "->" + hit;
     }
 }

+ 5 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java

@@ -6,8 +6,8 @@
 
 package org.elasticsearch.xpack.eql.execution.sequence;
 
-import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
+import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
 import org.elasticsearch.xpack.ql.util.Check;
 
@@ -32,7 +32,7 @@ public class Sequence {
 
     private int currentStage = 0;
 
-    public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) {
+    public Sequence(SequenceKey key, int stages, Ordinal ordinal, HitReference firstHit) {
         Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
         this.key = key;
         this.stages = stages;
@@ -40,7 +40,7 @@ public class Sequence {
         this.matches[0] = new Match(ordinal, firstHit);
     }
 
-    public int putMatch(int stage, SearchHit hit, Ordinal ordinal) {
+    public int putMatch(int stage, Ordinal ordinal, HitReference hit) {
         if (stage == currentStage + 1) {
             int previousStage = currentStage;
             currentStage = stage;
@@ -62,8 +62,8 @@ public class Sequence {
         return matches[0].ordinal();
     }
 
-    public List<SearchHit> hits() {
-        List<SearchHit> hits = new ArrayList<>(matches.length);
+    public List<HitReference> hits() {
+        List<HitReference> hits = new ArrayList<>(matches.length);
         for (Match m : matches) {
             hits.add(m.hit());
         }

+ 9 - 13
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

@@ -11,10 +11,9 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.Limit;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
-import org.elasticsearch.xpack.eql.session.Payload;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -98,10 +97,10 @@ public class SequenceMatcher {
      * Match hits for the given stage.
      * Returns false if the process needs to be stopped.
      */
-    public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
-        for (Tuple<KeyAndOrdinal, SearchHit> tuple : hits) {
+    public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
+        for (Tuple<KeyAndOrdinal, HitReference> tuple : hits) {
             KeyAndOrdinal ko = tuple.v1();
-            SearchHit hit = tuple.v2();
+            HitReference hit = tuple.v2();
 
             if (stage == 0) {
                 Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
@@ -125,7 +124,7 @@ public class SequenceMatcher {
      * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
      * given stage. If that's the case, update the sequence and the rest of the references.
      */
-    private void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
+    private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit) {
         stats.seen++;
         
         int previousStage = stage - 1;
@@ -172,7 +171,7 @@ public class SequenceMatcher {
             }
         }
         
-        sequence.putMatch(stage, hit, ordinal);
+        sequence.putMatch(stage, ordinal, hit);
 
         // bump the stages
         if (stage == completionStage) {
@@ -207,12 +206,9 @@ public class SequenceMatcher {
         return false;
     }
 
-    public Payload payload(long startTime) {
-        TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
-        List<Sequence> view = limit != null ? limit.view(completed) : completed;
-        Payload p = new SequencePayload(view, false, tookTime);
-        clear();
-        return p;
+
+    public List<Sequence> completed() {
+        return limit != null ? limit.view(completed) : completed;
     }
 
     public void dropUntil() {

+ 9 - 10
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java

@@ -7,26 +7,25 @@
 package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
 import org.elasticsearch.xpack.eql.session.Results.Type;
-import org.elasticsearch.xpack.eql.util.ReversedIterator;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 class SequencePayload extends AbstractPayload {
 
-    private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;
+    private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values;
 
-    SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook) {
+    SequencePayload(List<Sequence> sequences, List<List<SearchHit>> searchHits, boolean timedOut, TimeValue timeTook) {
         super(timedOut, timeTook);
-        sequences = new ArrayList<>(seq.size());
-        boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0);
+        values = new ArrayList<>(sequences.size());
         
-        for (Iterator<Sequence> it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) {
-            Sequence s = it.next();
-            sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
+        for (int i = 0; i < sequences.size(); i++) {
+            Sequence s = sequences.get(i);
+            List<SearchHit> hits = searchHits.get(i);
+            values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), hits));
         }
     }
 
@@ -38,6 +37,6 @@ class SequencePayload extends AbstractPayload {
     @SuppressWarnings("unchecked")
     @Override
     public <V> List<V> values() {
-        return (List<V>) sequences;
+        return (List<V>) values;
     }
 }

+ 60 - 21
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java → x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java

@@ -4,19 +4,23 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.eql.execution.assembler;
+package org.elasticsearch.xpack.eql.execution.sequence;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.eql.execution.assembler.BoxedQueryRequest;
+import org.elasticsearch.xpack.eql.execution.assembler.Criterion;
+import org.elasticsearch.xpack.eql.execution.assembler.Executable;
+import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.Ordinal;
 import org.elasticsearch.xpack.eql.execution.search.QueryClient;
-import org.elasticsearch.xpack.eql.execution.sequence.KeyAndOrdinal;
-import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
-import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
+import org.elasticsearch.xpack.eql.session.EmptyPayload;
 import org.elasticsearch.xpack.eql.session.Payload;
+import org.elasticsearch.xpack.eql.session.Results.Type;
 import org.elasticsearch.xpack.eql.util.ReversedIterator;
 
 import java.util.Iterator;
@@ -98,7 +102,7 @@ public class TumblingWindow implements Executable {
 
         if (hits.isEmpty() == false) {
             if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
-                listener.onResponse(payload());
+                payload(listener);
                 return;
             }
         }
@@ -120,7 +124,7 @@ public class TumblingWindow implements Executable {
             }
             // there aren't going to be any matches so cancel search
             else {
-                listener.onResponse(payload());
+                payload(listener);
             }
             return;
         }
@@ -231,7 +235,7 @@ public class TumblingWindow implements Executable {
 
                 // if the limit has been reached, return what's available
                 if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
-                    listener.onResponse(payload());
+                    payload(listener);
                     return;
                 }
             }
@@ -281,48 +285,83 @@ public class TumblingWindow implements Executable {
         return criterion.reverse() != base.reverse();
     }
 
-    Iterable<Tuple<KeyAndOrdinal, SearchHit>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
+    private void payload(ActionListener<Payload> listener) {
+        List<Sequence> completed = matcher.completed();
+
+        if (completed.isEmpty()) {
+            listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook()));
+            matcher.clear();
+            return;
+        }
+
+        client.get(hits(completed), wrap(searchHits -> {
+            listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook()));
+            matcher.clear();
+        }, listener::onFailure));
+    }
+
+    private TimeValue timeTook() {
+        return new TimeValue(System.currentTimeMillis() - startTime);
+    }
+    Iterable<List<HitReference>> hits(List<Sequence> sequences) {
         return () -> {
-            final Iterator<SearchHit> iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
+            final Iterator<Sequence> delegate = criteria.get(0).reverse() != criteria.get(1).reverse() ?
+                    new ReversedIterator<>(sequences) :
+                    sequences.iterator();
+            
+            return new Iterator<>() {
+
+                @Override
+                public boolean hasNext() {
+                    return delegate.hasNext();
+                }
+
+                @Override
+                public List<HitReference> next() {
+                    return delegate.next().hits();
+                }
+            };
+        };
+    }
+
+    Iterable<Tuple<KeyAndOrdinal, HitReference>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
+        return () -> {
+            final Iterator<SearchHit> delegate = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
 
             return new Iterator<>() {
 
                 @Override
                 public boolean hasNext() {
-                    return iter.hasNext();
+                    return delegate.hasNext();
                 }
 
                 @Override
-                public Tuple<KeyAndOrdinal, SearchHit> next() {
-                    SearchHit hit = iter.next();
+                public Tuple<KeyAndOrdinal, HitReference> next() {
+                    SearchHit hit = delegate.next();
                     SequenceKey k = criterion.key(hit);
                     Ordinal o = criterion.ordinal(hit);
-                    return new Tuple<>(new KeyAndOrdinal(k, o), hit);
+                    return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(hit));
                 }
             };
         };
     }
 
-    Iterable<KeyAndOrdinal> wrapUntilValues(Iterable<Tuple<KeyAndOrdinal, SearchHit>> iterable) {
+    <E> Iterable<KeyAndOrdinal> wrapUntilValues(Iterable<Tuple<KeyAndOrdinal, E>> iterable) {
         return () -> {
-            final Iterator<Tuple<KeyAndOrdinal, SearchHit>> iter = iterable.iterator();
+            final Iterator<Tuple<KeyAndOrdinal, E>> delegate = iterable.iterator();
 
             return new Iterator<>() {
 
                 @Override
                 public boolean hasNext() {
-                    return iter.hasNext();
+                    return delegate.hasNext();
                 }
 
                 @Override
                 public KeyAndOrdinal next() {
-                    return iter.next().v1();
+                    return delegate.next().v1();
                 }
             };
         };
     }
-
-    Payload payload() {
-        return matcher.payload(startTime);
-    }
 }

+ 5 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plan.physical;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
@@ -49,17 +50,16 @@ public class EsQueryExec extends LeafExec {
         return output;
     }
 
-    public QueryRequest queryRequest(EqlSession session) {
+    public SearchSourceBuilder source(EqlSession session) {
         EqlConfiguration cfg = session.configuration();
         // by default use the configuration size
-        // join/sequence queries will want to override this
-        SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter());
-        return () -> sourceBuilder;
+        return SourceGenerator.sourceBuilder(queryContainer, cfg.filter());
     }
 
     @Override
     public void execute(EqlSession session, ActionListener<Payload> listener) {
-        QueryRequest request = queryRequest(session);
+        // endpoint - fetch all source
+        QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE);
         listener = shouldReverse(request) ? new ReverseListener(listener) : listener;
         new BasicQueryClient(session).query(request, listener);
     }

+ 7 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java

@@ -16,9 +16,15 @@ import static java.util.Collections.emptyList;
 public class EmptyPayload implements Payload {
 
     private final Type type;
+    private final TimeValue timeTook;
 
     public EmptyPayload(Type type) {
+        this(type, TimeValue.ZERO);
+    }
+
+    public EmptyPayload(Type type, TimeValue timeTook) {
         this.type = type;
+        this.timeTook = timeTook;
     }
 
     @Override
@@ -33,7 +39,7 @@ public class EmptyPayload implements Payload {
 
     @Override
     public TimeValue timeTook() {
-        return TimeValue.ZERO;
+        return timeTook;
     }
 
     @Override

+ 22 - 9
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.assembler;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.unit.TimeValue;
@@ -17,8 +18,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
 import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec;
+import org.elasticsearch.xpack.eql.execution.search.HitReference;
 import org.elasticsearch.xpack.eql.execution.search.QueryClient;
+import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
 import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
+import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
 import org.elasticsearch.xpack.eql.session.Payload;
 import org.elasticsearch.xpack.eql.session.Results;
 import org.elasticsearch.xpack.eql.session.Results.Type;
@@ -170,6 +174,23 @@ public class SequenceSpecTests extends ESTestCase {
         }
     }
 
+    class TestQueryClient implements QueryClient {
+
+        @Override
+        public void query(QueryRequest r, ActionListener<Payload> l) {
+            int ordinal = r.searchSource().size();
+            if (ordinal != Integer.MAX_VALUE) {
+                r.searchSource().size(Integer.MAX_VALUE);
+            }
+            Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
+            l.onResponse(new TestPayload(evs));
+        }
+
+        @Override
+        public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
+            //no-op
+        }
+    }
 
     public SequenceSpecTests(String testName, int lineNumber, SeriesSpec spec) {
         this.lineNumber = lineNumber;
@@ -199,15 +220,7 @@ public class SequenceSpecTests extends ESTestCase {
         // convert the results through a test specific payload
         SequenceMatcher matcher = new SequenceMatcher(stages, TimeValue.MINUS_ONE, null);
         
-        QueryClient testClient = (r, l) -> {
-            int ordinal = r.searchSource().size();
-            if (ordinal != Integer.MAX_VALUE) {
-                r.searchSource().size(Integer.MAX_VALUE);
-            }
-            Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
-            l.onResponse(new TestPayload(evs));
-        };
-        
+        QueryClient testClient = new TestQueryClient();
         TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher);
 
         // finally make the assertion at the end of the listener

+ 2 - 1
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java

@@ -22,6 +22,7 @@ import java.util.Locale;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
 
 public class QueryFolderOkTests extends AbstractQueryFolderTestCase {
 
@@ -139,6 +140,6 @@ public class QueryFolderOkTests extends AbstractQueryFolderTestCase {
         assertThat(query, containsString("\"term\":{\"event.category\":{\"value\":\"process\""));
 
         // test field source extraction
-        assertThat(query, containsString("\"_source\":{\"includes\":[],\"excludes\":[]"));
+        assertThat(query, not(containsString("\"_source\":{\"includes\":[],\"excludes\":[]")));
     }
 }