Browse Source

Introduce lookup runtime fields (#82385)

This PR introduces the lookup runtime fields which are used to retrieve 
data from the related indices. The below search request enriches its
search hits with the location of each IP address from the `ip_location`
index.

```
POST logs/_search
{
  "runtime_mappings": {
    "location": {
      "type": "lookup",
      "lookup_index": "ip_location",
      "query_type": "term",
      "query_input_field": "ip",
      "query_target_field": "_id",
      "fetch_fields": [
        "country",
        "city"
      ]
    }
  },
  "fields": [
    "timestamp",
    "message",
    "location"
  ]
}
```

Response:

```
{
  "hits": {
    "hits": [
      {
        "_index": "logs",
        "_id": "1",
        "fields": {
          "location": [
            {
              "city": [ "Montreal" ],
              "country": [ "Canada" ]
            }
          ],
          "message": [ "the first message" ]
        }
      }
    ]
  }
}
```
Nhat Nguyen 3 years ago
parent
commit
31d703f24c
23 changed files with 1786 additions and 43 deletions
  1. 5 0
      docs/changelog/82385.yaml
  2. 117 0
      docs/reference/mapping/runtime.asciidoc
  3. 1 0
      docs/reference/search/search-your-data/retrieve-selected-fields.asciidoc
  4. 51 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/390_lookup_fields.yml
  5. 252 0
      server/src/internalClusterTest/java/org/elasticsearch/action/search/LookupRuntimeFieldIT.java
  6. 141 0
      server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java
  7. 10 7
      server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java
  8. 135 0
      server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java
  9. 5 1
      server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
  10. 40 4
      server/src/main/java/org/elasticsearch/common/document/DocumentField.java
  11. 248 0
      server/src/main/java/org/elasticsearch/index/mapper/LookupRuntimeFieldType.java
  12. 37 13
      server/src/main/java/org/elasticsearch/index/mapper/ValueFetcher.java
  13. 2 0
      server/src/main/java/org/elasticsearch/indices/IndicesModule.java
  14. 38 0
      server/src/main/java/org/elasticsearch/search/SearchHit.java
  15. 3 3
      server/src/main/java/org/elasticsearch/search/fetch/subphase/FieldAndFormat.java
  16. 3 5
      server/src/main/java/org/elasticsearch/search/fetch/subphase/FieldFetcher.java
  17. 47 0
      server/src/main/java/org/elasticsearch/search/fetch/subphase/LookupField.java
  18. 30 5
      server/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java
  19. 209 0
      server/src/test/java/org/elasticsearch/action/search/FetchLookupFieldsPhaseTests.java
  20. 148 0
      server/src/test/java/org/elasticsearch/index/mapper/LookupRuntimeFieldTypeTests.java
  21. 119 0
      server/src/test/java/org/elasticsearch/search/fetch/subphase/LookupFieldTests.java
  22. 132 1
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/FieldLevelSecurityTests.java
  23. 13 4
      x-pack/qa/runtime-fields/core-with-search/src/yamlRestTest/java/org/elasticsearch/xpack/runtimefields/test/search/CoreTestsWithSearchRuntimeFieldsIT.java

+ 5 - 0
docs/changelog/82385.yaml

@@ -0,0 +1,5 @@
+pr: 82385
+summary: Introduce lookup runtime fields
+area: Search
+type: feature
+issues: []

+ 117 - 0
docs/reference/mapping/runtime.asciidoc

@@ -157,11 +157,16 @@ The `runtime` section can be any of these data types:
 * `ip`
 * `keyword`
 * `long`
+* <<lookup-runtime-fields, `lookup`>>
 // end::runtime-data-types[]
 
 Runtime fields with a `type` of `date` can accept the
 <<mapping-date-format,`format`>> parameter exactly as the `date` field type.
 
+Runtime fields with a `type` of `lookup` allow retrieving fields from
+related indices. See <<lookup-runtime-fields, `retrieve fields from related indices`>>.
+
+
 If <<dynamic-field-mapping,dynamic field mapping>> is enabled where the
 `dynamic` parameter is set to `runtime`, new fields are automatically added to
 the index mapping as runtime fields:
@@ -803,6 +808,118 @@ address.
 // TESTRESPONSE[s/"_id" : "oWs5KXYB-XyJbifr9mrz"/"_id": $body.hits.hits.0._id/]
 // TESTRESPONSE[s/"day_of_week" : \[\n\s+"Sunday"\n\s\]/"day_of_week": $body.hits.hits.0.fields.day_of_week/]
 
+
+[[lookup-runtime-fields]]
+==== Retrieve fields from related indices
+
+experimental[]
+
+The <<search-fields,`fields`>> parameter on the `_search` API can also be used to retrieve fields from
+the related indices via runtime fields with a type of `lookup`.
+
+[source,console]
+----
+POST ip_location/_doc?refresh
+{
+  "ip": "192.168.1.1",
+  "country": "Canada",
+  "city": "Montreal"
+}
+
+PUT logs/_doc/1?refresh
+{
+  "host": "192.168.1.1",
+  "message": "the first message"
+}
+
+PUT logs/_doc/2?refresh
+{
+  "host": "192.168.1.2",
+  "message": "the second message"
+}
+
+POST logs/_search
+{
+  "runtime_mappings": {
+    "location": {
+        "type": "lookup", <1>
+        "target_index": "ip_location", <2>
+        "input_field": "host", <3>
+        "target_field": "ip", <4>
+        "fetch_fields": ["country", "city"] <5>
+    }
+  },
+  "fields": [
+    "host",
+    "message",
+    "location"
+  ],
+  "_source": false
+}
+----
+<1> Define a runtime field in the main search request with a type of `lookup` that retrieves fields from the target index using the <<query-dsl-term-query, `term`>> queries.
+<2> The target index where the lookup query executes against
+<3> A field on the main index whose values are used as the input values of the lookup term query
+<4> A field on the lookup index which the lookup query searches against
+<5> A list of fields to retrieve from the lookup index. See the <<search-fields, `fields`>> parameter of a search request.
+
+The above search returns the country and city from the `ip_location` index
+for each ip address of the returned search hits.
+
+[source,console-result]
+----
+{
+  "took": 3,
+  "timed_out": false,
+  "_shards": {
+    "total": 1,
+    "successful": 1,
+    "skipped": 0,
+    "failed": 0
+  },
+  "hits": {
+    "total": {
+      "value": 2,
+      "relation": "eq"
+    },
+    "max_score": 1.0,
+    "hits": [
+      {
+        "_index": "logs",
+        "_id": "1",
+        "_score": 1.0,
+        "fields": {
+          "host": [ "192.168.1.1" ],
+          "location": [
+            {
+              "city": [ "Montreal" ],
+              "country": [ "Canada" ]
+            }
+          ],
+          "message": [ "the first message" ]
+        }
+      },
+      {
+        "_index": "logs",
+        "_id": "2",
+        "_score": 1.0,
+        "fields": {
+          "host": [ "192.168.1.2" ],
+          "message": [ "the second message" ]
+        }
+      }
+    ]
+  }
+}
+----
+// TESTRESPONSE[s/"took": 3/"took": $body.took/]
+
+The response of lookup fields are grouped to maintain the independence
+of each document from the lookup index. The lookup query for each input
+value is expected to match at most one document on the lookup index.
+If the lookup query matches more than one documents, then a random document
+will be selected.
+
 [[runtime-indexed]]
 === Index a runtime field
 Runtime fields are defined by the context where they run. For example, you

+ 1 - 0
docs/reference/search/search-your-data/retrieve-selected-fields.asciidoc

@@ -32,6 +32,7 @@ parameter:
 * Formats dates and spatial data types
 * Retrieves <<runtime-retrieving-fields,runtime field values>>
 * Returns fields calculated by a script at index time
+* Returns fields from related indices using <<lookup-runtime-fields, lookup runtime fields>>
 // end::fields-param-desc[]
 
 Other mapping options are also respected, including

+ 51 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/390_lookup_fields.yml

@@ -0,0 +1,51 @@
+setup:
+  - do:
+      index:
+        index: ip_locations
+        id: 192.168.1.1
+        body: { country: 'Canada', city: 'Montreal' }
+
+  - do:
+      index:
+        index: ip_locations
+        id: 192.168.1.3
+        body: { country: 'Canada', city: 'Toronto' }
+
+  - do:
+      index:
+        index: logs
+        body: { ip: '192.168.1.1', msg: 'The first message', ord: 1 }
+  - do:
+      index:
+        index: logs
+        body: { ip: '192.168.1.2', msg: 'The second message', ord: 2 }
+
+  - do:
+      indices.refresh:
+        index: "ip_locations,logs"
+
+---
+"Retrieve lookup fields":
+  - skip:
+      version: " - 8.1.99"
+      reason: "Lookup fields are introduced in 8.2"
+  - do:
+      search:
+        index: logs
+        body:
+          runtime_mappings:
+            location:
+              type: lookup
+              target_index: ip_locations
+              input_field: ip
+              target_field: _id
+              fetch_fields: [ "city", "country" ]
+          sort: ord
+          fields:
+            - msg
+            - location
+
+  - match: { hits.hits.0.fields.msg: [ 'The first message' ] }
+  - match: { hits.hits.0.fields.location: [ { country: [ 'Canada' ], city: [ 'Montreal' ] } ] }
+  - match: { hits.hits.1.fields.msg: [ 'The second message' ] }
+  - match: { hits.hits.1.fields.location: null }

+ 252 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/search/LookupRuntimeFieldIT.java

@@ -0,0 +1,252 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.search;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class LookupRuntimeFieldIT extends ESIntegTestCase {
+
+    @Before
+    public void populateIndex() throws Exception {
+        client().admin()
+            .indices()
+            .prepareCreate("authors")
+            .setMapping("author", "type=keyword", "joined", "type=date,format=yyyy-MM-dd")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .get();
+        List<Map<String, String>> authors = List.of(
+            Map.of("author", "john", "first_name", "John", "last_name", "New York", "joined", "2020-03-01"),
+            Map.of("author", "mike", "first_name", "Mike", "last_name", "Boston", "joined", "2010-06-20"),
+            Map.of("author", "jack", "first_name", "Jack", "last_name", "Austin", "joined", "1999-11-03")
+        );
+        for (Map<String, String> author : authors) {
+            client().prepareIndex("authors").setSource(author).setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values())).get();
+        }
+        client().admin().indices().prepareRefresh("authors").get();
+
+        client().admin()
+            .indices()
+            .prepareCreate("publishers")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .get();
+        client().prepareBulk("publishers")
+            .add(new IndexRequest().id("p1").source("name", "The first publisher", "city", List.of("Montreal", "Vancouver")))
+            .add(new IndexRequest().id("p2").source("name", "The second publisher", "city", "Toronto"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        client().admin()
+            .indices()
+            .prepareCreate("books")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
+            .setMapping("""
+                {
+                    "properties": {
+                        "title": {"type": "text"},
+                        "author_id": {"type": "keyword"},
+                        "genre": {"type": "keyword"},
+                        "published_date": {
+                            "type": "date",
+                            "format": "yyyy-MM-dd"
+                        }
+                    },
+                    "runtime": {
+                        "author": {
+                            "type": "lookup",
+                            "target_index": "authors",
+                            "input_field": "author_id",
+                            "target_field": "author",
+                            "fetch_fields": ["first_name", "last_name"]
+                        }
+                    }
+                }
+                """)
+            .get();
+        List<Map<String, Object>> books = List.of(
+            Map.of(
+                "title",
+                "the first book",
+                "genre",
+                "fiction",
+                "author_id",
+                "john",
+                "publisher_id",
+                "p1",
+                "published_date",
+                "2020-01-05"
+            ),
+            Map.of(
+                "title",
+                "the second book",
+                "genre",
+                "science",
+                "author_id",
+                "mike",
+                "publisher_id",
+                "p2",
+                "published_date",
+                "2020-02-10"
+            ),
+            Map.of(
+                "title",
+                "the third book",
+                "genre",
+                "science",
+                "author_id",
+                List.of("mark", "mike"),
+                "publisher_id",
+                "p1",
+                "published_date",
+                "2021-04-20"
+            ),
+            Map.of(
+                "title",
+                "the forth book",
+                "genre",
+                "fiction",
+                "author_id",
+                List.of("mike", "jack"),
+                "publisher_id",
+                "p1",
+                "published_date",
+                "2021-05-11"
+            ),
+            Map.of("title", "the fifth book", "genre", "science", "author_id", "mike", "publisher_id", "p2", "published_date", "2021-06-30")
+        );
+        for (Map<String, Object> book : books) {
+            client().prepareIndex("books").setSource(book).setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values())).get();
+        }
+        client().admin().indices().prepareRefresh("books").get();
+    }
+
+    public void testBasic() {
+        SearchResponse searchResponse = client().prepareSearch("books")
+            .addFetchField("author")
+            .addFetchField("title")
+            .addSort("published_date", SortOrder.DESC)
+            .setSize(3)
+            .get();
+        ElasticsearchAssertions.assertNoFailures(searchResponse);
+        ElasticsearchAssertions.assertHitCount(searchResponse, 5);
+
+        SearchHit hit0 = searchResponse.getHits().getHits()[0];
+        assertThat(hit0.field("title").getValues(), equalTo(List.of("the fifth book")));
+        assertThat(
+            hit0.field("author").getValues(),
+            equalTo(List.of(Map.of("first_name", List.of("Mike"), "last_name", List.of("Boston"))))
+        );
+
+        SearchHit hit1 = searchResponse.getHits().getHits()[1];
+        assertThat(hit1.field("title").getValues(), equalTo(List.of("the forth book")));
+        assertThat(
+            hit1.field("author").getValues(),
+            equalTo(
+                List.of(
+                    Map.of("first_name", List.of("Mike"), "last_name", List.of("Boston")),
+                    Map.of("first_name", List.of("Jack"), "last_name", List.of("Austin"))
+                )
+            )
+        );
+
+        SearchHit hit2 = searchResponse.getHits().getHits()[2];
+        assertThat(hit2.field("title").getValues(), equalTo(List.of("the third book")));
+        assertThat(
+            hit2.field("author").getValues(),
+            equalTo(List.of(Map.of("first_name", List.of("Mike"), "last_name", List.of("Boston"))))
+        );
+    }
+
+    public void testLookupMultipleIndices() throws IOException {
+        SearchResponse searchResponse = client().prepareSearch("books")
+            .setRuntimeMappings(parseMapping("""
+                {
+                    "publisher": {
+                        "type": "lookup",
+                        "target_index": "publishers",
+                        "input_field": "publisher_id",
+                        "target_field": "_id",
+                        "fetch_fields": ["name", "city"]
+                    }
+                }
+                """))
+            .setFetchSource(false)
+            .addFetchField("title")
+            .addFetchField("author")
+            .addFetchField("publisher")
+            .addSort("published_date", SortOrder.DESC)
+            .setSize(2)
+            .get();
+        SearchHit hit0 = searchResponse.getHits().getHits()[0];
+        assertThat(hit0.field("title").getValues(), equalTo(List.of("the fifth book")));
+        assertThat(
+            hit0.field("author").getValues(),
+            equalTo(List.of(Map.of("first_name", List.of("Mike"), "last_name", List.of("Boston"))))
+        );
+        assertThat(
+            hit0.field("publisher").getValues(),
+            equalTo(List.of(Map.of("name", List.of("The second publisher"), "city", List.of("Toronto"))))
+        );
+
+        SearchHit hit1 = searchResponse.getHits().getHits()[1];
+        assertThat(hit1.field("title").getValues(), equalTo(List.of("the forth book")));
+        assertThat(
+            hit1.field("author").getValues(),
+            equalTo(
+                List.of(
+                    Map.of("first_name", List.of("Mike"), "last_name", List.of("Boston")),
+                    Map.of("first_name", List.of("Jack"), "last_name", List.of("Austin"))
+                )
+            )
+        );
+        assertThat(
+            hit1.field("publisher").getValues(),
+            equalTo(List.of(Map.of("name", List.of("The first publisher"), "city", List.of("Montreal", "Vancouver"))))
+        );
+    }
+
+    public void testFetchField() throws Exception {
+        SearchResponse searchResponse = client().prepareSearch("books").setRuntimeMappings(parseMapping("""
+            {
+                "author": {
+                    "type": "lookup",
+                    "target_index": "authors",
+                    "input_field": "author_id",
+                    "target_field": "author",
+                    "fetch_fields": ["first_name", {"field": "joined", "format": "MM/yyyy"}]
+                }
+            }
+            """)).addFetchField("author").addFetchField("title").addSort("published_date", SortOrder.ASC).setSize(1).get();
+        ElasticsearchAssertions.assertNoFailures(searchResponse);
+        SearchHit hit0 = searchResponse.getHits().getHits()[0];
+        // "author", "john", "first_name", "John", "last_name", "New York", "joined", "2020-03-01"
+        assertThat(hit0.field("title").getValues(), equalTo(List.of("the first book")));
+        assertThat(hit0.field("author").getValues(), equalTo(List.of(Map.of("first_name", List.of("John"), "joined", List.of("03/2020")))));
+    }
+
+    private Map<String, Object> parseMapping(String mapping) throws IOException {
+        try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) {
+            return parser.map();
+        }
+    }
+}

+ 141 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java

@@ -11,10 +11,12 @@ package org.elasticsearch.search.ccs;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -24,13 +26,16 @@ import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.LegacyReaderContext;
 import org.elasticsearch.search.internal.ReaderContext;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
@@ -38,11 +43,14 @@ import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.NodeRoles;
 import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -50,8 +58,11 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 
 public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
 
@@ -241,6 +252,136 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
         });
     }
 
+    /**
+     * Makes sure that lookup fields are resolved using the lookup index on each cluster.
+     */
+    public void testLookupFields() throws Exception {
+        cluster("cluster_a").client()
+            .admin()
+            .indices()
+            .prepareCreate("users")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .get();
+        cluster("cluster_a").client()
+            .prepareBulk("users")
+            .add(new IndexRequest().id("a").source("name", "Remote A"))
+            .add(new IndexRequest().id("b").source("name", "Remote B"))
+            .add(new IndexRequest().id("c").source("name", "Remote C"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+
+        client().admin()
+            .indices()
+            .prepareCreate("users")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .get();
+        client().prepareBulk("users")
+            .add(new IndexRequest().id("a").source("name", "Local A"))
+            .add(new IndexRequest().id("b").source("name", "Local B"))
+            .add(new IndexRequest().id("c").source("name", "Local C"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+
+        // Setup calls on the local cluster
+        client().admin()
+            .indices()
+            .prepareCreate("local_calls")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .setMapping("from_user", "type=keyword", "to_user", "type=keyword")
+            .get();
+        client().prepareBulk("local_calls")
+            .add(new IndexRequest().source("from_user", "a", "to_user", List.of("b", "c"), "duration", 95))
+            .add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 25))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+
+        // Setup calls on the remote cluster
+        cluster("cluster_a").client()
+            .admin()
+            .indices()
+            .prepareCreate("remote_calls")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
+            .setMapping("from_user", "type=keyword", "to_user", "type=keyword")
+            .get();
+        cluster("cluster_a").client()
+            .prepareBulk("remote_calls")
+            .add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 45))
+            .add(new IndexRequest().source("from_user", "unknown_caller", "to_user", "c", "duration", 50))
+            .add(new IndexRequest().source("from_user", List.of("a", "b"), "to_user", "c", "duration", 60))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+
+        final String runtimeMappingSource = """
+            {
+                "from": {
+                    "type": "lookup",
+                    "target_index": "users",
+                    "input_field": "from_user",
+                    "target_field": "_id",
+                    "fetch_fields": ["name"]
+                },
+                "to": {
+                    "type": "lookup",
+                    "target_index": "users",
+                    "input_field": "to_user",
+                    "target_field": "_id",
+                    "fetch_fields": ["name"]
+                }
+            }
+            """;
+        final Map<String, Object> runtimeMappings;
+        try (XContentParser parser = createParser(JsonXContent.jsonXContent, runtimeMappingSource)) {
+            runtimeMappings = parser.map();
+        }
+        // Search on the remote cluster only
+        {
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
+                .runtimeMappings(runtimeMappings)
+                .sort(new FieldSortBuilder("duration"))
+                .fetchField("from")
+                .fetchField("to");
+            SearchRequest request = new SearchRequest("cluster_a:remote_calls").source(searchSourceBuilder);
+            request.setCcsMinimizeRoundtrips(randomBoolean());
+            SearchResponse searchResponse = client().search(request).actionGet();
+            ElasticsearchAssertions.assertHitCount(searchResponse, 2);
+            SearchHit hit0 = searchResponse.getHits().getHits()[0];
+            assertThat(hit0.getIndex(), equalTo("remote_calls"));
+            assertThat(hit0.field("from"), nullValue());
+            assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
+
+            SearchHit hit1 = searchResponse.getHits().getHits()[1];
+            assertThat(hit1.getIndex(), equalTo("remote_calls"));
+            assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
+            assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
+        }
+        // Search on both clusters
+        {
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
+                .runtimeMappings(runtimeMappings)
+                .sort(new FieldSortBuilder("duration"))
+                .fetchField("from")
+                .fetchField("to");
+            SearchRequest request = new SearchRequest("local_calls", "cluster_a:remote_calls").source(searchSourceBuilder);
+            request.setCcsMinimizeRoundtrips(randomBoolean());
+            SearchResponse searchResponse = client().search(request).actionGet();
+            ElasticsearchAssertions.assertHitCount(searchResponse, 3);
+            SearchHit hit0 = searchResponse.getHits().getHits()[0];
+            assertThat(hit0.getIndex(), equalTo("remote_calls"));
+            assertThat(hit0.field("from"), nullValue());
+            assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
+
+            SearchHit hit1 = searchResponse.getHits().getHits()[1];
+            assertThat(hit1.getIndex(), equalTo("remote_calls"));
+            assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
+            assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
+
+            SearchHit hit2 = searchResponse.getHits().getHits()[2];
+            assertThat(hit2.getIndex(), equalTo("local_calls"));
+            assertThat(hit2.field("from").getValues(), contains(Map.of("name", List.of("Local A"))));
+            assertThat(hit2.field("to").getValues(), contains(Map.of("name", List.of("Local B")), Map.of("name", List.of("Local C"))));
+        }
+    }
+
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         if (clusterAlias.equals(LOCAL_CLUSTER)) {

+ 10 - 7
server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

@@ -10,20 +10,19 @@ package org.elasticsearch.action.search;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.util.Maps;
-import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.collapse.CollapseBuilder;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Supplier;
 
 /**
  * This search phase is an optional phase that will be executed once all hits are fetched from the shards that executes
@@ -33,13 +32,13 @@ import java.util.List;
 final class ExpandSearchPhase extends SearchPhase {
     private final SearchPhaseContext context;
     private final InternalSearchResponse searchResponse;
-    private final AtomicArray<SearchPhaseResult> queryResults;
+    private final Supplier<SearchPhase> nextPhase;
 
-    ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse, AtomicArray<SearchPhaseResult> queryResults) {
+    ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse, Supplier<SearchPhase> nextPhase) {
         super("expand");
         this.context = context;
         this.searchResponse = searchResponse;
-        this.queryResults = queryResults;
+        this.nextPhase = nextPhase;
     }
 
     /**
@@ -100,10 +99,10 @@ final class ExpandSearchPhase extends SearchPhase {
                         hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
                     }
                 }
-                context.sendSearchResponse(searchResponse, queryResults);
+                onPhaseDone();
             }, context::onFailure));
         } else {
-            context.sendSearchResponse(searchResponse, queryResults);
+            onPhaseDone();
         }
     }
 
@@ -147,4 +146,8 @@ final class ExpandSearchPhase extends SearchPhase {
         }
         return groupSource;
     }
+
+    private void onPhaseDone() {
+        context.executeNextPhase(this, nextPhase.get());
+    }
 }

+ 135 - 0
server/src/main/java/org/elasticsearch/action/search/FetchLookupFieldsPhase.java

@@ -0,0 +1,135 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.search;
+
+import org.apache.logging.log4j.util.Strings;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchPhaseResult;
+import org.elasticsearch.search.fetch.subphase.LookupField;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.transport.RemoteClusterService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Asynchronously resolves {@link LookupField} that are specified {@link DocumentField#getLookupFields()}
+ *
+ * @see org.elasticsearch.index.mapper.LookupRuntimeFieldType
+ */
+final class FetchLookupFieldsPhase extends SearchPhase {
+    private final SearchPhaseContext context;
+    private final InternalSearchResponse searchResponse;
+    private final AtomicArray<SearchPhaseResult> queryResults;
+
+    FetchLookupFieldsPhase(SearchPhaseContext context, InternalSearchResponse searchResponse, AtomicArray<SearchPhaseResult> queryResults) {
+        super("fetch_lookup_fields");
+        this.context = context;
+        this.searchResponse = searchResponse;
+        this.queryResults = queryResults;
+    }
+
+    private record Cluster(String clusterAlias, List<SearchHit> hitsWithLookupFields, List<LookupField> lookupFields) {
+
+    }
+
+    private static List<Cluster> groupLookupFieldsByClusterAlias(InternalSearchResponse response) {
+        final Map<String, List<SearchHit>> perClusters = new HashMap<>();
+        for (SearchHit hit : response.hits.getHits()) {
+            String clusterAlias = hit.getClusterAlias() != null ? hit.getClusterAlias() : RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY;
+            if (hit.hasLookupFields()) {
+                perClusters.computeIfAbsent(clusterAlias, k -> new ArrayList<>()).add(hit);
+            }
+        }
+        final List<Cluster> clusters = new ArrayList<>(perClusters.size());
+        for (Map.Entry<String, List<SearchHit>> e : perClusters.entrySet()) {
+            final List<LookupField> lookupFields = e.getValue()
+                .stream()
+                .flatMap(h -> h.getDocumentFields().values().stream())
+                .flatMap(doc -> doc.getLookupFields().stream())
+                .distinct()
+                .toList();
+            clusters.add(new Cluster(e.getKey(), e.getValue(), lookupFields));
+        }
+        return clusters;
+    }
+
+    @Override
+    public void run() {
+        final List<Cluster> clusters = groupLookupFieldsByClusterAlias(searchResponse);
+        if (clusters.isEmpty()) {
+            context.sendSearchResponse(searchResponse, queryResults);
+            return;
+        }
+        final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
+        for (Cluster cluster : clusters) {
+            // Do not prepend the clusterAlias to the targetIndex if the search request is already on the remote cluster.
+            final String clusterAlias = context.getRequest().getLocalClusterAlias() == null ? cluster.clusterAlias : null;
+            assert Strings.isEmpty(clusterAlias) || TransportSearchAction.shouldMinimizeRoundtrips(context.getRequest()) == false
+                : "lookup across clusters only if [ccs_minimize_roundtrips] is disabled";
+            for (LookupField lookupField : cluster.lookupFields) {
+                final SearchRequest searchRequest = lookupField.toSearchRequest(clusterAlias);
+                searchRequest.setCcsMinimizeRoundtrips(false);
+                multiSearchRequest.add(searchRequest);
+            }
+        }
+        context.getSearchTransport().sendExecuteMultiSearch(multiSearchRequest, context.getTask(), new ActionListener<>() {
+            @Override
+            public void onResponse(MultiSearchResponse items) {
+                Exception failure = null;
+                int index = 0;
+                for (Cluster cluster : clusters) {
+                    final Map<LookupField, List<Object>> lookupResults = Maps.newMapWithExpectedSize(cluster.lookupFields.size());
+                    for (LookupField lookupField : cluster.lookupFields) {
+                        final MultiSearchResponse.Item item = items.getResponses()[index];
+                        if (item.isFailure()) {
+                            failure = ExceptionsHelper.useOrSuppress(failure, item.getFailure());
+                        } else if (failure == null) {
+                            final List<Object> fetchedValues = new ArrayList<>();
+                            for (SearchHit rightHit : item.getResponse().getHits()) {
+                                final Map<String, List<Object>> fetchedFields = rightHit.getDocumentFields()
+                                    .values()
+                                    .stream()
+                                    .collect(Collectors.toMap(DocumentField::getName, DocumentField::getValues));
+                                if (fetchedFields.isEmpty() == false) {
+                                    fetchedValues.add(fetchedFields);
+                                }
+                            }
+                            lookupResults.put(lookupField, fetchedValues);
+                        }
+                        index++;
+                    }
+                    if (failure == null) {
+                        for (SearchHit hit : cluster.hitsWithLookupFields) {
+                            hit.resolveLookupFields(lookupResults);
+                        }
+                    }
+                }
+                if (failure != null) {
+                    context.onPhaseFailure(FetchLookupFieldsPhase.this, "failed to fetch lookup fields", failure);
+                } else {
+                    context.sendSearchResponse(searchResponse, queryResults);
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                context.onPhaseFailure(FetchLookupFieldsPhase.this, "failed to fetch lookup fields", e);
+            }
+        });
+    }
+}

+ 5 - 1
server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

@@ -56,7 +56,11 @@ final class FetchSearchPhase extends SearchPhase {
             searchPhaseController,
             aggregatedDfs,
             context,
-            (response, queryPhaseResults) -> new ExpandSearchPhase(context, response, queryPhaseResults)
+            (response, queryPhaseResults) -> new ExpandSearchPhase(
+                context,
+                response,
+                () -> new FetchLookupFieldsPhase(context, response, queryPhaseResults)
+            )
         );
     }
 

+ 40 - 4
server/src/main/java/org/elasticsearch/common/document/DocumentField.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.fetch.subphase.LookupField;
 import org.elasticsearch.xcontent.ToXContentFragment;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
@@ -38,7 +39,8 @@ public class DocumentField implements Writeable, Iterable<Object> {
 
     private final String name;
     private final List<Object> values;
-    private List<Object> ignoredValues;
+    private final List<Object> ignoredValues;
+    private final List<LookupField> lookupFields;
 
     public DocumentField(StreamInput in) throws IOException {
         name = in.readString();
@@ -48,6 +50,11 @@ public class DocumentField implements Writeable, Iterable<Object> {
         } else {
             ignoredValues = Collections.emptyList();
         }
+        if (in.getVersion().onOrAfter(Version.V_8_2_0)) {
+            lookupFields = in.readList(LookupField::new);
+        } else {
+            lookupFields = List.of();
+        }
     }
 
     public DocumentField(String name, List<Object> values) {
@@ -55,9 +62,16 @@ public class DocumentField implements Writeable, Iterable<Object> {
     }
 
     public DocumentField(String name, List<Object> values, List<Object> ignoredValues) {
+        this(name, values, ignoredValues, Collections.emptyList());
+    }
+
+    public DocumentField(String name, List<Object> values, List<Object> ignoredValues, List<LookupField> lookupFields) {
         this.name = Objects.requireNonNull(name, "name must not be null");
         this.values = Objects.requireNonNull(values, "values must not be null");
         this.ignoredValues = Objects.requireNonNull(ignoredValues, "ignoredValues must not be null");
+        this.lookupFields = Objects.requireNonNull(lookupFields, "lookupFields must not be null");
+        assert lookupFields.isEmpty() || (values.isEmpty() && ignoredValues.isEmpty())
+            : "DocumentField can't have both lookup fields and values";
     }
 
     /**
@@ -104,7 +118,18 @@ public class DocumentField implements Writeable, Iterable<Object> {
         if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
             out.writeCollection(ignoredValues, StreamOutput::writeGenericValue);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_2_0)) {
+            out.writeList(lookupFields);
+        } else {
+            if (lookupFields.isEmpty() == false) {
+                assert false : "Lookup fields require all nodes be on 8.2 or later";
+                throw new IllegalStateException("Lookup fields require all nodes be on 8.2 or later");
+            }
+        }
+    }
 
+    public List<LookupField> getLookupFields() {
+        return lookupFields;
     }
 
     public ToXContentFragment getValidValuesWriter() {
@@ -163,17 +188,28 @@ public class DocumentField implements Writeable, Iterable<Object> {
         DocumentField objects = (DocumentField) o;
         return Objects.equals(name, objects.name)
             && Objects.equals(values, objects.values)
-            && Objects.equals(ignoredValues, objects.ignoredValues);
+            && Objects.equals(ignoredValues, objects.ignoredValues)
+            && Objects.equals(lookupFields, objects.lookupFields);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, values, ignoredValues);
+        return Objects.hash(name, values, ignoredValues, lookupFields);
     }
 
     @Override
     public String toString() {
-        return "DocumentField{" + "name='" + name + '\'' + ", values=" + values + ", ignoredValues=" + ignoredValues + '}';
+        return "DocumentField{"
+            + "name='"
+            + name
+            + '\''
+            + ", values="
+            + values
+            + ", ignoredValues="
+            + ignoredValues
+            + ", lookupFields="
+            + lookupFields
+            + '}';
     }
 
 }

+ 248 - 0
server/src/main/java/org/elasticsearch/index/mapper/LookupRuntimeFieldType.java

@@ -0,0 +1,248 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.mapper;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Query;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.index.query.QueryShardException;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.script.CompositeFieldScript;
+import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
+import org.elasticsearch.search.fetch.subphase.LookupField;
+import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.search.lookup.SourceLookup;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.elasticsearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;
+
+/**
+ * A runtime field that retrieves fields from related indices.
+ * <pre>
+ * {
+ *     "type": "lookup",
+ *     "target_index": "an_external_index",
+ *     "input_field": "ip_address",
+ *     "target_field": "host_ip",
+ *     "fetch_fields": [
+ *         "field-1",
+ *         "field-2"
+ *     ]
+ * }
+ * </pre>
+ */
+public final class LookupRuntimeFieldType extends MappedFieldType {
+
+    public static final RuntimeField.Parser PARSER = new RuntimeField.Parser(Builder::new);
+    public static final String CONTENT_TYPE = "lookup";
+
+    private static class Builder extends RuntimeField.Builder {
+        private final FieldMapper.Parameter<String> targetIndex = FieldMapper.Parameter.stringParam(
+            "target_index",
+            false,
+            RuntimeField.initializerNotSupported(),
+            null
+        ).addValidator(v -> {
+            if (Strings.isEmpty(v)) {
+                throw new IllegalArgumentException("[target_index] parameter must be specified");
+            }
+        });
+
+        private final FieldMapper.Parameter<String> inputField = FieldMapper.Parameter.stringParam(
+            "input_field",
+            false,
+            RuntimeField.initializerNotSupported(),
+            null
+        ).addValidator(inputField -> {
+            if (Strings.isEmpty(inputField)) {
+                throw new IllegalArgumentException("[input_field] parameter must be specified");
+            }
+            if (inputField.equals(name)) {
+                throw new IllegalArgumentException("lookup field [" + name + "] can't use input from itself");
+            }
+        });
+
+        private final FieldMapper.Parameter<String> targetField = FieldMapper.Parameter.stringParam(
+            "target_field",
+            false,
+            RuntimeField.initializerNotSupported(),
+            null
+        ).addValidator(targetField -> {
+            if (Strings.isEmpty(targetField)) {
+                throw new IllegalArgumentException("[target_field] parameter must be specified");
+            }
+        });
+
+        private static FieldMapper.Parameter<List<FieldAndFormat>> newFetchFields() {
+            final FieldMapper.Parameter<List<FieldAndFormat>> fetchFields = new FieldMapper.Parameter<>(
+                "fetch_fields",
+                false,
+                List::of,
+                (s, ctx, o) -> parseFetchFields(o),
+                RuntimeField.initializerNotSupported(),
+                XContentBuilder::field,
+                Object::toString
+            );
+            fetchFields.addValidator(fields -> {
+                if (fields.isEmpty()) {
+                    throw new MapperParsingException("[fetch_fields] parameter must not be empty");
+                }
+            });
+            return fetchFields;
+        }
+
+        @SuppressWarnings("rawtypes")
+        private static List<FieldAndFormat> parseFetchFields(Object o) {
+            final List<?> values = (List<?>) o;
+            return values.stream().map(v -> {
+                if (v instanceof Map m) {
+                    final String field = (String) m.get(FieldAndFormat.FIELD_FIELD.getPreferredName());
+                    final String format = (String) m.get(FieldAndFormat.FORMAT_FIELD.getPreferredName());
+                    if (field == null) {
+                        throw new MapperParsingException("[field] parameter of [fetch_fields] must be provided");
+                    }
+                    return new FieldAndFormat(field, format);
+                } else if (v instanceof String s) {
+                    return new FieldAndFormat(s, null);
+                } else {
+                    throw new MapperParsingException("unexpected value [" + v + "] for [fetch_fields] parameter");
+                }
+            }).toList();
+        }
+
+        private final FieldMapper.Parameter<List<FieldAndFormat>> fetchFields = newFetchFields();
+
+        Builder(String name) {
+            super(name);
+
+        }
+
+        @Override
+        protected List<FieldMapper.Parameter<?>> getParameters() {
+            final List<FieldMapper.Parameter<?>> parameters = new ArrayList<>(super.getParameters());
+            parameters.add(targetIndex);
+            parameters.add(inputField);
+            parameters.add(targetField);
+            parameters.add(fetchFields);
+            return parameters;
+        }
+
+        @Override
+        protected RuntimeField createRuntimeField(MappingParserContext parserContext) {
+            final LookupRuntimeFieldType ft = new LookupRuntimeFieldType(
+                name,
+                meta(),
+                targetIndex.get(),
+                inputField.get(),
+                targetField.get(),
+                fetchFields.get()
+            );
+            return new LeafRuntimeField(name, ft, getParameters());
+        }
+
+        @Override
+        protected RuntimeField createChildRuntimeField(
+            MappingParserContext parserContext,
+            String parentName,
+            Function<SearchLookup, CompositeFieldScript.LeafFactory> parentScriptFactory
+        ) {
+            return createRuntimeField(parserContext);
+        }
+    }
+
+    private final String lookupIndex;
+    private final String inputField;
+    private final String targetField;
+    private final List<FieldAndFormat> fetchFields;
+
+    private LookupRuntimeFieldType(
+        String name,
+        Map<String, String> meta,
+        String lookupIndex,
+        String inputField,
+        String targetField,
+        List<FieldAndFormat> fetchFields
+    ) {
+        super(name, false, false, false, TextSearchInfo.NONE, meta);
+        this.lookupIndex = lookupIndex;
+        this.inputField = inputField;
+        this.targetField = targetField;
+        this.fetchFields = fetchFields;
+    }
+
+    @Override
+    public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
+        if (context.allowExpensiveQueries() == false) {
+            throw new ElasticsearchException(
+                "cannot be executed against lookup field ["
+                    + name()
+                    + "] while ["
+                    + ALLOW_EXPENSIVE_QUERIES.getKey()
+                    + "] is set to [false]."
+            );
+        }
+        return new LookupFieldValueFetcher(context);
+    }
+
+    @Override
+    public String typeName() {
+        return CONTENT_TYPE;
+    }
+
+    @Override
+    public Query termQuery(Object value, SearchExecutionContext context) {
+        throw new IllegalArgumentException("Cannot search on field [" + name() + "] since it is a lookup field.");
+    }
+
+    private class LookupFieldValueFetcher implements ValueFetcher {
+        private final ValueFetcher inputFieldValueFetcher;
+
+        LookupFieldValueFetcher(SearchExecutionContext context) {
+            final MappedFieldType inputFieldType = context.getFieldType(inputField);
+            // do not allow unmapped field
+            if (inputFieldType == null) {
+                throw new QueryShardException(context, "No field mapping can be found for the field with name [{}]", inputField);
+            }
+            this.inputFieldValueFetcher = inputFieldType.valueFetcher(context, null);
+        }
+
+        @Override
+        public List<Object> fetchValues(SourceLookup lookup, List<Object> ignoredValues) throws IOException {
+            assert false : "call #fetchDocumentField() instead";
+            throw new UnsupportedOperationException("call #fetchDocumentField() instead");
+        }
+
+        @Override
+        public DocumentField fetchDocumentField(String docName, SourceLookup lookup) throws IOException {
+            final DocumentField inputDoc = inputFieldValueFetcher.fetchDocumentField(inputField, lookup);
+            if (inputDoc == null || inputDoc.getValues().isEmpty()) {
+                return null;
+            }
+            final List<LookupField> lookupFields = inputDoc.getValues().stream().map(input -> {
+                final TermQueryBuilder query = new TermQueryBuilder(targetField, input.toString());
+                return new LookupField(lookupIndex, query, fetchFields, 1);
+            }).toList();
+            return new DocumentField(docName, List.of(), List.of(), lookupFields);
+        }
+
+        @Override
+        public void setNextReader(LeafReaderContext context) {
+            inputFieldValueFetcher.setNextReader(context);
+        }
+    }
+}

+ 37 - 13
server/src/main/java/org/elasticsearch/index/mapper/ValueFetcher.java

@@ -9,10 +9,13 @@
 package org.elasticsearch.index.mapper;
 
 import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.search.fetch.subphase.FetchFieldsPhase;
 import org.elasticsearch.search.lookup.SourceLookup;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -21,21 +24,42 @@ import java.util.List;
  */
 public interface ValueFetcher {
     /**
-    * Given access to a document's _source, return this field's values.
-    *
-    * In addition to pulling out the values, they will be parsed into a standard form.
-    * For example numeric field mappers make sure to parse the source value into a number
-    * of the right type.
-    *
-    * Note that for array values, the order in which values are returned is undefined and
-    * should not be relied on.
-    *
-    * @param lookup a lookup structure over the document's source.
-    * @param ignoredValues a mutable list to collect any ignored values as they were originally presented in source
-    * @return a list a standardized field values.
-    */
+     * This method is consumed by {@link #fetchDocumentField(String, SourceLookup)}.
+     *
+     * Given access to a document's _source, return this field's values.
+     * <p>
+     * In addition to pulling out the values, they will be parsed into a standard form.
+     * For example numeric field mappers make sure to parse the source value into a number
+     * of the right type.
+     * <p>
+     * Note that for array values, the order in which values are returned is undefined and
+     * should not be relied on.
+     *
+     * @param lookup        a lookup structure over the document's source.
+     * @param ignoredValues a mutable list to collect any ignored values as they were originally presented in source
+     * @return a list a standardized field values.
+     */
     List<Object> fetchValues(SourceLookup lookup, List<Object> ignoredValues) throws IOException;
 
+    /**
+     * Prefer implementing {@link #fetchValues(SourceLookup, List)}, which is simpler, when possible instead of this method.
+     * The default implementation creates a {@link DocumentField} using the values from {@link #fetchValues(SourceLookup, List)}
+     *
+     * @param docName the name of the document field
+     * @param lookup  a lookup structure over the document's source.
+     * @return a document field if this fetcher has values; otherwise returns null
+     */
+    @Nullable
+    default DocumentField fetchDocumentField(String docName, SourceLookup lookup) throws IOException {
+        final List<Object> ignoredValues = new ArrayList<>();
+        final List<Object> values = fetchValues(lookup, ignoredValues);
+        if (values.isEmpty() && ignoredValues.isEmpty()) {
+            return null;
+        } else {
+            return new DocumentField(docName, values, ignoredValues);
+        }
+    }
+
     /**
      * Update the leaf reader used to fetch values.
      */

+ 2 - 0
server/src/main/java/org/elasticsearch/indices/IndicesModule.java

@@ -39,6 +39,7 @@ import org.elasticsearch.index.mapper.IpScriptFieldType;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.KeywordScriptFieldType;
 import org.elasticsearch.index.mapper.LongScriptFieldType;
+import org.elasticsearch.index.mapper.LookupRuntimeFieldType;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.MetadataFieldMapper;
@@ -177,6 +178,7 @@ public class IndicesModule extends AbstractModule {
         runtimeParsers.put(KeywordFieldMapper.CONTENT_TYPE, KeywordScriptFieldType.PARSER);
         runtimeParsers.put(GeoPointFieldMapper.CONTENT_TYPE, GeoPointScriptFieldType.PARSER);
         runtimeParsers.put(CompositeRuntimeField.CONTENT_TYPE, CompositeRuntimeField.PARSER);
+        runtimeParsers.put(LookupRuntimeFieldType.CONTENT_TYPE, LookupRuntimeFieldType.PARSER);
 
         for (MapperPlugin mapperPlugin : mapperPlugins) {
             for (Map.Entry<String, RuntimeField.Parser> entry : mapperPlugin.getRuntimeFields().entrySet()) {

+ 38 - 0
server/src/main/java/org/elasticsearch/search/SearchHit.java

@@ -29,6 +29,7 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.fetch.subphase.LookupField;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
 import org.elasticsearch.search.lookup.SourceLookup;
 import org.elasticsearch.transport.RemoteClusterAware;
@@ -482,6 +483,43 @@ public final class SearchHit implements Writeable, ToXContentObject, Iterable<Do
         }
     }
 
+    /**
+     * Whether this search hit has any lookup fields
+     */
+    public boolean hasLookupFields() {
+        return getDocumentFields().values().stream().anyMatch(doc -> doc.getLookupFields().isEmpty() == false);
+    }
+
+    /**
+     * Resolve the lookup fields with the given results and merge them as regular fetch fields.
+     */
+    public void resolveLookupFields(Map<LookupField, List<Object>> lookupResults) {
+        if (lookupResults.isEmpty()) {
+            return;
+        }
+        final List<String> fields = new ArrayList<>(documentFields.keySet());
+        for (String field : fields) {
+            documentFields.computeIfPresent(field, (k, docField) -> {
+                if (docField.getLookupFields().isEmpty()) {
+                    return docField;
+                }
+                final List<Object> newValues = new ArrayList<>(docField.getValues());
+                for (LookupField lookupField : docField.getLookupFields()) {
+                    final List<Object> resolvedValues = lookupResults.get(lookupField);
+                    if (resolvedValues != null) {
+                        newValues.addAll(resolvedValues);
+                    }
+                }
+                if (newValues.isEmpty() && docField.getIgnoredValues().isEmpty()) {
+                    return null;
+                } else {
+                    return new DocumentField(docField.getName(), newValues, docField.getIgnoredValues());
+                }
+            });
+        }
+        assert hasLookupFields() == false : "Some lookup fields are not resolved";
+    }
+
     /**
      * A map of highlighted fields.
      */

+ 3 - 3
server/src/main/java/org/elasticsearch/search/fetch/subphase/FieldAndFormat.java

@@ -38,9 +38,9 @@ public final class FieldAndFormat implements Writeable, ToXContentObject {
     private static final String USE_DEFAULT_FORMAT = "use_field_mapping";
     private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(FetchDocValuesPhase.class);
 
-    private static final ParseField FIELD_FIELD = new ParseField("field");
-    private static final ParseField FORMAT_FIELD = new ParseField("format");
-    private static final ParseField INCLUDE_UNMAPPED_FIELD = new ParseField("include_unmapped");
+    public static final ParseField FIELD_FIELD = new ParseField("field");
+    public static final ParseField FORMAT_FIELD = new ParseField("format");
+    public static final ParseField INCLUDE_UNMAPPED_FIELD = new ParseField("include_unmapped");
 
     private static final ConstructingObjectParser<FieldAndFormat, Void> PARSER = new ConstructingObjectParser<>(
         "fetch_field_and_format",

+ 3 - 5
server/src/main/java/org/elasticsearch/search/fetch/subphase/FieldFetcher.java

@@ -166,12 +166,10 @@ public class FieldFetcher {
         Map<String, DocumentField> documentFields = new HashMap<>();
         for (FieldContext context : fieldContexts.values()) {
             String field = context.fieldName;
-
             ValueFetcher valueFetcher = context.valueFetcher;
-            List<Object> ignoredValues = new ArrayList<>();
-            List<Object> parsedValues = valueFetcher.fetchValues(sourceLookup, ignoredValues);
-            if (parsedValues.isEmpty() == false || ignoredValues.isEmpty() == false) {
-                documentFields.put(field, new DocumentField(field, parsedValues, ignoredValues));
+            final DocumentField docField = valueFetcher.fetchDocumentField(field, sourceLookup);
+            if (docField != null) {
+                documentFields.put(field, docField);
             }
         }
         collectUnmapped(documentFields, sourceLookup, "", 0);

+ 47 - 0
server/src/main/java/org/elasticsearch/search/fetch/subphase/LookupField.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.fetch.subphase;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.transport.RemoteClusterAware;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link LookupField} is an **unresolved** fetch field whose values will be resolved later
+ * in the fetch phase on the coordinating node.
+ *
+ * @see org.elasticsearch.index.mapper.LookupRuntimeFieldType
+ */
+public record LookupField(String targetIndex, QueryBuilder query, List<FieldAndFormat> fetchFields, int size) implements Writeable {
+
+    public LookupField(StreamInput in) throws IOException {
+        this(in.readString(), in.readNamedWriteable(QueryBuilder.class), in.readList(FieldAndFormat::new), in.readVInt());
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(targetIndex);
+        out.writeNamedWriteable(query);
+        out.writeCollection(fetchFields);
+        out.writeVInt(size);
+    }
+
+    public SearchRequest toSearchRequest(String clusterAlias) {
+        final SearchSourceBuilder source = new SearchSourceBuilder().query(query).trackScores(false).size(size).fetchSource(false);
+        fetchFields.forEach(source::fetchField);
+        return new SearchRequest().source(source).indices(RemoteClusterAware.buildRemoteIndexName(clusterAlias, targetIndex));
+    }
+}

+ 30 - 5
server/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java

@@ -125,7 +125,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
                 1.0F
             );
             InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
-            ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null);
+            ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, () -> new SearchPhase("test") {
+                @Override
+                public void run() {
+                    mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
+                }
+            });
 
             phase.run();
             mockSearchPhaseContext.assertNoFailure();
@@ -203,7 +208,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
             1.0F
         );
         InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
-        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null);
+        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, () -> new SearchPhase("test") {
+            @Override
+            public void run() {
+                mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
+            }
+        });
         phase.run();
         assertThat(mockSearchPhaseContext.phaseFailure.get(), Matchers.instanceOf(RuntimeException.class));
         assertEquals("boom", mockSearchPhaseContext.phaseFailure.get().getMessage());
@@ -238,7 +248,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
             1.0F
         );
         InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
-        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null);
+        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, () -> new SearchPhase("test") {
+            @Override
+            public void run() {
+                mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
+            }
+        });
         phase.run();
         mockSearchPhaseContext.assertNoFailure();
         assertNotNull(mockSearchPhaseContext.searchResponse.get());
@@ -261,7 +276,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
 
         SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
         InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
-        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null);
+        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, () -> new SearchPhase("test") {
+            @Override
+            public void run() {
+                mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
+            }
+        });
         phase.run();
         mockSearchPhaseContext.assertNoFailure();
         assertNotNull(mockSearchPhaseContext.searchResponse.get());
@@ -299,7 +319,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
 
         SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
         InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
-        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null);
+        ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, () -> new SearchPhase("test") {
+            @Override
+            public void run() throws IOException {
+                mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
+            }
+        });
         phase.run();
         mockSearchPhaseContext.assertNoFailure();
         assertNotNull(mockSearchPhaseContext.searchResponse.get());

+ 209 - 0
server/src/test/java/org/elasticsearch/action/search/FetchLookupFieldsPhaseTests.java

@@ -0,0 +1,209 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.search;
+
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHitTests;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
+import org.elasticsearch.search.fetch.subphase.LookupField;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class FetchLookupFieldsPhaseTests extends ESTestCase {
+
+    public void testNoLookupField() {
+        MockSearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1);
+        searchPhaseContext.searchTransport = new SearchTransportService(null, null, null) {
+            @Override
+            void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
+                throw new AssertionError("No lookup field");
+            }
+        };
+        int numHits = randomIntBetween(0, 10);
+        SearchHit[] searchHits = new SearchHit[randomIntBetween(0, 10)];
+        for (int i = 0; i < searchHits.length; i++) {
+            searchHits[i] = SearchHitTests.createTestItem(randomBoolean(), randomBoolean());
+        }
+        SearchHits hits = new SearchHits(searchHits, new TotalHits(numHits, TotalHits.Relation.EQUAL_TO), 1.0f);
+        InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
+        FetchLookupFieldsPhase phase = new FetchLookupFieldsPhase(searchPhaseContext, searchResponse, null);
+        phase.run();
+        searchPhaseContext.assertNoFailure();
+        assertNotNull(searchPhaseContext.searchResponse.get());
+    }
+
+    public void testBasic() {
+        MockSearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1);
+        final AtomicBoolean requestSent = new AtomicBoolean();
+        searchPhaseContext.searchTransport = new SearchTransportService(null, null, null) {
+            @Override
+            void sendExecuteMultiSearch(
+                MultiSearchRequest multiSearchRequest,
+                SearchTask task,
+                ActionListener<MultiSearchResponse> listener
+            ) {
+                assertTrue(requestSent.compareAndSet(false, true));
+                // send 4 requests for term_1, term_2, term_3, and unknown
+                assertThat(multiSearchRequest.requests(), hasSize(4));
+                for (SearchRequest r : multiSearchRequest.requests()) {
+                    assertNotNull(r.source());
+                    assertThat(r.source().query(), instanceOf(TermQueryBuilder.class));
+                    assertThat(r.source().size(), equalTo(1));
+                }
+                final List<String> queryTerms = multiSearchRequest.requests().stream().map(r -> {
+                    final TermQueryBuilder query = (TermQueryBuilder) r.source().query();
+                    return query.value().toString();
+                }).sorted().toList();
+                assertThat(queryTerms, equalTo(List.of("term_1", "term_2", "term_3", "xyz")));
+                final MultiSearchResponse.Item[] responses = new MultiSearchResponse.Item[multiSearchRequest.requests().size()];
+                for (int i = 0; i < responses.length; i++) {
+                    final SearchRequest r = multiSearchRequest.requests().get(i);
+                    final TermQueryBuilder query = (TermQueryBuilder) r.source().query();
+                    final Map<String, List<Object>> fields = switch (query.value().toString()) {
+                        case "term_1" -> Map.of("field_a", List.of("a1", "a2"), "field_b", List.of("b2"));
+                        case "term_2" -> Map.of("field_a", List.of("a2", "a3"), "field_b", List.of("b1"));
+                        case "term_3" -> Map.of("field_a", List.of("a2"), "field_b", List.of("b1", "b2"));
+                        case "xyz" -> null;
+                        default -> throw new AssertionError("unknown term value");
+                    };
+                    final SearchHits searchHits;
+                    if (fields != null) {
+                        final SearchHit hit = new SearchHit(randomInt(1000));
+                        fields.forEach((f, values) -> hit.setDocumentField(f, new DocumentField(f, values, List.of())));
+                        searchHits = new SearchHits(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
+                    } else {
+                        searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 1.0f);
+                    }
+                    InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
+                        searchHits,
+                        null,
+                        null,
+                        null,
+                        false,
+                        null,
+                        1
+                    );
+                    responses[i] = new MultiSearchResponse.Item(
+                        new SearchResponse(
+                            internalSearchResponse,
+                            null,
+                            1,
+                            1,
+                            0,
+                            randomNonNegativeLong(),
+                            ShardSearchFailure.EMPTY_ARRAY,
+                            SearchResponseTests.randomClusters(),
+                            null
+                        ),
+                        null
+                    );
+                }
+                listener.onResponse(new MultiSearchResponse(responses, randomNonNegativeLong()));
+            }
+        };
+
+        SearchHit leftHit0 = new SearchHit(randomInt(100));
+        final List<FieldAndFormat> fetchFields = List.of(new FieldAndFormat(randomAlphaOfLength(10), null));
+        {
+            leftHit0.setDocumentField(
+                "lookup_field_1",
+                new DocumentField(
+                    "lookup_field_1",
+                    List.of(),
+                    List.of(),
+                    List.of(
+                        new LookupField("test_index", new TermQueryBuilder("test_field", "term_1"), fetchFields, 1),
+                        new LookupField("test_index", new TermQueryBuilder("test_field", "term_2"), fetchFields, 1)
+                    )
+                )
+            );
+            leftHit0.setDocumentField(
+                "lookup_field_2",
+                new DocumentField(
+                    "lookup_field_2",
+                    List.of(),
+                    List.of(),
+                    List.of(new LookupField("test_index", new TermQueryBuilder("test_field", "term_2"), fetchFields, 1))
+                )
+            );
+        }
+
+        SearchHit leftHit1 = new SearchHit(randomInt(100));
+        {
+            leftHit1.setDocumentField(
+                "lookup_field_2",
+                new DocumentField(
+                    "lookup_field_2",
+                    List.of(),
+                    List.of(),
+                    List.of(
+                        new LookupField("test_index", new TermQueryBuilder("test_field", "term_2"), fetchFields, 1),
+                        new LookupField("test_index", new TermQueryBuilder("test_field", "xyz"), fetchFields, 1)
+                    )
+                )
+            );
+            leftHit1.setDocumentField(
+                "lookup_field_3",
+                new DocumentField(
+                    "lookup_field_3",
+                    List.of(),
+                    List.of(),
+                    List.of(new LookupField("test_index", new TermQueryBuilder("test_field", "term_3"), fetchFields, 1))
+                )
+            );
+        }
+        SearchHits searchHits = new SearchHits(new SearchHit[] { leftHit0, leftHit1 }, new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1.0f);
+        InternalSearchResponse searchResponse = new InternalSearchResponse(searchHits, null, null, null, false, null, 1);
+        FetchLookupFieldsPhase phase = new FetchLookupFieldsPhase(searchPhaseContext, searchResponse, null);
+        phase.run();
+        assertTrue(requestSent.get());
+        searchPhaseContext.assertNoFailure();
+        assertNotNull(searchPhaseContext.searchResponse.get());
+        assertSame(searchPhaseContext.searchResponse.get().getHits().getHits()[0], leftHit0);
+        assertSame(searchPhaseContext.searchResponse.get().getHits().getHits()[1], leftHit1);
+        assertFalse(leftHit0.hasLookupFields());
+        assertThat(
+            leftHit0.field("lookup_field_1").getValues(),
+            containsInAnyOrder(
+                Map.of("field_a", List.of("a1", "a2"), "field_b", List.of("b2")),
+                Map.of("field_a", List.of("a2", "a3"), "field_b", List.of("b1"))
+            )
+        );
+        assertThat(
+            leftHit0.field("lookup_field_2").getValues(),
+            contains(Map.of("field_a", List.of("a2", "a3"), "field_b", List.of("b1")))
+        );
+
+        assertFalse(leftHit1.hasLookupFields());
+        assertThat(
+            leftHit1.field("lookup_field_2").getValues(),
+            contains(Map.of("field_a", List.of("a2", "a3"), "field_b", List.of("b1")))
+        );
+        assertThat(
+            leftHit1.field("lookup_field_3").getValues(),
+            contains(Map.of("field_a", List.of("a2"), "field_b", List.of("b1", "b2")))
+        );
+    }
+
+}

+ 148 - 0
server/src/test/java/org/elasticsearch/index/mapper/LookupRuntimeFieldTypeTests.java

@@ -0,0 +1,148 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.mapper;
+
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.index.query.QueryShardException;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
+import org.elasticsearch.search.fetch.subphase.LookupField;
+import org.elasticsearch.search.lookup.SourceLookup;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+
+public class LookupRuntimeFieldTypeTests extends MapperServiceTestCase {
+
+    public void testFetchValues() throws IOException {
+        String mapping = """
+            {
+              "_doc": {
+                "properties" : {
+                  "foo" : {
+                    "type" : "keyword"
+                  }
+                },
+                "runtime": {
+                    "foo_lookup_field": {
+                        "type": "lookup",
+                        "target_index": "my_index",
+                        "input_field": "foo",
+                        "target_field": "term_field_foo",
+                        "fetch_fields": [
+                            "remote_field_*",
+                            {"field": "created", "format": "YYYY-dd-MM"}
+                        ]
+                    }
+                }
+              }
+            }
+            """;
+        var mapperService = createMapperService(mapping);
+        XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("foo", List.of("f1", "f2")).endObject();
+        SourceLookup sourceLookup = new SourceLookup();
+        sourceLookup.setSource(BytesReference.bytes(source));
+        MappedFieldType fieldType = mapperService.fieldType("foo_lookup_field");
+        ValueFetcher valueFetcher = fieldType.valueFetcher(createSearchExecutionContext(mapperService), null);
+        DocumentField doc = valueFetcher.fetchDocumentField("foo_lookup_field", sourceLookup);
+        assertNotNull(doc);
+        assertThat(doc.getName(), equalTo("foo_lookup_field"));
+        assertThat(doc.getValues(), empty());
+        assertThat(doc.getIgnoredValues(), empty());
+        assertThat(
+            doc.getLookupFields(),
+            contains(
+                new LookupField(
+                    "my_index",
+                    new TermQueryBuilder("term_field_foo", "f1"),
+                    List.of(new FieldAndFormat("remote_field_*", null), new FieldAndFormat("created", "YYYY-dd-MM")),
+                    1
+                ),
+                new LookupField(
+                    "my_index",
+                    new TermQueryBuilder("term_field_foo", "f2"),
+                    List.of(new FieldAndFormat("remote_field_*", null), new FieldAndFormat("created", "YYYY-dd-MM")),
+                    1
+                )
+            )
+        );
+    }
+
+    public void testEmptyInputField() throws IOException {
+        String mapping = """
+            {
+              "_doc": {
+                "properties" : {
+                  "foo" : {
+                    "type" : "keyword"
+                  }
+                },
+                "runtime": {
+                    "foo_lookup_field": {
+                        "type": "lookup",
+                        "target_index": "my_index",
+                        "input_field": "foo",
+                        "target_field": "term_field_foo",
+                        "fetch_fields": ["remote_field_*"]
+                    }
+                }
+              }
+            }
+            """;
+        var mapperService = createMapperService(mapping);
+        XContentBuilder source = XContentFactory.jsonBuilder();
+        source.startObject();
+        if (randomBoolean()) {
+            source.field("foo", List.of());
+        }
+        source.endObject();
+        SourceLookup sourceLookup = new SourceLookup();
+        sourceLookup.setSource(BytesReference.bytes(source));
+        MappedFieldType fieldType = mapperService.fieldType("foo_lookup_field");
+        ValueFetcher valueFetcher = fieldType.valueFetcher(createSearchExecutionContext(mapperService), null);
+        DocumentField doc = valueFetcher.fetchDocumentField("foo_lookup_field", sourceLookup);
+        assertNull(doc);
+    }
+
+    public void testInputFieldDoesNotExist() throws IOException {
+        String mapping = """
+            {
+              "_doc": {
+                "runtime": {
+                    "foo_lookup_field": {
+                        "type": "lookup",
+                        "target_index": "my_index",
+                        "input_field": "barbaz",
+                        "target_field": "term_field_foo",
+                        "fetch_fields": ["field-1", "field-2"]
+                    }
+                }
+              }
+            }
+            """;
+        var mapperService = createMapperService(mapping);
+        MappedFieldType fieldType = mapperService.fieldType("foo_lookup_field");
+        // fails if unmapped_fields is not
+        QueryShardException error = expectThrows(QueryShardException.class, () -> {
+            SearchExecutionContext context = createSearchExecutionContext(mapperService);
+            context.setAllowUnmappedFields(randomBoolean());
+            fieldType.valueFetcher(context, null);
+        });
+        assertThat(error.getMessage(), containsString("No field mapping can be found for the field with name [barbaz]"));
+    }
+}

+ 119 - 0
server/src/test/java/org/elasticsearch/search/fetch/subphase/LookupFieldTests.java

@@ -0,0 +1,119 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.fetch.subphase;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.indices.IndicesModule;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class LookupFieldTests extends AbstractWireSerializingTestCase<LookupField> {
+
+    private NamedWriteableRegistry namedWriteableRegistry;
+    private NamedXContentRegistry xContentRegistry;
+
+    @Before
+    public void setupXContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
+        entries.addAll(IndicesModule.getNamedWriteables());
+        entries.addAll(searchModule.getNamedWriteables());
+        namedWriteableRegistry = new NamedWriteableRegistry(entries);
+        xContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        return xContentRegistry;
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        return namedWriteableRegistry;
+    }
+
+    @Override
+    protected Writeable.Reader<LookupField> instanceReader() {
+        return LookupField::new;
+    }
+
+    public static LookupField randomInstance() {
+        final String lookupIndex = randomAlphaOfLength(10);
+        final QueryBuilder queryBuilder = QueryBuilders.termQuery(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
+        return new LookupField(lookupIndex, queryBuilder, randomFetchFields(), randomIntBetween(1, 10));
+    }
+
+    private static List<FieldAndFormat> randomFetchFields() {
+        final int numFetchFields = randomIntBetween(1, 10);
+        final List<FieldAndFormat> fetchFields = new ArrayList<>();
+        for (int i = 0; i < numFetchFields; i++) {
+            String format = randomBoolean() ? randomAlphaOfLength(5) : null;
+            String field = randomAlphaOfLength(10);
+            fetchFields.add(new FieldAndFormat(field, format));
+        }
+        return fetchFields;
+    }
+
+    @Override
+    protected LookupField mutateInstance(LookupField old) throws IOException {
+        String lookupIndex = old.targetIndex();
+        QueryBuilder query = old.query();
+        List<FieldAndFormat> fetchFields = old.fetchFields();
+        int size = old.size();
+        int iterations = iterations(1, 5);
+        for (int i = 0; i < iterations; i++) {
+            switch (randomIntBetween(0, 3)) {
+                case 0 -> lookupIndex = randomValueOtherThan(old.targetIndex(), () -> randomAlphaOfLength(10));
+                case 1 -> query = randomValueOtherThan(
+                    old.query(),
+                    () -> QueryBuilders.termQuery(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20))
+                );
+                case 2 -> fetchFields = randomValueOtherThan(old.fetchFields(), LookupFieldTests::randomFetchFields);
+                case 3 -> size = randomValueOtherThan(old.size(), () -> randomIntBetween(1, 10));
+                default -> throw new AssertionError();
+            }
+        }
+        return new LookupField(lookupIndex, query, fetchFields, size);
+    }
+
+    @Override
+    protected LookupField createTestInstance() {
+        return randomInstance();
+    }
+
+    public void testToSearchRequest() {
+        LookupField lookupField = createTestInstance();
+        final SearchRequest localRequest = lookupField.toSearchRequest(randomBoolean() ? null : "");
+        assertThat(localRequest.source().query(), equalTo(lookupField.query()));
+        assertThat(localRequest.source().size(), equalTo(lookupField.size()));
+        assertThat(localRequest.indices(), equalTo(new String[] { lookupField.targetIndex() }));
+        assertThat(localRequest.source().fetchFields(), equalTo(lookupField.fetchFields()));
+
+        final String clusterAlias = randomAlphaOfLength(10);
+        final SearchRequest remoteRequest = lookupField.toSearchRequest(clusterAlias);
+        assertThat(remoteRequest.source().query(), equalTo(lookupField.query()));
+        assertThat(remoteRequest.source().size(), equalTo(lookupField.size()));
+        assertThat(remoteRequest.indices(), equalTo(new String[] { clusterAlias + ":" + lookupField.targetIndex() }));
+        assertThat(remoteRequest.source().fetchFields(), equalTo(lookupField.fetchFields()));
+    }
+}

+ 132 - 1
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/FieldLevelSecurityTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.search.MultiSearchResponse;
 import org.elasticsearch.action.search.OpenPointInTimeAction;
 import org.elasticsearch.action.search.OpenPointInTimeRequest;
 import org.elasticsearch.action.search.OpenPointInTimeResponse;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
@@ -45,6 +46,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.builder.PointInTimeBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.InternalSettingsPlugin;
@@ -67,6 +69,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
 import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
@@ -177,7 +180,7 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
                   - names: '*'
                     privileges: [ ALL ]
                     field_security:
-                       grant: [ field2, query* ]
+                       grant: [ field2, query*]
             role4:
               cluster: [ all ]
               indices:
@@ -2065,4 +2068,132 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
         assertHitCount(response, 0);
     }
 
+    public void testLookupRuntimeFields() throws Exception {
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareCreate("hosts")
+                .setMapping("field1", "type=keyword", "field2", "type=text", "field3", "type=text")
+        );
+        client().prepareIndex("hosts")
+            .setId("1")
+            .setSource("field1", "192.168.1.1", "field2", "windows", "field3", "canada")
+            .setRefreshPolicy(IMMEDIATE)
+            .get();
+        client().prepareIndex("hosts")
+            .setId("2")
+            .setSource("field1", "192.168.1.2", "field2", "macos", "field3", "us")
+            .setRefreshPolicy(IMMEDIATE)
+            .get();
+
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareCreate("logs")
+                .setMapping("field1", "type=keyword", "field2", "type=text", "field3", "type=date,format=yyyy-MM-dd")
+        );
+
+        client().prepareIndex("logs")
+            .setId("1")
+            .setSource("field1", "192.168.1.1", "field2", "out of memory", "field3", "2021-01-20")
+            .setRefreshPolicy(IMMEDIATE)
+            .get();
+        client().prepareIndex("logs")
+            .setId("2")
+            .setSource("field1", "192.168.1.2", "field2", "authentication fails", "field3", "2021-01-21")
+            .setRefreshPolicy(IMMEDIATE)
+            .get();
+        Map<String, Object> lookupField = Map.of(
+            "type",
+            "lookup",
+            "target_index",
+            "hosts",
+            "input_field",
+            "field1",
+            "target_field",
+            "field1",
+            "fetch_fields",
+            List.of("field1", "field2", "field3")
+        );
+        SearchRequest request = new SearchRequest("logs").source(
+            new SearchSourceBuilder().fetchSource(false)
+                .fetchField("field1")
+                .fetchField("field2")
+                .fetchField("field3")
+                .fetchField("host")
+                .sort("field1")
+                .runtimeMappings(Map.of("host", lookupField))
+        );
+        SearchResponse response;
+        // user1 has access to field1
+        response = client().filterWithHeader(Map.of(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD)))
+            .search(request)
+            .actionGet();
+        assertHitCount(response, 2);
+        {
+            SearchHit hit0 = response.getHits().getHits()[0];
+            assertThat(hit0.getDocumentFields().keySet(), equalTo(Set.of("field1", "host")));
+            assertThat(hit0.field("field1").getValues(), equalTo(List.of("192.168.1.1")));
+            assertThat(hit0.field("host").getValues(), equalTo(List.of(Map.of("field1", List.of("192.168.1.1")))));
+        }
+        {
+            SearchHit hit1 = response.getHits().getHits()[1];
+            assertThat(hit1.getDocumentFields().keySet(), equalTo(Set.of("field1", "host")));
+            assertThat(hit1.field("field1").getValues(), equalTo(List.of("192.168.1.2")));
+            assertThat(hit1.field("host").getValues(), equalTo(List.of(Map.of("field1", List.of("192.168.1.2")))));
+        }
+        // user3 has access to field1, field2
+        response = client().filterWithHeader(Map.of(BASIC_AUTH_HEADER, basicAuthHeaderValue("user3", USERS_PASSWD)))
+            .search(request)
+            .actionGet();
+        assertHitCount(response, 2);
+        {
+            SearchHit hit0 = response.getHits().getHits()[0];
+            assertThat(hit0.getDocumentFields().keySet(), equalTo(Set.of("field1", "field2", "host")));
+            assertThat(hit0.field("field1").getValues(), equalTo(List.of("192.168.1.1")));
+            assertThat(hit0.field("field2").getValues(), equalTo(List.of("out of memory")));
+            assertThat(
+                hit0.field("host").getValues(),
+                equalTo(List.of(Map.of("field1", List.of("192.168.1.1"), "field2", List.of("windows"))))
+            );
+        }
+        {
+            SearchHit hit1 = response.getHits().getHits()[1];
+            assertThat(hit1.getDocumentFields().keySet(), equalTo(Set.of("field1", "field2", "host")));
+            assertThat(hit1.field("field1").getValues(), equalTo(List.of("192.168.1.2")));
+            assertThat(hit1.field("field2").getValues(), equalTo(List.of("authentication fails")));
+            assertThat(
+                hit1.field("host").getValues(),
+                equalTo(List.of(Map.of("field1", List.of("192.168.1.2"), "field2", List.of("macos"))))
+            );
+        }
+        // user6 has access to field1, field2, and field3
+        response = client().filterWithHeader(Map.of(BASIC_AUTH_HEADER, basicAuthHeaderValue("user6", USERS_PASSWD)))
+            .search(request)
+            .actionGet();
+        assertHitCount(response, 2);
+        {
+            SearchHit hit0 = response.getHits().getHits()[0];
+            assertThat(hit0.getDocumentFields().keySet(), equalTo(Set.of("field1", "field2", "field3", "host")));
+            assertThat(hit0.field("field1").getValues(), equalTo(List.of("192.168.1.1")));
+            assertThat(hit0.field("field2").getValues(), equalTo(List.of("out of memory")));
+            assertThat(hit0.field("field3").getValues(), equalTo(List.of("2021-01-20")));
+            assertThat(
+                hit0.field("host").getValues(),
+                equalTo(List.of(Map.of("field1", List.of("192.168.1.1"), "field2", List.of("windows"), "field3", List.of("canada"))))
+            );
+        }
+        {
+            SearchHit hit1 = response.getHits().getHits()[1];
+            assertThat(hit1.getDocumentFields().keySet(), equalTo(Set.of("field1", "field2", "field3", "host")));
+            assertThat(hit1.field("field1").getValues(), equalTo(List.of("192.168.1.2")));
+            assertThat(hit1.field("field2").getValues(), equalTo(List.of("authentication fails")));
+            assertThat(hit1.field("field3").getValues(), equalTo(List.of("2021-01-21")));
+            assertThat(
+                hit1.field("host").getValues(),
+                equalTo(List.of(Map.of("field1", List.of("192.168.1.2"), "field2", List.of("macos"), "field3", List.of("us"))))
+            );
+        }
+    }
+
 }

+ 13 - 4
x-pack/qa/runtime-fields/core-with-search/src/yamlRestTest/java/org/elasticsearch/xpack/runtimefields/test/search/CoreTestsWithSearchRuntimeFieldsIT.java

@@ -98,14 +98,23 @@ public class CoreTestsWithSearchRuntimeFieldsIT extends ESClientYamlSuiteTestCas
                     if (search.getBodies().isEmpty()) {
                         search.addBody(new HashMap<>());
                     }
+                    boolean changed = false;
                     for (Map<String, Object> body : search.getBodies()) {
                         Map<?, ?> runtimeMapping = runtimeMappings(search.getParams().get("index"));
-                        if (runtimeMapping == null) {
-                            return false;
+                        if (runtimeMapping != null) {
+                            changed = true;
+                            body.compute("runtime_mappings", (k, curr) -> {
+                                if (curr == null) {
+                                    return runtimeMapping;
+                                } else {
+                                    Map<Object, Object> mergedMappings = new HashMap<>((Map<?, ?>) curr);
+                                    mergedMappings.putAll(runtimeMapping);
+                                    return mergedMappings;
+                                }
+                            });
                         }
-                        body.put("runtime_mappings", runtimeMapping);
                     }
-                    return true;
+                    return changed;
                 }
 
                 private Map<?, ?> runtimeMappings(String index) {