Explorar el Código

ESQL: introduce a pre-mapping logical plan processing step (#121260)

This adds a pre-mapping logical plan processing step, occurring after the logical optimisation, but before mapping it to a physical plan. This step can perform async actions, if needed, and involves using a new `TransportActionServices` record with all available services.

Furthermore, the query rewriting step part of the `FullTextFunction`s planning (occurring on the coordinator only) is refactored a bit to update the queries in-place.
The verification done by `Match` and `Term` involving checking on the argument type is also now pulled back from post-optimisation to post-analysis. Their respective tests are moved accordingly as well.
Bogdan Pintea hace 8 meses
padre
commit
0393e56fa7
Se han modificado 21 ficheros con 300 adiciones y 310 borrados
  1. 5 0
      docs/changelog/121260.yaml
  2. 10 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java
  3. 14 2
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
  4. 0 30
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java
  5. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchFunctionIT.java
  6. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchOperatorIT.java
  7. 3 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java
  8. 2 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java
  9. 35 30
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java
  10. 95 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/QueryBuilderResolver.java
  11. 21 14
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Term.java
  12. 40 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/premapper/PreMapper.java
  13. 24 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java
  14. 11 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  15. 11 13
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  16. 0 167
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java
  17. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  18. 21 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
  19. 0 32
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
  20. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java
  21. 3 2
      x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java

+ 5 - 0
docs/changelog/121260.yaml

@@ -0,0 +1,5 @@
+pr: 121260
+summary: Introduce a pre-mapping logical plan processing step
+area: ES|QL
+type: enhancement
+issues: []

+ 10 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java

@@ -26,6 +26,16 @@ public class Holder<T> {
         this.value = value;
     }
 
+    /**
+     * Sets a value in the holder, but only if none has already been set.
+     * @param value the new value to set.
+     */
+    public void setIfAbsent(T value) {
+        if (this.value == null) {
+            this.value = value;
+        }
+    }
+
     public T get() {
         return value;
     }

+ 14 - 2
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -11,6 +11,8 @@ import org.apache.lucene.document.InetAddressPoint;
 import org.apache.lucene.sandbox.document.HalfFloatPoint;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -31,9 +33,11 @@ import org.elasticsearch.geo.GeometryTestUtils;
 import org.elasticsearch.geo.ShapeTestUtils;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.search.SearchService;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@@ -72,8 +76,8 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
 import org.elasticsearch.xpack.esql.session.Configuration;
-import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.elasticsearch.xpack.esql.telemetry.Metrics;
 import org.elasticsearch.xpack.versionfield.Version;
@@ -140,6 +144,7 @@ import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassificatio
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
 
 public final class EsqlTestUtils {
 
@@ -360,7 +365,14 @@ public final class EsqlTestUtils {
 
     public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
 
-    public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver();
+    public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
+        mock(TransportService.class),
+        mock(SearchService.class),
+        null,
+        mock(ClusterService.class),
+        mock(IndexNameExpressionResolver.class),
+        null
+    );
 
     private EsqlTestUtils() {}
 

+ 0 - 30
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java

@@ -1,30 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
-import org.elasticsearch.xpack.esql.session.Result;
-
-import java.util.function.BiConsumer;
-
-public class MockQueryBuilderResolver extends QueryBuilderResolver {
-    public MockQueryBuilderResolver() {
-        super(null, null, null, null);
-    }
-
-    @Override
-    public void resolveQueryBuilders(
-        LogicalPlan plan,
-        ActionListener<Result> listener,
-        BiConsumer<LogicalPlan, ActionListener<Result>> callback
-    ) {
-        callback.accept(plan, listener);
-    }
-}

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchFunctionIT.java

@@ -246,7 +246,7 @@ public class MatchFunctionIT extends AbstractEsqlIntegTestCase {
         var error = expectThrows(ElasticsearchException.class, () -> run(query));
         assertThat(
             error.getMessage(),
-            containsString("[MATCH] function cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
+            containsString("line 2:15: [MATCH] function cannot operate on [content], which is not a field from an index mapping")
         );
     }
 

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchOperatorIT.java

@@ -230,7 +230,7 @@ public class MatchOperatorIT extends AbstractEsqlIntegTestCase {
         var error = expectThrows(ElasticsearchException.class, () -> run(query));
         assertThat(
             error.getMessage(),
-            containsString("[:] operator cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
+            containsString("line 2:9: [:] operator cannot operate on [content], which is not a field from an index mapping")
         );
     }
 

+ 3 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

@@ -21,10 +21,10 @@ import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
+import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.session.EsqlSession;
 import org.elasticsearch.xpack.esql.session.IndexResolver;
-import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
 import org.elasticsearch.xpack.esql.session.Result;
 import org.elasticsearch.xpack.esql.telemetry.Metrics;
 import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
@@ -62,7 +62,7 @@ public class PlanExecutor {
         EsqlExecutionInfo executionInfo,
         IndicesExpressionGrouper indicesExpressionGrouper,
         EsqlSession.PlanRunner planRunner,
-        QueryBuilderResolver queryBuilderResolver,
+        TransportActionServices services,
         ActionListener<Result> listener
     ) {
         final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
@@ -78,7 +78,7 @@ public class PlanExecutor {
             verifier,
             planTelemetry,
             indicesExpressionGrouper,
-            queryBuilderResolver
+            services
         );
         QueryMetric clientId = QueryMetric.fromString("rest");
         metrics.total(clientId);

+ 2 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java

@@ -7,7 +7,7 @@
 
 package org.elasticsearch.xpack.esql.expression.function.fulltext;
 
-import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.lucene.BytesRefs;
 import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator;
 import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator.ShardConfig;
 import org.elasticsearch.compute.operator.EvalOperator;
@@ -110,11 +110,7 @@ public abstract class FullTextFunction extends Function implements TranslationAw
      */
     public Object queryAsObject() {
         Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */);
-        if (queryAsObject instanceof BytesRef bytesRef) {
-            return bytesRef.utf8ToString();
-        }
-
-        return queryAsObject;
+        return BytesRefs.toString(queryAsObject);
     }
 
     @Override

+ 35 - 30
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java

@@ -14,7 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.unit.Fuzziness;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
+import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
 import org.elasticsearch.xpack.esql.common.Failure;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -30,6 +30,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.DataTypeConverter;
 import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
+import org.elasticsearch.xpack.esql.core.util.Check;
 import org.elasticsearch.xpack.esql.core.util.NumericUtils;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
@@ -38,6 +39,7 @@ import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
 import org.elasticsearch.xpack.esql.querydsl.query.MatchQuery;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
@@ -48,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import static java.util.Map.entry;
 import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
@@ -88,7 +91,7 @@ import static org.elasticsearch.xpack.esql.expression.predicate.operator.compari
 /**
  * Full text function that performs a {@link org.elasticsearch.xpack.esql.querydsl.query.MatchQuery} .
  */
-public class Match extends FullTextFunction implements OptionalArgument, PostOptimizationVerificationAware {
+public class Match extends FullTextFunction implements OptionalArgument, PostAnalysisPlanVerificationAware {
 
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Match", Match::readFrom);
     public static final Set<DataType> FIELD_DATA_TYPES = Set.of(
@@ -429,23 +432,23 @@ public class Match extends FullTextFunction implements OptionalArgument, PostOpt
     }
 
     @Override
-    public void postOptimizationVerification(Failures failures) {
-        Expression fieldExpression = field();
-        // Field may be converted to other data type (field_name :: data_type), so we need to check the original field
-        if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
-            fieldExpression = convertFunction.field();
-        }
-        if (fieldExpression instanceof FieldAttribute == false) {
-            failures.add(
-                Failure.fail(
-                    field,
-                    "[{}] {} cannot operate on [{}], which is not a field from an index mapping",
-                    functionName(),
-                    functionType(),
-                    field.sourceText()
-                )
-            );
-        }
+    public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
+        return (plan, failures) -> {
+            super.postAnalysisPlanVerification().accept(plan, failures);
+            plan.forEachExpression(Match.class, m -> {
+                if (m.fieldAsFieldAttribute() == null) {
+                    failures.add(
+                        Failure.fail(
+                            m.field(),
+                            "[{}] {} cannot operate on [{}], which is not a field from an index mapping",
+                            functionName(),
+                            functionType(),
+                            m.field().sourceText()
+                        )
+                    );
+                }
+            });
+        };
     }
 
     @Override
@@ -476,22 +479,24 @@ public class Match extends FullTextFunction implements OptionalArgument, PostOpt
 
     @Override
     protected Query translate(TranslatorHandler handler) {
+        var fieldAttribute = fieldAsFieldAttribute();
+        Check.notNull(fieldAttribute, "Match must have a field attribute as the first argument");
+        String fieldName = fieldAttribute.name();
+        if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
+            // If we have multiple field types, we allow the query to be done, but getting the underlying field name
+            fieldName = multiTypeEsField.getName();
+        }
+        // Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
+        return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
+    }
+
+    private FieldAttribute fieldAsFieldAttribute() {
         Expression fieldExpression = field;
         // Field may be converted to other data type (field_name :: data_type), so we need to check the original field
         if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
             fieldExpression = convertFunction.field();
         }
-        if (fieldExpression instanceof FieldAttribute fieldAttribute) {
-            String fieldName = fieldAttribute.name();
-            if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
-                // If we have multiple field types, we allow the query to be done, but getting the underlying field name
-                fieldName = multiTypeEsField.getName();
-            }
-            // Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
-            return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
-        }
-
-        throw new IllegalArgumentException("Match must have a field attribute as the first argument");
+        return fieldExpression instanceof FieldAttribute fieldAttribute ? fieldAttribute : null;
     }
 
     @Override

+ 95 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/QueryBuilderResolver.java

@@ -0,0 +1,95 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.fulltext;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ResolvedIndices;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryRewriteContext;
+import org.elasticsearch.index.query.Rewriteable;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
+import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
+import org.elasticsearch.xpack.esql.session.IndexResolver;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
+ * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
+ * {@link QueryBuilderResolver#resolveQueryBuilders(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
+ * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
+ */
+public final class QueryBuilderResolver {
+
+    private QueryBuilderResolver() {}
+
+    public static void resolveQueryBuilders(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
+        var hasFullTextFunctions = plan.anyMatch(p -> {
+            Holder<Boolean> hasFullTextFunction = new Holder<>(false);
+            p.forEachExpression(FullTextFunction.class, unused -> hasFullTextFunction.set(true));
+            return hasFullTextFunction.get();
+        });
+        if (hasFullTextFunctions) {
+            Rewriteable.rewriteAndFetch(
+                new FullTextFunctionsRewritable(plan),
+                queryRewriteContext(services, indexNames(plan)),
+                listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
+            );
+        } else {
+            listener.onResponse(plan);
+        }
+    }
+
+    private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
+        ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
+            indexNames.toArray(String[]::new),
+            IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
+            services.clusterService().state(),
+            services.indexNameExpressionResolver(),
+            services.transportService().getRemoteClusterService(),
+            System.currentTimeMillis()
+        );
+
+        return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
+    }
+
+    private static Set<String> indexNames(LogicalPlan plan) {
+        Set<String> indexNames = new HashSet<>();
+        plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices()));
+        return indexNames;
+    }
+
+    private record FullTextFunctionsRewritable(LogicalPlan plan) implements Rewriteable<QueryBuilderResolver.FullTextFunctionsRewritable> {
+        @Override
+        public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
+            Holder<IOException> exceptionHolder = new Holder<>();
+            Holder<Boolean> updated = new Holder<>(false);
+            LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
+                QueryBuilder builder = f.queryBuilder(), initial = builder;
+                builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder;
+                try {
+                    builder = builder.rewrite(ctx);
+                } catch (IOException e) {
+                    exceptionHolder.setIfAbsent(e);
+                }
+                var rewritten = builder != initial;
+                updated.set(updated.get() || rewritten);
+                return rewritten ? f.replaceQueryBuilder(builder) : f;
+            });
+            if (exceptionHolder.get() != null) {
+                throw exceptionHolder.get();
+            }
+            return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
+        }
+    }
+}

+ 21 - 14
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Term.java

@@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
+import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
 import org.elasticsearch.xpack.esql.common.Failure;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -26,10 +26,12 @@ import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.BiConsumer;
 
 import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
 import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
@@ -39,7 +41,7 @@ import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isStr
 /**
  * Full text function that performs a {@link TermQuery} .
  */
-public class Term extends FullTextFunction implements PostOptimizationVerificationAware {
+public class Term extends FullTextFunction implements PostAnalysisPlanVerificationAware {
 
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Term", Term::readFrom);
 
@@ -104,18 +106,23 @@ public class Term extends FullTextFunction implements PostOptimizationVerificati
     }
 
     @Override
-    public void postOptimizationVerification(Failures failures) {
-        if (field instanceof FieldAttribute == false) {
-            failures.add(
-                Failure.fail(
-                    field,
-                    "[{}] {} cannot operate on [{}], which is not a field from an index mapping",
-                    functionName(),
-                    functionType(),
-                    field.sourceText()
-                )
-            );
-        }
+    public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
+        return (plan, failures) -> {
+            super.postAnalysisPlanVerification().accept(plan, failures);
+            plan.forEachExpression(Term.class, t -> {
+                if (t.field() instanceof FieldAttribute == false) { // TODO: is a conversion possible, similar to Match's case?
+                    failures.add(
+                        Failure.fail(
+                            t.field(),
+                            "[{}] {} cannot operate on [{}], which is not a field from an index mapping",
+                            t.functionName(),
+                            t.functionType(),
+                            t.field().sourceText()
+                        )
+                    );
+                }
+            });
+        };
     }
 
     @Override

+ 40 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/premapper/PreMapper.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner.premapper;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
+
+/**
+ * The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
+ * before this is being mapped to a physical one.
+ */
+public class PreMapper {
+
+    private final TransportActionServices services;
+
+    public PreMapper(TransportActionServices services) {
+        this.services = services;
+    }
+
+    /**
+     * Invokes any premapping steps that need to be applied to the logical plan, before this is being mapped to a physical one.
+     */
+    public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
+        queryRewrite(plan, listener.delegateFailureAndWrap((l, p) -> {
+            p.setOptimized();
+            l.onResponse(p);
+        }));
+    }
+
+    private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
+        QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
+    }
+}

+ 24 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plugin;
+
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.compute.operator.exchange.ExchangeService;
+import org.elasticsearch.search.SearchService;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.usage.UsageService;
+
+public record TransportActionServices(
+    TransportService transportService,
+    SearchService searchService,
+    ExchangeService exchangeService,
+    ClusterService clusterService,
+    IndexNameExpressionResolver indexNameExpressionResolver,
+    UsageService usageService
+) {}

+ 11 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -53,7 +53,6 @@ import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner;
-import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
 import org.elasticsearch.xpack.esql.session.Result;
 
 import java.io.IOException;
@@ -81,8 +80,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     private final LookupFromIndexService lookupFromIndexService;
     private final AsyncTaskManagementService<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> asyncTaskManagementService;
     private final RemoteClusterService remoteClusterService;
-    private final QueryBuilderResolver queryBuilderResolver;
     private final UsageService usageService;
+    private final TransportActionServices services;
     // Listeners for active async queries, key being the async task execution ID
     private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();
 
@@ -153,8 +152,16 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             bigArrays
         );
         this.remoteClusterService = transportService.getRemoteClusterService();
-        this.queryBuilderResolver = new QueryBuilderResolver(searchService, clusterService, transportService, indexNameExpressionResolver);
         this.usageService = usageService;
+
+        this.services = new TransportActionServices(
+            transportService,
+            searchService,
+            exchangeService,
+            clusterService,
+            indexNameExpressionResolver,
+            usageService
+        );
     }
 
     @Override
@@ -258,7 +265,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             executionInfo,
             remoteClusterService,
             planRunner,
-            queryBuilderResolver,
+            services,
             ActionListener.wrap(result -> {
                 recordCCSTelemetry(task, executionInfo, request, null);
                 listener.onResponse(toResponse(task, request, configuration, result));

+ 11 - 13
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -73,6 +73,8 @@ import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
+import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
+import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
 import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
 
 import java.util.ArrayList;
@@ -109,12 +111,12 @@ public class EsqlSession {
     private final Verifier verifier;
     private final EsqlFunctionRegistry functionRegistry;
     private final LogicalPlanOptimizer logicalPlanOptimizer;
+    private final PreMapper preMapper;
 
     private final Mapper mapper;
     private final PhysicalPlanOptimizer physicalPlanOptimizer;
     private final PlanTelemetry planTelemetry;
     private final IndicesExpressionGrouper indicesExpressionGrouper;
-    private final QueryBuilderResolver queryBuilderResolver;
 
     public EsqlSession(
         String sessionId,
@@ -128,7 +130,7 @@ public class EsqlSession {
         Verifier verifier,
         PlanTelemetry planTelemetry,
         IndicesExpressionGrouper indicesExpressionGrouper,
-        QueryBuilderResolver queryBuilderResolver
+        TransportActionServices services
     ) {
         this.sessionId = sessionId;
         this.configuration = configuration;
@@ -142,7 +144,7 @@ public class EsqlSession {
         this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
         this.planTelemetry = planTelemetry;
         this.indicesExpressionGrouper = indicesExpressionGrouper;
-        this.queryBuilderResolver = queryBuilderResolver;
+        this.preMapper = new PreMapper(services);
     }
 
     public String sessionId() {
@@ -162,16 +164,12 @@ public class EsqlSession {
             new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
                 @Override
                 public void onResponse(LogicalPlan analyzedPlan) {
-                    try {
-                        var optimizedPlan = optimizedPlan(analyzedPlan);
-                        queryBuilderResolver.resolveQueryBuilders(
-                            optimizedPlan,
-                            listener,
-                            (newPlan, next) -> executeOptimizedPlan(request, executionInfo, planRunner, newPlan, next)
-                        );
-                    } catch (Exception e) {
-                        listener.onFailure(e);
-                    }
+                    preMapper.preMapper(
+                        analyzedPlan,
+                        listener.delegateFailureAndWrap(
+                            (l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(p), l)
+                        )
+                    );
                 }
             }
         );

+ 0 - 167
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java

@@ -1,167 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.session;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ResolvedIndices;
-import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryRewriteContext;
-import org.elasticsearch.index.query.Rewriteable;
-import org.elasticsearch.search.SearchService;
-import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.esql.core.util.Holder;
-import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
-import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;
-
-/**
- * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
- * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
- * {@link QueryBuilderResolver#resolveQueryBuilders(LogicalPlan, ActionListener, BiConsumer)} will rewrite the plan by replacing
- * {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
- */
-public class QueryBuilderResolver {
-    private final SearchService searchService;
-    private final ClusterService clusterService;
-    private final TransportService transportService;
-    private final IndexNameExpressionResolver indexNameExpressionResolver;
-
-    public QueryBuilderResolver(
-        SearchService searchService,
-        ClusterService clusterService,
-        TransportService transportService,
-        IndexNameExpressionResolver indexNameExpressionResolver
-    ) {
-        this.searchService = searchService;
-        this.clusterService = clusterService;
-        this.transportService = transportService;
-        this.indexNameExpressionResolver = indexNameExpressionResolver;
-    }
-
-    public void resolveQueryBuilders(
-        LogicalPlan plan,
-        ActionListener<Result> listener,
-        BiConsumer<LogicalPlan, ActionListener<Result>> callback
-    ) {
-        if (plan.optimized() == false) {
-            listener.onFailure(new IllegalStateException("Expected optimized plan before query builder rewrite."));
-            return;
-        }
-
-        Set<FullTextFunction> unresolved = fullTextFunctions(plan);
-        Set<String> indexNames = indexNames(plan);
-
-        if (indexNames == null || indexNames.isEmpty() || unresolved.isEmpty()) {
-            callback.accept(plan, listener);
-            return;
-        }
-        QueryRewriteContext ctx = queryRewriteContext(indexNames);
-        FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved);
-        Rewriteable.rewriteAndFetch(rewritable, ctx, new ActionListener<FullTextFunctionsRewritable>() {
-            @Override
-            public void onResponse(FullTextFunctionsRewritable fullTextFunctionsRewritable) {
-                try {
-                    LogicalPlan newPlan = planWithResolvedQueryBuilders(plan, fullTextFunctionsRewritable.results());
-                    callback.accept(newPlan, listener);
-                } catch (Exception e) {
-                    onFailure(e);
-                }
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
-    }
-
-    private Set<FullTextFunction> fullTextFunctions(LogicalPlan plan) {
-        Set<FullTextFunction> functions = new HashSet<>();
-        plan.forEachExpressionDown(FullTextFunction.class, func -> functions.add(func));
-        return functions;
-    }
-
-    public Set<String> indexNames(LogicalPlan plan) {
-        Holder<Set<String>> indexNames = new Holder<>();
-        plan.forEachDown(EsRelation.class, esRelation -> indexNames.set(esRelation.concreteIndices()));
-        return indexNames.get();
-    }
-
-    public LogicalPlan planWithResolvedQueryBuilders(LogicalPlan plan, Map<FullTextFunction, QueryBuilder> newQueryBuilders) {
-        LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> {
-            if (newQueryBuilders.keySet().contains(m)) {
-                return m.replaceQueryBuilder(newQueryBuilders.get(m));
-            }
-            return m;
-        });
-        // The given plan was already analyzed and optimized, so we set the resulted plan to optimized as well.
-        newPlan.setOptimized();
-        return newPlan;
-    }
-
-    private QueryRewriteContext queryRewriteContext(Set<String> indexNames) {
-        ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
-            indexNames.toArray(String[]::new),
-            IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
-            clusterService.state(),
-            indexNameExpressionResolver,
-            transportService.getRemoteClusterService(),
-            System.currentTimeMillis()
-        );
-
-        return searchService.getRewriteContext(() -> System.currentTimeMillis(), resolvedIndices, null);
-    }
-
-    private class FullTextFunctionsRewritable implements Rewriteable<FullTextFunctionsRewritable> {
-
-        private final Map<FullTextFunction, QueryBuilder> queryBuilderMap;
-
-        FullTextFunctionsRewritable(Map<FullTextFunction, QueryBuilder> queryBuilderMap) {
-            this.queryBuilderMap = queryBuilderMap;
-        }
-
-        FullTextFunctionsRewritable(Set<FullTextFunction> functions) {
-            this.queryBuilderMap = new HashMap<>();
-
-            for (FullTextFunction func : functions) {
-                queryBuilderMap.put(func, TRANSLATOR_HANDLER.asQuery(func).asBuilder());
-            }
-        }
-
-        @Override
-        public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
-            Map<FullTextFunction, QueryBuilder> results = new HashMap<>();
-
-            boolean hasChanged = false;
-            for (var entry : queryBuilderMap.entrySet()) {
-                var initial = entry.getValue();
-                var rewritten = initial.rewrite(ctx);
-                hasChanged |= rewritten != initial;
-
-                results.put(entry.getKey(), rewritten);
-            }
-
-            return hasChanged ? new FullTextFunctionsRewritable(results) : this;
-        }
-
-        public Map<FullTextFunction, QueryBuilder> results() {
-            return queryBuilderMap;
-        }
-    }
-}

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -516,7 +516,7 @@ public class CsvTests extends ESTestCase {
             TEST_VERIFIER,
             new PlanTelemetry(functionRegistry),
             null,
-            EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER
+            EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES
         );
         TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);
 

+ 21 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

@@ -1186,9 +1186,9 @@ public class VerifierTests extends ESTestCase {
 
     public void testMatchInsideEval() throws Exception {
         assumeTrue("Match operator is available just for snapshots", Build.current().isSnapshot());
-
         assertEquals(
-            "1:36: [:] operator is only supported in WHERE commands",
+            "1:36: [:] operator is only supported in WHERE commands\n"
+                + "line 1:36: [:] operator cannot operate on [title], which is not a field from an index mapping",
             error("row title = \"brown fox\" | eval x = title:\"fox\" ")
         );
     }
@@ -1217,6 +1217,25 @@ public class VerifierTests extends ESTestCase {
         assertEquals("1:24: [:] operator cannot be used after LIMIT", error("from test | limit 10 | where first_name : \"Anna\""));
     }
 
+    // These should pass eventually once we lift some restrictions on match function
+    public void testMatchWithNonIndexedColumnCurrentlyUnsupported() {
+        assertEquals(
+            "1:67: [MATCH] function cannot operate on [initial], which is not a field from an index mapping",
+            error("from test | eval initial = substring(first_name, 1) | where match(initial, \"A\")")
+        );
+        assertEquals(
+            "1:67: [MATCH] function cannot operate on [text], which is not a field from an index mapping",
+            error("from test | eval text=concat(first_name, last_name) | where match(text, \"cat\")")
+        );
+    }
+
+    public void testMatchFunctionIsNotNullable() {
+        assertEquals(
+            "1:48: [MATCH] function cannot operate on [text::keyword], which is not a field from an index mapping",
+            error("row n = null | eval text = n + 5 | where match(text::keyword, \"Anna\")")
+        );
+    }
+
     public void testQueryStringFunctionsNotAllowedAfterCommands() throws Exception {
         // Source commands
         assertEquals("1:13: [QSTR] function cannot be used after SHOW", error("show info | where qstr(\"8.16.0\")"));

+ 0 - 32
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -7240,38 +7240,6 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertEquals("1:60: argument of [to_timeduration(x)] must be a constant, received [x]", e.getMessage().substring(header.length()));
     }
 
-    // These should pass eventually once we lift some restrictions on match function
-    public void testMatchWithNonIndexedColumnCurrentlyUnsupported() {
-        final String header = "Found 1 problem\nline ";
-        VerificationException e = expectThrows(VerificationException.class, () -> plan("""
-            from test | eval initial = substring(first_name, 1) | where match(initial, "A")"""));
-        assertTrue(e.getMessage().startsWith("Found "));
-        assertEquals(
-            "1:67: [MATCH] function cannot operate on [initial], which is not a field from an index mapping",
-            e.getMessage().substring(header.length())
-        );
-
-        e = expectThrows(VerificationException.class, () -> plan("""
-            from test | eval text=concat(first_name, last_name) | where match(text, "cat")"""));
-        assertTrue(e.getMessage().startsWith("Found "));
-        assertEquals(
-            "1:67: [MATCH] function cannot operate on [text], which is not a field from an index mapping",
-            e.getMessage().substring(header.length())
-        );
-    }
-
-    public void testMatchFunctionIsNotNullable() {
-        String queryText = """
-            row n = null | eval text = n + 5 | where match(text::keyword, "Anna")
-            """;
-
-        VerificationException ve = expectThrows(VerificationException.class, () -> plan(queryText));
-        assertThat(
-            ve.getMessage(),
-            containsString("[MATCH] function cannot operate on [text::keyword], which is not a field from an index mapping")
-        );
-    }
-
     public void testWhereNull() {
         var plan = plan("""
             from test

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java

@@ -125,7 +125,7 @@ public class PlanExecutorMetricsTests extends ESTestCase {
             new EsqlExecutionInfo(randomBoolean()),
             groupIndicesByCluster,
             runPhase,
-            EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER,
+            EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES,
             new ActionListener<>() {
                 @Override
                 public void onResponse(Result result) {
@@ -156,7 +156,7 @@ public class PlanExecutorMetricsTests extends ESTestCase {
             new EsqlExecutionInfo(randomBoolean()),
             groupIndicesByCluster,
             runPhase,
-            EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER,
+            EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES,
             new ActionListener<>() {
                 @Override
                 public void onResponse(Result result) {}

+ 3 - 2
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java

@@ -332,11 +332,12 @@ public class SemanticQueryBuilder extends AbstractQueryBuilder<SemanticQueryBuil
     protected boolean doEquals(SemanticQueryBuilder other) {
         return Objects.equals(fieldName, other.fieldName)
             && Objects.equals(query, other.query)
-            && Objects.equals(inferenceResults, other.inferenceResults);
+            && Objects.equals(inferenceResults, other.inferenceResults)
+            && Objects.equals(inferenceResultsSupplier, other.inferenceResultsSupplier);
     }
 
     @Override
     protected int doHashCode() {
-        return Objects.hash(fieldName, query, inferenceResults);
+        return Objects.hash(fieldName, query, inferenceResults, inferenceResultsSupplier);
     }
 }