Browse Source

Integrate enrich plan with enrich operator (ESQL-1255)

This PR integrates the enrich plan with the operator. I believe it is
necessary to have more tests for both the planning and execution of the
enrich. While fixing a bug in Mapper, I didn't write a test because I
expect that we will be working on tests for these soon.
Nhat Nguyen 2 years ago
parent
commit
bc4d7a73c7

+ 0 - 36
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/EnrichOperator.java

@@ -1,36 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.operator;
-
-import org.elasticsearch.compute.data.Page;
-
-public class EnrichOperator extends AbstractPageMappingOperator {
-    public record EnrichOperatorFactory() implements OperatorFactory {
-
-        @Override
-        public Operator get(DriverContext driverContext) {
-            return new EnrichOperator();
-        }
-
-        @Override
-        public String describe() {
-            return "EnrichOperator[]";
-        }
-    }
-
-    @Override
-    protected Page process(Page page) {
-        // TODO
-        throw new UnsupportedOperationException("Implement enrich operator!");
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName();
-    }
-}

+ 1 - 1
x-pack/plugin/esql/qa/security/roles.yml

@@ -14,7 +14,7 @@ user1:
   cluster:
     - cluster:monitor/main
   indices:
-    - names: ['index-user1', 'index' ]
+    - names: ['index-user1', 'index', "test-enrich" ]
       privileges:
         - read
         - write

+ 101 - 0
x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java

@@ -11,10 +11,12 @@ import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -113,6 +115,105 @@ public class EsqlSecurityIT extends ESRestTestCase {
         assertThat(respMap.get("values"), equalTo(List.of(List.of(2, 5))));
     }
 
+    public void testEnrich() throws Exception {
+        createEnrichPolicy();
+        try {
+            createIndex("test-enrich", Settings.EMPTY, """
+                "properties":{"timestamp": {"type": "long"}, "song_id": {"type": "keyword"}, "duration": {"type": "double"}}
+                """);
+            record Listen(long timestamp, String songId, double duration) {
+
+            }
+            var listens = List.of(
+                new Listen(1, "s1", 1.0),
+                new Listen(2, "s2", 2.0),
+                new Listen(3, "s1", 3.0),
+                new Listen(4, "s3", 1.0),
+                new Listen(5, "s4", 1.5),
+                new Listen(6, "s1", 2.5),
+                new Listen(7, "s1", 3.5),
+                new Listen(8, "s2", 5.0),
+                new Listen(8, "s1", 0.5),
+                new Listen(8, "s3", 0.25),
+                new Listen(8, "s4", 1.25)
+            );
+            for (int i = 0; i < listens.size(); i++) {
+                Listen listen = listens.get(i);
+                Request indexDoc = new Request("PUT", "/test-enrich/_doc/" + i);
+                String doc = Strings.toString(
+                    JsonXContent.contentBuilder()
+                        .startObject()
+                        .field("timestamp", listen.timestamp)
+                        .field("song_id", listen.songId)
+                        .field("duration", listen.duration)
+                        .endObject()
+                );
+                indexDoc.setJsonEntity(doc);
+                client().performRequest(indexDoc);
+            }
+            refresh("test-enrich");
+            Response resp = runESQLCommand(
+                "user1",
+                "FROM test-enrich | ENRICH songs ON song_id | stats total_duration = sum(duration) by artist | sort artist"
+            );
+            Map<String, Object> respMap = entityAsMap(resp);
+            assertThat(
+                respMap.get("values"),
+                equalTo(List.of(List.of(2.75, "Disturbed"), List.of(10.5, "Eagles"), List.of(8.25, "Linkin Park")))
+            );
+        } finally {
+            removeEnrichPolicy();
+        }
+    }
+
+    private void createEnrichPolicy() throws Exception {
+        createIndex("songs", Settings.EMPTY, """
+            "properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} }
+            """);
+        record Song(String id, String title, String artist) {
+
+        }
+
+        var songs = List.of(
+            new Song("s1", "Hotel California", "Eagles"),
+            new Song("s2", "In The End", "Linkin Park"),
+            new Song("s3", "Numb", "Linkin Park"),
+            new Song("s4", "The Sound Of Silence", "Disturbed")
+        );
+        for (int i = 0; i < songs.size(); i++) {
+            var song = songs.get(i);
+            Request indexDoc = new Request("PUT", "/songs/_doc/" + i);
+            String doc = Strings.toString(
+                JsonXContent.contentBuilder()
+                    .startObject()
+                    .field("song_id", song.id)
+                    .field("title", song.title)
+                    .field("artist", song.artist)
+                    .endObject()
+            );
+            indexDoc.setJsonEntity(doc);
+            client().performRequest(indexDoc);
+        }
+        refresh("songs");
+
+        Request createEnrich = new Request("PUT", "/_enrich/policy/songs");
+        createEnrich.setJsonEntity("""
+            {
+                "match": {
+                    "indices": "songs",
+                    "match_field": "song_id",
+                    "enrich_fields": ["title", "artist"]
+                }
+            }
+            """);
+        client().performRequest(createEnrich);
+        client().performRequest(new Request("PUT", "_enrich/policy/songs/_execute"));
+    }
+
+    private void removeEnrichPolicy() throws Exception {
+        client().performRequest(new Request("DELETE", "_enrich/policy/songs"));
+    }
+
     private Response runESQLCommand(String user, String command) throws IOException {
         Request request = new Request("POST", "_esql");
         request.setJsonEntity("{\"query\":\"" + command + "\"}");

+ 42 - 45
x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/60_enrich.yml

@@ -2,42 +2,15 @@
 setup:
   - do:
       indices.create:
-        index:  test
+        index:  cities
         body:
           settings:
             number_of_shards: 5
           mappings:
             properties:
-              id:
-                type: long
-              name:
+              city_code:
                 type: keyword
               city:
-                type: long
-  - do:
-      bulk:
-        index: "test"
-        refresh: true
-        body:
-          - { "index": { } }
-          - { "id": 1, "name": "Alice", "city": 10 }
-          - { "index": { } }
-          - { "id": 2, "name": "Bob", "city": 10 }
-          - { "index": { } }
-          - { "id": 3, "name": "Mario", "city": 20 }
-          - { "index": { } }
-          - { "id": 4, "name": "Denise", "city": 50 }
-  - do:
-      indices.create:
-        index:  cities
-        body:
-          settings:
-            number_of_shards: 5
-          mappings:
-            properties:
-              id:
-                type: long
-              name:
                 type: keyword
               country:
                 type: keyword
@@ -48,9 +21,9 @@ setup:
         refresh: true
         body:
           - { "index": { } }
-          - { "id": 10, "name": "New York", "country": "USA" }
+          - { "city_code": "nyc", "city": "New York", "country": "USA" }
           - { "index": { } }
-          - { "id": 20, "name": "Rome", "country": "Italy" }
+          - { "city_code": "rom", "city": "Rome", "country": "Italy" }
 
   - do:
       enrich.put_policy:
@@ -58,29 +31,53 @@ setup:
         body:
           match:
             indices: ["cities"]
-            match_field: "id"
-            enrich_fields: ["name", "country"]
+            match_field: "city_code"
+            enrich_fields: ["city", "country"]
 
   - do:
       enrich.execute_policy:
         name: cities_policy
 
+  - do:
+      indices.create:
+        index: test
+        body:
+          mappings:
+            properties:
+              name:
+                type: keyword
+              city_code:
+                type: keyword
+  - do:
+      bulk:
+        index: "test"
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "name": "Alice", "city_code": "nyc" }
+          - { "index": { } }
+          - { "name": "Bob", "city_code": "nyc" }
+          - { "index": { } }
+          - { "name": "Mario", "city_code": "rom" }
+          - { "index": { } }
+          - { "name": "Denise", "city_code": "sgn" }
 
 ---
-"Test only result columns, a false condition should be pushed down":
+"Basic":
   - do:
       esql.query:
         body:
-          query: 'from test | eval x = 1 | enrich cities_policy on city | project id, city, name, country, x | where x == 2'
+          query: 'from test | enrich cities_policy on city_code | project name, city, country | sort name'
 
-  - match: {columns.0.name: "id"}
-  - match: {columns.0.type: "long"}
-  - match: {columns.1.name: "city"}
-  - match: {columns.1.type: "long"}
-  - match: {columns.2.name: "name"}
-  - match: {columns.2.type: "keyword"}
-  - match: {columns.3.name: "country"}
-  - match: {columns.3.type: "keyword"}
-  - length: {values: 0}
+  - match: { columns.0.name: "name" }
+  - match: { columns.0.type: "keyword" }
+  - match: { columns.1.name: "city" }
+  - match: { columns.1.type: "keyword" }
+  - match: { columns.2.name: "country" }
+  - match: { columns.2.type: "keyword" }
 
-# TODO we'll need more meaningful data when Enrich evaluator is properly implemented
+  - length: { values: 4 }
+  - match: { values.0: [ "Alice", "New York", "USA" ] }
+  - match: { values.1: [ "Bob", "New York", "USA" ] }
+  - match: { values.2: [ "Denise", null, null ] }
+  - match: { values.3: [ "Mario", "Rome", "Italy" ] }

+ 42 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

@@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.AsyncOperator;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.xpack.ql.expression.Attribute;
 
@@ -26,6 +28,46 @@ public final class EnrichLookupOperator extends AsyncOperator {
     private final String matchField;
     private final List<Attribute> enrichFields;
 
+    public record Factory(
+        String sessionId,
+        CancellableTask parentTask,
+        int maxOutstandingRequests,
+        int inputChannel,
+        EnrichLookupService enrichLookupService,
+        String enrichIndex,
+        String matchType,
+        String matchField,
+        List<Attribute> enrichFields
+    ) implements OperatorFactory {
+        @Override
+        public String describe() {
+            return "EnrichOperator[index="
+                + enrichIndex
+                + " match_field="
+                + matchField
+                + " enrich_fields="
+                + enrichFields
+                + " inputChannel="
+                + inputChannel
+                + "]";
+        }
+
+        @Override
+        public Operator get(DriverContext driverContext) {
+            return new EnrichLookupOperator(
+                sessionId,
+                parentTask,
+                maxOutstandingRequests,
+                inputChannel,
+                enrichLookupService,
+                enrichIndex,
+                matchType,
+                matchField,
+                enrichFields
+            );
+        }
+    }
+
     public EnrichLookupOperator(
         String sessionId,
         CancellableTask parentTask,

+ 30 - 11
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -9,8 +9,11 @@ package org.elasticsearch.xpack.esql.enrich;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.UnavailableShardsException;
 import org.elasticsearch.action.support.ChannelActionListener;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
@@ -19,6 +22,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.ValueSources;
@@ -43,6 +47,7 @@ import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
@@ -79,7 +84,7 @@ import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter
  * <p>
  * The positionCount of the output page must be equal to the positionCount of the input page.
  */
-public final class EnrichLookupService {
+public class EnrichLookupService {
     public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup";
 
     private final ClusterService clusterService;
@@ -125,15 +130,19 @@ public final class EnrichLookupService {
         }
         DiscoveryNode targetNode = clusterState.nodes().get(shardRouting.currentNodeId());
         LookupRequest lookupRequest = new LookupRequest(sessionId, shardIt.shardId(), matchType, matchField, inputPage, extractFields);
-        // TODO: handle retry and avoid forking for the local lookup
-        transportService.sendChildRequest(
-            targetNode,
-            LOOKUP_ACTION_NAME,
-            lookupRequest,
-            parentTask,
-            TransportRequestOptions.EMPTY,
-            new ActionListenerResponseHandler<>(listener.map(r -> r.page), LookupResponse::new)
-        );
+        ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
+        listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
+        try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
+            // TODO: handle retry and avoid forking for the local lookup
+            transportService.sendChildRequest(
+                targetNode,
+                LOOKUP_ACTION_NAME,
+                lookupRequest,
+                parentTask,
+                TransportRequestOptions.EMPTY,
+                new ActionListenerResponseHandler<>(listener.map(r -> r.page), LookupResponse::new)
+            );
+        }
     }
 
     private void doLookup(
@@ -226,7 +235,7 @@ public final class EnrichLookupService {
         }
     }
 
-    private static class LookupRequest extends TransportRequest {
+    private static class LookupRequest extends TransportRequest implements IndicesRequest {
         private final String sessionId;
         private final ShardId shardId;
         private final String matchType;
@@ -273,6 +282,16 @@ public final class EnrichLookupService {
             planOut.writeCollection(extractFields, writerFromPlanWriter(PlanStreamOutput::writeAttribute));
         }
 
+        @Override
+        public String[] indices() {
+            return new String[] { shardId.getIndexName() };
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
+        }
+
         @Override
         public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
             return new CancellableTask(id, type, action, "", parentTaskId, headers) {

+ 2 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

@@ -35,16 +35,13 @@ public class EnrichPolicyResolver {
     public void resolvePolicy(String policyName, ActionListener<EnrichPolicyResolution> listener) {
         EnrichPolicy policy = policies().get(policyName);
         ThreadContext threadContext = threadPool.getThreadContext();
-        ActionListener<EnrichPolicyResolution> wrappedListener = new ContextPreservingActionListener<>(
-            threadContext.newRestorableContext(false),
-            listener
-        );
+        listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
         try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
             indexResolver.resolveAsMergedMapping(
                 EnrichPolicy.getBaseName(policyName),
                 false,
                 Map.of(),
-                wrappedListener.map(indexResult -> new EnrichPolicyResolution(policyName, policy, indexResult))
+                listener.map(indexResult -> new EnrichPolicyResolution(policyName, policy, indexResult))
             );
         }
     }

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java

@@ -74,9 +74,9 @@ public class FragmentExec extends LeafExec {
         sb.append(nodeName());
         sb.append("[filter=");
         sb.append(esFilter);
-        sb.append("[<>\n");
+        sb.append("[<>");
         sb.append(fragment.toString());
-        sb.append("\n<>]");
+        sb.append("<>]");
         return sb.toString();
     }
 }

+ 29 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.esql.planner;
 
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.compute.Describable;
 import org.elasticsearch.compute.ann.Experimental;
 import org.elasticsearch.compute.data.Block;
@@ -17,7 +18,6 @@ import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.ColumnExtractOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.EnrichOperator;
 import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory;
 import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
 import org.elasticsearch.compute.operator.FilterOperator.FilterOperatorFactory;
@@ -41,8 +41,11 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.Exchange
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator.ExchangeSourceOperatorFactory;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
 import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
@@ -101,24 +104,30 @@ import static org.elasticsearch.compute.operator.ProjectOperator.ProjectOperator
 public class LocalExecutionPlanner {
 
     private final String sessionId;
+    private final CancellableTask parentTask;
     private final BigArrays bigArrays;
     private final ThreadPool threadPool;
     private final EsqlConfiguration configuration;
     private final ExchangeService exchangeService;
+    private final EnrichLookupService enrichLookupService;
     private final PhysicalOperationProviders physicalOperationProviders;
 
     public LocalExecutionPlanner(
         String sessionId,
+        CancellableTask parentTask,
         BigArrays bigArrays,
         ThreadPool threadPool,
         EsqlConfiguration configuration,
         ExchangeService exchangeService,
+        EnrichLookupService enrichLookupService,
         PhysicalOperationProviders physicalOperationProviders
     ) {
         this.sessionId = sessionId;
+        this.parentTask = parentTask;
         this.bigArrays = bigArrays;
         this.threadPool = threadPool;
         this.exchangeService = exchangeService;
+        this.enrichLookupService = enrichLookupService;
         this.physicalOperationProviders = physicalOperationProviders;
         this.configuration = configuration;
     }
@@ -404,7 +413,25 @@ public class LocalExecutionPlanner {
             layoutBuilder.appendChannel(attr.id());
         }
         Layout layout = layoutBuilder.build();
-        return source.with(new EnrichOperator.EnrichOperatorFactory(), layout);
+        Set<String> indices = enrich.enrichIndex().concreteIndices();
+        if (indices.size() != 1) {
+            throw new EsqlIllegalArgumentException("Resolved enrich should have one concrete index; got " + indices);
+        }
+        String enrichIndex = Iterables.get(indices, 0);
+        return source.with(
+            new EnrichLookupOperator.Factory(
+                sessionId,
+                parentTask,
+                1, // TODO: Add a concurrent setting for enrich - also support unordered mode
+                source.layout.getChannel(enrich.matchField().id()),
+                enrichLookupService,
+                enrichIndex,
+                "match", // TODO: enrich should also resolve the match_type
+                enrich.matchField().name(),
+                enrich.enrichFields()
+            ),
+            layout
+        );
     }
 
     private Supplier<ExpressionEvaluator> toEvaluator(Expression exp, Layout layout) {

+ 4 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

@@ -91,15 +91,6 @@ public class Mapper {
         if (p instanceof ShowInfo showInfo) {
             return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values());
         }
-        if (p instanceof Enrich enrich) {
-            return new EnrichExec(
-                enrich.source(),
-                map(enrich.child()),
-                enrich.matchField(),
-                enrich.policy().index().get(),
-                enrich.enrichFields()
-            );
-        }
 
         //
         // Unary Plan
@@ -145,6 +136,10 @@ public class Mapper {
             return new GrokExec(grok.source(), child, grok.input(), grok.parser(), grok.extractedFields());
         }
 
+        if (p instanceof Enrich enrich) {
+            return new EnrichExec(enrich.source(), child, enrich.matchField(), enrich.policy().index().get(), enrich.enrichFields());
+        }
+
         //
         // Pipeline breakers
         //

+ 8 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -76,12 +77,14 @@ public class ComputeService {
     private final TransportService transportService;
     private final DriverTaskRunner driverRunner;
     private final ExchangeService exchangeService;
+    private final EnrichLookupService enrichLookupService;
 
     public ComputeService(
         SearchService searchService,
         ClusterService clusterService,
         TransportService transportService,
         ExchangeService exchangeService,
+        EnrichLookupService enrichLookupService,
         ThreadPool threadPool,
         BigArrays bigArrays
     ) {
@@ -98,6 +101,7 @@ public class ComputeService {
         );
         this.driverRunner = new DriverTaskRunner(transportService, threadPool.executor(ESQL_THREAD_POOL_NAME));
         this.exchangeService = exchangeService;
+        this.enrichLookupService = enrichLookupService;
     }
 
     public void execute(
@@ -175,16 +179,18 @@ public class ComputeService {
         });
     }
 
-    void runCompute(Task task, ComputeContext context, PhysicalPlan plan, ActionListener<Void> listener) {
+    void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<Void> listener) {
         List<Driver> drivers = new ArrayList<>();
         listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers));
         try {
             LocalExecutionPlanner planner = new LocalExecutionPlanner(
                 context.sessionId,
+                task,
                 bigArrays,
                 threadPool,
                 context.configuration,
                 exchangeService,
+                enrichLookupService,
                 new EsPhysicalOperationProviders(context.searchContexts)
             );
 
@@ -296,7 +302,7 @@ public class ComputeService {
                 );
                 exchangeService.createSinkHandler(sessionId, request.pragmas().exchangeBufferSize());
                 runCompute(
-                    task,
+                    (CancellableTask) task,
                     new ComputeContext(sessionId, searchContexts, request.configuration()),
                     request.plan(),
                     ActionListener.releaseAfter(listener.map(unused -> new DataNodeResponse()), releasable)

+ 9 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -62,7 +62,15 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         exchangeService.registerTransportHandler(transportService);
         this.exchangeService = exchangeService;
         this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService);
-        this.computeService = new ComputeService(searchService, clusterService, transportService, exchangeService, threadPool, bigArrays);
+        this.computeService = new ComputeService(
+            searchService,
+            clusterService,
+            transportService,
+            exchangeService,
+            enrichLookupService,
+            threadPool,
+            bigArrays
+        );
         this.settings = settings;
     }
 

+ 6 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -22,6 +22,8 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -32,6 +34,7 @@ import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.analysis.Verifier;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -71,6 +74,7 @@ import org.elasticsearch.xpack.ql.util.Holder;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.mockito.Mockito;
 
 import java.net.URL;
 import java.time.ZoneOffset;
@@ -275,10 +279,12 @@ public class CsvTests extends ESTestCase {
         String sessionId = "csv-test";
         LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner(
             sessionId,
+            new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()),
             BigArrays.NON_RECYCLING_INSTANCE,
             threadPool,
             configuration,
             exchangeService,
+            Mockito.mock(EnrichLookupService.class),
             testOperationProviders(testDataset)
         );
         //