Browse Source

[8.19] Don't build can_match queries we can't push to data nodes (#130210) (#130386)

* Don't build can_match queries we can't push to data nodes (#130210)

Passes the minimum transport version down to expressions when
we convert them into queries that we'll use for can_match.
Right now all this is used for is skipping the can_match from
the wildcard like queries. The queries we make there aren't
serializable. We'll fix that - but this should give us the
levers that we need to do it in a backwards incompatible way.

* Fix for backport

* don't be backwards
Nik Everett 3 months ago
parent
commit
b1cf59be71

+ 4 - 0
server/src/main/java/org/elasticsearch/index/query/AutomatonQueryBuilder.java

@@ -48,6 +48,10 @@ public class AutomatonQueryBuilder extends AbstractQueryBuilder<AutomatonQueryBu
         return fieldName;
     }
 
+    public String description() {
+        return description;
+    }
+
     @Override
     public String getWriteableName() {
         throw new UnsupportedOperationException("AutomatonQueryBuilder does not support getWriteableName");

+ 170 - 12
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -29,6 +29,7 @@ import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,11 +38,15 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
 import static org.elasticsearch.xpack.esql.ccq.Clusters.REMOTE_CLUSTER_NAME;
 import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 
 @ThreadLeakFilters(filters = TestClustersThreadFilter.class)
 public class MultiClustersIT extends ESRestTestCase {
@@ -129,7 +134,7 @@ public class MultiClustersIT extends ESRestTestCase {
     }
 
     private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
-        var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
+        var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query).profile(true);
         if (includeCCSMetadata) {
             queryBuilder.includeCCSMetadata(true);
         }
@@ -158,12 +163,51 @@ public class MultiClustersIT extends ESRestTestCase {
         }
     }
 
+    private <C, V> void assertResultMapForLike(
+        boolean includeCCSMetadata,
+        Map<String, Object> result,
+        C columns,
+        V values,
+        boolean remoteOnly,
+        boolean requireLikeListCapability
+    ) throws IOException {
+        List<String> requiredCapabilities = new ArrayList<>(List.of("like_on_index_fields"));
+        if (requireLikeListCapability) {
+            requiredCapabilities.add("like_list_on_index_fields");
+        }
+        // the feature is completely supported if both local and remote clusters support it
+        boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false);
+        try (RestClient remoteClient = remoteClusterClient()) {
+            isSupported = isSupported
+                && clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
+        }
+
+        if (isSupported) {
+            assertResultMap(includeCCSMetadata, result, columns, values, remoteOnly);
+        } else {
+            logger.info("-->  skipping data check for like index test, cluster does not support like index feature");
+            // just verify that we did not get a partial result
+            var clusters = result.get("_clusters");
+            var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
+            assertThat(reason, result.get("is_partial"), anyOf(nullValue(), is(false)));
+        }
+    }
+
+    private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities) throws IOException {
+        boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false);
+        try (RestClient remoteClient = remoteClusterClient()) {
+            isSupported = isSupported
+                && clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
+        }
+        return isSupported;
+    }
+
     private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
         MapMatcher mapMatcher = getResultMatcher(
             ccsMetadataAvailable(),
             result.containsKey("is_partial"),
             result.containsKey("documents_found")
-        );
+        ).extraOk();
         if (includeCCSMetadata) {
             mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
         }
@@ -251,11 +295,13 @@ public class MultiClustersIT extends ESRestTestCase {
 
         @SuppressWarnings("unchecked")
         Map<String, Object> remoteClusterShards = (Map<String, Object>) remoteCluster.get("_shards");
-        assertThat(remoteClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
-        assertThat((Integer) remoteClusterShards.get("total"), greaterThanOrEqualTo(0));
-        assertThat((Integer) remoteClusterShards.get("successful"), equalTo((Integer) remoteClusterShards.get("total")));
-        assertThat((Integer) remoteClusterShards.get("skipped"), equalTo(0));
-        assertThat((Integer) remoteClusterShards.get("failed"), equalTo(0));
+        assertThat(
+            remoteClusterShards,
+            matchesMap().entry("total", greaterThanOrEqualTo(0))
+                .entry("successful", remoteClusterShards.get("total"))
+                .entry("skipped", greaterThanOrEqualTo(0))
+                .entry("failed", 0)
+        );
 
         if (remoteOnly == false) {
             @SuppressWarnings("unchecked")
@@ -267,11 +313,13 @@ public class MultiClustersIT extends ESRestTestCase {
 
             @SuppressWarnings("unchecked")
             Map<String, Object> localClusterShards = (Map<String, Object>) localCluster.get("_shards");
-            assertThat(localClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
-            assertThat((Integer) localClusterShards.get("total"), greaterThanOrEqualTo(0));
-            assertThat((Integer) localClusterShards.get("successful"), equalTo((Integer) localClusterShards.get("total")));
-            assertThat((Integer) localClusterShards.get("skipped"), equalTo(0));
-            assertThat((Integer) localClusterShards.get("failed"), equalTo(0));
+            assertThat(
+                localClusterShards,
+                matchesMap().entry("total", greaterThanOrEqualTo(0))
+                    .entry("successful", localClusterShards.get("total"))
+                    .entry("skipped", greaterThanOrEqualTo(0))
+                    .entry("failed", 0)
+            );
         }
     }
 
@@ -371,6 +419,116 @@ public class MultiClustersIT extends ESRestTestCase {
         assertThat(clusterData, hasKey("took"));
     }
 
+    public void testLikeIndex() throws Exception {
+
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index LIKE "*remote*"
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
+    }
+
+    public void testNotLikeIndex() throws Exception {
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index NOT LIKE "*remote*"
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(localDocs.size(), localIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
+    }
+
+    public void testLikeListIndex() throws Exception {
+        List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
+        // the feature is completely supported if both local and remote clusters support it
+        if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
+            logger.info("-->  skipping testNotLikeListIndex, due to missing capability");
+            return;
+        }
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index LIKE ("*remote*", "not-exist*")
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
+    }
+
+    public void testNotLikeListIndex() throws Exception {
+        List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
+        // the feature is completely supported if both local and remote clusters support it
+        if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
+            logger.info("-->  skipping testNotLikeListIndex, due to missing capability");
+            return;
+        }
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index NOT LIKE ("*remote*", "not-exist*")
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(localDocs.size(), localIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
+    }
+
+    public void testNotLikeListKeyWord() throws Exception {
+        List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
+        // the feature is completely supported if both local and remote clusters support it
+        if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
+            logger.info("-->  skipping testNotLikeListIndex, due to missing capability");
+            return;
+        }
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE color NOT LIKE ("*blue*", "*red*")
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(localDocs.size(), localIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
+    }
+
+    public void testRLikeIndex() throws Exception {
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index RLIKE ".*remote.*"
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
+    }
+
+    public void testNotRLikeIndex() throws Exception {
+        boolean includeCCSMetadata = includeCCSMetadata();
+        Map<String, Object> result = run("""
+            FROM test-local-index,*:test-remote-index METADATA _index
+            | WHERE _index NOT RLIKE ".*remote.*"
+            | STATS c = COUNT(*) BY _index
+            | SORT _index ASC
+            """, includeCCSMetadata);
+        var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
+        var values = List.of(List.of(localDocs.size(), localIndex));
+        assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
+    }
+
     private RestClient remoteClusterClient() throws IOException {
         var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
         return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));

+ 6 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/regex/WildcardLikeList.java

@@ -92,8 +92,12 @@ public class WildcardLikeList extends RegexMatch<WildcardPatternList> {
      */
     @Override
     public Translatable translatable(LucenePushdownPredicates pushdownPredicates) {
-        return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;
-
+        if (pushdownPredicates.minTransportVersion() == null) {
+            return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;
+        } else {
+            // The AutomatonQuery that we use right now isn't serializable.
+            return Translatable.NO;
+        }
     }
 
     /**

+ 58 - 20
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/LucenePushdownPredicates.java

@@ -7,6 +7,9 @@
 
 package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
@@ -30,6 +33,24 @@ import org.elasticsearch.xpack.esql.stats.SearchStats;
  * </ol>
  */
 public interface LucenePushdownPredicates {
+    /**
+     * If we're extracting a query for {@code can_match} then this is the
+     * minimum transport version in the cluster. Otherwise, this is {@code null}.
+     * <p>
+     *     If this is not null {@link Expression}s should not claim to be
+     *     serializable unless their {@link QueryBuilder}
+     *     {@link QueryBuilder#getMinimalSupportedVersion supports} the version.
+     * </p>
+     * <p>
+     *     This is done on the coordinating node <strong>and</strong>. And for
+     *     cross cluster search this is done on the coordinating node on the
+     *     remote cluster. So! We actually <strong>have</strong> the minimum
+     *     cluster transport version.
+     * </p>
+     */
+    @Nullable
+    TransportVersion minTransportVersion();
+
     /**
      * For TEXT fields, we need to check if the field has a subfield of type KEYWORD that can be used instead.
      */
@@ -101,29 +122,41 @@ public interface LucenePushdownPredicates {
      * In particular, it assumes TEXT fields have no exact subfields (underlying keyword field),
      * and that isAggregatable means indexed and has hasDocValues.
      */
-    LucenePushdownPredicates DEFAULT = new LucenePushdownPredicates() {
-        @Override
-        public boolean hasExactSubfield(FieldAttribute attr) {
-            return false;
-        }
+    LucenePushdownPredicates DEFAULT = forCanMatch(null);
 
-        @Override
-        public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
-            // Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
-            return attr.field().isAggregatable();
-        }
+    /**
+     * A {@link LucenePushdownPredicates} for use with the {@code can_match} phase.
+     */
+    static LucenePushdownPredicates forCanMatch(TransportVersion minTransportVersion) {
+        return new LucenePushdownPredicates() {
+            @Override
+            public TransportVersion minTransportVersion() {
+                return minTransportVersion;
+            }
 
-        @Override
-        public boolean isIndexed(FieldAttribute attr) {
-            // TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
-            return attr.field().isAggregatable();
-        }
+            @Override
+            public boolean hasExactSubfield(FieldAttribute attr) {
+                return false;
+            }
 
-        @Override
-        public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
-            return false;
-        }
-    };
+            @Override
+            public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
+                // Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
+                return attr.field().isAggregatable();
+            }
+
+            @Override
+            public boolean isIndexed(FieldAttribute attr) {
+                // TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
+                return attr.field().isAggregatable();
+            }
+
+            @Override
+            public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
+                return false;
+            }
+        };
+    }
 
     /**
      * If we have access to {@link SearchStats} over a collection of shards, we can make more fine-grained decisions about what can be
@@ -133,6 +166,11 @@ public interface LucenePushdownPredicates {
         // TODO: use FieldAttribute#fieldName, otherwise this doesn't apply to field attributes used for union types.
         // C.f. https://github.com/elastic/elasticsearch/issues/128905
         return new LucenePushdownPredicates() {
+            @Override
+            public TransportVersion minTransportVersion() {
+                return null;
+            }
+
             @Override
             public boolean hasExactSubfield(FieldAttribute attr) {
                 return stats.hasExactSubfield(new FieldAttribute.FieldName(attr.name()));

+ 10 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.planner;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
 import org.elasticsearch.common.util.BigArrays;
@@ -187,8 +188,8 @@ public class PlannerUtils {
     /**
      * Extracts a filter that can be used to skip unmatched shards on the coordinator.
      */
-    public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
-        return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
+    public static QueryBuilder canMatchFilter(TransportVersion minTransportVersion, PhysicalPlan plan) {
+        return detectFilter(minTransportVersion, plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
     }
 
     /**
@@ -196,11 +197,14 @@ public class PlannerUtils {
      * We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
      * take care to not use it with TEXT fields.
      */
-    static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
+    static QueryBuilder detectFilter(TransportVersion minTransportVersion, PhysicalPlan plan, Predicate<String> fieldName) {
         // first position is the REST filter, the second the query filter
         final List<QueryBuilder> requestFilters = new ArrayList<>();
+        final LucenePushdownPredicates ctx = LucenePushdownPredicates.forCanMatch(minTransportVersion);
         plan.forEachDown(FragmentExec.class, fe -> {
-            requestFilters.add(fe.esFilter());
+            if (fe.esFilter() != null && minTransportVersion.onOrAfter(fe.esFilter().getMinimalSupportedVersion())) {
+                requestFilters.add(fe.esFilter());
+            }
             // detect filter inside the query
             fe.fragment().forEachUp(Filter.class, f -> {
                 // the only filter that can be pushed down is that on top of the relation
@@ -218,15 +222,13 @@ public class PlannerUtils {
                         // and the expression is pushable (functions can be fully translated)
                         if (matchesField
                             && refsBuilder.isEmpty()
-                            && translatable(exp, LucenePushdownPredicates.DEFAULT).finish() == TranslationAware.FinishedTranslatable.YES) {
+                            && translatable(exp, ctx).finish() == TranslationAware.FinishedTranslatable.YES) {
                             matches.add(exp);
                         }
                     }
                 }
                 if (matches.isEmpty() == false) {
-                    requestFilters.add(
-                        TRANSLATOR_HANDLER.asQuery(LucenePushdownPredicates.DEFAULT, Predicates.combineAnd(matches)).toQueryBuilder()
-                    );
+                    requestFilters.add(TRANSLATOR_HANDLER.asQuery(ctx, Predicates.combineAnd(matches)).toQueryBuilder());
                 }
             });
         });

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -111,7 +111,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
             esqlExecutor,
             parentTask,
             originalIndices,
-            PlannerUtils.canMatchFilter(dataNodePlan),
+            PlannerUtils.canMatchFilter(clusterService.state().getMinTransportVersion(), dataNodePlan),
             clusterAlias,
             configuration.allowPartialResults(),
             maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querydsl/query/SingleValueQuery.java

@@ -224,6 +224,10 @@ public class SingleValueQuery extends Query {
             builder.add(rewrite, BooleanClause.Occur.FILTER);
             return builder.build();
         }
+
+        public String fieldName() {
+            return field;
+        }
     }
 
     /**

+ 20 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/WildcardLikeListTests.java

@@ -11,6 +11,9 @@ import com.carrotsearch.randomizedtesting.annotations.Name;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -22,6 +25,7 @@ import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTe
 import org.elasticsearch.xpack.esql.expression.function.FunctionName;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
 import org.elasticsearch.xpack.esql.expression.function.scalar.string.regex.WildcardLikeList;
+import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -96,4 +100,20 @@ public class WildcardLikeListTests extends AbstractScalarFunctionTestCase {
                 ? new WildcardLikeList(source, expression, wildcardPatternList)
                 : new WildcardLikeList(source, expression, wildcardPatternList, false));
     }
+
+    public void testNotPushableOverCanMatch() {
+        TranslationAware translatable = (TranslationAware) buildFieldExpression(testCase);
+        assertThat(
+            translatable.translatable(LucenePushdownPredicates.forCanMatch(TransportVersion.current())).finish(),
+            equalTo(TranslationAware.FinishedTranslatable.NO)
+        );
+    }
+
+    public void testPushable() {
+        TranslationAware translatable = (TranslationAware) buildFieldExpression(testCase);
+        assertThat(
+            translatable.translatable(LucenePushdownPredicates.from(new EsqlTestUtils.TestSearchStats())).finish(),
+            equalTo(TranslationAware.FinishedTranslatable.YES)
+        );
+    }
 }

+ 49 - 13
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.planner;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -15,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.index.query.AutomatonQueryBuilder;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.test.ESTestCase;
@@ -45,6 +47,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
@@ -65,6 +68,7 @@ public class FilterTests extends ESTestCase {
 
     // use a field that already exists in the mapping
     private static final String EMP_NO = "emp_no";
+    private static final String LAST_NAME = "last_name";
     private static final String OTHER_FIELD = "salary";
 
     private static EsqlParser parser;
@@ -104,7 +108,7 @@ public class FilterTests extends ESTestCase {
             |WHERE {} > 10
             """, OTHER_FIELD), restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         assertEquals(restFilter.toString(), filter.toString());
     }
 
@@ -117,7 +121,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, value);
         var plan = plan(query, null);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var expected = singleValueQuery(query, unscore(rangeQuery(EMP_NO).gt(value)), EMP_NO, ((SingleValueQuery.Builder) filter).source());
         assertEquals(expected.toString(), filter.toString());
     }
@@ -132,7 +136,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, value);
         var plan = plan(query, restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var builder = ((BoolQueryBuilder) filter).filter().get(1);
         var queryFilter = singleValueQuery(
             query,
@@ -155,7 +159,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, lowValue, EMP_NO, highValue);
         var plan = plan(query, restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var musts = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).must();
         var left = singleValueQuery(
             query,
@@ -184,7 +188,7 @@ public class FilterTests extends ESTestCase {
             |WHERE {} > {} OR {} < {}
             """, OTHER_FIELD, lowValue, EMP_NO, highValue), restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var expected = restFilter;
         assertEquals(expected.toString(), filter.toString());
     }
@@ -200,7 +204,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, lowValue, EMP_NO, highValue);
         var plan = plan(query, restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var shoulds = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).should();
         var left = singleValueQuery(
             query,
@@ -231,7 +235,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, lowValue, OTHER_FIELD, eqValue, EMP_NO, highValue);
         var plan = plan(query, restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var musts = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).must();
         var left = singleValueQuery(
             query,
@@ -265,7 +269,7 @@ public class FilterTests extends ESTestCase {
             """, EMP_NO, lowValue, EMP_NO, eqValue, EMP_NO, highValue);
         var plan = plan(query, restFilter);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         var builder = ((BoolQueryBuilder) filter).filter().get(1);
         var queryFilter = singleValueQuery(
             query,
@@ -286,7 +290,7 @@ public class FilterTests extends ESTestCase {
             |WHERE {} > {}
             """, EMP_NO, OTHER_FIELD, EMP_NO, eqValue), null);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         assertThat(filter, nullValue());
     }
 
@@ -298,7 +302,7 @@ public class FilterTests extends ESTestCase {
             |WHERE to_int(to_string({})) == {}
             """, EMP_NO, eqValue), null);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         assertThat(filter, nullValue());
     }
 
@@ -310,10 +314,42 @@ public class FilterTests extends ESTestCase {
             |WHERE to_int(to_string({})) + 987 == {}
             """, EMP_NO, eqValue), null);
 
-        var filter = filterQueryForTransportNodes(plan);
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
         assertThat(filter, nullValue());
     }
 
+    public void testLikeList() {
+        String query = LoggerMessageFormat.format(null, """
+             FROM test
+            |WHERE {} LIKE ("a+", "b+")
+            """, LAST_NAME);
+        var plan = plan(query, null);
+
+        var filter = filterQueryForTransportNodes(TransportVersion.current(), plan);
+        assertNull(filter);
+    }
+
+    /**
+     * Tests that we <strong>can</strong> extract a filter if the transport
+     * version is {@code null}. This isn't run in the "filter for transport nodes"
+     * code path. Instead, it's in the "filter for the local node" path, but
+     * we can get a quick test of that by calling this setup.
+     */
+    public void testLikeListNullTransportVersion() {
+        String query = LoggerMessageFormat.format(null, """
+             FROM test
+            |WHERE {} LIKE ("a+", "b+")
+            """, LAST_NAME);
+        var plan = plan(query, null);
+
+        SingleValueQuery.Builder filter = (SingleValueQuery.Builder) filterQueryForTransportNodes(null, plan);
+        assertEquals(LAST_NAME, filter.fieldName());
+        AutomatonQueryBuilder innerFilter = (AutomatonQueryBuilder) filter.next();
+        assertEquals(LAST_NAME, innerFilter.fieldName());
+        assertEquals("""
+            LIKE("a+", "b+"), caseInsensitive=false""", innerFilter.description());
+    }
+
     /**
      * Ugly hack to create a QueryBuilder for SingleValueQuery.
      * For some reason however the queryName is set to null on range queries when deserializing.
@@ -360,8 +396,8 @@ public class FilterTests extends ESTestCase {
         return unscore(rangeQuery(field).lt("2020-12-34"));
     }
 
-    private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
-        return PlannerUtils.detectFilter(plan, EMP_NO::equals);
+    private QueryBuilder filterQueryForTransportNodes(TransportVersion minTransportVersion, PhysicalPlan plan) {
+        return PlannerUtils.detectFilter(minTransportVersion, plan, Set.of(EMP_NO, LAST_NAME)::contains);
     }
 
     @Override