Bläddra i källkod

ESQL: Basic enrich-like lookup loading (#115667) (#115980)

This adds super basic way to perform a lookup-style LEFT JOIN thing.
It's *like* ENRICH, except it can use an index_mode=lookup index rather
than an ENRICH policy. It's like a LEFT JOIN but it can't change the
output cardinality. That's a genuinely useful thing!

This intentionally forks some portion of the ENRICH infrastructure and
shares others. I *believe* these are the right parts to fork and the
right parts to share. Namely:
* We *share* the internal implementaions
* We fork the request
* We fork the configuration of what to join

This should allow us to iterate the on the requests without damaging
anything in ENRICH but any speed ups that we build for these lookup
joins *can* be shared with ENRICH if we decide that they work.

Relies on #115143
Nik Everett 11 månader sedan
förälder
incheckning
c8a3f376b5

+ 10 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -89,7 +89,16 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 
-// TODO: Move these tests to the right test classes.
+/**
+ * This venerable test builds {@link Driver}s by hand and runs them together, simulating
+ * whole runs without needing to involve ESQL-proper. It's a wonderful place to integration
+ * test new ideas, and it was the first tests the compute engine ever had. But as we plug
+ * these things into ESQL tests should leave here and just run in csv-spec tests. Or move
+ * into unit tests for the operators themselves.
+ * <p>
+ *     TODO move any of these we can to unit tests for the operator.
+ * </p>
+ */
 public class OperatorTests extends MapperServiceTestCase {
 
     public void testQueryOperator() throws IOException {
@@ -355,7 +364,6 @@ public class OperatorTests extends MapperServiceTestCase {
         } finally {
             primesBlock.close();
         }
-
     }
 
     /**

+ 249 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -0,0 +1,249 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.compute.lucene.LuceneSourceOperator;
+import org.elasticsearch.compute.lucene.ShardContext;
+import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.operator.Driver;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.DriverRunner;
+import org.elasticsearch.compute.operator.PageConsumerOperator;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.search.SearchService;
+import org.elasticsearch.search.internal.AliasFilter;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
+import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+import org.elasticsearch.xpack.esql.plugin.TransportEsqlQueryAction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.elasticsearch.test.ListMatcher.matchesList;
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.hamcrest.Matchers.empty;
+
+public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
+    /**
+     * Quick and dirty test for looking up data from a lookup index.
+     */
+    public void testLookupIndex() throws IOException {
+        // TODO this should *fail* if the target index isn't a lookup type index - it doesn't now.
+        int docCount = between(10, 1000);
+        List<String> expected = new ArrayList<>(docCount);
+        client().admin()
+            .indices()
+            .prepareCreate("source")
+            .setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1))
+            .setMapping("data", "type=keyword")
+            .get();
+        client().admin()
+            .indices()
+            .prepareCreate("lookup")
+            .setSettings(
+                Settings.builder()
+                    .put(IndexSettings.MODE.getKey(), "lookup")
+                    // TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command.
+                    .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+            )
+            .setMapping("data", "type=keyword", "l", "type=long")
+            .get();
+        client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();
+
+        String[] data = new String[] { "aa", "bb", "cc", "dd" };
+        List<IndexRequestBuilder> docs = new ArrayList<>();
+        for (int i = 0; i < docCount; i++) {
+            docs.add(client().prepareIndex("source").setSource(Map.of("data", data[i % data.length])));
+            expected.add(data[i % data.length] + ":" + (i % data.length));
+        }
+        for (int i = 0; i < data.length; i++) {
+            docs.add(client().prepareIndex("lookup").setSource(Map.of("data", data[i], "l", i)));
+        }
+        Collections.sort(expected);
+        indexRandom(true, true, docs);
+
+        /*
+         * Find the data node hosting the only shard of the source index.
+         */
+        SearchService searchService = null;
+        String nodeWithShard = null;
+        ShardId shardId = null;
+        node: for (String node : internalCluster().getNodeNames()) {
+            searchService = internalCluster().getInstance(SearchService.class, node);
+            for (IndexService idx : searchService.getIndicesService()) {
+                if (idx.index().getName().equals("source")) {
+                    nodeWithShard = node;
+                    shardId = new ShardId(idx.index(), 0);
+                    break node;
+                }
+            }
+        }
+        if (nodeWithShard == null) {
+            throw new IllegalStateException("couldn't find any copy of source index");
+        }
+
+        List<String> results = new CopyOnWriteArrayList<>();
+        /*
+         * Run the Driver.
+         */
+        try (
+            SearchContext searchContext = searchService.createSearchContext(
+                new ShardSearchRequest(shardId, System.currentTimeMillis(), AliasFilter.EMPTY, null),
+                SearchService.NO_TIMEOUT
+            )
+        ) {
+            ShardContext esqlContext = new EsPhysicalOperationProviders.DefaultShardContext(
+                0,
+                searchContext.getSearchExecutionContext(),
+                AliasFilter.EMPTY
+            );
+            LuceneSourceOperator.Factory source = new LuceneSourceOperator.Factory(
+                List.of(esqlContext),
+                ctx -> new MatchAllDocsQuery(),
+                DataPartitioning.SEGMENT,
+                1,
+                10000,
+                DocIdSetIterator.NO_MORE_DOCS
+            );
+            ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
+                List.of(
+                    new ValuesSourceReaderOperator.FieldInfo(
+                        "data",
+                        ElementType.BYTES_REF,
+                        shard -> searchContext.getSearchExecutionContext().getFieldType("data").blockLoader(null)
+                    )
+                ),
+                List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
+                    throw new IllegalStateException("can't load source here");
+                })),
+                0
+            );
+            CancellableTask parentTask = new EsqlQueryTask(
+                1,
+                "test",
+                "test",
+                "test",
+                null,
+                Map.of(),
+                Map.of(),
+                new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID),
+                TEST_REQUEST_TIMEOUT
+            );
+            LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
+                "test",
+                parentTask,
+                QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY),
+                1,
+                internalCluster().getInstance(TransportEsqlQueryAction.class, nodeWithShard).getLookupFromIndexService(),
+                DataType.KEYWORD,
+                "lookup",
+                "data",
+                List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
+            );
+            DriverContext driverContext = driverContext();
+            try (
+                var driver = new Driver(
+                    driverContext,
+                    source.get(driverContext),
+                    List.of(reader.get(driverContext), lookup.get(driverContext)),
+                    new PageConsumerOperator(page -> {
+                        try {
+                            BytesRefVector dataBlock = page.<BytesRefBlock>getBlock(1).asVector();
+                            LongVector loadedBlock = page.<LongBlock>getBlock(2).asVector();
+                            for (int p = 0; p < page.getPositionCount(); p++) {
+                                results.add(dataBlock.getBytesRef(p, new BytesRef()).utf8ToString() + ":" + loadedBlock.getLong(p));
+                            }
+                        } finally {
+                            page.releaseBlocks();
+                        }
+                    }),
+                    () -> {}
+                )
+            ) {
+                PlainActionFuture<Void> future = new PlainActionFuture<>();
+                ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeWithShard);
+                var driverRunner = new DriverRunner(threadPool.getThreadContext()) {
+                    @Override
+                    protected void start(Driver driver, ActionListener<Void> driverListener) {
+                        Driver.start(
+                            threadPool.getThreadContext(),
+                            threadPool.executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
+                            driver,
+                            between(1, 10000),
+                            driverListener
+                        );
+                    }
+                };
+                driverRunner.runToCompletion(List.of(driver), future);
+                future.actionGet(TimeValue.timeValueSeconds(30));
+                assertMap(results.stream().sorted().toList(), matchesList(expected));
+            }
+            assertDriverContext(driverContext);
+        }
+    }
+
+    /**
+     * Creates a {@link BigArrays} that tracks releases but doesn't throw circuit breaking exceptions.
+     */
+    private BigArrays bigArrays() {
+        return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+    }
+
+    /**
+     * A {@link DriverContext} that won't throw {@link CircuitBreakingException}.
+     */
+    protected final DriverContext driverContext() {
+        var breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1));
+        return new DriverContext(bigArrays(), BlockFactory.getInstance(breaker, bigArrays()));
+    }
+
+    public static void assertDriverContext(DriverContext driverContext) {
+        assertTrue(driverContext.isFinished());
+        assertThat(driverContext.getSnapshot().releasables(), empty());
+    }
+}

+ 593 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

@@ -0,0 +1,593 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.UnavailableShardsException;
+import org.elasticsearch.action.support.ChannelActionListener;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.GroupShardsIterator;
+import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiFunction;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BlockStreamInput;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LocalCircuitBreaker;
+import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.operator.Driver;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.compute.operator.OutputOperator;
+import org.elasticsearch.core.AbstractRefCounted;
+import org.elasticsearch.core.RefCounted;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.SearchService;
+import org.elasticsearch.search.internal.AliasFilter;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportRequestHandler;
+import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.support.Exceptions;
+import org.elasticsearch.xpack.core.security.user.User;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * {@link AbstractLookupService} performs a single valued {@code LEFT JOIN} for a
+ * given input page against another index. This is quite similar to a nested loop
+ * join. It is restricted to indices with only a single shard.
+ * <p>
+ *     This registers a {@link TransportRequestHandler} so we can handle requests
+ *     to join data that isn't local to the node, but it is much faster if the
+ *     data is already local.
+ * </p>
+ * <p>
+ *     The join process spawns a {@link Driver} per incoming page which runs in
+ *     three stages:
+ * </p>
+ * <p>
+ *     Stage 1: Finding matching document IDs for the input page. This stage is done
+ *     by the {@link EnrichQuerySourceOperator}. The output page of this stage is
+ *     represented as {@code [DocVector, IntBlock: positions of the input terms]}.
+ * </p>
+ * <p>
+ *     Stage 2: Extracting field values for the matched document IDs. The output page
+ *     is represented as
+ *     {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}.
+ * </p>
+ * <p>
+ *     Stage 3: Combining the extracted values based on positions and filling nulls for
+ *     positions without matches. This is done by {@link MergePositionsOperator}. The output
+ *     page is represented as {@code [Block: field1, Block: field2,...]}.
+ * </p>
+ * <p>
+ *     The {@link Page#getPositionCount()} of the output {@link Page} is  equal to the
+ *     {@link Page#getPositionCount()} of the input page. In other words - it returns
+ *     the same number of rows that it was sent no matter how many documents match.
+ * </p>
+ */
+abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
+    private final String actionName;
+    private final String privilegeName;
+    private final ClusterService clusterService;
+    private final SearchService searchService;
+    private final TransportService transportService;
+    private final Executor executor;
+    private final BigArrays bigArrays;
+    private final BlockFactory blockFactory;
+    private final LocalCircuitBreaker.SizeSettings localBreakerSettings;
+
+    AbstractLookupService(
+        String actionName,
+        String privilegeName,
+        ClusterService clusterService,
+        SearchService searchService,
+        TransportService transportService,
+        BigArrays bigArrays,
+        BlockFactory blockFactory,
+        CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest
+    ) {
+        this.actionName = actionName;
+        this.privilegeName = privilegeName;
+        this.clusterService = clusterService;
+        this.searchService = searchService;
+        this.transportService = transportService;
+        this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
+        this.bigArrays = bigArrays;
+        this.blockFactory = blockFactory;
+        this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
+        transportService.registerRequestHandler(
+            actionName,
+            transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
+            in -> readRequest.apply(in, blockFactory),
+            new TransportHandler()
+        );
+    }
+
+    /**
+     * Convert a request as sent to {@link #lookupAsync} into a transport request after
+     * preflight checks have been performed.
+     */
+    protected abstract T transportRequest(R request, ShardId shardId);
+
+    /**
+     * Build a list of queries to perform inside the actual lookup.
+     */
+    protected abstract QueryList queryList(T request, SearchExecutionContext context, Block inputBlock, DataType inputDataType);
+
+    /**
+     * Perform the actual lookup.
+     */
+    public final void lookupAsync(R request, CancellableTask parentTask, ActionListener<Page> outListener) {
+        ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
+        ActionListener<Page> listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext);
+        hasPrivilege(listener.delegateFailureAndWrap((delegate, ignored) -> {
+            ClusterState clusterState = clusterService.state();
+            GroupShardsIterator<ShardIterator> shardIterators = clusterService.operationRouting()
+                .searchShards(clusterState, new String[] { request.index }, Map.of(), "_local");
+            if (shardIterators.size() != 1) {
+                delegate.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", request.index));
+                return;
+            }
+            ShardIterator shardIt = shardIterators.get(0);
+            ShardRouting shardRouting = shardIt.nextOrNull();
+            ShardId shardId = shardIt.shardId();
+            if (shardRouting == null) {
+                delegate.onFailure(new UnavailableShardsException(shardId, "target index is not available"));
+                return;
+            }
+            DiscoveryNode targetNode = clusterState.nodes().get(shardRouting.currentNodeId());
+            T transportRequest = transportRequest(request, shardId);
+            // TODO: handle retry and avoid forking for the local lookup
+            try (ThreadContext.StoredContext unused = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
+                transportService.sendChildRequest(
+                    targetNode,
+                    actionName,
+                    transportRequest,
+                    parentTask,
+                    TransportRequestOptions.EMPTY,
+                    new ActionListenerResponseHandler<>(
+                        delegate.map(LookupResponse::takePage),
+                        in -> new LookupResponse(in, blockFactory),
+                        executor
+                    )
+                );
+            }
+        }));
+    }
+
+    private void hasPrivilege(ActionListener<Void> outListener) {
+        final Settings settings = clusterService.getSettings();
+        if (settings.hasValue(XPackSettings.SECURITY_ENABLED.getKey()) == false || XPackSettings.SECURITY_ENABLED.get(settings) == false) {
+            outListener.onResponse(null);
+            return;
+        }
+        final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
+        final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
+        final User user = securityContext.getUser();
+        if (user == null) {
+            outListener.onFailure(new IllegalStateException("missing or unable to read authentication info on request"));
+            return;
+        }
+        HasPrivilegesRequest request = new HasPrivilegesRequest();
+        request.username(user.principal());
+        request.clusterPrivileges(privilegeName);
+        request.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]);
+        request.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
+        ActionListener<HasPrivilegesResponse> listener = outListener.delegateFailureAndWrap((l, resp) -> {
+            if (resp.isCompleteMatch()) {
+                l.onResponse(null);
+                return;
+            }
+            String detailed = resp.getClusterPrivileges()
+                .entrySet()
+                .stream()
+                .filter(e -> e.getValue() == false)
+                .map(e -> "privilege [" + e.getKey() + "] is missing")
+                .collect(Collectors.joining(", "));
+            String message = "user ["
+                + user.principal()
+                + "] doesn't have "
+                + "sufficient privileges to perform enrich lookup: "
+                + detailed;
+            l.onFailure(Exceptions.authorizationError(message));
+        });
+        transportService.sendRequest(
+            transportService.getLocalNode(),
+            HasPrivilegesAction.NAME,
+            request,
+            TransportRequestOptions.EMPTY,
+            new ActionListenerResponseHandler<>(listener, HasPrivilegesResponse::new, executor)
+        );
+    }
+
+    private void doLookup(T request, CancellableTask task, ActionListener<Page> listener) {
+        Block inputBlock = request.inputPage.getBlock(0);
+        if (inputBlock.areAllValuesNull()) {
+            listener.onResponse(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
+            return;
+        }
+        final List<Releasable> releasables = new ArrayList<>(6);
+        boolean started = false;
+        try {
+            final ShardSearchRequest shardSearchRequest = new ShardSearchRequest(request.shardId, 0, AliasFilter.EMPTY);
+            final SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
+            releasables.add(searchContext);
+            final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
+                blockFactory.breaker(),
+                localBreakerSettings.overReservedBytes(),
+                localBreakerSettings.maxOverReservedBytes()
+            );
+            releasables.add(localBreaker);
+            final DriverContext driverContext = new DriverContext(bigArrays, blockFactory.newChildFactory(localBreaker));
+            final ElementType[] mergingTypes = new ElementType[request.extractFields.size()];
+            for (int i = 0; i < request.extractFields.size(); i++) {
+                mergingTypes[i] = PlannerUtils.toElementType(request.extractFields.get(i).dataType());
+            }
+            final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray();
+            final MergePositionsOperator mergePositionsOperator;
+            final OrdinalBytesRefBlock ordinalsBytesRefBlock;
+            if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
+                inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock();
+                var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock();
+                mergePositionsOperator = new MergePositionsOperator(
+                    1,
+                    mergingChannels,
+                    mergingTypes,
+                    selectedPositions,
+                    driverContext.blockFactory()
+                );
+
+            } else {
+                try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
+                    mergePositionsOperator = new MergePositionsOperator(
+                        1,
+                        mergingChannels,
+                        mergingTypes,
+                        selectedPositions,
+                        driverContext.blockFactory()
+                    );
+                }
+            }
+            releasables.add(mergePositionsOperator);
+            SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
+            QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
+            var queryOperator = new EnrichQuerySourceOperator(
+                driverContext.blockFactory(),
+                EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
+                queryList,
+                searchExecutionContext.getIndexReader()
+            );
+            releasables.add(queryOperator);
+            var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
+            releasables.add(extractFieldsOperator);
+
+            AtomicReference<Page> result = new AtomicReference<>();
+            OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
+            releasables.add(outputOperator);
+            Driver driver = new Driver(
+                "enrich-lookup:" + request.sessionId,
+                System.currentTimeMillis(),
+                System.nanoTime(),
+                driverContext,
+                request::toString,
+                queryOperator,
+                List.of(extractFieldsOperator, mergePositionsOperator),
+                outputOperator,
+                Driver.DEFAULT_STATUS_INTERVAL,
+                Releasables.wrap(searchContext, localBreaker)
+            );
+            task.addListener(() -> {
+                String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
+                driver.cancel(reason);
+            });
+            var threadContext = transportService.getThreadPool().getThreadContext();
+            Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
+                Page out = result.get();
+                if (out == null) {
+                    out = createNullResponse(request.inputPage.getPositionCount(), request.extractFields);
+                }
+                return out;
+            }));
+            started = true;
+        } catch (Exception e) {
+            listener.onFailure(e);
+        } finally {
+            if (started == false) {
+                Releasables.close(releasables);
+            }
+        }
+    }
+
+    private static Operator extractFieldsOperator(
+        SearchContext searchContext,
+        DriverContext driverContext,
+        List<NamedExpression> extractFields
+    ) {
+        EsPhysicalOperationProviders.ShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(
+            0,
+            searchContext.getSearchExecutionContext(),
+            searchContext.request().getAliasFilter()
+        );
+        List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>(extractFields.size());
+        for (NamedExpression extractField : extractFields) {
+            BlockLoader loader = shardContext.blockLoader(
+                extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(),
+                extractField.dataType() == DataType.UNSUPPORTED,
+                MappedFieldType.FieldExtractPreference.NONE
+            );
+            fields.add(
+                new ValuesSourceReaderOperator.FieldInfo(
+                    extractField.name(),
+                    PlannerUtils.toElementType(extractField.dataType()),
+                    shardIdx -> {
+                        if (shardIdx != 0) {
+                            throw new IllegalStateException("only one shard");
+                        }
+                        return loader;
+                    }
+                )
+            );
+        }
+        return new ValuesSourceReaderOperator(
+            driverContext.blockFactory(),
+            fields,
+            List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), searchContext::newSourceLoader)),
+            0
+        );
+    }
+
+    private Page createNullResponse(int positionCount, List<NamedExpression> extractFields) {
+        final Block[] blocks = new Block[extractFields.size()];
+        try {
+            for (int i = 0; i < extractFields.size(); i++) {
+                blocks[i] = blockFactory.newConstantNullBlock(positionCount);
+            }
+            return new Page(blocks);
+        } finally {
+            if (blocks[blocks.length - 1] == null) {
+                Releasables.close(blocks);
+            }
+        }
+    }
+
+    private class TransportHandler implements TransportRequestHandler<T> {
+        @Override
+        public void messageReceived(T request, TransportChannel channel, Task task) {
+            request.incRef();
+            ActionListener<LookupResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), request::decRef);
+            doLookup(
+                request,
+                (CancellableTask) task,
+                listener.delegateFailureAndWrap(
+                    (l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage, blockFactory))
+                )
+            );
+        }
+    }
+
+    abstract static class Request {
+        final String sessionId;
+        final String index;
+        final DataType inputDataType;
+        final Page inputPage;
+        final List<NamedExpression> extractFields;
+
+        Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
+            this.sessionId = sessionId;
+            this.index = index;
+            this.inputDataType = inputDataType;
+            this.inputPage = inputPage;
+            this.extractFields = extractFields;
+        }
+    }
+
+    abstract static class TransportRequest extends org.elasticsearch.transport.TransportRequest implements IndicesRequest {
+        final String sessionId;
+        final ShardId shardId;
+        final DataType inputDataType;
+        final Page inputPage;
+        final List<NamedExpression> extractFields;
+        // TODO: Remove this workaround once we have Block RefCount
+        final Page toRelease;
+        final RefCounted refs = AbstractRefCounted.of(this::releasePage);
+
+        TransportRequest(
+            String sessionId,
+            ShardId shardId,
+            DataType inputDataType,
+            Page inputPage,
+            Page toRelease,
+            List<NamedExpression> extractFields
+        ) {
+            this.sessionId = sessionId;
+            this.shardId = shardId;
+            this.inputDataType = inputDataType;
+            this.inputPage = inputPage;
+            this.toRelease = toRelease;
+            this.extractFields = extractFields;
+        }
+
+        @Override
+        public final String[] indices() {
+            return new String[] { shardId.getIndexName() };
+        }
+
+        @Override
+        public final IndicesOptions indicesOptions() {
+            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
+        }
+
+        @Override
+        public final Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, "", parentTaskId, headers) {
+                @Override
+                public String getDescription() {
+                    return this.toString();
+                }
+            };
+        }
+
+        private void releasePage() {
+            if (toRelease != null) {
+                Releasables.closeExpectNoException(toRelease::releaseBlocks);
+            }
+        }
+
+        @Override
+        public final void incRef() {
+            refs.incRef();
+        }
+
+        @Override
+        public final boolean tryIncRef() {
+            return refs.tryIncRef();
+        }
+
+        @Override
+        public final boolean decRef() {
+            return refs.decRef();
+        }
+
+        @Override
+        public final boolean hasReferences() {
+            return refs.hasReferences();
+        }
+
+        @Override
+        public final String toString() {
+            return "LOOKUP("
+                + " session="
+                + sessionId
+                + " ,shard="
+                + shardId
+                + " ,input_type="
+                + inputDataType
+                + " ,extract_fields="
+                + extractFields
+                + " ,positions="
+                + inputPage.getPositionCount()
+                + extraDescription()
+                + ")";
+        }
+
+        protected abstract String extraDescription();
+    }
+
+    private static class LookupResponse extends TransportResponse {
+        private final RefCounted refs = AbstractRefCounted.of(this::releasePage);
+        private final BlockFactory blockFactory;
+        private Page page;
+        private long reservedBytes = 0;
+
+        LookupResponse(Page page, BlockFactory blockFactory) {
+            this.page = page;
+            this.blockFactory = blockFactory;
+        }
+
+        LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
+            try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
+                this.page = new Page(bsi);
+            }
+            this.blockFactory = blockFactory;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            long bytes = page.ramBytesUsedByBlocks();
+            blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
+            reservedBytes += bytes;
+            page.writeTo(out);
+        }
+
+        Page takePage() {
+            var p = page;
+            page = null;
+            return p;
+        }
+
+        private void releasePage() {
+            blockFactory.breaker().addWithoutBreaking(-reservedBytes);
+            if (page != null) {
+                Releasables.closeExpectNoException(page::releaseBlocks);
+            }
+        }
+
+        @Override
+        public void incRef() {
+            refs.incRef();
+        }
+
+        @Override
+        public boolean tryIncRef() {
+            return refs.tryIncRef();
+        }
+
+        @Override
+        public boolean decRef() {
+            return refs.decRef();
+        }
+
+        @Override
+        public boolean hasReferences() {
+            return refs.hasReferences();
+        }
+    }
+}

+ 3 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

@@ -109,17 +109,16 @@ public final class EnrichLookupOperator extends AsyncOperator {
     protected void performAsync(Page inputPage, ActionListener<Page> listener) {
         final Block inputBlock = inputPage.getBlock(inputChannel);
         totalTerms += inputBlock.getTotalValueCount();
-        enrichLookupService.lookupAsync(
+        EnrichLookupService.Request request = new EnrichLookupService.Request(
             sessionId,
-            parentTask,
             enrichIndex,
             inputDataType,
             matchType,
             matchField,
-            enrichFields,
             new Page(inputBlock),
-            listener.map(inputPage::appendPage)
+            enrichFields
         );
+        enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
     }
 
     @Override

+ 76 - 535
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -8,116 +8,39 @@
 package org.elasticsearch.xpack.esql.enrich;
 
 import org.elasticsearch.TransportVersions;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionListenerResponseHandler;
-import org.elasticsearch.action.IndicesRequest;
-import org.elasticsearch.action.UnavailableShardsException;
-import org.elasticsearch.action.support.ChannelActionListener;
-import org.elasticsearch.action.support.ContextPreservingActionListener;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.GroupShardsIterator;
-import org.elasticsearch.cluster.routing.ShardIterator;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
-import org.elasticsearch.compute.data.BytesRefBlock;
-import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
-import org.elasticsearch.compute.data.LocalCircuitBreaker;
-import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
-import org.elasticsearch.compute.operator.Driver;
-import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.Operator;
-import org.elasticsearch.compute.operator.OutputOperator;
-import org.elasticsearch.core.AbstractRefCounted;
-import org.elasticsearch.core.RefCounted;
-import org.elasticsearch.core.Releasable;
-import org.elasticsearch.core.Releasables;
-import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchService;
-import org.elasticsearch.search.internal.AliasFilter;
-import org.elasticsearch.search.internal.SearchContext;
-import org.elasticsearch.search.internal.ShardSearchRequest;
-import org.elasticsearch.tasks.CancellableTask;
-import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportChannel;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestHandler;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.ClientHelper;
-import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.security.SecurityContext;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
-import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
-import org.elasticsearch.xpack.core.security.support.Exceptions;
-import org.elasticsearch.xpack.core.security.user.User;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
-import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
-import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
-import org.elasticsearch.xpack.esql.planner.PlannerUtils;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
- * {@link EnrichLookupService} performs enrich lookup for a given input page. The lookup process consists of three stages:
- * - Stage 1: Finding matching document IDs for the input page. This stage is done by the {@link EnrichQuerySourceOperator} or its variants.
- * The output page of this stage is represented as [DocVector, IntBlock: positions of the input terms].
- * <p>
- * - Stage 2: Extracting field values for the matched document IDs. The output page is represented as
- * [DocVector, IntBlock: positions, Block: field1, Block: field2,...].
- * <p>
- * - Stage 3: Combining the extracted values based on positions and filling nulls for positions without matches.
- * This is done by {@link MergePositionsOperator}. The output page is represented as [Block: field1, Block: field2,...].
- * <p>
- * The positionCount of the output page must be equal to the positionCount of the input page.
+ * {@link EnrichLookupService} performs enrich lookup for a given input page.
+ * See {@link AbstractLookupService} for how it works where it refers to this
+ * process as a {@code LEFT JOIN}. Which is mostly is.
  */
-public class EnrichLookupService {
+public class EnrichLookupService extends AbstractLookupService<EnrichLookupService.Request, EnrichLookupService.TransportRequest> {
     public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup";
 
-    private final ClusterService clusterService;
-    private final SearchService searchService;
-    private final TransportService transportService;
-    private final Executor executor;
-    private final BigArrays bigArrays;
-    private final BlockFactory blockFactory;
-    private final LocalCircuitBreaker.SizeSettings localBreakerSettings;
-
     public EnrichLookupService(
         ClusterService clusterService,
         SearchService searchService,
@@ -125,353 +48,107 @@ public class EnrichLookupService {
         BigArrays bigArrays,
         BlockFactory blockFactory
     ) {
-        this.clusterService = clusterService;
-        this.searchService = searchService;
-        this.transportService = transportService;
-        this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
-        this.bigArrays = bigArrays;
-        this.blockFactory = blockFactory;
-        this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
-        transportService.registerRequestHandler(
+        super(
             LOOKUP_ACTION_NAME,
-            transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
-            in -> new LookupRequest(in, blockFactory),
-            new TransportHandler()
+            ClusterPrivilegeResolver.MONITOR_ENRICH.name(),
+            clusterService,
+            searchService,
+            transportService,
+            bigArrays,
+            blockFactory,
+            TransportRequest::readFrom
         );
     }
 
-    public void lookupAsync(
-        String sessionId,
-        CancellableTask parentTask,
-        String index,
-        DataType inputDataType,
-        String matchType,
-        String matchField,
-        List<NamedExpression> extractFields,
-        Page inputPage,
-        ActionListener<Page> outListener
-    ) {
-        ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
-        ActionListener<Page> listener = ContextPreservingActionListener.wrapPreservingContext(outListener, threadContext);
-        hasEnrichPrivilege(listener.delegateFailureAndWrap((delegate, ignored) -> {
-            ClusterState clusterState = clusterService.state();
-            GroupShardsIterator<ShardIterator> shardIterators = clusterService.operationRouting()
-                .searchShards(clusterState, new String[] { index }, Map.of(), "_local");
-            if (shardIterators.size() != 1) {
-                delegate.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", index));
-                return;
-            }
-            ShardIterator shardIt = shardIterators.get(0);
-            ShardRouting shardRouting = shardIt.nextOrNull();
-            ShardId shardId = shardIt.shardId();
-            if (shardRouting == null) {
-                delegate.onFailure(new UnavailableShardsException(shardId, "enrich index is not available"));
-                return;
-            }
-            DiscoveryNode targetNode = clusterState.nodes().get(shardRouting.currentNodeId());
-            var lookupRequest = new LookupRequest(sessionId, shardId, inputDataType, matchType, matchField, inputPage, extractFields);
-            // TODO: handle retry and avoid forking for the local lookup
-            try (ThreadContext.StoredContext unused = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
-                transportService.sendChildRequest(
-                    targetNode,
-                    LOOKUP_ACTION_NAME,
-                    lookupRequest,
-                    parentTask,
-                    TransportRequestOptions.EMPTY,
-                    new ActionListenerResponseHandler<>(
-                        delegate.map(LookupResponse::takePage),
-                        in -> new LookupResponse(in, blockFactory),
-                        executor
-                    )
-                );
-            }
-        }));
-    }
-
-    private void hasEnrichPrivilege(ActionListener<Void> outListener) {
-        final Settings settings = clusterService.getSettings();
-        if (settings.hasValue(XPackSettings.SECURITY_ENABLED.getKey()) == false || XPackSettings.SECURITY_ENABLED.get(settings) == false) {
-            outListener.onResponse(null);
-            return;
-        }
-        final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
-        final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
-        final User user = securityContext.getUser();
-        if (user == null) {
-            outListener.onFailure(new IllegalStateException("missing or unable to read authentication info on request"));
-            return;
-        }
-        HasPrivilegesRequest request = new HasPrivilegesRequest();
-        request.username(user.principal());
-        request.clusterPrivileges(ClusterPrivilegeResolver.MONITOR_ENRICH.name());
-        request.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]);
-        request.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
-        ActionListener<HasPrivilegesResponse> listener = outListener.delegateFailureAndWrap((l, resp) -> {
-            if (resp.isCompleteMatch()) {
-                l.onResponse(null);
-                return;
-            }
-            String detailed = resp.getClusterPrivileges()
-                .entrySet()
-                .stream()
-                .filter(e -> e.getValue() == false)
-                .map(e -> "privilege [" + e.getKey() + "] is missing")
-                .collect(Collectors.joining(", "));
-            String message = "user ["
-                + user.principal()
-                + "] doesn't have "
-                + "sufficient privileges to perform enrich lookup: "
-                + detailed;
-            l.onFailure(Exceptions.authorizationError(message));
-        });
-        transportService.sendRequest(
-            transportService.getLocalNode(),
-            HasPrivilegesAction.NAME,
-            request,
-            TransportRequestOptions.EMPTY,
-            new ActionListenerResponseHandler<>(listener, HasPrivilegesResponse::new, executor)
+    @Override
+    protected TransportRequest transportRequest(EnrichLookupService.Request request, ShardId shardId) {
+        return new TransportRequest(
+            request.sessionId,
+            shardId,
+            request.inputDataType,
+            request.matchType,
+            request.matchField,
+            request.inputPage,
+            null,
+            request.extractFields
         );
     }
 
-    private void doLookup(
-        String sessionId,
-        CancellableTask task,
-        ShardId shardId,
-        DataType inputDataType,
-        String matchType,
-        String matchField,
-        Page inputPage,
-        List<NamedExpression> extractFields,
-        ActionListener<Page> listener
-    ) {
-        Block inputBlock = inputPage.getBlock(0);
-        if (inputBlock.areAllValuesNull()) {
-            listener.onResponse(createNullResponse(inputPage.getPositionCount(), extractFields));
-            return;
-        }
-        final List<Releasable> releasables = new ArrayList<>(6);
-        boolean started = false;
-        try {
-            final ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY);
-            final SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
-            releasables.add(searchContext);
-            final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
-                blockFactory.breaker(),
-                localBreakerSettings.overReservedBytes(),
-                localBreakerSettings.maxOverReservedBytes()
-            );
-            releasables.add(localBreaker);
-            final DriverContext driverContext = new DriverContext(bigArrays, blockFactory.newChildFactory(localBreaker));
-            final ElementType[] mergingTypes = new ElementType[extractFields.size()];
-            for (int i = 0; i < extractFields.size(); i++) {
-                mergingTypes[i] = PlannerUtils.toElementType(extractFields.get(i).dataType());
-            }
-            final int[] mergingChannels = IntStream.range(0, extractFields.size()).map(i -> i + 2).toArray();
-            final MergePositionsOperator mergePositionsOperator;
-            final OrdinalBytesRefBlock ordinalsBytesRefBlock;
-            if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
-                inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock();
-                var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock();
-                mergePositionsOperator = new MergePositionsOperator(
-                    1,
-                    mergingChannels,
-                    mergingTypes,
-                    selectedPositions,
-                    driverContext.blockFactory()
-                );
-
-            } else {
-                try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
-                    mergePositionsOperator = new MergePositionsOperator(
-                        1,
-                        mergingChannels,
-                        mergingTypes,
-                        selectedPositions,
-                        driverContext.blockFactory()
-                    );
-                }
-            }
-            releasables.add(mergePositionsOperator);
-            SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
-            MappedFieldType fieldType = searchExecutionContext.getFieldType(matchField);
-            var queryList = switch (matchType) {
-                case "match", "range" -> QueryList.termQueryList(fieldType, searchExecutionContext, inputBlock, inputDataType);
-                case "geo_match" -> QueryList.geoShapeQuery(fieldType, searchExecutionContext, inputBlock, inputDataType);
-                default -> throw new EsqlIllegalArgumentException("illegal match type " + matchType);
-            };
-            var queryOperator = new EnrichQuerySourceOperator(
-                driverContext.blockFactory(),
-                EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
-                queryList,
-                searchExecutionContext.getIndexReader()
-            );
-            releasables.add(queryOperator);
-            var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, extractFields);
-            releasables.add(extractFieldsOperator);
-
-            AtomicReference<Page> result = new AtomicReference<>();
-            OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
-            releasables.add(outputOperator);
-            Driver driver = new Driver(
-                "enrich-lookup:" + sessionId,
-                System.currentTimeMillis(),
-                System.nanoTime(),
-                driverContext,
-                () -> lookupDescription(
-                    sessionId,
-                    shardId,
-                    inputDataType,
-                    matchType,
-                    matchField,
-                    extractFields,
-                    inputPage.getPositionCount()
-                ),
-                queryOperator,
-                List.of(extractFieldsOperator, mergePositionsOperator),
-                outputOperator,
-                Driver.DEFAULT_STATUS_INTERVAL,
-                Releasables.wrap(searchContext, localBreaker)
-            );
-            task.addListener(() -> {
-                String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
-                driver.cancel(reason);
-            });
-            var threadContext = transportService.getThreadPool().getThreadContext();
-            Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
-                Page out = result.get();
-                if (out == null) {
-                    out = createNullResponse(inputPage.getPositionCount(), extractFields);
-                }
-                return out;
-            }));
-            started = true;
-        } catch (Exception e) {
-            listener.onFailure(e);
-        } finally {
-            if (started == false) {
-                Releasables.close(releasables);
-            }
-        }
+    @Override
+    protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) {
+        MappedFieldType fieldType = context.getFieldType(request.matchField);
+        return switch (request.matchType) {
+            case "match", "range" -> QueryList.termQueryList(fieldType, context, inputBlock, inputDataType);
+            case "geo_match" -> QueryList.geoShapeQuery(fieldType, context, inputBlock, inputDataType);
+            default -> throw new EsqlIllegalArgumentException("illegal match type " + request.matchType);
+        };
     }
 
-    private static Operator extractFieldsOperator(
-        SearchContext searchContext,
-        DriverContext driverContext,
-        List<NamedExpression> extractFields
-    ) {
-        EsPhysicalOperationProviders.ShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(
-            0,
-            searchContext.getSearchExecutionContext(),
-            searchContext.request().getAliasFilter()
-        );
-        List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>(extractFields.size());
-        for (NamedExpression extractField : extractFields) {
-            BlockLoader loader = shardContext.blockLoader(
-                extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(),
-                extractField.dataType() == DataType.UNSUPPORTED,
-                MappedFieldType.FieldExtractPreference.NONE
-            );
-            fields.add(
-                new ValuesSourceReaderOperator.FieldInfo(
-                    extractField.name(),
-                    PlannerUtils.toElementType(extractField.dataType()),
-                    shardIdx -> {
-                        if (shardIdx != 0) {
-                            throw new IllegalStateException("only one shard");
-                        }
-                        return loader;
-                    }
-                )
-            );
-        }
-        return new ValuesSourceReaderOperator(
-            driverContext.blockFactory(),
-            fields,
-            List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), searchContext::newSourceLoader)),
-            0
-        );
-    }
-
-    private Page createNullResponse(int positionCount, List<NamedExpression> extractFields) {
-        final Block[] blocks = new Block[extractFields.size()];
-        try {
-            for (int i = 0; i < extractFields.size(); i++) {
-                blocks[i] = blockFactory.newConstantNullBlock(positionCount);
-            }
-            return new Page(blocks);
-        } finally {
-            if (blocks[blocks.length - 1] == null) {
-                Releasables.close(blocks);
-            }
-        }
-    }
+    public static class Request extends AbstractLookupService.Request {
+        private final String matchType;
+        private final String matchField;
 
-    private class TransportHandler implements TransportRequestHandler<LookupRequest> {
-        @Override
-        public void messageReceived(LookupRequest request, TransportChannel channel, Task task) {
-            request.incRef();
-            ActionListener<LookupResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), request::decRef);
-            doLookup(
-                request.sessionId,
-                (CancellableTask) task,
-                request.shardId,
-                request.inputDataType,
-                request.matchType,
-                request.matchField,
-                request.inputPage,
-                request.extractFields,
-                listener.delegateFailureAndWrap(
-                    (l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage, blockFactory))
-                )
-            );
+        Request(
+            String sessionId,
+            String index,
+            DataType inputDataType,
+            String matchType,
+            String matchField,
+            Page inputPage,
+            List<NamedExpression> extractFields
+        ) {
+            super(sessionId, index, inputDataType, inputPage, extractFields);
+            this.matchType = matchType;
+            this.matchField = matchField;
         }
     }
 
-    private static class LookupRequest extends TransportRequest implements IndicesRequest {
-        private final String sessionId;
-        private final ShardId shardId;
-        private final DataType inputDataType;
+    protected static class TransportRequest extends AbstractLookupService.TransportRequest {
         private final String matchType;
         private final String matchField;
-        private final Page inputPage;
-        private final List<NamedExpression> extractFields;
-        // TODO: Remove this workaround once we have Block RefCount
-        private final Page toRelease;
-        private final RefCounted refs = AbstractRefCounted.of(this::releasePage);
 
-        LookupRequest(
+        TransportRequest(
             String sessionId,
             ShardId shardId,
             DataType inputDataType,
             String matchType,
             String matchField,
             Page inputPage,
+            Page toRelease,
             List<NamedExpression> extractFields
         ) {
-            this.sessionId = sessionId;
-            this.shardId = shardId;
-            this.inputDataType = inputDataType;
+            super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields);
             this.matchType = matchType;
             this.matchField = matchField;
-            this.inputPage = inputPage;
-            this.toRelease = null;
-            this.extractFields = extractFields;
         }
 
-        LookupRequest(StreamInput in, BlockFactory blockFactory) throws IOException {
-            super(in);
-            this.sessionId = in.readString();
-            this.shardId = new ShardId(in);
-            String inputDataType = (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) ? in.readString() : "unknown";
-            this.inputDataType = DataType.fromTypeName(inputDataType);
-            this.matchType = in.readString();
-            this.matchField = in.readString();
+        static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
+            TaskId parentTaskId = TaskId.readFromStream(in);
+            String sessionId = in.readString();
+            ShardId shardId = new ShardId(in);
+            DataType inputDataType = DataType.fromTypeName(
+                (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) ? in.readString() : "unknown"
+            );
+            String matchType = in.readString();
+            String matchField = in.readString();
+            Page inputPage;
             try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
-                this.inputPage = new Page(bsi);
+                inputPage = new Page(bsi);
             }
-            this.toRelease = inputPage;
             PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
-            this.extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
+            List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
+            TransportRequest result = new TransportRequest(
+                sessionId,
+                shardId,
+                inputDataType,
+                matchType,
+                matchField,
+                inputPage,
+                inputPage,
+                extractFields
+            );
+            result.setParentTask(parentTaskId);
+            return result;
         }
 
         @Override
@@ -490,144 +167,8 @@ public class EnrichLookupService {
         }
 
         @Override
-        public String[] indices() {
-            return new String[] { shardId.getIndexName() };
-        }
-
-        @Override
-        public IndicesOptions indicesOptions() {
-            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
-        }
-
-        @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
-            return new CancellableTask(id, type, action, "", parentTaskId, headers) {
-                @Override
-                public String getDescription() {
-                    return lookupDescription(
-                        sessionId,
-                        shardId,
-                        inputDataType,
-                        matchType,
-                        matchField,
-                        extractFields,
-                        inputPage.getPositionCount()
-                    );
-                }
-            };
-        }
-
-        private void releasePage() {
-            if (toRelease != null) {
-                Releasables.closeExpectNoException(toRelease::releaseBlocks);
-            }
-        }
-
-        @Override
-        public void incRef() {
-            refs.incRef();
-        }
-
-        @Override
-        public boolean tryIncRef() {
-            return refs.tryIncRef();
-        }
-
-        @Override
-        public boolean decRef() {
-            return refs.decRef();
-        }
-
-        @Override
-        public boolean hasReferences() {
-            return refs.hasReferences();
-        }
-    }
-
-    private static String lookupDescription(
-        String sessionId,
-        ShardId shardId,
-        DataType inputDataType,
-        String matchType,
-        String matchField,
-        List<NamedExpression> extractFields,
-        int positionCount
-    ) {
-        return "ENRICH_LOOKUP("
-            + " session="
-            + sessionId
-            + " ,shard="
-            + shardId
-            + " ,input_type="
-            + inputDataType
-            + " ,match_type="
-            + matchType
-            + " ,match_field="
-            + matchField
-            + " ,extract_fields="
-            + extractFields
-            + " ,positions="
-            + positionCount
-            + ")";
-    }
-
-    private static class LookupResponse extends TransportResponse {
-        private Page page;
-        private final RefCounted refs = AbstractRefCounted.of(this::releasePage);
-        private final BlockFactory blockFactory;
-        private long reservedBytes = 0;
-
-        LookupResponse(Page page, BlockFactory blockFactory) {
-            this.page = page;
-            this.blockFactory = blockFactory;
-        }
-
-        LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
-            try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
-                this.page = new Page(bsi);
-            }
-            this.blockFactory = blockFactory;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            long bytes = page.ramBytesUsedByBlocks();
-            blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
-            reservedBytes += bytes;
-            page.writeTo(out);
-        }
-
-        Page takePage() {
-            var p = page;
-            page = null;
-            return p;
-        }
-
-        private void releasePage() {
-            blockFactory.breaker().addWithoutBreaking(-reservedBytes);
-            if (page != null) {
-                Releasables.closeExpectNoException(page::releaseBlocks);
-            }
-        }
-
-        @Override
-        public void incRef() {
-            refs.incRef();
-        }
-
-        @Override
-        public boolean tryIncRef() {
-            return refs.tryIncRef();
-        }
-
-        @Override
-        public boolean decRef() {
-            return refs.decRef();
-        }
-
-        @Override
-        public boolean hasReferences() {
-            return refs.hasReferences();
+        protected String extraDescription() {
+            return " ,match_type=" + matchType + " ,match_field=" + matchField;
         }
     }
 }

+ 200 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

@@ -0,0 +1,200 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.AsyncOperator;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+// TODO rename package
+public final class LookupFromIndexOperator extends AsyncOperator {
+    public record Factory(
+        String sessionId,
+        CancellableTask parentTask,
+        int maxOutstandingRequests,
+        int inputChannel,
+        LookupFromIndexService lookupService,
+        DataType inputDataType,
+        String lookupIndex,
+        String matchField,
+        List<NamedExpression> loadFields
+    ) implements OperatorFactory {
+        @Override
+        public String describe() {
+            return "LookupOperator[index="
+                + lookupIndex
+                + " match_field="
+                + matchField
+                + " load_fields="
+                + loadFields
+                + " inputChannel="
+                + inputChannel
+                + "]";
+        }
+
+        @Override
+        public Operator get(DriverContext driverContext) {
+            return new LookupFromIndexOperator(
+                sessionId,
+                driverContext,
+                parentTask,
+                maxOutstandingRequests,
+                inputChannel,
+                lookupService,
+                inputDataType,
+                lookupIndex,
+                matchField,
+                loadFields
+            );
+        }
+    }
+
+    private final LookupFromIndexService lookupService;
+    private final String sessionId;
+    private final CancellableTask parentTask;
+    private final int inputChannel;
+    private final DataType inputDataType;
+    private final String lookupIndex;
+    private final String matchField;
+    private final List<NamedExpression> loadFields;
+    private long totalTerms = 0L;
+
+    public LookupFromIndexOperator(
+        String sessionId,
+        DriverContext driverContext,
+        CancellableTask parentTask,
+        int maxOutstandingRequests,
+        int inputChannel,
+        LookupFromIndexService lookupService,
+        DataType inputDataType,
+        String lookupIndex,
+        String matchField,
+        List<NamedExpression> loadFields
+    ) {
+        super(driverContext, maxOutstandingRequests);
+        this.sessionId = sessionId;
+        this.parentTask = parentTask;
+        this.inputChannel = inputChannel;
+        this.lookupService = lookupService;
+        this.inputDataType = inputDataType;
+        this.lookupIndex = lookupIndex;
+        this.matchField = matchField;
+        this.loadFields = loadFields;
+    }
+
+    @Override
+    protected void performAsync(Page inputPage, ActionListener<Page> listener) {
+        final Block inputBlock = inputPage.getBlock(inputChannel);
+        totalTerms += inputBlock.getTotalValueCount();
+        LookupFromIndexService.Request request = new LookupFromIndexService.Request(
+            sessionId,
+            lookupIndex,
+            inputDataType,
+            matchField,
+            new Page(inputBlock),
+            loadFields
+        );
+        lookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
+    }
+
+    @Override
+    public String toString() {
+        return "LookupOperator[index="
+            + lookupIndex
+            + " input_type="
+            + inputDataType
+            + " match_field="
+            + matchField
+            + " load_fields="
+            + loadFields
+            + " inputChannel="
+            + inputChannel
+            + "]";
+    }
+
+    @Override
+    protected void doClose() {
+        // TODO: Maybe create a sub-task as the parent task of all the lookup tasks
+        // then cancel it when this operator terminates early (e.g., have enough result).
+    }
+
+    @Override
+    protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
+        return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
+    }
+
+    public static class Status extends AsyncOperator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "lookup",
+            Status::new
+        );
+
+        final long totalTerms;
+
+        Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
+            super(receivedPages, completedPages, totalTimeInMillis);
+            this.totalTerms = totalTerms;
+        }
+
+        Status(StreamInput in) throws IOException {
+            super(in);
+            this.totalTerms = in.readVLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeVLong(totalTerms);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            innerToXContent(builder);
+            builder.field("total_terms", totalTerms);
+            return builder.endObject();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass() || super.equals(o) == false) {
+                return false;
+            }
+            Status status = (Status) o;
+            return totalTerms == status.totalTerms;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), totalTerms);
+        }
+    }
+}

+ 154 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

@@ -0,0 +1,154 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BlockStreamInput;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.SearchService;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
+import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link LookupFromIndexService} performs lookup against a Lookup index for
+ * a given input page. See {@link AbstractLookupService} for how it works
+ * where it refers to this process as a {@code LEFT JOIN}. Which is mostly is.
+ */
+public class LookupFromIndexService extends AbstractLookupService<LookupFromIndexService.Request, LookupFromIndexService.TransportRequest> {
+    public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup_from_index";
+
+    public LookupFromIndexService(
+        ClusterService clusterService,
+        SearchService searchService,
+        TransportService transportService,
+        BigArrays bigArrays,
+        BlockFactory blockFactory
+    ) {
+        super(
+            LOOKUP_ACTION_NAME,
+            ClusterPrivilegeResolver.MONITOR_ENRICH.name(), // TODO some other privilege
+            clusterService,
+            searchService,
+            transportService,
+            bigArrays,
+            blockFactory,
+            TransportRequest::readFrom
+        );
+    }
+
+    @Override
+    protected TransportRequest transportRequest(LookupFromIndexService.Request request, ShardId shardId) {
+        return new TransportRequest(
+            request.sessionId,
+            shardId,
+            request.inputDataType,
+            request.inputPage,
+            null,
+            request.extractFields,
+            request.matchField
+        );
+    }
+
+    @Override
+    protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) {
+        MappedFieldType fieldType = context.getFieldType(request.matchField);
+        return QueryList.termQueryList(fieldType, context, inputBlock, inputDataType);
+    }
+
+    public static class Request extends AbstractLookupService.Request {
+        private final String matchField;
+
+        Request(
+            String sessionId,
+            String index,
+            DataType inputDataType,
+            String matchField,
+            Page inputPage,
+            List<NamedExpression> extractFields
+        ) {
+            super(sessionId, index, inputDataType, inputPage, extractFields);
+            this.matchField = matchField;
+        }
+    }
+
+    protected static class TransportRequest extends AbstractLookupService.TransportRequest {
+        private final String matchField;
+
+        TransportRequest(
+            String sessionId,
+            ShardId shardId,
+            DataType inputDataType,
+            Page inputPage,
+            Page toRelease,
+            List<NamedExpression> extractFields,
+            String matchField
+        ) {
+            super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields);
+            this.matchField = matchField;
+        }
+
+        static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
+            TaskId parentTaskId = TaskId.readFromStream(in);
+            String sessionId = in.readString();
+            ShardId shardId = new ShardId(in);
+            DataType inputDataType = DataType.fromTypeName(in.readString());
+            Page inputPage;
+            try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
+                inputPage = new Page(bsi);
+            }
+            PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
+            List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
+            String matchField = in.readString();
+            TransportRequest result = new TransportRequest(
+                sessionId,
+                shardId,
+                inputDataType,
+                inputPage,
+                inputPage,
+                extractFields,
+                matchField
+            );
+            result.setParentTask(parentTaskId);
+            return result;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(sessionId);
+            out.writeWriteable(shardId);
+            out.writeString(inputDataType.typeName());
+            out.writeWriteable(inputPage);
+            PlanStreamOutput planOut = new PlanStreamOutput(out, null);
+            planOut.writeNamedWriteableCollection(extractFields);
+            out.writeString(matchField);
+        }
+
+        @Override
+        protected String extraDescription() {
+            return " ,match_field=" + matchField;
+        }
+    }
+}

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/QueryList.java

@@ -40,7 +40,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.IP;
 /**
  * Generates a list of Lucene queries based on the input block.
  */
-abstract class QueryList {
+public abstract class QueryList {
     protected final Block block;
 
     protected QueryList(Block block) {

+ 7 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -38,6 +38,7 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
 import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
+import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.session.Configuration;
@@ -65,6 +66,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     private final Executor requestExecutor;
     private final EnrichPolicyResolver enrichPolicyResolver;
     private final EnrichLookupService enrichLookupService;
+    private final LookupFromIndexService lookupFromIndexService;
     private final AsyncTaskManagementService<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> asyncTaskManagementService;
     private final RemoteClusterService remoteClusterService;
 
@@ -94,6 +96,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         this.exchangeService = exchangeService;
         this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
         this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
+        this.lookupFromIndexService = new LookupFromIndexService(clusterService, searchService, transportService, bigArrays, blockFactory);
         this.computeService = new ComputeService(
             searchService,
             transportService,
@@ -278,4 +281,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     private static boolean requestIsAsync(EsqlQueryRequest request) {
         return request.async();
     }
+
+    public LookupFromIndexService getLookupFromIndexService() {
+        return lookupFromIndexService;
+    }
 }