Browse Source

Add support for Lookup Join on Multiple Fields (#131559)

Add support for Lookup Join on Multiple Fields

FROM index1
| LOOKUP JOIN lookup_index on field1, field2
Removed some checks to allow lookup join on multiple fields.
Added a new interface LookupEnrichQueryGenerator, that can be used to get total number of queries and queries by position. The rest of the methods from QueryGenerator are not needed by AbstractLookupService.

That allowed the creation of a new class ExpressionQueryList implements LookupEnrichQueryGenerator, which is responsible for creating the AND query for the different fields. We will likely need to enhance it in the future to support expressions that include OR and NOT as well.

TransportRequest is enhanced to now support List<MatchConfig> matchFields instead of String matchField. This is how we pass the match fields around now. If we are communicating with an cluster that does not support LookupOnMultipleFields and it is needed by the query we will fail the query. This can happen during rolling upgrade or CCS.
Julian Kiryakov 2 months ago
parent
commit
a995a1208a
52 changed files with 1313 additions and 307 deletions
  1. 10 0
      docs/changelog/131559.yaml
  2. 12 3
      docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md
  3. 8 5
      docs/reference/query-languages/esql/esql-lookup-join.md
  4. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  5. 88 31
      test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java
  6. 1 1
      test/framework/src/main/java/org/elasticsearch/test/AbstractWireSerializingTestCase.java
  7. 2 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java
  8. 61 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java
  9. 30 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java
  10. 5 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java
  11. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java
  12. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java
  13. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java
  14. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java
  15. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java
  16. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java
  17. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java
  18. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java
  19. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java
  20. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java
  21. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java
  22. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java
  23. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java
  24. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java
  25. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java
  26. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java
  27. 2 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java
  28. 1 2
      x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleAbstractBlockSourceOperator.java
  29. 1 1
      x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleLongLongBlockSourceOperator.java
  30. 2 0
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
  31. 14 3
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java
  32. 43 6
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java
  33. 13 1
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java
  34. 18 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv
  35. 19 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable_lookup.csv
  36. 278 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
  37. 22 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json
  38. 22 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable_lookup.json
  39. 150 44
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java
  40. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  41. 13 26
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
  42. 11 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
  43. 60 56
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java
  44. 65 27
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java
  45. 78 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java
  46. 11 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java
  47. 2 16
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  48. 2 10
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java
  49. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java
  50. 54 22
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java
  51. 90 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MatchConfigSerializationTests.java
  52. 102 29
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml

+ 10 - 0
docs/changelog/131559.yaml

@@ -0,0 +1,10 @@
+pr: 131559
+summary: Add support for LOOKUP JOIN on multiple fields
+area: ES|QL
+type: enhancement
+issues: [ ]
+highlight:
+  title: Add support for Lookup Join on Multiple Fields
+  body: "Add support for Lookup Join on Multiple Fields e.g. FROM index1\n| LOOKUP\
+    \ JOIN lookup_index on field1, field2"
+  notable: true

+ 12 - 3
docs/reference/query-languages/esql/_snippets/commands/layout/lookup-join.md

@@ -17,13 +17,22 @@ FROM <source_index>
 | LOOKUP JOIN <lookup_index> ON <field_name>
 ```
 
+```esql
+FROM <source_index>
+| LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3>
+```
+
 **Parameters**
 
 `<lookup_index>`
 :   The name of the lookup index. This must be a specific index name - wildcards, aliases, and remote cluster references are not supported. Indices used for lookups must be configured with the [`lookup` index mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting).
 
-`<field_name>`
-:   The field to join on. This field must exist in both your current query results and in the lookup index. If the field contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).
+`<field_name>` or `<field_name1>, <field_name2>, <field_name3>`
+:   The field(s) to join on. Can be either:
+  * A single field name
+  * A comma-separated list of field names {applies_to}`stack: ga 9.2`
+:   These fields must exist in both your current query results and in the lookup index. If the fields contains multi-valued entries, those entries will not match anything (the added fields will contain `null` for those rows).
+
 
 **Description**
 
@@ -32,7 +41,7 @@ results table by finding documents in a lookup index that share the same
 join field value as your result rows.
 
 For each row in your results table that matches a document in the lookup
-index based on the join field, all fields from the matching document are
+index based on the join fields, all fields from the matching document are
 added as new columns to that row.
 
 If multiple documents in the lookup index match a single row in your

+ 8 - 5
docs/reference/query-languages/esql/esql-lookup-join.md

@@ -33,11 +33,14 @@ For example, you can use `LOOKUP JOIN` to:
 The `LOOKUP JOIN` command adds fields from the lookup index as new columns to your results table based on matching values in the join field.
 
 The command requires two parameters:
-- The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
-- The name of the field to join on
-
+* The name of the lookup index (which must have the `lookup` [`index.mode setting`](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting))
+* The field(s) to join on. Can be either:
+  * A single field name
+  * A comma-separated list of field names {applies_to}`stack: ga 9.2`
+  
 ```esql
-LOOKUP JOIN <lookup_index> ON <field_name>
+LOOKUP JOIN <lookup_index> ON <field_name>  # Join on a single field
+LOOKUP JOIN <lookup_index> ON <field_name1>, <field_name2>, <field_name3>  # Join on multiple fields
 ```
 
 :::{image} ../images/esql-lookup-join.png
@@ -200,7 +203,7 @@ The following are the current limitations with `LOOKUP JOIN`:
 * Indices in [`lookup` mode](/reference/elasticsearch/index-settings/index-modules.md#index-mode-setting) are always single-sharded.
 * Cross cluster search is unsupported initially. Both source and lookup indices must be local.
 * Currently, only matching on equality is supported.
-* `LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
+* In Stack versions `9.0-9.1`,`LOOKUP JOIN` can only use a single match field and a single index. Wildcards are not supported.
   * Aliases, datemath, and datastreams are supported, as long as the index pattern matches a single concrete index {applies_to}`stack: ga 9.1.0`.
 * The name of the match field in `LOOKUP JOIN lu_idx ON match_field` must match an existing field in the query. This may require `RENAME`s or `EVAL`s to achieve.
 * The query will circuit break if there are too many matching documents in the lookup index, or if the documents are too large. More precisely, `LOOKUP JOIN` works in batches of, normally, about 10,000 rows; a large amount of heap space is needed if the matching documents from the lookup index for a batch are multiple megabytes or larger. This is roughly the same as for `ENRICH`.

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -363,6 +363,7 @@ public class TransportVersions {
     public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
     public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
     public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
+    public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 88 - 31
test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

@@ -13,6 +13,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.util.EntityUtils;
 import org.apache.lucene.tests.util.TimeUnits;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
@@ -697,13 +698,26 @@ public class HeapAttackIT extends ESRestTestCase {
     public void testLookupExplosion() throws IOException {
         int sensorDataCount = 400;
         int lookupEntries = 10000;
-        Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries);
+        Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
         assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
     }
 
+    public void testLookupExplosionManyFields() throws IOException {
+        int sensorDataCount = 400;
+        int lookupEntries = 1000;
+        int joinFieldsCount = 990;
+        Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
+        assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
+    }
+
+    public void testLookupExplosionManyMatchesManyFields() throws IOException {
+        // 1500, 10000 is enough locally, but some CI machines need more.
+        assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
+    }
+
     public void testLookupExplosionManyMatches() throws IOException {
         // 1500, 10000 is enough locally, but some CI machines need more.
-        assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000));
+        assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
     }
 
     public void testLookupExplosionNoFetch() throws IOException {
@@ -730,11 +744,18 @@ public class HeapAttackIT extends ESRestTestCase {
         assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
     }
 
-    private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException {
+    private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
         try {
-            lookupExplosionData(sensorDataCount, lookupEntries);
+            lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
             StringBuilder query = startQuery();
-            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}");
+            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON ");
+            for (int i = 0; i < joinFieldsCount; i++) {
+                if (i != 0) {
+                    query.append(",");
+                }
+                query.append("id").append(i);
+            }
+            query.append(" | STATS COUNT(location)\"}");
             return responseAsMap(query(query.toString(), null));
         } finally {
             deleteIndex("sensor_data");
@@ -744,9 +765,9 @@ public class HeapAttackIT extends ESRestTestCase {
 
     private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException {
         try {
-            lookupExplosionData(sensorDataCount, lookupEntries);
+            lookupExplosionData(sensorDataCount, lookupEntries, 1);
             StringBuilder query = startQuery();
-            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}");
+            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}");
             return responseAsMap(query(query.toString(), null));
         } finally {
             deleteIndex("sensor_data");
@@ -754,14 +775,14 @@ public class HeapAttackIT extends ESRestTestCase {
         }
     }
 
-    private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws IOException {
-        initSensorData(sensorDataCount, 1);
-        initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484");
+    private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount) throws IOException {
+        initSensorData(sensorDataCount, 1, joinFieldCount);
+        initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount);
     }
 
     private Map<String, Object> lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException {
         try {
-            initSensorData(sensorDataCount, 1);
+            initSensorData(sensorDataCount, 1, 1);
             initSensorLookupString(lookupEntries, 1, i -> {
                 int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
                 StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes()));
@@ -772,7 +793,7 @@ public class HeapAttackIT extends ESRestTestCase {
                 return str.toString();
             });
             StringBuilder query = startQuery();
-            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}");
+            query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}");
             return responseAsMap(query(query.toString(), null));
         } finally {
             deleteIndex("sensor_data");
@@ -794,11 +815,11 @@ public class HeapAttackIT extends ESRestTestCase {
 
     private Map<String, Object> enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException {
         try {
-            initSensorData(sensorDataCount, 1);
+            initSensorData(sensorDataCount, 1, 1);
             initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484");
             try {
                 StringBuilder query = startQuery();
-                query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}");
+                query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}");
                 return responseAsMap(query(query.toString(), null));
             } finally {
                 Request delete = new Request("DELETE", "/_enrich/policy/sensor");
@@ -958,16 +979,30 @@ public class HeapAttackIT extends ESRestTestCase {
         initIndex("mv_longs", bulk.toString());
     }
 
-    private void initSensorData(int docCount, int sensorCount) throws IOException {
+    private void initSensorData(int docCount, int sensorCount, int joinFieldCount) throws IOException {
         logger.info("loading sensor data");
-        createIndex("sensor_data", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
-            {
-                "properties": {
-                    "@timestamp": { "type": "date" },
-                    "id": { "type": "long" },
+        // We cannot go over 1000 fields, due to failed on parsing mappings on index creation
+        // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
+        assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990);
+        StringBuilder createIndexBuilder = new StringBuilder();
+        createIndexBuilder.append("""
+             {
+                 "properties": {
+                     "@timestamp": { "type": "date" },
+            """);
+        for (int i = 0; i < joinFieldCount; i++) {
+            createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
+        }
+        createIndexBuilder.append("""
                     "value": { "type": "double" }
                 }
             }""");
+        CreateIndexResponse response = createIndex(
+            "sensor_data",
+            Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
+            createIndexBuilder.toString()
+        );
+        assertTrue(response.isAcknowledged());
         int docsPerBulk = 1000;
         long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");
 
@@ -975,8 +1010,11 @@ public class HeapAttackIT extends ESRestTestCase {
         for (int i = 0; i < docCount; i++) {
             data.append(String.format(Locale.ROOT, """
                 {"create":{}}
-                {"timestamp":"%s", "id": %d, "value": %f}
-                """, DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate), i % sensorCount, i * 1.1));
+                {"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate)));
+            for (int j = 0; j < joinFieldCount; j++) {
+                data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, i % sensorCount));
+            }
+            data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1));
             if (i % docsPerBulk == docsPerBulk - 1) {
                 bulk("sensor_data", data.toString());
                 data.setLength(0);
@@ -985,23 +1023,42 @@ public class HeapAttackIT extends ESRestTestCase {
         initIndex("sensor_data", data.toString());
     }
 
-    private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
+    private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<String> location, int joinFieldsCount)
+        throws IOException {
         logger.info("loading sensor lookup");
-        createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
+        // cannot go over 1000 fields, due to failed on parsing mappings on index creation
+        // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
+        assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990);
+        StringBuilder createIndexBuilder = new StringBuilder();
+        createIndexBuilder.append("""
             {
                 "properties": {
-                    "id": { "type": "long" },
+            """);
+        for (int i = 0; i < joinFieldsCount; i++) {
+            createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
+        }
+        createIndexBuilder.append("""
                     "location": { "type": "geo_point" }
                 }
             }""");
+        CreateIndexResponse response = createIndex(
+            "sensor_lookup",
+            Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(),
+            createIndexBuilder.toString()
+        );
+        assertTrue(response.isAcknowledged());
         int docsPerBulk = 1000;
         StringBuilder data = new StringBuilder();
         for (int i = 0; i < lookupEntries; i++) {
             int sensor = i % sensorCount;
             data.append(String.format(Locale.ROOT, """
                 {"create":{}}
-                {"id": %d, "location": "POINT(%s)"}
-                """, sensor, location.apply(sensor)));
+                {"""));
+            for (int j = 0; j < joinFieldsCount; j++) {
+                data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
+            }
+            data.append(String.format(Locale.ROOT, """
+                "location": "POINT(%s)"}\n""", location.apply(sensor)));
             if (i % docsPerBulk == docsPerBulk - 1) {
                 bulk("sensor_lookup", data.toString());
                 data.setLength(0);
@@ -1015,7 +1072,7 @@ public class HeapAttackIT extends ESRestTestCase {
         createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """
             {
                 "properties": {
-                    "id": { "type": "long" },
+                    "id0": { "type": "long" },
                     "string": { "type": "text" }
                 }
             }""");
@@ -1025,7 +1082,7 @@ public class HeapAttackIT extends ESRestTestCase {
             int sensor = i % sensorCount;
             data.append(String.format(Locale.ROOT, """
                 {"create":{}}
-                {"id": %d, "string": "%s"}
+                {"id0": %d, "string": "%s"}
                 """, sensor, string.apply(sensor)));
             if (i % docsPerBulk == docsPerBulk - 1) {
                 bulk("sensor_lookup", data.toString());
@@ -1036,7 +1093,7 @@ public class HeapAttackIT extends ESRestTestCase {
     }
 
     private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction<String> location) throws IOException {
-        initSensorLookup(lookupEntries, sensorCount, location);
+        initSensorLookup(lookupEntries, sensorCount, location, 1);
         logger.info("loading sensor enrich");
 
         Request create = new Request("PUT", "/_enrich/policy/sensor");
@@ -1044,7 +1101,7 @@ public class HeapAttackIT extends ESRestTestCase {
             {
               "match": {
                 "indices": "sensor_lookup",
-                "match_field": "id",
+                "match_field": "id0",
                 "enrich_fields": ["location"]
               }
             }

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/AbstractWireSerializingTestCase.java

@@ -37,7 +37,7 @@ public abstract class AbstractWireSerializingTestCase<T extends Writeable> exten
      * Copy the {@link Writeable} by round tripping it through {@linkplain StreamInput} and {@linkplain StreamOutput}.
      */
     @Override
-    protected final T copyInstance(T instance, TransportVersion version) throws IOException {
+    protected T copyInstance(T instance, TransportVersion version) throws IOException {
         return copyInstance(instance, getNamedWriteableRegistry(), instanceWriter(), instanceReader(), version);
     }
 }

+ 2 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java

@@ -37,7 +37,7 @@ import java.io.UncheckedIOException;
  */
 public final class EnrichQuerySourceOperator extends SourceOperator {
     private final BlockFactory blockFactory;
-    private final QueryList queryList;
+    private final LookupEnrichQueryGenerator queryList;
     private int queryPosition = -1;
     private final ShardContext shardContext;
     private final IndexReader indexReader;
@@ -51,7 +51,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
     public EnrichQuerySourceOperator(
         BlockFactory blockFactory,
         int maxPageSize,
-        QueryList queryList,
+        LookupEnrichQueryGenerator queryList,
         ShardContext shardContext,
         Warnings warnings
     ) {

+ 61 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java

@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator.lookup;
+
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Query;
+
+import java.util.List;
+
+/**
+ * A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query.
+ * Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
+ * In the future we can extend this to support more complex expressions, such as disjunctions or negations.
+ */
+public class ExpressionQueryList implements LookupEnrichQueryGenerator {
+    private final List<QueryList> queryLists;
+
+    public ExpressionQueryList(List<QueryList> queryLists) {
+        if (queryLists.size() < 2) {
+            throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
+        }
+        this.queryLists = queryLists;
+    }
+
+    @Override
+    public Query getQuery(int position) {
+        BooleanQuery.Builder builder = new BooleanQuery.Builder();
+        for (QueryList queryList : queryLists) {
+            Query q = queryList.getQuery(position);
+            if (q == null) {
+                // if any of the matchFields are null, it means there is no match for this position
+                // A AND NULL is always NULL, so we can skip this position
+                return null;
+            }
+            builder.add(q, BooleanClause.Occur.FILTER);
+        }
+        return builder.build();
+    }
+
+    @Override
+    public int getPositionCount() {
+        int positionCount = queryLists.get(0).getPositionCount();
+        for (QueryList queryList : queryLists) {
+            if (queryList.getPositionCount() != positionCount) {
+                throw new IllegalStateException(
+                    "All QueryLists must have the same position count, expected: "
+                        + positionCount
+                        + ", but got: "
+                        + queryList.getPositionCount()
+                );
+            }
+        }
+        return positionCount;
+    }
+}

+ 30 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java

@@ -0,0 +1,30 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator.lookup;
+
+import org.apache.lucene.search.Query;
+import org.elasticsearch.core.Nullable;
+
+/**
+ * An interface to generates queries for the lookup and enrich operators.
+ * This interface is used to retrieve queries based on a position index.
+ */
+public interface LookupEnrichQueryGenerator {
+
+    /**
+     * Returns the query at the given position.
+     */
+    @Nullable
+    Query getQuery(int position);
+
+    /**
+     * Returns the number of queries in this generator
+     */
+    int getPositionCount();
+
+}

+ 5 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java

@@ -49,7 +49,7 @@ import java.util.function.IntFunction;
 /**
  * Generates a list of Lucene queries based on the input block.
  */
-public abstract class QueryList {
+public abstract class QueryList implements LookupEnrichQueryGenerator {
     protected final SearchExecutionContext searchExecutionContext;
     protected final AliasFilter aliasFilter;
     protected final MappedFieldType field;
@@ -74,7 +74,8 @@ public abstract class QueryList {
     /**
      * Returns the number of positions in this query list
      */
-    int getPositionCount() {
+    @Override
+    public int getPositionCount() {
         return block.getPositionCount();
     }
 
@@ -87,7 +88,8 @@ public abstract class QueryList {
      */
     public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);
 
-    final Query getQuery(int position) {
+    @Override
+    public final Query getQuery(int position) {
         final int valueCount = block.getValueCount(position);
         if (onlySingleValueParams != null && valueCount != 1) {
             if (valueCount > 1) {

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java

@@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java

@@ -15,7 +15,7 @@ import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.LongDoubleTupleBlockSourceOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java

@@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java

@@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunctionTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests.java

@@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.ArrayList;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunctionTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunctionTests.java

@@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.aggregations.metrics.TDigestState;
 import org.junit.Before;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunctionTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesLongGroupingAggregatorFunctionTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.Arrays;

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory;
 import org.elasticsearch.compute.test.CannedSourceOperator;
 import org.elasticsearch.compute.test.OperatorTestCase;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.hamcrest.Matcher;
 

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.test.CannedSourceOperator;
 import org.elasticsearch.compute.test.OperatorTestCase;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.hamcrest.Matcher;
 

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.test.BlockTestUtils;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.hamcrest.Matcher;
 

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.test.OperatorTestCase;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.hamcrest.Matcher;
 

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/RowInTableLookupOperatorTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.compute.test.CannedSourceOperator;
 import org.elasticsearch.compute.test.OperatorTestCase;
 import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
 import org.elasticsearch.compute.test.TestBlockFactory;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.hamcrest.Matcher;
 

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleDocLongBlockSourceOperator.java

@@ -11,6 +11,7 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.DocBlock;
+import org.elasticsearch.compute.test.TupleAbstractBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.mapper.BlockLoader;
 

+ 2 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

@@ -30,15 +30,15 @@ import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.PageConsumerOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TupleAbstractBlockSourceOperator;
 import org.elasticsearch.compute.operator.TupleDocLongBlockSourceOperator;
-import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.compute.test.CannedSourceOperator;
 import org.elasticsearch.compute.test.OperatorTestCase;
 import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
 import org.elasticsearch.compute.test.TestBlockBuilder;
 import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.compute.test.TestDriverFactory;
+import org.elasticsearch.compute.test.TupleAbstractBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.SimpleRefCounted;
 import org.elasticsearch.core.Tuple;

+ 1 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleAbstractBlockSourceOperator.java → x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleAbstractBlockSourceOperator.java

@@ -5,13 +5,12 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.operator;
+package org.elasticsearch.compute.test;
 
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.test.AbstractBlockSourceOperator;
 import org.elasticsearch.core.Tuple;
 
 import java.util.List;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleLongLongBlockSourceOperator.java → x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TupleLongLongBlockSourceOperator.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.operator;
+package org.elasticsearch.compute.test;
 
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;

+ 2 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -113,6 +113,8 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         "SortEvalBeforeLookup",
         "SortBeforeAndAfterMultipleJoinAndMvExpand",
         "LookupJoinAfterTopNAndRemoteEnrich",
+        "LookupJoinOnTwoFieldsAfterTop",
+        "LookupJoinOnTwoFieldsMultipleTimes",
         // Lookup join after LIMIT is not supported in CCS yet
         "LookupJoinAfterLimitAndRemoteEnrich"
     );

+ 14 - 3
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java

@@ -240,13 +240,24 @@ public abstract class GenerativeRestTest extends ESRestTestCase {
             .toList();
     }
 
-    public record LookupIdx(String idxName, String key, String keyType) {}
+    public record LookupIdxColumn(String name, String type) {}
+
+    public record LookupIdx(String idxName, List<LookupIdxColumn> keys) {}
 
     private List<LookupIdx> lookupIndices() {
         List<LookupIdx> result = new ArrayList<>();
         // we don't have key info from the dataset loader, let's hardcode it for now
-        result.add(new LookupIdx("languages_lookup", "language_code", "integer"));
-        result.add(new LookupIdx("message_types_lookup", "message", "keyword"));
+        result.add(new LookupIdx("languages_lookup", List.of(new LookupIdxColumn("language_code", "integer"))));
+        result.add(new LookupIdx("message_types_lookup", List.of(new LookupIdxColumn("message", "keyword"))));
+        List<LookupIdxColumn> multiColumnJoinableLookupKeys = List.of(
+            new LookupIdxColumn("id_int", "integer"),
+            new LookupIdxColumn("name_str", "keyword"),
+            new LookupIdxColumn("is_active_bool", "boolean"),
+            new LookupIdxColumn("ip_addr", "ip"),
+            new LookupIdxColumn("other1", "keyword"),
+            new LookupIdxColumn("other2", "integer")
+        );
+        result.add(new LookupIdx("multi_column_joinable_lookup", multiColumnJoinableLookupKeys));
         return result;
     }
 

+ 43 - 6
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/LookupJoinGenerator.java

@@ -11,10 +11,15 @@ import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
 import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest;
 import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.ESTestCase.randomInt;
+import static org.elasticsearch.test.ESTestCase.randomSubsetOf;
 
 public class LookupJoinGenerator implements CommandGenerator {
 
@@ -29,15 +34,47 @@ public class LookupJoinGenerator implements CommandGenerator {
     ) {
         GenerativeRestTest.LookupIdx lookupIdx = randomFrom(schema.lookupIndices());
         String lookupIdxName = lookupIdx.idxName();
-        String idxKey = lookupIdx.key();
-        String keyType = lookupIdx.keyType();
+        int joinColumnsCount = randomInt(lookupIdx.keys().size() - 1) + 1; // at least one column must be used for the join
+        List<GenerativeRestTest.LookupIdxColumn> joinColumns = randomSubsetOf(joinColumnsCount, lookupIdx.keys());
+        List<String> keyNames = new ArrayList<>();
+        List<String> joinOn = new ArrayList<>();
+        Set<String> usedColumns = new HashSet<>();
+        for (GenerativeRestTest.LookupIdxColumn joinColumn : joinColumns) {
+            String idxKey = joinColumn.name();
+            String keyType = joinColumn.type();
 
-        var candidateKeys = previousOutput.stream().filter(x -> x.type().equals(keyType)).toList();
-        if (candidateKeys.isEmpty()) {
+            var candidateKeys = previousOutput.stream().filter(x -> x.type().equals(keyType)).toList();
+            if (candidateKeys.isEmpty()) {
+                continue; // no candidate keys of the right type, skip this column
+            }
+            EsqlQueryGenerator.Column key = randomFrom(candidateKeys);
+            if (usedColumns.contains(key.name()) || usedColumns.contains(idxKey)) {
+                continue; // already used this column from the lookup index, or will discard the main index column by RENAME'ing below, skip
+            } else {
+                usedColumns.add(key.name());
+                usedColumns.add(idxKey);
+            }
+            keyNames.add(key.name());
+            joinOn.add(idxKey);
+        }
+        if (keyNames.isEmpty()) {
             return EMPTY_DESCRIPTION;
         }
-        EsqlQueryGenerator.Column key = randomFrom(candidateKeys);
-        String cmdString = "| rename " + key.name() + " as " + idxKey + " | lookup join " + lookupIdxName + " on " + idxKey;
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < keyNames.size(); i++) {
+            stringBuilder.append("| rename ");
+            stringBuilder.append(keyNames.get(i));
+            stringBuilder.append(" as ");
+            stringBuilder.append(joinOn.get(i));
+        }
+        stringBuilder.append(" | lookup join ").append(lookupIdxName).append(" on ");
+        for (int i = 0; i < keyNames.size(); i++) {
+            stringBuilder.append(joinOn.get(i));
+            if (i < keyNames.size() - 1) {
+                stringBuilder.append(", ");
+            }
+        }
+        String cmdString = stringBuilder.toString();
         return new CommandDescription(LOOKUP_JOIN, this, cmdString, Map.of());
     }
 

+ 13 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

@@ -63,6 +63,16 @@ public class CsvTestsDataLoader {
     private static final TestDataset HOSTS = new TestDataset("hosts");
     private static final TestDataset APPS = new TestDataset("apps");
     private static final TestDataset APPS_SHORT = APPS.withIndex("apps_short").withTypeMapping(Map.of("id", "short"));
+    private static final TestDataset MULTI_COLUMN_JOINABLE = new TestDataset(
+        "multi_column_joinable",
+        "mapping-multi_column_joinable.json",
+        "multi_column_joinable.csv"
+    );
+    private static final TestDataset MULTI_COLUMN_JOINABLE_LOOKUP = new TestDataset(
+        "multi_column_joinable_lookup",
+        "mapping-multi_column_joinable_lookup.json",
+        "multi_column_joinable_lookup.csv"
+    ).withSetting("lookup-settings.json");
     private static final TestDataset LANGUAGES = new TestDataset("languages");
     private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json");
     private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key")
@@ -219,7 +229,9 @@ public class CsvTestsDataLoader {
         Map.entry(LOGS.indexName, LOGS),
         Map.entry(MV_TEXT.indexName, MV_TEXT),
         Map.entry(DENSE_VECTOR.indexName, DENSE_VECTOR),
-        Map.entry(COLORS.indexName, COLORS)
+        Map.entry(COLORS.indexName, COLORS),
+        Map.entry(MULTI_COLUMN_JOINABLE.indexName, MULTI_COLUMN_JOINABLE),
+        Map.entry(MULTI_COLUMN_JOINABLE_LOOKUP.indexName, MULTI_COLUMN_JOINABLE_LOOKUP)
     );
 
     private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json");

+ 18 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable.csv

@@ -0,0 +1,18 @@
+id_int,name_str,is_active_bool,ip_addr,extra1,extra2
+1,Alice,true,192.168.1.1,foo,100
+2,Bob,false,192.168.1.2,bar,200
+3,Charlie,true,192.168.1.3,baz,300
+4,David,false,192.168.1.4,qux,400
+5,Eve,true,192.168.1.5,quux,500
+6,,true,192.168.1.6,corge,600
+7,Grace,false,,grault,700
+8,Hank,true,192.168.1.8,garply,800
+9,Ivy,false,192.168.1.9,waldo,900
+10,John,true,192.168.1.10,fred,1000
+,Kate,false,192.168.1.11,plugh,1100
+[12],Liam,true,192.168.1.12,xyzzy,1200
+13,Mia,false,192.168.1.13,thud,1300
+[14],Nina,true,192.168.1.14,foo2,1400
+15,Oscar,false,192.168.1.15,bar2,1500
+[17,18],Olivia,true,192.168.1.17,xyz,17000
+[1,19,21],Sophia,true,192.168.1.21,zyx,21000

+ 19 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/multi_column_joinable_lookup.csv

@@ -0,0 +1,19 @@
+id_int,name_str,is_active_bool,ip_addr,other1,other2
+1,Alice,true,192.168.1.1,alpha,1000
+1,Alice,true,192.168.1.2,beta,2000
+2,Bob,false,192.168.1.3,gamma,3000
+3,Charlie,true,192.168.1.3,delta,4000
+3,Charlie,false,192.168.1.3,epsilon,5000
+4,David,false,192.168.1.4,zeta,6000
+5,Eve,true,192.168.1.5,eta,7000
+5,Eve,true,192.168.1.5,theta,8000
+6,,true,192.168.1.6,iota,9000
+7,Grace,false,,kappa,10000
+8,Hank,true,192.168.1.8,lambda,11000
+,Kate,false,192.168.1.11,mu,12000
+12,Liam,true,192.168.1.12,nu,13000
+13,Mia,false,192.168.1.13,xi,14000
+[14],Nina,true,192.168.1.14,omicron,15000
+16,Paul,true,192.168.1.16,pi,16000
+[17,18],Olivia,true,192.168.1.17,rho,17000
+[1,19,20],Sophia,true,192.168.1.21,sigma,21000

+ 278 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -4871,3 +4871,281 @@ Connected to 10.1.0.1 | English                      | English               | U
 Connected to 10.1.0.1 | English                      | English               | null
 Connected to 10.1.0.1 | English                      | null                  | United Kingdom
 ;
+
+lookupJoinOnTwoFields
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+1              | Alice            | foo            | alpha          | 1000
+1              | Alice            | foo            | beta           | 2000
+[1, 19, 21]    | Sophia           | zyx            | null           | null
+2              | Bob              | bar            | gamma          | 3000
+3              | Charlie          | baz            | delta          | 4000
+3              | Charlie          | baz            | epsilon        | 5000
+4              | David            | qux            | zeta           | 6000
+5              | Eve              | quux           | eta            | 7000
+5              | Eve              | quux           | theta          | 8000
+6              | null             | corge          | null           | null
+7              | Grace            | grault         | kappa          | 10000
+8              | Hank             | garply         | lambda         | 11000
+9              | Ivy              | waldo          | null           | null
+10             | John             | fred           | null           | null
+12             | Liam             | xyzzy          | nu             | 13000
+13             | Mia              | thud           | xi             | 14000
+14             | Nina             | foo2           | omicron        | 15000
+15             | Oscar            | bar2           | null           | null
+[17, 18]       | Olivia           | xyz            | null           | null
+null           | Kate             | plugh          | null           | null
+;
+
+lookupJoinOnTwoFieldsSelfJoin
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable_lookup
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str
+| KEEP id_int, name_str, other1, other2
+| SORT id_int, name_str, other1, other2
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | other1:keyword | other2:integer
+1              | Alice            | alpha          | 1000
+1              | Alice            | alpha          | 1000
+1              | Alice            | beta           | 2000
+1              | Alice            | beta           | 2000
+[1, 19, 20]    | Sophia           | null           | null
+2              | Bob              | gamma          | 3000
+3              | Charlie          | delta          | 4000
+3              | Charlie          | delta          | 4000
+3              | Charlie          | epsilon        | 5000
+3              | Charlie          | epsilon        | 5000
+4              | David            | zeta           | 6000
+5              | Eve              | eta            | 7000
+5              | Eve              | eta            | 7000
+5              | Eve              | theta          | 8000
+5              | Eve              | theta          | 8000
+6              | null             | null           | null
+7              | Grace            | kappa          | 10000
+8              | Hank             | lambda         | 11000
+12             | Liam             | nu             | 13000
+13             | Mia              | xi             | 14000
+14             | Nina             | omicron        | 15000
+16             | Paul             | pi             | 16000
+[17, 18]       | Olivia           | null           | null
+null           | Kate             | null           | null
+;
+
+lookupJoinOnThreeFields
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+1              | Alice            | foo            | alpha          | 1000
+1              | Alice            | foo            | beta           | 2000
+[1, 19, 21]    | Sophia           | zyx            | null           | null
+2              | Bob              | bar            | gamma          | 3000
+3              | Charlie          | baz            | delta          | 4000
+4              | David            | qux            | zeta           | 6000
+5              | Eve              | quux           | eta            | 7000
+5              | Eve              | quux           | theta          | 8000
+6              | null             | corge          | null           | null
+7              | Grace            | grault         | kappa          | 10000
+8              | Hank             | garply         | lambda         | 11000
+9              | Ivy              | waldo          | null           | null
+10             | John             | fred           | null           | null
+12             | Liam             | xyzzy          | nu             | 13000
+13             | Mia              | thud           | xi             | 14000
+14             | Nina             | foo2           | omicron        | 15000
+15             | Oscar            | bar2           | null           | null
+[17, 18]       | Olivia           | xyz            | null           | null
+null           | Kate             | plugh          | null           | null
+;
+
+
+lookupJoinOnFourFields
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool, ip_addr
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, name_str, is_active_bool, ip_addr] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+1              | Alice            | foo            | alpha          | 1000
+[1, 19, 21]    | Sophia           | zyx            | null           | null
+2              | Bob              | bar            | null           | null
+3              | Charlie          | baz            | delta          | 4000
+4              | David            | qux            | zeta           | 6000
+5              | Eve              | quux           | eta            | 7000
+5              | Eve              | quux           | theta          | 8000
+6              | null             | corge          | null           | null
+7              | Grace            | grault         | null           | null
+8              | Hank             | garply         | lambda         | 11000
+9              | Ivy              | waldo          | null           | null
+10             | John             | fred           | null           | null
+12             | Liam             | xyzzy          | nu             | 13000
+13             | Mia              | thud           | xi             | 14000
+14             | Nina             | foo2           | omicron        | 15000
+15             | Oscar            | bar2           | null           | null
+[17, 18]       | Olivia           | xyz            | null           | null
+null           | Kate             | plugh          | null           | null
+;
+
+
+lookupJoinOnTwoOtherFields
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| LOOKUP JOIN multi_column_joinable_lookup ON is_active_bool, ip_addr
+| KEEP id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr
+| SORT id_int, name_str, extra1, other1, other2, is_active_bool, ip_addr
+;
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer | is_active_bool:boolean | ip_addr:ip
+1              | Alice            | foo            | alpha          | 1000           | true                   | 192.168.1.1
+[1, 19, 20]    | Sophia           | zyx            | sigma          | 21000          | true                   | 192.168.1.21
+3              | Charlie          | baz            | delta          | 4000           | true                   | 192.168.1.3
+4              | David            | qux            | zeta           | 6000           | false                  | 192.168.1.4
+5              | Eve              | quux           | eta            | 7000           | true                   | 192.168.1.5
+5              | Eve              | quux           | theta          | 8000           | true                   | 192.168.1.5
+6              | null             | corge          | iota           | 9000           | true                   | 192.168.1.6
+8              | Hank             | garply         | lambda         | 11000          | true                   | 192.168.1.8
+12             | Liam             | xyzzy          | nu             | 13000          | true                   | 192.168.1.12
+13             | Mia              | thud           | xi             | 14000          | false                  | 192.168.1.13
+14             | Nina             | foo2           | omicron        | 15000          | true                   | 192.168.1.14
+[17, 18]       | Olivia           | xyz            | rho            | 17000          | true                   | 192.168.1.17
+null           | Kate             | plugh          | mu             | 12000          | false                  | 192.168.1.11
+null           | null             | bar            | null           | null           | false                  | 192.168.1.2
+null           | null             | bar2           | null           | null           | false                  | 192.168.1.15
+null           | null             | fred           | null           | null           | true                   | 192.168.1.10
+null           | null             | grault         | null           | null           | false                  | null
+null           | null             | waldo          | null           | null           | false                  | 192.168.1.9
+;
+
+lookupJoinOnTwoFieldsWithEval
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| eval id_int = id_int + 5
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 2:17: evaluation of [id_int + 5] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:17: java.lang.IllegalArgumentException: single-value function encountered multi-value
+warning:Line 3:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 3:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+6              | null             | foo            | iota           | 9000
+7              | Grace            | bar            | kappa          | 10000
+8              | Hank             | baz            | lambda         | 11000
+9              | null             | qux            | null           | null
+10             | null             | quux           | null           | null
+11             | null             | corge          | null           | null
+12             | null             | grault         | null           | null
+13             | null             | garply         | null           | null
+14             | null             | waldo          | null           | null
+15             | null             | fred           | null           | null
+17             | null             | xyzzy          | null           | null
+18             | null             | thud           | null           | null
+19             | null             | foo2           | null           | null
+20             | null             | bar2           | null           | null
+null           | null             | plugh          | null           | null
+null           | null             | xyz            | null           | null
+null           | null             | zyx            | null           | null
+;
+
+
+
+
+lookupJoinOnTwoFieldsAfterTop
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| SORT extra1
+| LIMIT 10
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 4:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+1              | Alice            | foo            | alpha          | 1000
+1              | Alice            | foo            | beta           | 2000
+2              | Bob              | bar            | gamma          | 3000
+3              | Charlie          | baz            | delta          | 4000
+6              | null             | corge          | iota           | 9000
+7              | Grace            | grault         | kappa          | 10000
+8              | Hank             | garply         | lambda         | 11000
+10             | null             | fred           | null           | null
+14             | Nina             | foo2           | omicron        | 15000
+15             | null             | bar2           | null           | null
+null           | null             | plugh          | null           | null
+;
+
+
+lookupJoinOnTwoFieldsMultipleTimes
+required_capability: join_lookup_v12
+required_capability: lookup_join_on_multiple_fields
+
+FROM multi_column_joinable
+| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
+| SORT extra1
+| LIMIT 10
+| LOOKUP JOIN multi_column_joinable_lookup ON name_str, is_active_bool
+| KEEP id_int, name_str, extra1, other1, other2
+| SORT id_int, name_str, extra1, other1, other2
+;
+
+warning:Line 2:3: evaluation of [LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value
+
+id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
+1              | Alice            | foo            | alpha          | 1000
+1              | Alice            | foo            | alpha          | 1000
+1              | Alice            | foo            | beta           | 2000
+1              | Alice            | foo            | beta           | 2000
+2              | Bob              | bar            | gamma          | 3000
+3              | Charlie          | baz            | delta          | 4000
+7              | Grace            | grault         | kappa          | 10000
+8              | Hank             | garply         | lambda         | 11000
+14             | Nina             | foo2           | omicron        | 15000
+null           | null             | bar2           | null           | null
+null           | null             | corge          | null           | null
+null           | null             | fred           | null           | null
+;

+ 22 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable.json

@@ -0,0 +1,22 @@
+{
+  "properties": {
+    "id_int": {
+      "type": "integer"
+    },
+    "name_str": {
+      "type": "keyword"
+    },
+    "is_active_bool": {
+      "type": "boolean"
+    },
+    "ip_addr": {
+      "type": "ip"
+    },
+    "extra1": {
+      "type": "keyword"
+    },
+    "extra2": {
+      "type": "integer"
+    }
+  }
+}

+ 22 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-multi_column_joinable_lookup.json

@@ -0,0 +1,22 @@
+{
+  "properties": {
+    "id_int": {
+      "type": "integer"
+    },
+    "name_str": {
+      "type": "keyword"
+    },
+    "is_active_bool": {
+      "type": "boolean"
+    },
+    "ip_addr": {
+      "type": "ip"
+    },
+    "other1": {
+      "type": "keyword"
+    },
+    "other2": {
+      "type": "integer"
+    }
+  }
+}

+ 150 - 44
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
+import org.elasticsearch.xpack.esql.enrich.MatchConfig;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
 import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -82,18 +83,69 @@ import static org.hamcrest.Matchers.hasSize;
 
 public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
     public void testKeywordKey() throws IOException {
-        runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "cc", "dd" }));
+        runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }));
+    }
+
+    public void testJoinOnTwoKeys() throws IOException {
+        runLookup(
+            List.of(DataType.KEYWORD, DataType.LONG),
+            new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } })
+        );
+    }
+
+    public void testJoinOnThreeKeys() throws IOException {
+        runLookup(
+            List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD),
+            new UsingSingleLookupTable(
+                new Object[][] {
+                    new String[] { "aa", "bb", "cc", "dd" },
+                    new Long[] { 12L, 33L, 1L, 42L },
+                    new String[] { "one", "two", "three", "four" }, }
+            )
+        );
+    }
+
+    public void testJoinOnFourKeys() throws IOException {
+        runLookup(
+            List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD, DataType.INTEGER),
+            new UsingSingleLookupTable(
+                new Object[][] {
+                    new String[] { "aa", "bb", "cc", "dd" },
+                    new Long[] { 12L, 33L, 1L, 42L },
+                    new String[] { "one", "two", "three", "four" },
+                    new Integer[] { 1, 2, 3, 4 }, }
+            )
+        );
     }
 
     public void testLongKey() throws IOException {
-        runLookup(DataType.LONG, new UsingSingleLookupTable(new Long[] { 12L, 33L, 1L }));
+        runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }));
     }
 
     /**
      * LOOKUP multiple results match.
      */
     public void testLookupIndexMultiResults() throws IOException {
-        runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" }));
+        runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }));
+    }
+
+    public void testJoinOnTwoKeysMultiResults() throws IOException {
+        runLookup(
+            List.of(DataType.KEYWORD, DataType.LONG),
+            new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" }, new Long[] { 12L, 1L, 1L, 42L } })
+        );
+    }
+
+    public void testJoinOnThreeKeysMultiResults() throws IOException {
+        runLookup(
+            List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD),
+            new UsingSingleLookupTable(
+                new Object[][] {
+                    new String[] { "aa", "bb", "bb", "dd" },
+                    new Long[] { 12L, 1L, 1L, 42L },
+                    new String[] { "one", "two", "two", "four" } }
+            )
+        );
     }
 
     interface PopulateIndices {
@@ -101,52 +153,87 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
     }
 
     class UsingSingleLookupTable implements PopulateIndices {
-        private final Map<Object, List<Integer>> matches = new HashMap<>();
-        private final Object[] lookupData;
+        private final Map<List<Object>, List<Integer>> matches = new HashMap<>();
+        private final Object[][] lookupData;
 
-        UsingSingleLookupTable(Object[] lookupData) {
+        // Accepts array of arrays, each sub-array is values for a key field
+        // All subarrays must have the same length
+        UsingSingleLookupTable(Object[][] lookupData) {
             this.lookupData = lookupData;
-            for (int i = 0; i < lookupData.length; i++) {
-                matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i);
+            int numRows = lookupData[0].length;
+            for (int i = 0; i < numRows; i++) {
+                List<Object> key = new ArrayList<>();
+                for (Object[] col : lookupData) {
+                    key.add(col[i]);
+                }
+                matches.computeIfAbsent(key, k -> new ArrayList<>()).add(i);
             }
         }
 
         @Override
         public void populate(int docCount, List<String> expected) {
             List<IndexRequestBuilder> docs = new ArrayList<>();
+            int numFields = lookupData.length;
+            int numRows = lookupData[0].length;
             for (int i = 0; i < docCount; i++) {
-                Object key = lookupData[i % lookupData.length];
-                docs.add(client().prepareIndex("source").setSource(Map.of("key", key)));
+                List<Object> key = new ArrayList<>();
+                Map<String, Object> sourceDoc = new HashMap<>();
+                for (int f = 0; f < numFields; f++) {
+                    Object val = lookupData[f][i % numRows];
+                    key.add(val);
+                    sourceDoc.put("key" + f, val);
+                }
+                docs.add(client().prepareIndex("source").setSource(sourceDoc));
+                String keyString;
+                if (key.size() == 1) {
+                    keyString = String.valueOf(key.get(0));
+                } else {
+                    keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new));
+                }
                 for (Integer match : matches.get(key)) {
-                    expected.add(key + ":" + match);
+                    expected.add(keyString + ":" + match);
                 }
             }
-            for (int i = 0; i < lookupData.length; i++) {
-                docs.add(client().prepareIndex("lookup").setSource(Map.of("key", lookupData[i], "l", i)));
+            for (int i = 0; i < numRows; i++) {
+                Map<String, Object> lookupDoc = new HashMap<>();
+                for (int f = 0; f < numFields; f++) {
+                    lookupDoc.put("key" + f, lookupData[f][i]);
+                }
+                lookupDoc.put("l", i);
+                docs.add(client().prepareIndex("lookup").setSource(lookupDoc));
             }
             Collections.sort(expected);
             indexRandom(true, true, docs);
         }
     }
 
-    private void runLookup(DataType keyType, PopulateIndices populateIndices) throws IOException {
+    private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices) throws IOException {
+        String[] fieldMappers = new String[keyTypes.size() * 2];
+        for (int i = 0; i < keyTypes.size(); i++) {
+            fieldMappers[2 * i] = "key" + i;
+            fieldMappers[2 * i + 1] = "type=" + keyTypes.get(i).esType();
+        }
         client().admin()
             .indices()
             .prepareCreate("source")
             .setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1))
-            .setMapping("key", "type=" + keyType.esType())
-            .get();
-        client().admin()
-            .indices()
-            .prepareCreate("lookup")
-            .setSettings(
-                Settings.builder()
-                    .put(IndexSettings.MODE.getKey(), "lookup")
-                    // TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command.
-                    .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
-            )
-            .setMapping("key", "type=" + keyType.esType(), "l", "type=long")
+            .setMapping(fieldMappers)
             .get();
+
+        Settings.Builder lookupSettings = Settings.builder()
+            .put(IndexSettings.MODE.getKey(), "lookup")
+            .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1);
+
+        String[] lookupMappers = new String[keyTypes.size() * 2 + 2];
+        int lookupMappersCounter = 0;
+        for (; lookupMappersCounter < keyTypes.size(); lookupMappersCounter++) {
+            lookupMappers[2 * lookupMappersCounter] = "key" + lookupMappersCounter;
+            lookupMappers[2 * lookupMappersCounter + 1] = "type=" + keyTypes.get(lookupMappersCounter).esType();
+        }
+        lookupMappers[2 * lookupMappersCounter] = "l";
+        lookupMappers[2 * lookupMappersCounter + 1] = "type=long";
+        client().admin().indices().prepareCreate("lookup").setSettings(lookupSettings).setMapping(lookupMappers).get();
+
         client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();
 
         int docCount = between(10, 1000);
@@ -198,15 +285,20 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
                 DocIdSetIterator.NO_MORE_DOCS,
                 false // no scoring
             );
-            ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
-                PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
-                List.of(
+            List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>();
+            for (int i = 0; i < keyTypes.size(); i++) {
+                final int idx = i;
+                fieldInfos.add(
                     new ValuesSourceReaderOperator.FieldInfo(
-                        "key",
-                        PlannerUtils.toElementType(keyType),
-                        shard -> searchContext.getSearchExecutionContext().getFieldType("key").blockLoader(blContext())
+                        "key" + idx,
+                        PlannerUtils.toElementType(keyTypes.get(idx)),
+                        shard -> searchContext.getSearchExecutionContext().getFieldType("key" + idx).blockLoader(blContext())
                     )
-                ),
+                );
+            }
+            ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
+                PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
+                fieldInfos,
                 List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
                     throw new IllegalStateException("can't load source here");
                 }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
@@ -224,16 +316,18 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
                 TEST_REQUEST_TIMEOUT
             );
             final String finalNodeWithShard = nodeWithShard;
+            List<MatchConfig> matchFields = new ArrayList<>();
+            for (int i = 0; i < keyTypes.size(); i++) {
+                matchFields.add(new MatchConfig(new FieldAttribute.FieldName("key" + i), i + 1, keyTypes.get(i)));
+            }
             LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
+                matchFields,
                 "test",
                 parentTask,
                 QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY),
-                1,
                 ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
-                keyType,
                 "lookup",
                 "lookup",
-                new FieldAttribute.FieldName("key"),
                 List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
                 Source.EMPTY
             );
@@ -245,16 +339,28 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
                     List.of(reader.get(driverContext), lookup.get(driverContext)),
                     new PageConsumerOperator(page -> {
                         try {
-                            Block keyBlock = page.getBlock(1);
-                            LongVector loadedBlock = page.<LongBlock>getBlock(2).asVector();
+                            List<Block> keyBlocks = new ArrayList<>();
+                            for (int i = 0; i < keyTypes.size(); i++) {
+                                keyBlocks.add(page.getBlock(i + 1));
+                            }
+                            LongVector loadedBlock = page.<LongBlock>getBlock(keyTypes.size() + 1).asVector();
                             for (int p = 0; p < page.getPositionCount(); p++) {
-                                List<Object> key = BlockTestUtils.valuesAtPositions(keyBlock, p, p + 1).get(0);
-                                assertThat(key, hasSize(1));
-                                Object keyValue = key.get(0);
-                                if (keyValue instanceof BytesRef b) {
-                                    keyValue = b.utf8ToString();
+                                StringBuilder result = new StringBuilder();
+                                for (int j = 0; j < keyBlocks.size(); j++) {
+                                    List<Object> key = BlockTestUtils.valuesAtPositions(keyBlocks.get(j), p, p + 1).get(0);
+                                    assertThat(key, hasSize(1));
+                                    Object keyValue = key.get(0);
+                                    if (keyValue instanceof BytesRef b) {
+                                        keyValue = b.utf8ToString();
+                                    }
+                                    result.append(keyValue);
+                                    if (j < keyBlocks.size() - 1) {
+                                        result.append(",");
+                                    }
+
                                 }
-                                results.add(keyValue + ":" + loadedBlock.getLong(p));
+                                result.append(":" + loadedBlock.getLong(p));
+                                results.add(result.toString());
                             }
                         } finally {
                             page.releaseBlocks();

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1317,6 +1317,10 @@ public class EsqlCapabilities {
          */
         FIXED_PROFILE_SERIALIZATION,
 
+        /**
+         * Support for lookup join on multiple fields.
+         */
+        LOOKUP_JOIN_ON_MULTIPLE_FIELDS,
         /**
          * Dot product vector similarity function
          */

+ 13 - 26
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

@@ -43,6 +43,7 @@ import org.elasticsearch.compute.operator.OutputOperator;
 import org.elasticsearch.compute.operator.ProjectOperator;
 import org.elasticsearch.compute.operator.Warnings;
 import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
+import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
 import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
 import org.elasticsearch.compute.operator.lookup.QueryList;
 import org.elasticsearch.core.AbstractRefCounted;
@@ -191,12 +192,11 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
     /**
      * Build a list of queries to perform inside the actual lookup.
      */
-    protected abstract QueryList queryList(
+    protected abstract LookupEnrichQueryGenerator queryList(
         T request,
         SearchExecutionContext context,
         AliasFilter aliasFilter,
         Block inputBlock,
-        @Nullable DataType inputDataType,
         Warnings warnings
     );
 
@@ -271,13 +271,15 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
     }
 
     private void doLookup(T request, CancellableTask task, ActionListener<List<Page>> listener) {
-        Block inputBlock = request.inputPage.getBlock(0);
-        if (inputBlock.areAllValuesNull()) {
-            List<Page> nullResponse = mergePages
-                ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
-                : List.of();
-            listener.onResponse(nullResponse);
-            return;
+        for (int j = 0; j < request.inputPage.getBlockCount(); j++) {
+            Block inputBlock = request.inputPage.getBlock(j);
+            if (inputBlock.areAllValuesNull()) {
+                List<Page> nullResponse = mergePages
+                    ? List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
+                    : List.of();
+                listener.onResponse(nullResponse);
+                return;
+            }
         }
         final List<Releasable> releasables = new ArrayList<>(6);
         boolean started = false;
@@ -305,6 +307,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
             final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray();
             final Operator finishPages;
             final OrdinalBytesRefBlock ordinalsBytesRefBlock;
+            Block inputBlock = request.inputPage.getBlock(0);
             if (mergePages  // TODO fix this optimization for Lookup.
                 && inputBlock instanceof BytesRefBlock bytesRefBlock
                 && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
@@ -334,14 +337,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
                 request.source.source().getColumnNumber(),
                 request.source.text()
             );
-            QueryList queryList = queryList(
-                request,
-                shardContext.executionContext,
-                aliasFilter,
-                inputBlock,
-                request.inputDataType,
-                warnings
-            );
+            LookupEnrichQueryGenerator queryList = queryList(request, shardContext.executionContext, aliasFilter, inputBlock, warnings);
             var queryOperator = new EnrichQuerySourceOperator(
                 driverContext.blockFactory(),
                 EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
@@ -536,11 +532,6 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
         final String sessionId;
         final ShardId shardId;
         final String indexPattern;
-        /**
-         * For mixed clusters with nodes &lt;8.14, this will be null.
-         */
-        @Nullable
-        final DataType inputDataType;
         final Page inputPage;
         final List<NamedExpression> extractFields;
         final Source source;
@@ -552,7 +543,6 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
             String sessionId,
             ShardId shardId,
             String indexPattern,
-            DataType inputDataType,
             Page inputPage,
             Page toRelease,
             List<NamedExpression> extractFields,
@@ -561,7 +551,6 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
             this.sessionId = sessionId;
             this.shardId = shardId;
             this.indexPattern = indexPattern;
-            this.inputDataType = inputDataType;
             this.inputPage = inputPage;
             this.toRelease = toRelease;
             this.extractFields = extractFields;
@@ -621,8 +610,6 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
                 + sessionId
                 + " ,shard="
                 + shardId
-                + " ,input_type="
-                + inputDataType
                 + " ,extract_fields="
                 + extractFields
                 + " ,positions="

+ 11 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.Warnings;
+import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
 import org.elasticsearch.compute.operator.lookup.QueryList;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasables;
@@ -110,14 +111,14 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
     }
 
     @Override
-    protected QueryList queryList(
+    protected LookupEnrichQueryGenerator queryList(
         TransportRequest request,
         SearchExecutionContext context,
         AliasFilter aliasFilter,
         Block inputBlock,
-        @Nullable DataType inputDataType,
         Warnings warnings
     ) {
+        DataType inputDataType = request.inputDataType;
         MappedFieldType fieldType = context.getFieldType(request.matchField);
         validateTypes(inputDataType, fieldType);
         return switch (request.matchType) {
@@ -190,6 +191,11 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
     protected static class TransportRequest extends AbstractLookupService.TransportRequest {
         private final String matchType;
         private final String matchField;
+        /**
+         * For mixed clusters with nodes &lt;8.14, this will be null.
+         */
+        @Nullable
+        final DataType inputDataType;
 
         TransportRequest(
             String sessionId,
@@ -202,9 +208,10 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
             List<NamedExpression> extractFields,
             Source source
         ) {
-            super(sessionId, shardId, shardId.getIndexName(), inputDataType, inputPage, toRelease, extractFields, source);
+            super(sessionId, shardId, shardId.getIndexName(), inputPage, toRelease, extractFields, source);
             this.matchType = matchType;
             this.matchField = matchField;
+            this.inputDataType = inputDataType;
         }
 
         static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -261,7 +268,7 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
 
         @Override
         protected String extraDescription() {
-            return " ,match_type=" + matchType + " ,match_field=" + matchField;
+            return " ,input_type=" + inputDataType + " ,match_type=" + matchType + " ,match_field=" + matchField;
         }
     }
 

+ 60 - 56
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

@@ -23,12 +23,11 @@ import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.tree.Source;
-import org.elasticsearch.xpack.esql.core.type.DataType;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -37,47 +36,45 @@ import java.util.function.Function;
 
 // TODO rename package
 public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
+
     public record Factory(
+        List<MatchConfig> matchFields,
         String sessionId,
         CancellableTask parentTask,
         int maxOutstandingRequests,
-        int inputChannel,
         Function<DriverContext, LookupFromIndexService> lookupService,
-        DataType inputDataType,
         String lookupIndexPattern,
         String lookupIndex,
-        FieldAttribute.FieldName matchField,
         List<NamedExpression> loadFields,
         Source source
     ) implements OperatorFactory {
         @Override
         public String describe() {
-            return "LookupOperator[index="
-                + lookupIndex
-                + " input_type="
-                + inputDataType
-                + " match_field="
-                + matchField.string()
-                + " load_fields="
-                + loadFields
-                + " inputChannel="
-                + inputChannel
-                + "]";
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
+            for (MatchConfig matchField : matchFields) {
+                stringBuilder.append(" input_type=")
+                    .append(matchField.type())
+                    .append(" match_field=")
+                    .append(matchField.fieldName().string())
+                    .append(" inputChannel=")
+                    .append(matchField.channel());
+            }
+            stringBuilder.append("]");
+            return stringBuilder.toString();
         }
 
         @Override
         public Operator get(DriverContext driverContext) {
             return new LookupFromIndexOperator(
+                matchFields,
                 sessionId,
                 driverContext,
                 parentTask,
                 maxOutstandingRequests,
-                inputChannel,
                 lookupService.apply(driverContext),
-                inputDataType,
                 lookupIndexPattern,
                 lookupIndex,
-                matchField.string(),
                 loadFields,
                 source
             );
@@ -87,14 +84,12 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
     private final LookupFromIndexService lookupService;
     private final String sessionId;
     private final CancellableTask parentTask;
-    private final int inputChannel;
-    private final DataType inputDataType;
     private final String lookupIndexPattern;
     private final String lookupIndex;
-    private final String matchField;
     private final List<NamedExpression> loadFields;
     private final Source source;
-    private long totalTerms = 0L;
+    private long totalRows = 0L;
+    private List<MatchConfig> matchFields;
     /**
      * Total number of pages emitted by this {@link Operator}.
      */
@@ -105,43 +100,51 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
     private OngoingJoin ongoing = null;
 
     public LookupFromIndexOperator(
+        List<MatchConfig> matchFields,
         String sessionId,
         DriverContext driverContext,
         CancellableTask parentTask,
         int maxOutstandingRequests,
-        int inputChannel,
         LookupFromIndexService lookupService,
-        DataType inputDataType,
         String lookupIndexPattern,
         String lookupIndex,
-        String matchField,
         List<NamedExpression> loadFields,
         Source source
     ) {
         super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
+        this.matchFields = matchFields;
         this.sessionId = sessionId;
         this.parentTask = parentTask;
-        this.inputChannel = inputChannel;
         this.lookupService = lookupService;
-        this.inputDataType = inputDataType;
         this.lookupIndexPattern = lookupIndexPattern;
         this.lookupIndex = lookupIndex;
-        this.matchField = matchField;
         this.loadFields = loadFields;
         this.source = source;
     }
 
     @Override
     protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener) {
-        final Block inputBlock = inputPage.getBlock(inputChannel);
-        totalTerms += inputBlock.getTotalValueCount();
+        Block[] inputBlockArray = new Block[matchFields.size()];
+        List<MatchConfig> newMatchFields = new ArrayList<>();
+        for (int i = 0; i < matchFields.size(); i++) {
+            MatchConfig matchField = matchFields.get(i);
+            int inputChannel = matchField.channel();
+            final Block inputBlock = inputPage.getBlock(inputChannel);
+            inputBlockArray[i] = inputBlock;
+            // the matchFields we have are indexed by the input channel on the left side of the join
+            // create a new MatchConfig that uses the field name and type from the matchField
+            // but the new channel index in the inputBlockArray
+            newMatchFields.add(new MatchConfig(matchField.fieldName(), i, matchField.type()));
+        }
+        // we only add to the totalRows once, so we can use the first block
+        totalRows += inputPage.getBlock(0).getTotalValueCount();
+
         LookupFromIndexService.Request request = new LookupFromIndexService.Request(
             sessionId,
             lookupIndex,
             lookupIndexPattern,
-            inputDataType,
-            matchField,
-            new Page(inputBlock),
+            newMatchFields,
+            new Page(inputBlockArray),
             loadFields,
             source
         );
@@ -190,17 +193,18 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
 
     @Override
     public String toString() {
-        return "LookupOperator[index="
-            + lookupIndex
-            + " input_type="
-            + inputDataType
-            + " match_field="
-            + matchField
-            + " load_fields="
-            + loadFields
-            + " inputChannel="
-            + inputChannel
-            + "]";
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("LookupOperator[index=").append(lookupIndex).append(" load_fields=").append(loadFields);
+        for (MatchConfig matchField : matchFields) {
+            stringBuilder.append(" input_type=")
+                .append(matchField.type())
+                .append(" match_field=")
+                .append(matchField.fieldName().string())
+                .append(" inputChannel=")
+                .append(matchField.channel());
+        }
+        stringBuilder.append("]");
+        return stringBuilder.toString();
     }
 
     @Override
@@ -225,7 +229,7 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
 
     @Override
     protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
-        return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages);
+        return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalRows, emittedPages);
     }
 
     public static class Status extends AsyncOperator.Status {
@@ -235,28 +239,28 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
             Status::new
         );
 
-        private final long totalTerms;
+        private final long totalRows;
         /**
          * Total number of pages emitted by this {@link Operator}.
          */
         private final long emittedPages;
 
-        Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) {
+        Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalRows, long emittedPages) {
             super(receivedPages, completedPages, totalTimeInMillis);
-            this.totalTerms = totalTerms;
+            this.totalRows = totalRows;
             this.emittedPages = emittedPages;
         }
 
         Status(StreamInput in) throws IOException {
             super(in);
-            this.totalTerms = in.readVLong();
+            this.totalRows = in.readVLong();
             this.emittedPages = in.readVLong();
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
-            out.writeVLong(totalTerms);
+            out.writeVLong(totalRows);
             out.writeVLong(emittedPages);
         }
 
@@ -269,8 +273,8 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
             return emittedPages;
         }
 
-        public long totalTerms() {
-            return totalTerms;
+        public long totalRows() {
+            return totalRows;
         }
 
         @Override
@@ -278,7 +282,7 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
             builder.startObject();
             super.innerToXContent(builder);
             builder.field("emitted_pages", emittedPages());
-            builder.field("total_terms", totalTerms());
+            builder.field("total_rows", totalRows());
             return builder.endObject();
         }
 
@@ -291,12 +295,12 @@ public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndex
                 return false;
             }
             Status status = (Status) o;
-            return totalTerms == status.totalTerms && emittedPages == status.emittedPages;
+            return totalRows == status.totalRows && emittedPages == status.emittedPages;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(super.hashCode(), totalTerms, emittedPages);
+            return Objects.hash(super.hashCode(), totalRows, emittedPages);
         }
     }
 

+ 65 - 27
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

@@ -20,8 +20,9 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.Warnings;
+import org.elasticsearch.compute.operator.lookup.ExpressionQueryList;
+import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
 import org.elasticsearch.compute.operator.lookup.QueryList;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.index.shard.ShardId;
@@ -31,6 +32,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -38,8 +40,10 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link LookupFromIndexService} performs lookup against a Lookup index for
@@ -80,28 +84,38 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
             request.sessionId,
             shardId,
             request.indexPattern,
-            request.inputDataType,
             request.inputPage,
             null,
             request.extractFields,
-            request.matchField,
+            request.matchFields,
             request.source
         );
     }
 
     @Override
-    protected QueryList queryList(
+    protected LookupEnrichQueryGenerator queryList(
         TransportRequest request,
         SearchExecutionContext context,
         AliasFilter aliasFilter,
         Block inputBlock,
-        @Nullable DataType inputDataType,
         Warnings warnings
     ) {
-        return termQueryList(context.getFieldType(request.matchField), context, aliasFilter, inputBlock, inputDataType).onlySingleValues(
-            warnings,
-            "LOOKUP JOIN encountered multi-value"
-        );
+        List<QueryList> queryLists = new ArrayList<>();
+        for (int i = 0; i < request.matchFields.size(); i++) {
+            MatchConfig matchField = request.matchFields.get(i);
+            QueryList q = termQueryList(
+                context.getFieldType(matchField.fieldName().string()),
+                context,
+                aliasFilter,
+                request.inputPage.getBlock(matchField.channel()),
+                matchField.type()
+            ).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
+            queryLists.add(q);
+        }
+        if (queryLists.size() == 1) {
+            return queryLists.getFirst();
+        }
+        return new ExpressionQueryList(queryLists);
     }
 
     @Override
@@ -115,39 +129,39 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
     }
 
     public static class Request extends AbstractLookupService.Request {
-        private final String matchField;
+        private final List<MatchConfig> matchFields;
 
         Request(
             String sessionId,
             String index,
             String indexPattern,
-            DataType inputDataType,
-            String matchField,
+            List<MatchConfig> matchFields,
             Page inputPage,
             List<NamedExpression> extractFields,
             Source source
         ) {
-            super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source);
-            this.matchField = matchField;
+            super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source);
+            this.matchFields = matchFields;
         }
     }
 
     protected static class TransportRequest extends AbstractLookupService.TransportRequest {
-        private final String matchField;
+        private final List<MatchConfig> matchFields;
 
+        // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order
+        // The channel information inside the MatchConfig, should say the same thing
         TransportRequest(
             String sessionId,
             ShardId shardId,
             String indexPattern,
-            DataType inputDataType,
             Page inputPage,
             Page toRelease,
             List<NamedExpression> extractFields,
-            String matchField,
+            List<MatchConfig> matchFields,
             Source source
         ) {
-            super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source);
-            this.matchField = matchField;
+            super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source);
+            this.matchFields = matchFields;
         }
 
         static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -163,14 +177,26 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
                 indexPattern = shardId.getIndexName();
             }
 
-            DataType inputDataType = DataType.fromTypeName(in.readString());
+            DataType inputDataType = null;
+            if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) {
+                inputDataType = DataType.fromTypeName(in.readString());
+            }
+
             Page inputPage;
             try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
                 inputPage = new Page(bsi);
             }
             PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
             List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
-            String matchField = in.readString();
+            List<MatchConfig> matchFields = null;
+            if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
+                matchFields = planIn.readCollectionAsList(MatchConfig::new);
+            } else {
+                String matchField = in.readString();
+                // For older versions, we only support a single match field.
+                matchFields = new ArrayList<>(1);
+                matchFields.add(new MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
+            }
             var source = Source.EMPTY;
             if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
                 source = Source.readFrom(planIn);
@@ -185,11 +211,10 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
                 sessionId,
                 shardId,
                 indexPattern,
-                inputDataType,
                 inputPage,
                 inputPage,
                 extractFields,
-                matchField,
+                matchFields,
                 source
             );
             result.setParentTask(parentTaskId);
@@ -208,12 +233,25 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
             } else if (indexPattern.equals(shardId.getIndexName()) == false) {
                 throw new EsqlIllegalArgumentException("Aliases and index patterns are not allowed for LOOKUP JOIN [{}]", indexPattern);
             }
-
-            out.writeString(inputDataType.typeName());
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS) == false) {
+                // only write this for old versions
+                // older versions only support a single match field
+                if (matchFields.size() > 1) {
+                    throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node");
+                }
+                out.writeString(matchFields.get(0).type().typeName());
+            }
             out.writeWriteable(inputPage);
             PlanStreamOutput planOut = new PlanStreamOutput(out, null);
             planOut.writeNamedWriteableCollection(extractFields);
-            out.writeString(matchField);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
+                // serialize all match fields for new versions
+                planOut.writeCollection(matchFields, (o, matchConfig) -> matchConfig.writeTo(o));
+            } else {
+                // older versions only support a single match field, we already checked this above when writing the datatype
+                // send the field name of the first and only match field here
+                out.writeString(matchFields.get(0).fieldName().string());
+            }
             if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
                 source.writeTo(planOut);
             }
@@ -224,7 +262,7 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
 
         @Override
         protected String extraDescription() {
-            return " ,match_field=" + matchField;
+            return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "));
         }
     }
 

+ 78 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MatchConfig.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.planner.Layout;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class MatchConfig implements Writeable {
+    private final FieldAttribute.FieldName fieldName;
+    private final int channel;
+    private final DataType type;
+
+    public MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) {
+        this.fieldName = fieldName;
+        this.channel = channel;
+        this.type = type;
+    }
+
+    public MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
+        // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
+        // indices, so the call to exactAttribute looks redundant now.
+        this(match.exactAttribute().fieldName(), input.channel(), input.type());
+    }
+
+    public MatchConfig(StreamInput in) throws IOException {
+        this(new FieldAttribute.FieldName(in.readString()), in.readInt(), DataType.readFrom(in));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(fieldName.string());
+        out.writeInt(channel);
+        type.writeTo(out);
+    }
+
+    public FieldAttribute.FieldName fieldName() {
+        return fieldName;
+    }
+
+    public int channel() {
+        return channel;
+    }
+
+    public DataType type() {
+        return type;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) return true;
+        if (obj == null || obj.getClass() != this.getClass()) return false;
+        var that = (MatchConfig) obj;
+        return Objects.equals(this.fieldName, that.fieldName) && this.channel == that.channel && Objects.equals(this.type, that.type);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fieldName, channel, type);
+    }
+
+    @Override
+    public String toString() {
+        return "MatchConfig[" + "fieldName=" + fieldName + ", " + "channel=" + channel + ", " + "type=" + type + ']';
+    }
+
+}

+ 11 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

@@ -638,7 +638,17 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
 
         var matchFieldsCount = joinFields.size();
         if (matchFieldsCount > 1) {
-            throw new ParsingException(source, "JOIN ON clause only supports one field at the moment, found [{}]", matchFieldsCount);
+            Set<String> matchFieldNames = new LinkedHashSet<>();
+            for (Attribute field : joinFields) {
+                if (matchFieldNames.add(field.name()) == false) {
+                    throw new ParsingException(
+                        field.source(),
+                        "JOIN ON clause does not support multiple fields with the same name, found multiple instances of [{}]",
+                        field.name()
+                    );
+                }
+
+            }
         }
 
         return p -> {

+ 2 - 16
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -83,6 +83,7 @@ import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
+import org.elasticsearch.xpack.esql.enrich.MatchConfig;
 import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
 import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter;
 import org.elasticsearch.xpack.esql.expression.Order;
@@ -779,23 +780,16 @@ public class LocalExecutionPlanner {
             }
             matchFields.add(new MatchConfig(right, input));
         }
-        if (matchFields.size() != 1) {
-            throw new IllegalArgumentException("can't plan [" + join + "]: multiple join predicates are not supported");
-        }
-        // TODO support multiple match fields, and support more than equality predicates
-        MatchConfig matchConfig = matchFields.getFirst();
 
         return source.with(
             new LookupFromIndexOperator.Factory(
+                matchFields,
                 sessionId,
                 parentTask,
                 context.queryPragmas().enrichMaxWorkers(),
-                matchConfig.channel(),
                 ctx -> lookupFromIndexService,
-                matchConfig.type(),
                 localSourceExec.indexPattern(),
                 indexName,
-                matchConfig.fieldName(),
                 join.addedFields().stream().map(f -> (NamedExpression) f).toList(),
                 join.source()
             ),
@@ -803,14 +797,6 @@ public class LocalExecutionPlanner {
         );
     }
 
-    private record MatchConfig(FieldAttribute.FieldName fieldName, int channel, DataType type) {
-        private MatchConfig(FieldAttribute match, Layout.ChannelAndType input) {
-            // TODO: Using exactAttribute was supposed to handle TEXT fields with KEYWORD subfields - but we don't allow these in lookup
-            // indices, so the call to exactAttribute looks redundant now.
-            this(match.exactAttribute().fieldName(), input.channel(), input.type());
-        }
-    }
-
     private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) {
         Layout.Builder layout = new Layout.Builder();
         layout.append(localSourceExec.output());

+ 2 - 10
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java

@@ -135,18 +135,10 @@ public class ParsingTests extends ESTestCase {
         );
     }
 
-    public void testJoinOnMultipleFields() {
-        assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
-        assertEquals(
-            "1:35: JOIN ON clause only supports one field at the moment, found [2]",
-            error("row languages = 1, gender = \"f\" | lookup join test on gender, languages")
-        );
-    }
-
     public void testJoinTwiceOnTheSameField() {
         assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
         assertEquals(
-            "1:35: JOIN ON clause only supports one field at the moment, found [2]",
+            "1:66: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [languages]",
             error("row languages = 1, gender = \"f\" | lookup join test on languages, languages")
         );
     }
@@ -154,7 +146,7 @@ public class ParsingTests extends ESTestCase {
     public void testJoinTwiceOnTheSameField_TwoLookups() {
         assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
         assertEquals(
-            "1:80: JOIN ON clause only supports one field at the moment, found [2]",
+            "1:108: JOIN ON clause does not support multiple fields with the same name, found multiple instances of [gender]",
             error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender")
         );
     }

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java

@@ -39,7 +39,7 @@ public class LookupFromIndexOperatorStatusTests extends AbstractWireSerializingT
         long receivedPages = in.receivedPages();
         long completedPages = in.completedPages();
         long procesNanos = in.procesNanos();
-        long totalTerms = in.totalTerms();
+        long totalTerms = in.totalRows();
         long emittedPages = in.emittedPages();
         switch (randomIntBetween(0, 4)) {
             case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong);
@@ -62,7 +62,7 @@ public class LookupFromIndexOperatorStatusTests extends AbstractWireSerializingT
               "received_pages" : 100,
               "completed_pages" : 50,
               "emitted_pages" : 88,
-              "total_terms" : 120
+              "total_rows" : 120
             }"""));
     }
 }

+ 54 - 22
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

@@ -11,6 +11,7 @@ import org.apache.lucene.document.Field;
 import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.apache.lucene.util.BytesRef;
@@ -39,9 +40,11 @@ import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.compute.test.NoOpReleasable;
 import org.elasticsearch.compute.test.OperatorTestCase;
 import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
+import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperServiceTestCase;
@@ -86,25 +89,37 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
     private final ThreadPool threadPool = threadPool();
     private final Directory lookupIndexDirectory = newDirectory();
     private final List<Releasable> releasables = new ArrayList<>();
+    private int numberOfJoinColumns; // we only allow 1 or 2 columns due to simpleInput() implementation
 
     @Before
     public void buildLookupIndex() throws IOException {
+        numberOfJoinColumns = 1 + randomInt(1); // 1 or 2 join columns
         try (RandomIndexWriter writer = new RandomIndexWriter(random(), lookupIndexDirectory)) {
             for (int i = 0; i < LOOKUP_SIZE; i++) {
-                writer.addDocument(
-                    List.of(
-                        new LongField("match", i, Field.Store.NO),
-                        new KeywordFieldMapper.KeywordField("lkwd", new BytesRef("l" + i), KeywordFieldMapper.Defaults.FIELD_TYPE),
-                        new IntField("lint", -i, Field.Store.NO)
-                    )
-                );
+                List<IndexableField> fields = new ArrayList<>();
+                fields.add(new LongField("match0", i, Field.Store.NO));
+                if (numberOfJoinColumns == 2) {
+                    fields.add(new LongField("match1", i + 1, Field.Store.NO));
+                }
+                fields.add(new KeywordFieldMapper.KeywordField("lkwd", new BytesRef("l" + i), KeywordFieldMapper.Defaults.FIELD_TYPE));
+                fields.add(new IntField("lint", -i, Field.Store.NO));
+                writer.addDocument(fields);
             }
         }
     }
 
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> l % LOOKUP_SIZE));
+        if (numberOfJoinColumns == 1) {
+            return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> l % LOOKUP_SIZE));
+        } else if (numberOfJoinColumns == 2) {
+            return new TupleLongLongBlockSourceOperator(
+                blockFactory,
+                LongStream.range(0, size).mapToObj(l -> Tuple.tuple(l % LOOKUP_SIZE, l % LOOKUP_SIZE + 1))
+            );
+        } else {
+            throw new IllegalStateException("numberOfJoinColumns must be 1 or 2, got: " + numberOfJoinColumns);
+        }
     }
 
     @Override
@@ -117,10 +132,10 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
         int count = results.stream().mapToInt(Page::getPositionCount).sum();
         assertThat(count, equalTo(input.stream().mapToInt(Page::getPositionCount).sum()));
         for (Page r : results) {
-            assertThat(r.getBlockCount(), equalTo(3));
+            assertThat(r.getBlockCount(), equalTo(numberOfJoinColumns + 2));
             LongVector match = r.<LongBlock>getBlock(0).asVector();
-            BytesRefVector lkwd = r.<BytesRefBlock>getBlock(1).asVector();
-            IntVector lint = r.<IntBlock>getBlock(2).asVector();
+            BytesRefVector lkwd = r.<BytesRefBlock>getBlock(numberOfJoinColumns).asVector();
+            IntVector lint = r.<IntBlock>getBlock(numberOfJoinColumns + 1).asVector();
             for (int p = 0; p < r.getPositionCount(); p++) {
                 long m = match.getLong(p);
                 assertThat(lkwd.getBytesRef(p, new BytesRef()).utf8ToString(), equalTo("l" + m));
@@ -134,24 +149,26 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
         String sessionId = "test";
         CancellableTask parentTask = new CancellableTask(0, "test", "test", "test", TaskId.EMPTY_TASK_ID, Map.of());
         int maxOutstandingRequests = 1;
-        int inputChannel = 0;
         DataType inputDataType = DataType.LONG;
         String lookupIndex = "idx";
-        FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match");
         List<NamedExpression> loadFields = List.of(
             new ReferenceAttribute(Source.EMPTY, "lkwd", DataType.KEYWORD),
             new ReferenceAttribute(Source.EMPTY, "lint", DataType.INTEGER)
         );
+
+        List<MatchConfig> matchFields = new ArrayList<>();
+        for (int i = 0; i < numberOfJoinColumns; i++) {
+            FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i);
+            matchFields.add(new MatchConfig(matchField, i, inputDataType));
+        }
         return new LookupFromIndexOperator.Factory(
+            matchFields,
             sessionId,
             parentTask,
             maxOutstandingRequests,
-            inputChannel,
             this::lookupService,
-            inputDataType,
             lookupIndex,
             lookupIndex,
-            matchField,
             loadFields,
             Source.EMPTY
         );
@@ -164,9 +181,13 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
 
     @Override
     protected Matcher<String> expectedToStringOfSimple() {
-        return matchesPattern(
-            "LookupOperator\\[index=idx input_type=LONG match_field=match load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+] inputChannel=0]"
-        );
+        StringBuilder sb = new StringBuilder();
+        sb.append("LookupOperator\\[index=idx load_fields=\\[lkwd\\{r}#\\d+, lint\\{r}#\\d+]");
+        for (int i = 0; i < numberOfJoinColumns; i++) {
+            sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i);
+        }
+        sb.append("]");
+        return matchesPattern(sb.toString());
     }
 
     private LookupFromIndexService lookupService(DriverContext mainContext) {
@@ -240,14 +261,25 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
         return shardId -> {
             MapperServiceTestCase mapperHelper = new MapperServiceTestCase() {
             };
-            MapperService mapperService = mapperHelper.createMapperService("""
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append("""
                 {
                     "doc": { "properties": {
-                        "match": { "type": "long" },
+                        "match0": { "type": "long" },
+                """);
+            if (numberOfJoinColumns == 2) {
+                stringBuilder.append("""
+                        "match1": { "type": "long" },
+                    """);
+            }
+            stringBuilder.append("""
                         "lkwd": { "type": "keyword" },
                         "lint": { "type": "integer" }
                     }}
-                }""");
+                }
+                """);
+
+            MapperService mapperService = mapperHelper.createMapperService(stringBuilder.toString());
             DirectoryReader reader = DirectoryReader.open(lookupIndexDirectory);
             SearchExecutionContext executionCtx = mapperHelper.createSearchExecutionContext(mapperService, newSearcher(reader));
             var ctx = new EsPhysicalOperationProviders.DefaultShardContext(0, new NoOpReleasable(), executionCtx, AliasFilter.EMPTY);

+ 90 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MatchConfigSerializationTests.java

@@ -0,0 +1,90 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
+import org.elasticsearch.xpack.esql.session.Configuration;
+import org.junit.Before;
+
+import java.io.IOException;
+
+import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
+
+public class MatchConfigSerializationTests extends AbstractWireSerializingTestCase<MatchConfig> {
+
+    private Configuration config;
+
+    @Before
+    public void initConfig() {
+        this.config = randomConfiguration();
+    }
+
+    @Override
+    protected Writeable.Reader<MatchConfig> instanceReader() {
+        return MatchConfig::new;
+    }
+
+    @Override
+    protected MatchConfig createTestInstance() {
+        return randomMatchConfig();
+    }
+
+    private MatchConfig randomMatchConfig() {
+        // Implement logic to create a random MatchConfig instance
+        String name = randomAlphaOfLengthBetween(1, 100);
+        int channel = randomInt();
+        DataType type = randomFrom(DataType.types());
+        return new MatchConfig(new FieldAttribute.FieldName(name), channel, type);
+    }
+
+    @Override
+    protected MatchConfig mutateInstance(MatchConfig instance) {
+        return mutateMatchConfig(instance);
+    }
+
+    private MatchConfig mutateMatchConfig(MatchConfig instance) {
+        int i = randomIntBetween(1, 3);
+        return switch (i) {
+            case 1 -> {
+                String name = randomValueOtherThan(instance.fieldName().string(), () -> randomAlphaOfLengthBetween(1, 100));
+                yield new MatchConfig(new FieldAttribute.FieldName(name), instance.channel(), instance.type());
+            }
+            case 2 -> {
+                int channel = randomValueOtherThan(instance.channel(), () -> randomInt());
+                yield new MatchConfig(instance.fieldName(), channel, instance.type());
+            }
+            default -> {
+                DataType type = randomValueOtherThan(instance.type(), () -> randomFrom(DataType.types()));
+                yield new MatchConfig(instance.fieldName(), instance.channel(), type);
+            }
+        };
+    }
+
+    @Override
+    protected MatchConfig copyInstance(MatchConfig instance, TransportVersion version) throws IOException {
+        return copyInstance(instance, getNamedWriteableRegistry(), (out, v) -> v.writeTo(new PlanStreamOutput(out, config)), in -> {
+            PlanStreamInput pin = new PlanStreamInput(in, in.namedWriteableRegistry(), config);
+            return new MatchConfig(pin);
+        }, version);
+    }
+}

+ 102 - 29
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml

@@ -1,12 +1,12 @@
 ---
 setup:
   - requires:
-      test_runner_features: [capabilities, contains, allowed_warnings]
+      test_runner_features: [ capabilities, contains, allowed_warnings ]
       capabilities:
         - method: POST
           path: /_query
-          parameters: []
-          capabilities: [join_lookup_v12]
+          parameters: [ ]
+          capabilities: [ join_lookup_v12 ]
       reason: "uses LOOKUP JOIN"
   - do:
       indices.create:
@@ -80,6 +80,31 @@ setup:
                 type: long
               color:
                 type: keyword
+  - do:
+      indices.create:
+        index: test-lookup-color-match
+        body:
+          settings:
+            index:
+              mode: lookup
+          mappings:
+            properties:
+              key:
+                type: long
+              color:
+                type: keyword
+              shade:
+                type: keyword
+  - do:
+      indices.create:
+        index: test2
+        body:
+          mappings:
+            properties:
+              key:
+                type: long
+              color:
+                type: keyword
   - do:
       bulk:
         index: "test"
@@ -108,7 +133,7 @@ setup:
           - { "index": { } }
           - { "key": 2, "color": "blue" }
           - { "index": { } }
-          - { "key": [0, 1, 2], "color": null }
+          - { "key": [ 0, 1, 2 ], "color": null }
   - do:
       bulk:
         index: "test-lookup-mv"
@@ -119,7 +144,7 @@ setup:
           - { "index": { } }
           - { "key": 2, "color": "yellow" }
           - { "index": { } }
-          - { "key": [0, 1, 2], "color": "green" }
+          - { "key": [ 0, 1, 2 ], "color": "green" }
   - do:
       bulk:
         index: "test-lookup-no-key"
@@ -129,7 +154,28 @@ setup:
           - { "no-key": 1, "color": "cyan" }
           - { "index": { } }
           - { "no-key": 2, "color": "yellow" }
-
+  - do:
+      bulk:
+        index: "test-lookup-color-match"
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "key": 10, "color": "red" , shade: "light" }
+          - { "index": { } }
+          - { "key": 10, "color": "red" , shade: "reddish" }
+          - { "index": { } }
+          - { "key": 20, "color": "blue" , shade: "dark" }
+          - { "index": { } }
+          - { "key": 30, "color": "pink" , shade: "hot" }
+  - do:
+      bulk:
+        index: "test2"
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "key": 10, "color": "red" }
+          - { "index": { } }
+          - { "key": 20, "color": "yellow" }
 ---
 basic:
   - do:
@@ -137,12 +183,12 @@ basic:
         body:
           query: 'FROM test | SORT key | LOOKUP JOIN test-lookup-1 ON key | LIMIT 3'
 
-  - match: {columns.0.name: "key"}
-  - match: {columns.0.type: "long"}
-  - match: {columns.1.name: "color"}
-  - match: {columns.1.type: "keyword"}
-  - match: {values.0: [1, "cyan"]}
-  - match: {values.1: [2, "yellow"]}
+  - match: { columns.0.name: "key" }
+  - match: { columns.0.type: "long" }
+  - match: { columns.1.name: "color" }
+  - match: { columns.1.type: "keyword" }
+  - match: { values.0: [ 1, "cyan" ] }
+  - match: { values.1: [ 2, "yellow" ] }
 
 ---
 fails with non-lookup index v2:
@@ -150,8 +196,8 @@ fails with non-lookup index v2:
       capabilities:
         - method: POST
           path: /_query
-          parameters: []
-          capabilities: [enable_lookup_join_on_remote]
+          parameters: [ ]
+          capabilities: [ enable_lookup_join_on_remote ]
       reason: "checks updated error messages"
   - do:
       esql.query:
@@ -195,12 +241,12 @@ mv-on-lookup:
         - "Line 1:24: evaluation of [LOOKUP JOIN test-lookup-mv ON key] failed, treating result as null. Only first 20 failures recorded."
         - "Line 1:24: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value"
 
-  - match: {columns.0.name: "key"}
-  - match: {columns.0.type: "long"}
-  - match: {columns.1.name: "color"}
-  - match: {columns.1.type: "keyword"}
-  - match: {values.0: [1, "cyan"]}
-  - match: {values.1: [2, "yellow"]}
+  - match: { columns.0.name: "key" }
+  - match: { columns.0.type: "long" }
+  - match: { columns.1.name: "color" }
+  - match: { columns.1.type: "keyword" }
+  - match: { values.0: [ 1, "cyan" ] }
+  - match: { values.1: [ 2, "yellow" ] }
 
 ---
 mv-on-query:
@@ -212,21 +258,48 @@ mv-on-query:
         - "Line 1:27: evaluation of [LOOKUP JOIN test-lookup-1 ON key] failed, treating result as null. Only first 20 failures recorded."
         - "Line 1:27: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value"
 
-  - match: {columns.0.name: "key"}
-  - match: {columns.0.type: "long"}
-  - match: {columns.1.name: "color"}
-  - match: {columns.1.type: "keyword"}
-  - match: {values.0: [[0, 1, 2], null]}
-  - match: {values.1: [1, "cyan"]}
-  - match: {values.2: [2, "yellow"]}
+  - match: { columns.0.name: "key" }
+  - match: { columns.0.type: "long" }
+  - match: { columns.1.name: "color" }
+  - match: { columns.1.type: "keyword" }
+  - match: { values.0: [ [ 0, 1, 2 ], null ] }
+  - match: { values.1: [ 1, "cyan" ] }
+  - match: { values.2: [ 2, "yellow" ] }
 
 ---
 lookup-no-key:
   - do:
       esql.query:
-          body:
-            query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color'
+        body:
+          query: 'FROM test | LOOKUP JOIN test-lookup-no-key ON key | KEEP key, color'
       catch: "bad_request"
 
   - match: { error.type: "verification_exception" }
   - contains: { error.reason: "Unknown column [key] in right side of join" }
+
+---
+basic join on two fields:
+  - requires:
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: [ ]
+          capabilities: [ lookup_join_on_multiple_fields ]
+      reason: "uses LOOKUP JOIN on two fields"
+  - do:
+      esql.query:
+        body:
+          query: 'FROM test2 | LOOKUP JOIN test-lookup-color-match ON key, color | KEEP key, color, shade | SORT key, color, shade | LIMIT 10'
+  - length: { values: 3 }
+  - match: { columns.0.name: "key" }
+  - match: { columns.0.type: "long" }
+  - match: { columns.1.name: "color" }
+  - match: { columns.1.type: "keyword" }
+  - match: { columns.2.name: "shade" }
+  - match: { columns.2.type: "keyword" }
+    #for 10 and red 2 rows match
+  - match: { values.0: [ 10, "red", "light" ] }
+  - match: { values.1: [ 10, "red", "reddish" ] }
+    #for 20 and yellow, no rows match, but we keep the row as it is a lookup join
+  - match: { values.2: [ 20, "yellow", null ] }
+