Browse Source

EQL: Introduce tie breaker support (#57787)

Allow a field inside the data to be used as a tie breaker for events
that have the same timestamp.
The field is optional by default.
If used, the tie-breaker always requires a non-null value since it is
used inside `search_after` which requires a non-null value.

Fix #56824
Costin Leau 5 years ago
parent
commit
e5719ecb47
29 changed files with 488 additions and 221 deletions
  1. 17 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java
  2. 4 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchRequestTests.java
  3. 5 13
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java
  4. 14 2
      x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json
  5. 0 22
      x-pack/plugin/eql/qa/common/src/main/resources/test_queries_supported.toml
  6. 9 62
      x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml
  7. 21 2
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java
  8. 5 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java
  9. 33 4
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java
  10. 6 6
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java
  11. 4 2
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java
  12. 28 13
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java
  13. 10 3
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java
  14. 8 4
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java
  15. 25 5
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java
  16. 3 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceKey.java
  17. 27 5
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java
  18. 60 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java
  19. 40 20
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java
  20. 11 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java
  21. 26 13
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/Join.java
  22. 17 10
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/KeyedFilter.java
  23. 14 20
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/Sequence.java
  24. 23 11
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java
  25. 2 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java
  26. 1 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java
  27. 3 1
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java
  28. 68 0
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/expression/EmptyAttribute.java
  29. 4 0
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/expression/Expressions.java

+ 17 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java

@@ -43,9 +43,11 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
     private int fetchSize = 50;
     private SearchAfterBuilder searchAfterBuilder;
     private String query;
+    private String tieBreakerField;
 
     static final String KEY_FILTER = "filter";
     static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
+    static final String KEY_TIE_BREAKER_FIELD = "tie_breaker_field";
     static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
     static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
     static final String KEY_SIZE = "size";
@@ -64,6 +66,9 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
             builder.field(KEY_FILTER, filter);
         }
         builder.field(KEY_TIMESTAMP_FIELD, timestampField());
+        if (tieBreakerField != null) {
+            builder.field(KEY_TIE_BREAKER_FIELD, tieBreakerField());
+        }
         builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
         if (implicitJoinKeyField != null) {
             builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
@@ -107,6 +112,16 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
         return this;
     }
 
+    public String tieBreakerField() {
+        return this.tieBreakerField;
+    }
+
+    public EqlSearchRequest tieBreakerField(String tieBreakerField) {
+        Objects.requireNonNull(tieBreakerField, "tie breaker field must not be null");
+        this.tieBreakerField = tieBreakerField;
+        return this;
+    }
+
     public String eventCategoryField() {
         return this.eventCategoryField;
     }
@@ -180,6 +195,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
                 Objects.equals(indicesOptions, that.indicesOptions) &&
                 Objects.equals(filter, that.filter) &&
                 Objects.equals(timestampField, that.timestampField) &&
+                Objects.equals(tieBreakerField, that.tieBreakerField) &&
                 Objects.equals(eventCategoryField, that.eventCategoryField) &&
                 Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
                 Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
@@ -194,6 +210,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
             filter,
             fetchSize,
             timestampField,
+            tieBreakerField,
             eventCategoryField,
             implicitJoinKeyField,
             searchAfterBuilder,

+ 4 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchRequestTests.java

@@ -51,6 +51,9 @@ public class EqlSearchRequestTests extends AbstractRequestTestCase<EqlSearchRequ
         if (randomBoolean()) {
             EqlSearchRequest.timestampField(randomAlphaOfLength(10));
         }
+        if (randomBoolean()) {
+            EqlSearchRequest.tieBreakerField(randomAlphaOfLength(10));
+        }
         if (randomBoolean()) {
             EqlSearchRequest.searchAfter(randomArray(1, 4, Object[]::new, () -> randomAlphaOfLength(3)));
         }
@@ -75,6 +78,7 @@ public class EqlSearchRequestTests extends AbstractRequestTestCase<EqlSearchRequ
         assertThat(serverInstance.eventCategoryField(), equalTo(clientTestInstance.eventCategoryField()));
         assertThat(serverInstance.implicitJoinKeyField(), equalTo(clientTestInstance.implicitJoinKeyField()));
         assertThat(serverInstance.timestampField(), equalTo(clientTestInstance.timestampField()));
+        assertThat(serverInstance.tieBreakerField(), equalTo(clientTestInstance.tieBreakerField()));
         assertThat(serverInstance.filter(), equalTo(clientTestInstance.filter()));
         assertThat(serverInstance.query(), equalTo(clientTestInstance.query()));
         assertThat(serverInstance.searchAfter(), equalTo(clientTestInstance.searchAfter()));

+ 5 - 13
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java

@@ -36,6 +36,7 @@ import org.junit.BeforeClass;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.stream.Collectors.toList;
@@ -77,18 +78,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
             List<Object> list = parser.list();
             for (Object item : list) {
                 assertThat(item, instanceOf(HashMap.class));
-
-                HashMap<String, Object> entry = (HashMap<String, Object>) item;
-
-                // Adjust the structure of the document with additional event.category and @timestamp fields
-                // Add event.category field
-                HashMap<String, Object> objEvent = new HashMap<>();
-                objEvent.put("category", entry.get("event_type"));
-                entry.put("event", objEvent);
-
-                // Add @timestamp field
-                entry.put("@timestamp", entry.get("timestamp"));
-
+                Map<String, Object> entry = (Map<String, Object>) item;
                 bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON));
             }
         }
@@ -162,8 +152,9 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
                 name = spec.note();
             }
             if (Strings.isNullOrEmpty(name)) {
-                name = spec.query();
+                name = "" + (counter.get() + 1);
             }
+
             return new Object[] { counter.incrementAndGet(), name, spec };
         }).collect(toList());
     }
@@ -197,6 +188,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
 
     protected EqlSearchResponse runQuery(String index, String query) throws Exception {
         EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
+        request.tieBreakerField("event.sequence");
         return eqlClient().search(request, RequestOptions.DEFAULT);
     }
 

+ 14 - 2
x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json

@@ -3,10 +3,18 @@
         "command_line" : {
             "type" : "keyword"
         },
+        "event_type" : {
+            "type" : "keyword"
+        },
         "event" : {
             "properties" : {
                 "category" : {
-                    "type" : "keyword"
+                    "type" : "alias",
+                    "path" : "event_type"
+                },
+                "sequence" : {
+                    "type" : "alias",
+                    "path" : "serial_event_id"
                 }
             }
         },
@@ -34,9 +42,13 @@
         "subtype" : {
             "type" : "keyword"
         },
-        "@timestamp" : {
+        "timestamp" : {
             "type" : "date"
         },
+        "@timestamp" : {
+            "type" : "alias",
+            "path" : "timestamp"
+        },
         "user" : {
             "type" : "keyword"
         },

+ 0 - 22
x-pack/plugin/eql/qa/common/src/main/resources/test_queries_supported.toml

@@ -187,28 +187,6 @@ query = "file where 66.0 / serial_event_id == 1"
 expected_event_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 46]
 query = "process where serial_event_id + ((1 + 3) * 2 / (3 - 1)) * 2 == 54 or 70 + serial_event_id < 100"
 
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id = 1]
-  [process where serial_event_id = 2]
-'''
-expected_event_ids  = [1, 2]
-
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id=1] by unique_pid
-  [process where true] by unique_ppid'''
-expected_event_ids  = [1, 2]
-
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id<3] by unique_pid
-  [process where true] by unique_ppid
-'''
-expected_event_ids  = [1, 2, 2, 3]
 
 [[queries]]
 query = '''

+ 9 - 62
x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml

@@ -12,6 +12,15 @@
 # query = 'process where serial_event_id = 1'
 # expected_event_ids  = [1]
 
+
+# fails because of string check - msbuild does not match MSBuild
+[[queries]]
+query = '''
+sequence by unique_pid [process where opcode=1 and process_name == 'msbuild.exe'] [network where true]'''
+expected_event_ids  = [75273, 75304]
+description = "test that process sequences are working correctly"
+
+
 [[queries]]
 query = 'process where true | head 6'
 expected_event_ids  = [1, 2, 3, 4, 5, 6]
@@ -229,22 +238,6 @@ process where true
 | sort md5 event_subtype_full null_field process_name
 | sort serial_event_id'''
 
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id < 5]
-  [process where serial_event_id = 5]
-'''
-expected_event_ids  = [4, 5]
-
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id<3] by unique_pid
-  [process where true] by unique_ppid
-'''
-expected_event_ids  = [1, 2, 2, 3]
-
 [[queries]]
 query = '''
 sequence
@@ -329,22 +322,6 @@ sequence with maxspan=0.5s
 | tail 2'''
 expected_event_ids = []
 
-[[queries]]
-query = '''
-sequence
-  [process where serial_event_id < 5]
-  [process where serial_event_id < 5]
-'''
-expected_event_ids  = [1, 2, 2, 3, 3, 4]
-
-[[queries]]
-query = '''
-sequence
-  [file where opcode=0 and file_name="svchost.exe"] by unique_pid
-  [process where opcode == 1] by unique_ppid
-'''
-expected_event_ids  = [55, 56]
-
 [[queries]]
 query = '''
 sequence
@@ -562,30 +539,6 @@ expected_event_ids  = [55, 95]
 query = 'process where event of [process where process_name = "python.exe" ]'
 expected_event_ids  = [48, 50, 51, 54, 93]
 
-[[queries]]
-query = '''
-sequence
-  [file where file_name="lsass.exe"] by file_path,process_path
-  [process where true] by process_path,parent_process_path
-'''
-expected_event_ids  = [61, 62]
-
-[[queries]]
-query = '''
-sequence by user_name
-  [file where file_name="lsass.exe"] by file_path, process_path
-  [process where true] by process_path, parent_process_path
-'''
-expected_event_ids  = [61, 62]
-
-[[queries]]
-query = '''
-sequence by pid
-  [file where file_name="lsass.exe"] by file_path,process_path
-  [process where true] by process_path,parent_process_path
-'''
-expected_event_ids = []
-
 [[queries]]
 query = '''
 sequence by user_name
@@ -839,12 +792,6 @@ process where process_name != original_file_name
 expected_event_ids = []
 description = "check that case insensitive comparisons are performed for fields."
 
-[[queries]]
-query = '''
-sequence by unique_pid [process where opcode=1 and process_name == 'msbuild.exe'] [network where true]'''
-expected_event_ids  = [75273, 75304]
-description = "test that process sequences are working correctly"
-
 [[queries]]
 expected_event_ids  = [57]
 description = "test arraySearch functionality for lists of strings, and lists of objects"

+ 21 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java

@@ -32,8 +32,8 @@ import java.util.function.Supplier;
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
 import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
-import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
 import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
+import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
 
 public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Replaceable, ToXContent {
 
@@ -43,6 +43,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
 
     private QueryBuilder filter = null;
     private String timestampField = FIELD_TIMESTAMP;
+    private String tieBreakerField = null;
     private String eventCategoryField = FIELD_EVENT_CATEGORY;
     private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
     private int fetchSize = FETCH_SIZE;
@@ -52,6 +53,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
 
     static final String KEY_FILTER = "filter";
     static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
+    static final String KEY_TIE_BREAKER_FIELD = "tie_breaker_field";
     static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
     static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
     static final String KEY_SIZE = "size";
@@ -61,6 +63,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
 
     static final ParseField FILTER = new ParseField(KEY_FILTER);
     static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
+    static final ParseField TIE_BREAKER_FIELD = new ParseField(KEY_TIE_BREAKER_FIELD);
     static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
     static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
     static final ParseField SIZE = new ParseField(KEY_SIZE);
@@ -80,6 +83,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
         indicesOptions = IndicesOptions.readIndicesOptions(in);
         filter = in.readOptionalNamedWriteable(QueryBuilder.class);
         timestampField = in.readString();
+        tieBreakerField = in.readOptionalString();
         eventCategoryField = in.readString();
         implicitJoinKeyField = in.readString();
         fetchSize = in.readVInt();
@@ -136,6 +140,9 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
             builder.field(KEY_FILTER, filter);
         }
         builder.field(KEY_TIMESTAMP_FIELD, timestampField());
+        if (tieBreakerField != null) {
+            builder.field(KEY_TIE_BREAKER_FIELD, tieBreakerField());
+        }
         builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
         if (implicitJoinKeyField != null) {
             builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
@@ -161,6 +168,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
         parser.declareObject(EqlSearchRequest::filter,
             (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), FILTER);
         parser.declareString(EqlSearchRequest::timestampField, TIMESTAMP_FIELD);
+        parser.declareString(EqlSearchRequest::tieBreakerField, TIE_BREAKER_FIELD);
         parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
         parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
         parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
@@ -191,6 +199,13 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
         return this;
     }
 
+    public String tieBreakerField() { return this.tieBreakerField; }
+
+    public EqlSearchRequest tieBreakerField(String tieBreakerField) {
+        this.tieBreakerField = tieBreakerField;
+        return this;
+    }
+
     public String eventCategoryField() { return this.eventCategoryField; }
 
     public EqlSearchRequest eventCategoryField(String eventCategoryField) {
@@ -250,6 +265,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
         indicesOptions.writeIndicesOptions(out);
         out.writeOptionalNamedWriteable(filter);
         out.writeString(timestampField);
+        out.writeOptionalString(tieBreakerField);
         out.writeString(eventCategoryField);
         out.writeString(implicitJoinKeyField);
         out.writeVInt(fetchSize);
@@ -272,6 +288,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
                 Objects.equals(indicesOptions, that.indicesOptions) &&
                 Objects.equals(filter, that.filter) &&
                 Objects.equals(timestampField, that.timestampField) &&
+                Objects.equals(tieBreakerField, that.tieBreakerField) &&
                 Objects.equals(eventCategoryField, that.eventCategoryField) &&
                 Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
                 Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
@@ -286,7 +303,9 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
             indicesOptions,
             filter,
             fetchSize,
-            timestampField, eventCategoryField,
+            timestampField,
+            tieBreakerField,
+            eventCategoryField,
             implicitJoinKeyField,
             searchAfterBuilder,
             query,

+ 5 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java

@@ -30,6 +30,11 @@ public class EqlSearchRequestBuilder extends ActionRequestBuilder<EqlSearchReque
         return this;
     }
 
+    public EqlSearchRequestBuilder tieBreakerField(String tieBreakerField) {
+        request.tieBreakerField(tieBreakerField);
+        return this;
+    }
+
     public EqlSearchRequestBuilder eventCategoryField(String eventCategoryField) {
         request.eventCategoryField(eventCategoryField);
         return this;

+ 33 - 4
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java

@@ -6,7 +6,9 @@
 
 package org.elasticsearch.xpack.eql.execution.assembler;
 
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
 import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
 
 import java.util.List;
@@ -16,11 +18,14 @@ public class Criterion {
     private final SearchSourceBuilder searchSource;
     private final List<HitExtractor> keyExtractors;
     private final HitExtractor timestampExtractor;
+    private final HitExtractor tieBreakerExtractor;
 
-    public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor) {
+    public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
+                     HitExtractor tieBreakerExtractor) {
         this.searchSource = searchSource;
         this.keyExtractors = searchAfterExractors;
         this.timestampExtractor = timestampExtractor;
+        this.tieBreakerExtractor = tieBreakerExtractor;
     }
 
     public SearchSourceBuilder searchSource() {
@@ -35,8 +40,32 @@ public class Criterion {
         return timestampExtractor;
     }
 
-    public void fromTimestamp(long timestampMarker) {
+    public HitExtractor tieBreakerExtractor() {
+        return tieBreakerExtractor;
+    }
+
+    public long timestamp(SearchHit hit) {
+        Object ts = timestampExtractor.extract(hit);
+        if (ts instanceof Number) {
+            return ((Number) ts).longValue();
+        }
+        throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    public Comparable<Object> tieBreaker(SearchHit hit) {
+        if (tieBreakerExtractor == null) {
+            return null;
+        }
+        Object tb = tieBreakerExtractor.extract(hit);
+        if (tb instanceof Comparable) {
+            return (Comparable<Object>) tb;
+        }
+        throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
+    }
+
+    public void fromMarkers(Object[] markers) {
         // TODO: this is likely to be rewritten afterwards
-        searchSource.searchAfter(new Object[] { timestampMarker });
+        searchSource.searchAfter(markers);
     }
-}
+}

+ 6 - 6
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

@@ -24,7 +24,6 @@ import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
 import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
 import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.eql.plan.physical.SequenceExec;
 import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry;
 import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
 import org.elasticsearch.xpack.eql.session.EqlConfiguration;
@@ -32,6 +31,7 @@ import org.elasticsearch.xpack.eql.session.EqlSession;
 import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
 import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.util.Check;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 
@@ -57,18 +57,18 @@ public class ExecutionManager implements QueryClient {
         this.indices = cfg.indexAsWildcard();
     }
 
-    public Executable from(SequenceExec seqExec) {
+
+    public Executable assemble(List<List<Attribute>> listOfKeys, List<PhysicalPlan> plans, Attribute timestamp, Attribute tieBreaker) {
         FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
         
-        List<List<Attribute>> listOfKeys = seqExec.keys();
-        List<PhysicalPlan> plans = seqExec.children();
         List<Criterion> criteria = new ArrayList<>(plans.size() - 1);
         
         // build a criterion for each query
         for (int i = 0; i < plans.size() - 1; i++) {
             List<Attribute> keys = listOfKeys.get(i);
             // fields
-            HitExtractor tsExtractor = timestampExtractor(hitExtractor(seqExec.timestamp(), extractorRegistry));
+            HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
+            HitExtractor tbExtractor = Expressions.isPresent(tieBreaker) ? hitExtractor(tieBreaker, extractorRegistry) : null;
             List<HitExtractor> keyExtractors = hitExtractors(keys, extractorRegistry);
 
             PhysicalPlan query = plans.get(i);
@@ -78,7 +78,7 @@ public class ExecutionManager implements QueryClient {
             QueryContainer container = ((EsQueryExec) query).queryContainer();
             SearchSourceBuilder searchSource = SourceGenerator.sourceBuilder(container, cfg.filter(), cfg.size());
             
-            criteria.add(new Criterion(searchSource, keyExtractors, tsExtractor));
+            criteria.add(new Criterion(searchSource, keyExtractors, tsExtractor, tbExtractor));
         }
         return new SequenceRuntime(criteria, this);
     }

+ 4 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyWithTime.java → x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java

@@ -8,12 +8,14 @@ package org.elasticsearch.xpack.eql.execution.assembler;
 
 import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
 
-class KeyWithTime {
+class KeyAndOrdinal {
     final SequenceKey key;
     final long timestamp;
+    final Comparable<Object> tieBreaker;
 
-    KeyWithTime(SequenceKey key, long timestamp) {
+    KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tieBreaker) {
         this.key = key;
         this.timestamp = timestamp;
+        this.tieBreaker = tieBreaker;
     }
 }

+ 28 - 13
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java

@@ -36,8 +36,9 @@ class SequenceRuntime implements Executable {
     SequenceRuntime(List<Criterion> criteria, QueryClient queryClient) {
         this.criteria = criteria;
         this.numberOfStages = criteria.size();
-        this.stateMachine = new SequenceStateMachine(numberOfStages);
         this.queryClient = queryClient;
+        boolean hasTieBreaker = criteria.get(0).tieBreakerExtractor() != null;
+        this.stateMachine = new SequenceStateMachine(numberOfStages, hasTieBreaker);
     }
 
     @Override
@@ -51,7 +52,7 @@ class SequenceRuntime implements Executable {
         queryClient.query(firstStage.searchSource(), wrap(payload -> {
 
             // 1. execute last stage (find keys)
-            startTracking(payload);
+            startTracking(payload, resultsListener);
 
             // 2. go descending through the rest of the stages, while adjusting the query
             inspectStage(1, resultsListener);
@@ -59,26 +60,40 @@ class SequenceRuntime implements Executable {
         }, resultsListener::onFailure));
     }
 
-    private void startTracking(Payload<SearchHit> payload) {
+    private void startTracking(Payload<SearchHit> payload, ActionListener<Results> resultsListener) {
         Criterion lastCriterion = criteria.get(0);
         List<SearchHit> hits = payload.values();
 
+        // nothing matches the first query, bail out early
+        if (hits.isEmpty()) {
+            resultsListener.onResponse(assembleResults());
+            return;
+        }
+        
         long tMin = Long.MAX_VALUE;
         long tMax = Long.MIN_VALUE;
+        
+        Comparable<Object> bMin = null;
         // we could have extracted that in the hit loop but that if would have been evaluated
         // for every document
         if (hits.isEmpty() == false) {
-            tMin = (Long) lastCriterion.timestampExtractor().extract(hits.get(0));
-            tMax = (Long) lastCriterion.timestampExtractor().extract(hits.get(hits.size() - 1));
+            tMin = lastCriterion.timestamp(hits.get(0));
+            tMax = lastCriterion.timestamp(hits.get(hits.size() - 1));
+            
+            if (lastCriterion.tieBreakerExtractor() != null) {
+               bMin = lastCriterion.tieBreaker(hits.get(0));
+            }
         }
 
         for (SearchHit hit : hits) {
-            KeyWithTime keyAndTime = findKey(hit, lastCriterion);
-            Sequence seq = new Sequence(keyAndTime.key, numberOfStages, keyAndTime.timestamp, hit);
+            KeyAndOrdinal ko = findKey(hit, lastCriterion);
+            Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tieBreaker, hit);
             stateMachine.trackSequence(seq, tMin, tMax);
         }
-        // TB: change
         stateMachine.setTimestampMarker(0, tMin);
+        if (bMin != null) {
+            stateMachine.setTieBreakerMarker(0, bMin);
+        }
     }
 
     private void inspectStage(int stage, ActionListener<Results> resultsListener) {
@@ -90,7 +105,7 @@ class SequenceRuntime implements Executable {
         // else continue finding matches
         Criterion currentCriterion = criteria.get(stage);
         // narrow by the previous stage timestamp marker
-        currentCriterion.fromTimestamp(stateMachine.getTimestampMarker(stage - 1));
+        currentCriterion.fromMarkers(stateMachine.getMarkers(stage - 1));
         
         queryClient.query(currentCriterion.searchSource(), wrap(payload -> {
             findMatches(stage, payload);
@@ -104,12 +119,12 @@ class SequenceRuntime implements Executable {
         
         // break the results per key
         for (SearchHit hit : hits) {
-            KeyWithTime kt = findKey(hit, currentCriterion);
-            stateMachine.match(currentStage, kt.key, kt.timestamp, hit);
+            KeyAndOrdinal ko = findKey(hit, currentCriterion);
+            stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tieBreaker, hit);
         }
     }
 
-    private KeyWithTime findKey(SearchHit hit, Criterion criterion) {
+    private KeyAndOrdinal findKey(SearchHit hit, Criterion criterion) {
         List<HitExtractor> keyExtractors = criterion.keyExtractors();
 
         SequenceKey key;
@@ -123,7 +138,7 @@ class SequenceRuntime implements Executable {
             key = new SequenceKey(docKeys);
         }
 
-        return new KeyWithTime(key, (Long) criterion.timestampExtractor().extract(hit));
+        return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tieBreaker(hit));
     }
 
     private Results assembleResults() {

+ 10 - 3
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java

@@ -16,10 +16,12 @@ import java.util.Objects;
 class Match {
 
     private final long timestamp;
+    private final Comparable<Object> tieBreaker;
     private final SearchHit hit;
 
-    Match(long timestamp, SearchHit hit) {
+    Match(long timestamp, Comparable<Object> tieBreaker, SearchHit hit) {
         this.timestamp = timestamp;
+        this.tieBreaker = tieBreaker;
         this.hit = hit;
     }
 
@@ -27,13 +29,17 @@ class Match {
         return timestamp;
     }
 
+    Comparable<Object> tieBreaker() {
+        return tieBreaker;
+    }
+
     SearchHit hit() {
         return hit;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, hit);
+        return Objects.hash(timestamp, tieBreaker, hit);
     }
 
     @Override
@@ -48,11 +54,12 @@ class Match {
 
         Match other = (Match) obj;
         return Objects.equals(timestamp, other.timestamp)
+                && Objects.equals(tieBreaker, other.tieBreaker)
                 && Objects.equals(hit, other.hit);
     }
 
     @Override
     public String toString() {
-        return timestamp + "->" + hit.getId();
+        return timestamp + "[" + (tieBreaker != null ? tieBreaker : "") + "]->" + hit.getId();
     }
 }

+ 8 - 4
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java

@@ -31,19 +31,19 @@ public class Sequence {
 
     private int currentStage = 0;
 
-    public Sequence(SequenceKey key, int stages, long timestamp, SearchHit firstHit) {
+    public Sequence(SequenceKey key, int stages, long timestamp, Comparable<Object> tieBreaker, SearchHit firstHit) {
         Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
         this.key = key;
         this.stages = stages;
         this.matches = new Match[stages];
-        this.matches[0] = new Match(timestamp, firstHit);
+        this.matches[0] = new Match(timestamp, tieBreaker, firstHit);
     }
 
-    public int putMatch(int stage, SearchHit hit, long timestamp) {
+    public int putMatch(int stage, SearchHit hit, long timestamp, Comparable<Object> tieBreaker) {
         if (stage == currentStage + 1) {
             int previousStage = currentStage;
             currentStage = stage;
-            matches[currentStage] = new Match(timestamp, hit);
+            matches[currentStage] = new Match(timestamp, tieBreaker, hit);
             return previousStage;
         }
         throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage);
@@ -61,6 +61,10 @@ public class Sequence {
         return matches[currentStage].timestamp();
     }
 
+    public Comparable<Object> currentTieBreaker() {
+        return matches[currentStage].tieBreaker();
+    }
+
     public long timestamp(int stage) {
         // stages not initialized yet return an out-of-band value to have no impact on the interval range
         if (stage > currentStage) {

+ 25 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java

@@ -49,15 +49,24 @@ public class SequenceFrame {
      * Returns the latest Sequence from the group that has its timestamp
      * less than the given argument alongside its position in the list.
      */
-    public Tuple<Sequence, Integer> before(long timestamp) {
+    public Tuple<Sequence, Integer> before(long timestamp, Comparable<Object> tieBreaker) {
         Sequence matchSeq = null;
         int matchPos = -1;
         int position = -1;
         for (Sequence sequence : sequences) {
             position++;
+            // ts only comparison
             if (sequence.currentTimestamp() < timestamp) {
                 matchSeq = sequence;
                 matchPos = position;
+            }
+            // apply tiebreaker (null first, that is null is less than any value)
+            else if (tieBreaker != null && sequence.currentTimestamp() == timestamp) {
+                Comparable<Object> tb = sequence.currentTieBreaker();
+                if (tb == null || tb.compareTo(tieBreaker) < 0) {
+                    matchSeq = sequence;
+                    matchPos = position;
+                }
             } else {
                 break;
             }
@@ -69,18 +78,29 @@ public class SequenceFrame {
      * Returns the first Sequence from the group that has its timestamp
      * greater than the given argument alongside its position in the list.
      */
-    public Tuple<Sequence, Integer> after(long timestamp) {
-        Sequence match = null;
+    public Tuple<Sequence, Integer> after(long timestamp, Comparable<Object> tieBreaker) {
+        Sequence matchSeq = null;
+        int matchPos = -1;
         int position = -1;
         for (Sequence sequence : sequences) {
             position++;
+            // ts only comparison
             if (sequence.currentTimestamp() > timestamp) {
-                match = sequence;
+                matchSeq = sequence;
+                matchPos = position;
+            }
+            // apply tiebreaker (null first, that is null is less than any value)
+            else if (tieBreaker != null && sequence.currentTimestamp() == timestamp) {
+                Comparable<Object> tb = sequence.currentTieBreaker();
+                if (tb == null || tb.compareTo(tieBreaker) > 0) {
+                    matchSeq = sequence;
+                    matchPos = position;
+                }
             } else {
                 break;
             }
         }
-        return match != null ? new Tuple<>(match, position) : null;
+        return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null;
     }
 
     public boolean isEmpty() {

+ 3 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceKey.java

@@ -17,9 +17,11 @@ public class SequenceKey {
     public static final SequenceKey NONE = new SequenceKey();
 
     private final Object[] keys;
+    private final int hashCode;
 
     public SequenceKey(Object... keys) {
         this.keys = keys;
+        this.hashCode = Objects.hash(keys);
     }
 
     public List<String> asStringList() {
@@ -32,7 +34,7 @@ public class SequenceKey {
 
     @Override
     public int hashCode() {
-        return Objects.hash(keys);
+        return hashCode;
     }
 
     @Override

+ 27 - 5
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java

@@ -27,17 +27,25 @@ public class SequenceStateMachine {
     /** this ignores the key */
     private final long[] timestampMarkers;
 
+    private final Comparable<Object>[] tieBreakerMarkers;
+    private final boolean hasTieBreaker;
+
     private final int completionStage;
 
     /** list of completed sequences - separate to avoid polluting the other stages */
     private final List<Sequence> completed;
 
-    public SequenceStateMachine(int stages) {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public SequenceStateMachine(int stages, boolean hasTieBreaker) {
         this.completionStage = stages - 1;
+
         this.stageToKeys = new StageToKeys(completionStage);
         this.keyToSequences = new KeyToSequences(completionStage);
         this.timestampMarkers = new long[completionStage];
+        this.tieBreakerMarkers = new Comparable[completionStage];
         this.completed = new LinkedList<>();
+
+        this.hasTieBreaker = hasTieBreaker;
     }
 
     public List<Sequence> completeSequences() {
@@ -48,10 +56,24 @@ public class SequenceStateMachine {
         return timestampMarkers[stage];
     }
 
+    public Comparable<?> getTieBreakerMarker(int stage) {
+        return tieBreakerMarkers[stage];
+    }
+
     public void setTimestampMarker(int stage, long timestamp) {
         timestampMarkers[stage] = timestamp;
     }
 
+    public void setTieBreakerMarker(int stage, Comparable<Object> tieBreaker) {
+        tieBreakerMarkers[stage] = tieBreaker;
+    }
+
+    public Object[] getMarkers(int stage) {
+        long ts = timestampMarkers[stage];
+        Comparable<Object> tb = tieBreakerMarkers[stage];
+        return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts };
+    }
+
     public void trackSequence(Sequence sequence, long tMin, long tMax) {
         SequenceKey key = sequence.key();
 
@@ -62,10 +84,10 @@ public class SequenceStateMachine {
     }
 
     /**
-     * Match the given hit (based on key and timestamp) with any potential sequence from the previous
+     * Match the given hit (based on key and timestamp and potential tieBreaker) with any potential sequence from the previous
      * given stage. If that's the case, update the sequence and the rest of the references.
      */
-    public boolean match(int stage, SequenceKey key, long timestamp, SearchHit hit) {
+    public boolean match(int stage, SequenceKey key, long timestamp, Comparable<Object> tieBreaker, SearchHit hit) {
         int previousStage = stage - 1;
         // check key presence to avoid creating a collection
         SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key);
@@ -73,7 +95,7 @@ public class SequenceStateMachine {
             return false;
         }
         // pick the sequence with the highest timestamp lower than current match timestamp
-        Tuple<Sequence, Integer> before = frame.before(timestamp);
+        Tuple<Sequence, Integer> before = frame.before(timestamp, tieBreaker);
         if (before == null) {
             return false;
         }
@@ -81,7 +103,7 @@ public class SequenceStateMachine {
         // eliminate the match and all previous values from the frame
         frame.trim(before.v2() + 1);
         // update sequence
-        sequence.putMatch(stage, hit, timestamp);
+        sequence.putMatch(stage, hit, timestamp, tieBreaker);
 
         // remove the frame and keys early (as the key space is large)
         if (frame.isEmpty()) {

+ 60 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.eql.execution.sequence;
+
+import org.elasticsearch.xpack.ql.capabilities.Resolvable;
+import org.elasticsearch.xpack.ql.expression.Attribute;
+
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+/**
+ * Time ordinal for a given event.
+ * It is an internal construct that wraps the mandatory timestamp attribute and the optional application tie-breaker.
+ */
+public class TimeOrdinal implements Resolvable {
+
+    private final Attribute timestamp;
+    private final Attribute tieBreaker;
+
+    public TimeOrdinal(Attribute timestamp, Attribute tieBreaker) {
+        this.timestamp = timestamp;
+        this.tieBreaker = tieBreaker;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestamp, tieBreaker);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        
+        TimeOrdinal other = (TimeOrdinal) obj;
+        return Objects.equals(timestamp, other.timestamp) &&
+                Objects.equals(tieBreaker, other.tieBreaker);
+    }
+
+    @Override
+    public boolean resolved() {
+        return timestamp.resolved() && (tieBreaker == null || tieBreaker.resolved());
+    }
+
+    public List<Attribute> output() {
+        return tieBreaker == null ? singletonList(timestamp) : asList(timestamp, tieBreaker);
+    }
+}

+ 40 - 20
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java

@@ -5,22 +5,26 @@
  */
 package org.elasticsearch.xpack.eql.parser;
 
+import org.antlr.v4.runtime.tree.ParseTree;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.EventFilterContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.IntegerLiteralContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinContext;
+import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinKeysContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinTermContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.NumberContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceParamsContext;
 import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceTermContext;
+import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SubqueryContext;
 import org.elasticsearch.xpack.eql.plan.logical.Join;
 import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
 import org.elasticsearch.xpack.eql.plan.logical.Sequence;
 import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
 import org.elasticsearch.xpack.ql.expression.Attribute;
+import org.elasticsearch.xpack.ql.expression.EmptyAttribute;
 import org.elasticsearch.xpack.ql.expression.Expression;
-import org.elasticsearch.xpack.ql.expression.FieldAttribute;
+import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.expression.Literal;
 import org.elasticsearch.xpack.ql.expression.Order;
 import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
@@ -33,7 +37,6 @@ import org.elasticsearch.xpack.ql.plan.logical.Project;
 import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataTypes;
-import org.elasticsearch.xpack.ql.type.UnsupportedEsField;
 import org.elasticsearch.xpack.ql.util.CollectionUtils;
 
 import java.util.ArrayList;
@@ -41,11 +44,11 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
 
 public abstract class LogicalPlanBuilder extends ExpressionBuilder {
 
     private final UnresolvedRelation RELATION = new UnresolvedRelation(Source.EMPTY, null, "", false, "");
+    private final EmptyAttribute UNSPECIFIED_FIELD = new EmptyAttribute(Source.EMPTY);
 
     public LogicalPlanBuilder(ParserParams params) {
         super(params);
@@ -55,6 +58,10 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
         return new UnresolvedAttribute(Source.EMPTY, params.fieldTimestamp());
     }
 
+    private Attribute fieldTieBreaker() {
+        return params.fieldTieBreaker() != null ? new UnresolvedAttribute(Source.EMPTY, params.fieldTieBreaker()) : UNSPECIFIED_FIELD;
+    }
+
     @Override
     public LogicalPlan visitEventQuery(EqlBaseParser.EventQueryContext ctx) {
         return new Project(source(ctx), visitEventFilter(ctx.eventFilter()), emptyList());
@@ -77,9 +84,17 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
         }
 
         Filter filter = new Filter(source, RELATION, condition);
-        // add implicit sorting - when pipes are added, this would better sit there (as a default pipe)
-        Order order = new Order(source, fieldTimestamp(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST);
-        OrderBy orderBy = new OrderBy(source, filter, singletonList(order));
+        List<Order> orders = new ArrayList<>(2);
+
+        // TODO: add implicit sorting - when pipes are added, this would better sit there (as a default pipe)
+        orders.add(new Order(source, fieldTimestamp(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
+        // make sure to add the tieBreaker as well
+        Attribute tieBreaker = fieldTieBreaker();
+        if (Expressions.isPresent(tieBreaker)) {
+            orders.add(new Order(source, tieBreaker, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
+        }
+
+        OrderBy orderBy = new OrderBy(source, filter, orders);
         return orderBy;
     }
 
@@ -118,22 +133,30 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
             until = defaultUntil(source);
         }
 
-        return new Join(source, queries, until, fieldTimestamp());
+        return new Join(source, queries, until, fieldTimestamp(), fieldTieBreaker());
     }
 
     private KeyedFilter defaultUntil(Source source) {
         // no until declared means no results
-        // create a dummy keyed filter
-        String notUsed = "<not-used>";
-        Attribute tsField = new FieldAttribute(source, notUsed, new UnsupportedEsField(notUsed, notUsed));
-        return new KeyedFilter(source, new LocalRelation(source, emptyList()), emptyList(), tsField);
+        return new KeyedFilter(source, new LocalRelation(source, emptyList()), emptyList(), UNSPECIFIED_FIELD, UNSPECIFIED_FIELD);
     }
 
     public KeyedFilter visitJoinTerm(JoinTermContext ctx, List<Attribute> joinKeys) {
-        List<Attribute> keys = CollectionUtils.combine(joinKeys, visitJoinKeys(ctx.by));
-        LogicalPlan eventQuery = visitEventFilter(ctx.subquery().eventFilter());
-        LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, fieldTimestamp()));
-        return new KeyedFilter(source(ctx), child, keys, fieldTimestamp());
+        return keyedFilter(joinKeys, ctx, ctx.by, ctx.subquery());
+    }
+
+    private KeyedFilter keyedFilter(List<Attribute> joinKeys, ParseTree ctx, JoinKeysContext joinCtx, SubqueryContext subqueryCtx) {
+        List<Attribute> keys = CollectionUtils.combine(joinKeys, visitJoinKeys(joinCtx));
+        LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
+
+        List<Attribute> output = CollectionUtils.combine(keys, fieldTimestamp());
+        Attribute fieldTieBreaker = fieldTieBreaker();
+        if (Expressions.isPresent(fieldTieBreaker)) {
+            output = CollectionUtils.combine(output, fieldTieBreaker);
+        }
+        LogicalPlan child = new Project(source(ctx), eventQuery, output);
+
+        return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTieBreaker());
     }
 
     @Override
@@ -176,7 +199,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
             until = defaultUntil(source);
         }
 
-        return new Sequence(source, queries, until, maxSpan, fieldTimestamp());
+        return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTieBreaker());
     }
 
     public KeyedFilter visitSequenceTerm(SequenceTermContext ctx, List<Attribute> joinKeys) {
@@ -184,10 +207,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
             throw new ParsingException(source(ctx.FORK()), "sequence fork is unsupported");
         }
 
-        List<Attribute> keys = CollectionUtils.combine(joinKeys, visitJoinKeys(ctx.by));
-        LogicalPlan eventQuery = visitEventFilter(ctx.subquery().eventFilter());
-        LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, fieldTimestamp()));
-        return new KeyedFilter(source(ctx), child, keys, fieldTimestamp());
+        return keyedFilter(joinKeys, ctx, ctx.by, ctx.subquery());
     }
 
     @Override

+ 11 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java

@@ -11,14 +11,15 @@ import java.util.List;
 
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
-import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
 import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
+import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
 
 public class ParserParams {
 
     private final ZoneId zoneId;
     private String fieldEventCategory = FIELD_EVENT_CATEGORY;
     private String fieldTimestamp = FIELD_TIMESTAMP;
+    private String fieldTieBreaker = null;
     private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY;
     private List<Object> queryParams = emptyList();
 
@@ -44,6 +45,15 @@ public class ParserParams {
         return this;
     }
 
+    public String fieldTieBreaker() {
+        return fieldTieBreaker;
+    }
+
+    public ParserParams fieldTieBreaker(String fieldTieBreaker) {
+        this.fieldTieBreaker = fieldTieBreaker;
+        return this;
+    }
+
     public String implicitJoinKey() {
         return implicitJoinKey;
     }

+ 26 - 13
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/Join.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.plan.logical;
 import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
 import org.elasticsearch.xpack.ql.capabilities.Resolvables;
 import org.elasticsearch.xpack.ql.expression.Attribute;
+import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.Source;
@@ -25,17 +26,19 @@ public class Join extends LogicalPlan {
 
     private final List<KeyedFilter> queries;
     private final KeyedFilter until;
-    private final Attribute timestampField;
+    private final Attribute timestamp;
+    private final Attribute tieBreaker;
 
-    public Join(Source source, List<KeyedFilter> queries, KeyedFilter until, Attribute timestampField) {
+    public Join(Source source, List<KeyedFilter> queries, KeyedFilter until, Attribute timestamp, Attribute tieBreaker) {
         super(source, CollectionUtils.combine(queries, until));
         this.queries = queries;
         this.until = until;
-        this.timestampField = timestampField;
+        this.timestamp = timestamp;
+        this.tieBreaker = tieBreaker;
     }
 
-    private Join(Source source, List<LogicalPlan> queries, LogicalPlan until, Attribute timestampField) {
-        this(source, asKeyed(queries), asKeyed(until), timestampField);
+    private Join(Source source, List<LogicalPlan> queries, LogicalPlan until, Attribute timestamp, Attribute tieBreaker) {
+        this(source, asKeyed(queries), asKeyed(until), timestamp, tieBreaker);
     }
 
     static List<KeyedFilter> asKeyed(List<LogicalPlan> list) {
@@ -56,7 +59,7 @@ public class Join extends LogicalPlan {
 
     @Override
     protected NodeInfo<? extends Join> info() {
-        return NodeInfo.create(this, Join::new, queries, until, timestampField);
+        return NodeInfo.create(this, Join::new, queries, until, timestamp, tieBreaker);
     }
 
     @Override
@@ -65,13 +68,18 @@ public class Join extends LogicalPlan {
             throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
         }
         int lastIndex = newChildren.size() - 1;
-        return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestampField);
+        return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestamp, tieBreaker);
     }
 
     @Override
     public List<Attribute> output() {
         List<Attribute> out = new ArrayList<>();
-        out.add(timestampField);
+
+        out.add(timestamp);
+        if (Expressions.isPresent(tieBreaker)) {
+            out.add(tieBreaker);
+        }
+
         for (KeyedFilter query : queries) {
             out.addAll(query.output());
         }
@@ -80,7 +88,7 @@ public class Join extends LogicalPlan {
 
     @Override
     public boolean expressionsResolved() {
-        return timestampField.resolved() && until.resolved() && Resolvables.resolved(queries);
+        return timestamp.resolved() && tieBreaker.resolved() && until.resolved() && Resolvables.resolved(queries);
     }
 
     public List<KeyedFilter> queries() {
@@ -91,13 +99,17 @@ public class Join extends LogicalPlan {
         return until;
     }
 
-    public Attribute timestampField() {
-        return timestampField;
+    public Attribute timestamp() {
+        return timestamp;
+    }
+    
+    public Attribute tieBreaker() {
+        return tieBreaker;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestampField, queries, until);
+        return Objects.hash(timestamp, tieBreaker, queries, until);
     }
 
     @Override
@@ -113,7 +125,8 @@ public class Join extends LogicalPlan {
 
         return Objects.equals(queries, other.queries)
                 && Objects.equals(until, other.until)
-                && Objects.equals(timestampField, other.timestampField);
+                && Objects.equals(timestamp, other.timestamp)
+                && Objects.equals(tieBreaker, other.tieBreaker);
     }
 
     @Override

+ 17 - 10
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/KeyedFilter.java

@@ -24,40 +24,46 @@ import java.util.Objects;
 public class KeyedFilter extends UnaryPlan {
 
     private final List<? extends NamedExpression> keys;
-    private final Attribute timestampField;
+    private final Attribute timestamp;
+    private final Attribute tieBreaker;
 
-    public KeyedFilter(Source source, LogicalPlan child, List<? extends NamedExpression> keys, Attribute timestampField) {
+    public KeyedFilter(Source source, LogicalPlan child, List<? extends NamedExpression> keys, Attribute timestamp, Attribute tieBreaker) {
         super(source, child);
         this.keys = keys;
-        this.timestampField = timestampField;
+        this.timestamp = timestamp;
+        this.tieBreaker = tieBreaker;
     }
 
     @Override
     protected NodeInfo<KeyedFilter> info() {
-        return NodeInfo.create(this, KeyedFilter::new, child(), keys, timestampField);
+        return NodeInfo.create(this, KeyedFilter::new, child(), keys, timestamp, tieBreaker);
     }
 
     @Override
     protected KeyedFilter replaceChild(LogicalPlan newChild) {
-        return new KeyedFilter(source(), newChild, keys, timestampField);
+        return new KeyedFilter(source(), newChild, keys, timestamp, tieBreaker);
     }
     
     public List<? extends NamedExpression> keys() {
         return keys;
     }
 
-    public Attribute timestampField() {
-        return timestampField;
+    public Attribute timestamp() {
+        return timestamp;
+    }
+    
+    public Attribute tieBreaker() {
+        return tieBreaker;
     }
 
     @Override
     public boolean expressionsResolved() {
-        return Resolvables.resolved(keys) && timestampField.resolved();
+        return Resolvables.resolved(keys) && timestamp.resolved() && tieBreaker.resolved();
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(keys, timestampField, child());
+        return Objects.hash(keys, timestamp, tieBreaker, child());
     }
     
     @Override
@@ -72,7 +78,8 @@ public class KeyedFilter extends UnaryPlan {
         KeyedFilter other = (KeyedFilter) obj;
 
         return Objects.equals(keys, other.keys)
-                && Objects.equals(timestampField, other.timestampField)
+                && Objects.equals(timestamp, other.timestamp)
+                && Objects.equals(tieBreaker, other.tieBreaker)
                 && Objects.equals(child(), other.child());
     }
 }

+ 14 - 20
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/logical/Sequence.java

@@ -22,19 +22,21 @@ public class Sequence extends Join {
 
     private final TimeValue maxSpan;
 
-    public Sequence(Source source, List<KeyedFilter> queries, KeyedFilter until, TimeValue maxSpan, Attribute timestampField) {
-        super(source, queries, until, timestampField);
+    public Sequence(Source source, List<KeyedFilter> queries, KeyedFilter until, TimeValue maxSpan, Attribute timestamp,
+                    Attribute tieBreaker) {
+        super(source, queries, until, timestamp, tieBreaker);
         this.maxSpan = maxSpan;
     }
 
-    private Sequence(Source source, List<LogicalPlan> queries, LogicalPlan until, TimeValue maxSpan, Attribute timestampField) {
-        super(source, asKeyed(queries), asKeyed(until), timestampField);
+    private Sequence(Source source, List<LogicalPlan> queries, LogicalPlan until, TimeValue maxSpan, Attribute timestamp,
+                     Attribute tieBreaker) {
+        super(source, asKeyed(queries), asKeyed(until), timestamp, tieBreaker);
         this.maxSpan = maxSpan;
     }
 
     @Override
     protected NodeInfo<Sequence> info() {
-        return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestampField());
+        return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestamp(), tieBreaker());
     }
 
     @Override
@@ -43,7 +45,7 @@ public class Sequence extends Join {
             throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
         }
         int lastIndex = newChildren.size() - 1;
-        return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestampField());
+        return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestamp(), tieBreaker());
     }
 
     public TimeValue maxSpan() {
@@ -52,28 +54,20 @@ public class Sequence extends Join {
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxSpan, timestampField(), queries(), until());
+        return Objects.hash(maxSpan, super.hashCode());
     }
 
     @Override
     public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
+        if (super.equals(obj)) {
+            Sequence other = (Sequence) obj;
+            return Objects.equals(maxSpan, other.maxSpan);
         }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        Sequence other = (Sequence) obj;
-
-        return Objects.equals(maxSpan, other.maxSpan)
-                && Objects.equals(queries(), other.queries())
-                && Objects.equals(until(), other.until())
-                && Objects.equals(timestampField(), other.timestampField());
+        return false;
     }
 
     @Override
     public List<Object> nodeProperties() {
         return singletonList(maxSpan);
     }
-}
+}

+ 23 - 11
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java

@@ -16,52 +16,59 @@ import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.Source;
-import org.elasticsearch.xpack.ql.util.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.xpack.ql.util.CollectionUtils.combine;
 
 public class SequenceExec extends PhysicalPlan {
 
     private final List<List<Attribute>> keys;
     private final Attribute timestamp;
+    private final Attribute tieBreaker;
 
     public SequenceExec(Source source,
                         List<List<Attribute>> keys,
                         List<PhysicalPlan> matches,
                         List<Attribute> untilKeys,
                         PhysicalPlan until,
-                        Attribute timestampField) {
-        this(source, CollectionUtils.combine(matches, until), CollectionUtils.combine(keys, singletonList(untilKeys)), timestampField);
+                        Attribute timestamp,
+                        Attribute tieBreaker) {
+        this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tieBreaker);
     }
 
-    private SequenceExec(Source source, List<PhysicalPlan> children, List<List<Attribute>> keys, Attribute timestampField) {
+    private SequenceExec(Source source, List<PhysicalPlan> children, List<List<Attribute>> keys, Attribute ts, Attribute tb) {
         super(source, children);
         this.keys = keys;
-        this.timestamp = timestampField;
+        this.timestamp = ts;
+        this.tieBreaker = tb;
     }
 
     @Override
     protected NodeInfo<SequenceExec> info() {
-        return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp);
+        return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tieBreaker);
     }
 
     @Override
     public PhysicalPlan replaceChildren(List<PhysicalPlan> newChildren) {
         if (newChildren.size() != children().size()) {
-            throw new EqlIllegalArgumentException("Expected the same number of children [{}] but got [{}]", children().size(), newChildren
-                    .size());
+            throw new EqlIllegalArgumentException("Expected the same number of children [{}] but got [{}]",
+                    children().size(),
+                    newChildren.size());
         }
-        return new SequenceExec(source(), newChildren, keys, timestamp);
+        return new SequenceExec(source(), newChildren, keys, timestamp, tieBreaker);
     }
 
     @Override
     public List<Attribute> output() {
         List<Attribute> attrs = new ArrayList<>();
         attrs.add(timestamp);
+        if (Expressions.isPresent(tieBreaker)) {
+            attrs.add(tieBreaker);
+        }
         for (List<? extends NamedExpression> ne : keys) {
             attrs.addAll(Expressions.asAttributes(ne));
         }
@@ -76,14 +83,18 @@ public class SequenceExec extends PhysicalPlan {
         return timestamp;
     }
 
+    public Attribute tieBreaker() {
+        return tieBreaker;
+    }
+
     @Override
     public void execute(EqlSession session, ActionListener<Results> listener) {
-        new ExecutionManager(session).from(this).execute(listener);
+        new ExecutionManager(session).assemble(keys(), children(), timestamp(), tieBreaker()).execute(listener);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, keys, children());
+        return Objects.hash(timestamp, tieBreaker, keys, children());
     }
 
     @Override
@@ -98,6 +109,7 @@ public class SequenceExec extends PhysicalPlan {
 
         SequenceExec other = (SequenceExec) obj;
         return Objects.equals(timestamp, other.timestamp)
+                && Objects.equals(tieBreaker, other.tieBreaker)
                 && Objects.equals(children(), other.children())
                 && Objects.equals(keys, other.keys);
     }

+ 2 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java

@@ -72,7 +72,8 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
                         matches,
                         Expressions.asAttributes(s.until().keys()),
                         map(s.until().child()),
-                        s.timestampField());
+                        s.timestamp(),
+                        s.tieBreaker());
             }
 
             if (p instanceof LocalRelation) {

+ 1 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

@@ -67,6 +67,7 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
         ParserParams params = new ParserParams(zoneId)
             .fieldEventCategory(request.eventCategoryField())
             .fieldTimestamp(request.timestampField())
+            .fieldTieBreaker(request.tieBreakerField())
             .implicitJoinKey(request.implicitJoinKeyField());
 
         EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),

+ 3 - 1
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java

@@ -41,6 +41,7 @@ public class SequenceRuntimeTests extends ESTestCase {
     private final boolean hasKeys;
     private final List<HitExtractor> keyExtractors;
     private final HitExtractor tsExtractor;
+    private final HitExtractor tbExtractor;
 
     abstract static class EmptyHitExtractor implements HitExtractor {
         @Override
@@ -78,7 +79,7 @@ public class SequenceRuntimeTests extends ESTestCase {
         private final int ordinal;
 
         TestCriterion(int ordinal) {
-            super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor);
+            super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor);
             this.ordinal = ordinal;
         }
 
@@ -162,6 +163,7 @@ public class SequenceRuntimeTests extends ESTestCase {
         this.hasKeys = spec.hasKeys;
         this.keyExtractors = hasKeys ? singletonList(new KeyExtractor()) : emptyList();
         this.tsExtractor = TimestampExtractor.INSTANCE;
+        this.tbExtractor = null;
     }
 
     @ParametersFactory(shuffle = false, argumentFormatting = "%0$s")

+ 68 - 0
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/expression/EmptyAttribute.java

@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ql.expression;
+
+import org.elasticsearch.xpack.ql.tree.NodeInfo;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+import org.elasticsearch.xpack.ql.util.StringUtils;
+
+/**
+ * Marker for optional attributes. Acting as a dummy placeholder to avoid using null
+ * in the tree (which is not allowed).
+ */
+public class EmptyAttribute extends Attribute {
+
+    public EmptyAttribute(Source source) {
+        super(source, StringUtils.EMPTY, null, null);
+    }
+
+    @Override
+    protected Attribute clone(Source source, String name, DataType type, String qualifier, Nullability nullability, NameId id,
+                              boolean synthetic) {
+        return this;
+    }
+
+    @Override
+    protected String label() {
+        return "e";
+    }
+
+    @Override
+    public boolean resolved() {
+        return true;
+    }
+
+    @Override
+    public DataType dataType() {
+        return DataTypes.NULL;
+    }
+
+    @Override
+    protected NodeInfo<? extends Expression> info() {
+        return NodeInfo.create(this);
+    }
+
+    @Override
+    public int hashCode() {
+        return EmptyAttribute.class.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        return true;
+    }
+}

+ 4 - 0
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/expression/Expressions.java

@@ -152,6 +152,10 @@ public final class Expressions {
         return null;
     }
 
+    public static boolean isPresent(NamedExpression e) {
+        return e instanceof EmptyAttribute == false;
+    }
+
     public static boolean equalsAsAttribute(Expression left, Expression right) {
         if (!left.semanticEquals(right)) {
             Attribute l = attribute(left);