Browse Source

[8.x] Backport two PRs (#117246) (#117843) (#117967)

* LOOKUP JOIN using field-caps for field mapping (#117246)

* LOOKUP JOIN using field-caps for field mapping

Removes the hard-coded hack for languages_lookup, and instead does a field-caps check for the real join index.

* Update docs/changelog/117246.yaml

* Some code review comments

* Enhance LOOKUP JOIN csv-spec tests to cover more cases and fix several bugs found (#117843)

Adds several more tests to lookup-join.csv-spec, and fixes the following bugs:

* FieldCaps on right hand side should ignore fieldNames method and just use "*" because currently the fieldNames search cannot handle lookup fields with aliases (should be fixed in a followup PR).
* Stop using the lookup index in the ComputeService (so we don’t get both indices data coming in from the left, and other weird behaviour).
* Ignore failing SearchStats checks on fields from the right hand side in the logical planner (so it does not plan EVAL field = null for all right hand fields). This should be fixed properly with the correct updates to TransportSearchShardsAction (or rather to making multiple use of that for each branch of the execution model).

* Don't load indices with mode:lookup due to cluster state errors in mixed clusters

* Disable all lookup-join tests on 8.x, due to issues with cluster state

* Spotless apply
Craig Taverner 10 months ago
parent
commit
5afbfda213
24 changed files with 494 additions and 122 deletions
  1. 5 0
      docs/changelog/117246.yaml
  2. 2 2
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
  3. 17 7
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java
  4. 5 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/clientips_lookup-settings.json
  5. 1 1
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/languages.csv
  6. 5 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/languages_lookup-settings.json
  7. 214 10
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
  8. 8 8
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-clientips.json
  9. 1 1
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-languages.json
  10. 10 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-message_types.json
  11. 6 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv
  12. 5 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types_lookup-settings.json
  13. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  14. 11 26
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java
  15. 13 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/AnalyzerContext.java
  16. 11 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java
  17. 12 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java
  18. 8 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java
  19. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java
  20. 1 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  21. 9 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
  22. 49 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  23. 98 48
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  24. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

+ 5 - 0
docs/changelog/117246.yaml

@@ -0,0 +1,5 @@
+pr: 117246
+summary: LOOKUP JOIN using field-caps for field mapping
+area: ES|QL
+type: enhancement
+issues: []

+ 2 - 2
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -48,7 +48,7 @@ import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDI
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V3;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V4;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
 import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
@@ -124,7 +124,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
-        assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V3.capabilityName()));
+        assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V4.capabilityName()));
     }
 
     private TestFeatureService remoteFeaturesService() throws IOException {

+ 17 - 7
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

@@ -56,6 +56,8 @@ public class CsvTestsDataLoader {
     private static final TestsDataset APPS = new TestsDataset("apps");
     private static final TestsDataset APPS_SHORT = APPS.withIndex("apps_short").withTypeMapping(Map.of("id", "short"));
     private static final TestsDataset LANGUAGES = new TestsDataset("languages");
+    // private static final TestsDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup")
+    // .withSetting("languages_lookup-settings.json");
     private static final TestsDataset ALERTS = new TestsDataset("alerts");
     private static final TestsDataset UL_LOGS = new TestsDataset("ul_logs");
     private static final TestsDataset SAMPLE_DATA = new TestsDataset("sample_data");
@@ -70,6 +72,11 @@ public class CsvTestsDataLoader {
         .withTypeMapping(Map.of("@timestamp", "date_nanos"));
     private static final TestsDataset MISSING_IP_SAMPLE_DATA = new TestsDataset("missing_ip_sample_data");
     private static final TestsDataset CLIENT_IPS = new TestsDataset("clientips");
+    // private static final TestsDataset CLIENT_IPS_LOOKUP = CLIENT_IPS.withIndex("clientips_lookup")
+    // .withSetting("clientips_lookup-settings.json");
+    private static final TestsDataset MESSAGE_TYPES = new TestsDataset("message_types");
+    // private static final TestsDataset MESSAGE_TYPES_LOOKUP = MESSAGE_TYPES.withIndex("message_types_lookup")
+    // .withSetting("message_types_lookup-settings.json");
     private static final TestsDataset CLIENT_CIDR = new TestsDataset("client_cidr");
     private static final TestsDataset AGES = new TestsDataset("ages");
     private static final TestsDataset HEIGHTS = new TestsDataset("heights");
@@ -94,14 +101,13 @@ public class CsvTestsDataLoader {
     private static final TestsDataset BOOKS = new TestsDataset("books");
     private static final TestsDataset SEMANTIC_TEXT = new TestsDataset("semantic_text").withInferenceEndpoint(true);
 
-    private static final String LOOKUP_INDEX_SUFFIX = "_lookup";
-
     public static final Map<String, TestsDataset> CSV_DATASET_MAP = Map.ofEntries(
         Map.entry(EMPLOYEES.indexName, EMPLOYEES),
         Map.entry(HOSTS.indexName, HOSTS),
         Map.entry(APPS.indexName, APPS),
         Map.entry(APPS_SHORT.indexName, APPS_SHORT),
         Map.entry(LANGUAGES.indexName, LANGUAGES),
+        // Map.entry(LANGUAGES_LOOKUP.indexName, LANGUAGES_LOOKUP),
         Map.entry(UL_LOGS.indexName, UL_LOGS),
         Map.entry(SAMPLE_DATA.indexName, SAMPLE_DATA),
         Map.entry(MV_SAMPLE_DATA.indexName, MV_SAMPLE_DATA),
@@ -111,6 +117,9 @@ public class CsvTestsDataLoader {
         Map.entry(SAMPLE_DATA_TS_NANOS.indexName, SAMPLE_DATA_TS_NANOS),
         Map.entry(MISSING_IP_SAMPLE_DATA.indexName, MISSING_IP_SAMPLE_DATA),
         Map.entry(CLIENT_IPS.indexName, CLIENT_IPS),
+        // Map.entry(CLIENT_IPS_LOOKUP.indexName, CLIENT_IPS_LOOKUP),
+        Map.entry(MESSAGE_TYPES.indexName, MESSAGE_TYPES),
+        // Map.entry(MESSAGE_TYPES_LOOKUP.indexName, MESSAGE_TYPES_LOOKUP),
         Map.entry(CLIENT_CIDR.indexName, CLIENT_CIDR),
         Map.entry(AGES.indexName, AGES),
         Map.entry(HEIGHTS.indexName, HEIGHTS),
@@ -132,9 +141,7 @@ public class CsvTestsDataLoader {
         Map.entry(DISTANCES.indexName, DISTANCES),
         Map.entry(ADDRESSES.indexName, ADDRESSES),
         Map.entry(BOOKS.indexName, BOOKS),
-        Map.entry(SEMANTIC_TEXT.indexName, SEMANTIC_TEXT),
-        // JOIN LOOKUP alias
-        Map.entry(LANGUAGES.indexName + LOOKUP_INDEX_SUFFIX, LANGUAGES.withIndex(LANGUAGES.indexName + LOOKUP_INDEX_SUFFIX))
+        Map.entry(SEMANTIC_TEXT.indexName, SEMANTIC_TEXT)
     );
 
     private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json");
@@ -174,13 +181,14 @@ public class CsvTestsDataLoader {
      * </p>
      * <p>
      * Accepts an URL as first argument, eg. http://localhost:9200 or http://user:pass@localhost:9200
-     *</p>
+     * </p>
      * <p>
      * If no arguments are specified, the default URL is http://localhost:9200 without authentication
      * </p>
      * <p>
      * It also supports HTTPS
      * </p>
+     *
      * @param args the URL to connect
      * @throws IOException
      */
@@ -270,7 +278,9 @@ public class CsvTestsDataLoader {
         }
     }
 
-    /** The semantic_text mapping type require an inference endpoint that needs to be setup before creating the index. */
+    /**
+     * The semantic_text mapping type require an inference endpoint that needs to be setup before creating the index.
+     */
     public static void createInferenceEndpoint(RestClient client) throws IOException {
         Request request = new Request("PUT", "_inference/sparse_embedding/test_sparse_inference");
         request.setJsonEntity("""

+ 5 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/clientips_lookup-settings.json

@@ -0,0 +1,5 @@
+{
+  "index": {
+    "mode": "lookup"
+  }
+}

+ 1 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/resources/languages.csv

@@ -1,4 +1,4 @@
-language_code:keyword,language_name:keyword
+language_code:integer,language_name:keyword
 1,English
 2,French
 3,Spanish

+ 5 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/languages_lookup-settings.json

@@ -0,0 +1,5 @@
+{
+  "index": {
+    "mode": "lookup"
+  }
+}

+ 214 - 10
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -4,8 +4,8 @@
 //
 
 //TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
-basicOnTheDataNode-Ignore
-required_capability: join_lookup_v3
+basicOnTheDataNode
+required_capability: join_lookup_v4
 
 FROM employees
 | EVAL language_code = languages
@@ -21,19 +21,19 @@ emp_no:integer | language_code:integer | language_name:keyword
 10093          | 3                     | Spanish
 ;
 
-basicRow-Ignore
-required_capability: join_lookup_v3
+basicRow
+required_capability: join_lookup_v4
 
 ROW language_code = 1
 | LOOKUP JOIN languages_lookup ON language_code
 ;
 
-language_code:keyword  | language_name:keyword
+language_code:integer  | language_name:keyword
 1                      | English
 ;
 
 basicOnTheCoordinator
-required_capability: join_lookup_v3
+required_capability: join_lookup_v4
 
 FROM employees
 | SORT emp_no
@@ -49,9 +49,8 @@ emp_no:integer | language_code:integer | language_name:keyword
 10003          | 4                     | German
 ;
 
-//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
-subsequentEvalOnTheDataNode-Ignore
-required_capability: join_lookup_v3
+subsequentEvalOnTheDataNode
+required_capability: join_lookup_v4
 
 FROM employees
 | EVAL language_code = languages
@@ -69,7 +68,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
 ;
 
 subsequentEvalOnTheCoordinator
-required_capability: join_lookup_v3
+required_capability: join_lookup_v4
 
 FROM employees
 | SORT emp_no
@@ -85,3 +84,208 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
 10002          | 5                     | null                  |                       10
 10003          | 4                     | german                |                        8
 ;
+
+lookupIPFromRow
+required_capability: join_lookup_v4
+
+ROW left = "left", client_ip = "172.21.0.5", right = "right"
+| LOOKUP JOIN clientips_lookup ON client_ip
+;
+
+left:keyword | client_ip:keyword | right:keyword | env:keyword
+left         | 172.21.0.5        | right         | Development
+;
+
+lookupIPFromRowWithShadowing
+required_capability: join_lookup_v4
+
+ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
+| LOOKUP JOIN clientips_lookup ON client_ip
+;
+
+left:keyword | client_ip:keyword | right:keyword | env:keyword
+left         | 172.21.0.5        | right         | Development
+;
+
+lookupIPFromRowWithShadowingKeep
+required_capability: join_lookup_v4
+
+ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
+| EVAL client_ip = client_ip::keyword
+| LOOKUP JOIN clientips_lookup ON client_ip
+| KEEP left, client_ip, right, env
+;
+
+left:keyword | client_ip:keyword | right:keyword | env:keyword
+left         | 172.21.0.5        | right         | Development
+;
+
+lookupIPFromIndex
+required_capability: join_lookup_v4
+
+FROM sample_data
+| EVAL client_ip = client_ip::keyword
+| LOOKUP JOIN clientips_lookup ON client_ip
+;
+
+@timestamp:date          | event_duration:long | message:keyword       | client_ip:keyword | env:keyword
+2023-10-23T13:55:01.543Z | 1756467             | Connected to 10.1.0.1 | 172.21.3.15       | Production
+2023-10-23T13:53:55.832Z | 5033755             | Connection error      | 172.21.3.15       | Production
+2023-10-23T13:52:55.015Z | 8268153             | Connection error      | 172.21.3.15       | Production
+2023-10-23T13:51:54.732Z | 725448              | Connection error      | 172.21.3.15       | Production
+2023-10-23T13:33:34.937Z | 1232382             | Disconnected          | 172.21.0.5        | Development
+2023-10-23T12:27:28.948Z | 2764889             | Connected to 10.1.0.2 | 172.21.2.113      | QA
+2023-10-23T12:15:03.360Z | 3450233             | Connected to 10.1.0.3 | 172.21.2.162      | QA
+;
+
+lookupIPFromIndexKeep
+required_capability: join_lookup_v4
+
+FROM sample_data
+| EVAL client_ip = client_ip::keyword
+| LOOKUP JOIN clientips_lookup ON client_ip
+| KEEP @timestamp, client_ip, event_duration, message, env
+;
+
+@timestamp:date          | client_ip:keyword | event_duration:long | message:keyword       | env:keyword
+2023-10-23T13:55:01.543Z | 172.21.3.15       | 1756467             | Connected to 10.1.0.1 | Production
+2023-10-23T13:53:55.832Z | 172.21.3.15       | 5033755             | Connection error      | Production
+2023-10-23T13:52:55.015Z | 172.21.3.15       | 8268153             | Connection error      | Production
+2023-10-23T13:51:54.732Z | 172.21.3.15       | 725448              | Connection error      | Production
+2023-10-23T13:33:34.937Z | 172.21.0.5        | 1232382             | Disconnected          | Development
+2023-10-23T12:27:28.948Z | 172.21.2.113      | 2764889             | Connected to 10.1.0.2 | QA
+2023-10-23T12:15:03.360Z | 172.21.2.162      | 3450233             | Connected to 10.1.0.3 | QA
+;
+
+lookupIPFromIndexStats
+required_capability: join_lookup_v4
+
+FROM sample_data
+| EVAL client_ip = client_ip::keyword
+| LOOKUP JOIN clientips_lookup ON client_ip
+| STATS count = count(client_ip) BY env
+| SORT count DESC, env ASC
+;
+
+count:long | env:keyword
+4          | Production
+2          | QA
+1          | Development
+;
+
+lookupIPFromIndexStatsKeep
+required_capability: join_lookup_v4
+
+FROM sample_data
+| EVAL client_ip = client_ip::keyword
+| LOOKUP JOIN clientips_lookup ON client_ip
+| KEEP client_ip, env
+| STATS count = count(client_ip) BY env
+| SORT count DESC, env ASC
+;
+
+count:long | env:keyword
+4          | Production
+2          | QA
+1          | Development
+;
+
+lookupMessageFromRow
+required_capability: join_lookup_v4
+
+ROW left = "left", message = "Connected to 10.1.0.1", right = "right"
+| LOOKUP JOIN message_types_lookup ON message
+;
+
+left:keyword | message:keyword       | right:keyword | type:keyword
+left         | Connected to 10.1.0.1 | right         | Success
+;
+
+lookupMessageFromRowWithShadowing
+required_capability: join_lookup_v4
+
+ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
+| LOOKUP JOIN message_types_lookup ON message
+;
+
+left:keyword | message:keyword       | right:keyword | type:keyword
+left         | Connected to 10.1.0.1 | right         | Success
+;
+
+lookupMessageFromRowWithShadowingKeep
+required_capability: join_lookup_v4
+
+ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
+| LOOKUP JOIN message_types_lookup ON message
+| KEEP left, message, right, type
+;
+
+left:keyword | message:keyword       | right:keyword | type:keyword
+left         | Connected to 10.1.0.1 | right         | Success
+;
+
+lookupMessageFromIndex
+required_capability: join_lookup_v4
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+;
+
+@timestamp:date          | client_ip:ip | event_duration:long | message:keyword       | type:keyword
+2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467             | Connected to 10.1.0.1 | Success
+2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755             | Connection error      | Error
+2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153             | Connection error      | Error
+2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448              | Connection error      | Error
+2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382             | Disconnected          | Disconnected
+2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889             | Connected to 10.1.0.2 | Success
+2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233             | Connected to 10.1.0.3 | Success
+;
+
+lookupMessageFromIndexKeep
+required_capability: join_lookup_v4
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+| KEEP @timestamp, client_ip, event_duration, message, type
+;
+
+@timestamp:date          | client_ip:ip | event_duration:long | message:keyword       | type:keyword
+2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467             | Connected to 10.1.0.1 | Success
+2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755             | Connection error      | Error
+2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153             | Connection error      | Error
+2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448              | Connection error      | Error
+2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382             | Disconnected          | Disconnected
+2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889             | Connected to 10.1.0.2 | Success
+2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233             | Connected to 10.1.0.3 | Success
+;
+
+lookupMessageFromIndexStats
+required_capability: join_lookup_v4
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+| STATS count = count(message) BY type
+| SORT count DESC, type ASC
+;
+
+count:long | type:keyword
+3          | Error
+3          | Success
+1          | Disconnected
+;
+
+lookupMessageFromIndexStatsKeep
+required_capability: join_lookup_v4
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+| KEEP message, type
+| STATS count = count(message) BY type
+| SORT count DESC, type ASC
+;
+
+count:long | type:keyword
+3          | Error
+3          | Success
+1          | Disconnected
+;

+ 8 - 8
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-clientips.json

@@ -1,10 +1,10 @@
 {
-    "properties": {
-      "client_ip": {
-        "type": "keyword"
-      },
-      "env": {
-        "type": "keyword"
-      }
+  "properties": {
+    "client_ip": {
+      "type": "keyword"
+    },
+    "env": {
+      "type": "keyword"
     }
-  }
+  }
+}

+ 1 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-languages.json

@@ -1,7 +1,7 @@
 {
     "properties" : {
         "language_code" : {
-            "type" : "keyword"
+            "type" : "integer"
         },
         "language_name" : {
             "type" : "keyword"

+ 10 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-message_types.json

@@ -0,0 +1,10 @@
+{
+  "properties": {
+    "message": {
+      "type": "keyword"
+    },
+    "type": {
+      "type": "keyword"
+    }
+  }
+}

+ 6 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv

@@ -0,0 +1,6 @@
+message:keyword,type:keyword
+Connection error,Error
+Disconnected,Disconnected
+Connected to 10.1.0.1,Success
+Connected to 10.1.0.2,Success
+Connected to 10.1.0.3,Success

+ 5 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types_lookup-settings.json

@@ -0,0 +1,5 @@
+{
+  "index": {
+    "mode": "lookup"
+  }
+}

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -518,7 +518,7 @@ public class EsqlCapabilities {
         /**
          * LOOKUP JOIN
          */
-        JOIN_LOOKUP_V3(Build.current().isSnapshot()),
+        JOIN_LOOKUP_V4(false && Build.current().isSnapshot()),
 
         /**
          * Fix for https://github.com/elastic/elasticsearch/issues/117054

+ 11 - 26
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

@@ -61,6 +61,7 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Dat
 import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.EsqlArithmeticOperation;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
 import org.elasticsearch.xpack.esql.index.EsIndex;
+import org.elasticsearch.xpack.esql.index.IndexResolution;
 import org.elasticsearch.xpack.esql.parser.ParsingException;
 import org.elasticsearch.xpack.esql.plan.TableIdentifier;
 import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
@@ -105,7 +106,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -198,11 +198,12 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
 
         @Override
         protected LogicalPlan rule(UnresolvedRelation plan, AnalyzerContext context) {
-            if (plan.indexMode().equals(IndexMode.LOOKUP)) {
-                return hackLookupMapping(plan);
-            }
-            if (context.indexResolution().isValid() == false) {
-                return plan.unresolvedMessage().equals(context.indexResolution().toString())
+            return resolveIndex(plan, plan.indexMode().equals(IndexMode.LOOKUP) ? context.lookupResolution() : context.indexResolution());
+        }
+
+        private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexResolution) {
+            if (indexResolution.isValid() == false) {
+                return plan.unresolvedMessage().equals(indexResolution.toString())
                     ? plan
                     : new UnresolvedRelation(
                         plan.source(),
@@ -210,12 +211,12 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                         plan.frozen(),
                         plan.metadataFields(),
                         plan.indexMode(),
-                        context.indexResolution().toString(),
+                        indexResolution.toString(),
                         plan.commandName()
                     );
             }
             TableIdentifier table = plan.table();
-            if (context.indexResolution().matches(table.index()) == false) {
+            if (indexResolution.matches(table.index()) == false) {
                 // TODO: fix this (and tests), or drop check (seems SQL-inherited, where's also defective)
                 new UnresolvedRelation(
                     plan.source(),
@@ -223,32 +224,16 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                     plan.frozen(),
                     plan.metadataFields(),
                     plan.indexMode(),
-                    "invalid [" + table + "] resolution to [" + context.indexResolution() + "]",
+                    "invalid [" + table + "] resolution to [" + indexResolution + "]",
                     plan.commandName()
                 );
             }
 
-            EsIndex esIndex = context.indexResolution().get();
+            EsIndex esIndex = indexResolution.get();
             var attributes = mappingAsAttributes(plan.source(), esIndex.mapping());
             attributes.addAll(plan.metadataFields());
             return new EsRelation(plan.source(), esIndex, attributes.isEmpty() ? NO_FIELDS : attributes, plan.indexMode());
         }
-
-        private LogicalPlan hackLookupMapping(UnresolvedRelation plan) {
-            if (plan.table().index().toLowerCase(Locale.ROOT).equals("languages_lookup")) {
-                EsIndex esIndex = new EsIndex(
-                    "languages_lookup",
-                    Map.ofEntries(
-                        Map.entry("language_code", new EsField("language_code", DataType.LONG, Map.of(), true)),
-                        Map.entry("language_name", new EsField("language", DataType.KEYWORD, Map.of(), true))
-                    ),
-                    Map.of("languages_lookup", IndexMode.LOOKUP)
-                );
-                var attributes = mappingAsAttributes(plan.source(), esIndex.mapping());
-                return new EsRelation(plan.source(), esIndex, attributes.isEmpty() ? NO_FIELDS : attributes, plan.indexMode());
-            }
-            return plan;
-        }
     }
 
     /**

+ 13 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/AnalyzerContext.java

@@ -15,5 +15,17 @@ public record AnalyzerContext(
     Configuration configuration,
     EsqlFunctionRegistry functionRegistry,
     IndexResolution indexResolution,
+    IndexResolution lookupResolution,
     EnrichResolution enrichResolution
-) {}
+) {
+    // Currently for tests only, since most do not test lookups
+    // TODO: make this even simpler, remove the enrichResolution for tests that do not require it (most tests)
+    public AnalyzerContext(
+        Configuration configuration,
+        EsqlFunctionRegistry functionRegistry,
+        IndexResolution indexResolution,
+        EnrichResolution enrichResolution
+    ) {
+        this(configuration, functionRegistry, indexResolution, IndexResolution.invalid("<none>"), enrichResolution);
+    }
+}

+ 11 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

@@ -24,6 +24,7 @@ import org.elasticsearch.search.SearchService;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -78,9 +79,19 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
     @Override
     protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) {
         MappedFieldType fieldType = context.getFieldType(request.matchField);
+        validateTypes(request.inputDataType, fieldType);
         return termQueryList(fieldType, context, inputBlock, inputDataType);
     }
 
+    private static void validateTypes(DataType inputDataType, MappedFieldType fieldType) {
+        // TODO: consider supporting implicit type conversion as done in ENRICH for some types
+        if (fieldType.typeName().equals(inputDataType.typeName()) == false) {
+            throw new EsqlIllegalArgumentException(
+                "LOOKUP JOIN match and input types are incompatible: match[" + fieldType.typeName() + "], input[" + inputDataType + "]"
+            );
+        }
+    }
+
     public static class Request extends AbstractLookupService.Request {
         private final String matchField;
 

+ 12 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
 
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
@@ -23,6 +24,7 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
 import org.elasticsearch.xpack.esql.plan.logical.TopN;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
@@ -56,10 +58,13 @@ public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan,
             var projections = project.projections();
             List<NamedExpression> newProjections = new ArrayList<>(projections.size());
             Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
+            AttributeSet joinAttributes = joinAttributes(project);
 
             for (NamedExpression projection : projections) {
                 // Do not use the attribute name, this can deviate from the field name for union types.
-                if (projection instanceof FieldAttribute f && stats.exists(f.fieldName()) == false) {
+                if (projection instanceof FieldAttribute f && stats.exists(f.fieldName()) == false && joinAttributes.contains(f) == false) {
+                    // TODO: Should do a searchStats lookup for join attributes instead of just ignoring them here
+                    // See TransportSearchShardsAction
                     DataType dt = f.dataType();
                     Alias nullAlias = nullLiteral.get(f.dataType());
                     // save the first field as null (per datatype)
@@ -96,4 +101,10 @@ public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan,
 
         return plan;
     }
+
+    private AttributeSet joinAttributes(Project project) {
+        var attributes = new AttributeSet();
+        project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output())));
+        return attributes;
+    }
 }

+ 8 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

@@ -23,14 +23,12 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.rule.Rule;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
 /**
- *
  * Materialize the concrete fields that need to be extracted from the storage until the last possible moment.
  * Expects the local plan to already have a projection containing the fields needed upstream.
  * <p>
@@ -102,15 +100,18 @@ public class InsertFieldExtraction extends Rule<PhysicalPlan, PhysicalPlan> {
 
     private static Set<Attribute> missingAttributes(PhysicalPlan p) {
         var missing = new LinkedHashSet<Attribute>();
-        var inputSet = p.inputSet();
+        var input = p.inputSet();
 
-        // TODO: We need to extract whatever fields are missing from the left hand side.
-        // skip the lookup join since the right side is always materialized and a projection
+        // For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized
         if (p instanceof LookupJoinExec join) {
-            return Collections.emptySet();
+            join.leftFields().forEach(f -> {
+                if (input.contains(f) == false) {
+                    missing.add(f);
+                }
+            });
+            return missing;
         }
 
-        var input = inputSet;
         // collect field attributes used inside expressions
         // TODO: Rather than going over all expressions manually, this should just call .references()
         p.forEachExpression(TypedAttribute.class, f -> {

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

@@ -102,7 +102,7 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize {
 
     @Override
     public PhysicalPlan estimateRowSize(State state) {
-        state.add(false, output());
+        state.add(false, addedFields);
         return this;
     }
 

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -565,6 +565,7 @@ public class LocalExecutionPlanner {
 
     private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlannerContext context) {
         PhysicalOperation source = plan(join.left(), context);
+        // TODO: The source builder includes incoming fields including the ones we're going to drop
         Layout.Builder layoutBuilder = source.layout.builder();
         for (Attribute f : join.addedFields()) {
             layoutBuilder.append(f);

+ 9 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -14,6 +14,7 @@ import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.SearchExecutionContext;
@@ -25,6 +26,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.core.util.Queries;
+import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -117,12 +119,17 @@ public class PlannerUtils {
         var indices = new LinkedHashSet<String>();
         plan.forEachUp(
             FragmentExec.class,
-            f -> f.fragment()
-                .forEachUp(EsRelation.class, r -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(r.index().name()))))
+            f -> f.fragment().forEachUp(EsRelation.class, r -> addOriginalIndexIfNotLookup(indices, r.index()))
         );
         return indices.toArray(String[]::new);
     }
 
+    private static void addOriginalIndexIfNotLookup(Set<String> indices, EsIndex index) {
+        if (index.indexNameWithModes().get(index.name()) != IndexMode.LOOKUP) {
+            indices.addAll(asList(Strings.commaDelimitedListToStringArray(index.name())));
+        }
+    }
+
     public static PhysicalPlan localPlan(List<SearchExecutionContext> searchContexts, Configuration configuration, PhysicalPlan plan) {
         return localPlan(configuration, plan, SearchContextStats.from(searchContexts));
     }

+ 49 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -63,8 +63,12 @@ import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
+import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -77,6 +81,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -162,9 +167,11 @@ public class ComputeService {
         Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
             .groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));
         QueryPragmas queryPragmas = configuration.pragmas();
+        Set<String> lookupIndexNames = findLookupIndexNames(physicalPlan);
+        Set<String> concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames);
         if (dataNodePlan == null) {
-            if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) {
-                String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices;
+            if (concreteIndexNames.isEmpty() == false) {
+                String error = "expected no concrete indices without data node plan; got " + concreteIndexNames;
                 assert false : error;
                 listener.onFailure(new IllegalStateException(error));
                 return;
@@ -187,7 +194,7 @@ public class ComputeService {
                 return;
             }
         } else {
-            if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) {
+            if (concreteIndexNames.isEmpty()) {
                 var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan;
                 assert false : error;
                 listener.onFailure(new IllegalStateException(error));
@@ -261,6 +268,42 @@ public class ComputeService {
         }
     }
 
+    private Set<String> selectConcreteIndices(Map<String, OriginalIndices> clusterToConcreteIndices, Set<String> indexesToIgnore) {
+        Set<String> concreteIndexNames = new HashSet<>();
+        clusterToConcreteIndices.forEach((clusterAlias, concreteIndices) -> {
+            for (String index : concreteIndices.indices()) {
+                if (indexesToIgnore.contains(index) == false) {
+                    concreteIndexNames.add(index);
+                }
+            }
+        });
+        return concreteIndexNames;
+    }
+
+    private Set<String> findLookupIndexNames(PhysicalPlan physicalPlan) {
+        Set<String> lookupIndexNames = new HashSet<>();
+        // When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()"
+        physicalPlan.forEachDown(
+            LookupJoinExec.class,
+            lookupJoinExec -> lookupJoinExec.lookup()
+                .forEachDown(
+                    FragmentExec.class,
+                    frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
+                )
+        );
+        // When planning JOIN on the data node: "FragmentExec.fragment()->Join.right()->EsRelation.index()"
+        // TODO this only works for LEFT join, so we still need to support RIGHT join
+        physicalPlan.forEachDown(
+            FragmentExec.class,
+            fragmentExec -> fragmentExec.fragment()
+                .forEachDown(
+                    Join.class,
+                    join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
+                )
+        );
+        return lookupIndexNames;
+    }
+
     // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
     private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
         if (execInfo.isCrossClusterSearch()) {
@@ -569,8 +612,9 @@ public class ComputeService {
     /**
      * Result from lookupDataNodes where can_match is performed to determine what shards can be skipped
      * and which target nodes are needed for running the ES|QL query
-     * @param dataNodes list of DataNode to perform the ES|QL query on
-     * @param totalShards Total number of shards (from can_match phase), including skipped shards
+     *
+     * @param dataNodes     list of DataNode to perform the ES|QL query on
+     * @param totalShards   Total number of shards (from can_match phase), including skipped shards
      * @param skippedShards Number of skipped shards (from can_match phase)
      */
     record DataNodeResult(List<DataNode> dataNodes, int totalShards, int skippedShards) {}

+ 98 - 48
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.compute.data.Block;
@@ -62,6 +63,8 @@ import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
 import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
 import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -76,7 +79,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -176,7 +178,7 @@ public class EsqlSession {
         executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
     }
 
-    private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {};
+    private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}
 
     private void executeSubPlans(
         PhysicalPlan physicalPlan,
@@ -272,9 +274,12 @@ public class EsqlSession {
             return;
         }
 
-        preAnalyze(parsed, executionInfo, (indices, policies) -> {
+        preAnalyze(parsed, executionInfo, (indices, lookupIndices, policies) -> {
             planningMetrics.gatherPreAnalysisMetrics(parsed);
-            Analyzer analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indices, policies), verifier);
+            Analyzer analyzer = new Analyzer(
+                new AnalyzerContext(configuration, functionRegistry, indices, lookupIndices, policies),
+                verifier
+            );
             var plan = analyzer.analyze(parsed);
             plan.setAnalyzed();
             LOGGER.debug("Analyzed plan:\n{}", plan);
@@ -285,7 +290,7 @@ public class EsqlSession {
     private <T> void preAnalyze(
         LogicalPlan parsed,
         EsqlExecutionInfo executionInfo,
-        BiFunction<IndexResolution, EnrichResolution, T> action,
+        TriFunction<IndexResolution, IndexResolution, EnrichResolution, T> action,
         ActionListener<T> listener
     ) {
         PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
@@ -299,63 +304,81 @@ public class EsqlSession {
         ).keySet();
         enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> {
             // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
-            var matchFields = enrichResolution.resolvedEnrichPolicies()
+            var enrichMatchFields = enrichResolution.resolvedEnrichPolicies()
                 .stream()
                 .map(ResolvedEnrichPolicy::matchField)
                 .collect(Collectors.toSet());
-            Map<String, Exception> unavailableClusters = enrichResolution.getUnavailableClusters();
-            preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> {
-                // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
-                // resolution to updateExecutionInfo
-                if (indexResolution.isValid()) {
-                    EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
-                    EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
-                    if (executionInfo.isCrossClusterSearch()
-                        && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
-                        // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
-                        // Exception to let the LogicalPlanActionListener decide how to proceed
-                        ll.onFailure(new NoClustersToSearchException());
-                        return;
-                    }
-
-                    Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
-                        indexResolution.get().concreteIndices().toArray(String[]::new)
-                    ).keySet();
-                    // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
-                    // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again.
-                    // TODO: add a test for this
-                    if (targetClusters.containsAll(newClusters) == false
-                        // do not bother with a re-resolution if only remotes were requested and all were offline
-                        && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) {
-                        enrichPolicyResolver.resolvePolicies(
-                            newClusters,
-                            unresolvedPolicies,
-                            ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution))
-                        );
-                        return;
-                    }
-                }
-                ll.onResponse(action.apply(indexResolution, enrichResolution));
-            }), matchFields);
+            // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
+            var fieldNames = fieldNames(parsed, enrichMatchFields);
+            // First resolve the lookup indices, then the main indices
+            preAnalyzeLookupIndices(
+                preAnalysis.lookupIndices,
+                Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection
+                l.delegateFailureAndWrap(
+                    (lx, lookupIndexResolution) -> preAnalyzeIndices(
+                        indices,
+                        executionInfo,
+                        enrichResolution.getUnavailableClusters(),
+                        fieldNames,
+                        lx.delegateFailureAndWrap((ll, indexResolution) -> {
+                            // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid
+                            // index resolution to updateExecutionInfo
+                            if (indexResolution.isValid()) {
+                                EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+                                EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(
+                                    executionInfo,
+                                    indexResolution.unavailableClusters()
+                                );
+                                if (executionInfo.isCrossClusterSearch()
+                                    && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
+                                    // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
+                                    // Exception to let the LogicalPlanActionListener decide how to proceed
+                                    ll.onFailure(new NoClustersToSearchException());
+                                    return;
+                                }
+
+                                Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
+                                    indexResolution.get().concreteIndices().toArray(String[]::new)
+                                ).keySet();
+                                // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
+                                // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies
+                                // again.
+                                // TODO: add a test for this
+                                if (targetClusters.containsAll(newClusters) == false
+                                    // do not bother with a re-resolution if only remotes were requested and all were offline
+                                    && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) {
+                                    enrichPolicyResolver.resolvePolicies(
+                                        newClusters,
+                                        unresolvedPolicies,
+                                        ll.map(
+                                            newEnrichResolution -> action.apply(indexResolution, lookupIndexResolution, newEnrichResolution)
+                                        )
+                                    );
+                                    return;
+                                }
+                            }
+                            ll.onResponse(action.apply(indexResolution, lookupIndexResolution, enrichResolution));
+                        })
+                    )
+                )
+            );
         }));
     }
 
     private void preAnalyzeIndices(
-        LogicalPlan parsed,
+        List<TableInfo> indices,
         EsqlExecutionInfo executionInfo,
         Map<String, Exception> unavailableClusters,  // known to be unavailable from the enrich policy API call
-        ActionListener<IndexResolution> listener,
-        Set<String> enrichPolicyMatchFields
+        Set<String> fieldNames,
+        ActionListener<IndexResolution> listener
     ) {
-        PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed);
         // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
-        if (preAnalysis.indices.size() > 1) {
+        if (indices.size() > 1) {
             // Note: JOINs are not supported but we detect them when
             listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
-        } else if (preAnalysis.indices.size() == 1) {
-            TableInfo tableInfo = preAnalysis.indices.get(0);
+        } else if (indices.size() == 1) {
+            TableInfo tableInfo = indices.get(0);
             TableIdentifier table = tableInfo.id();
-            var fieldNames = fieldNames(parsed, enrichPolicyMatchFields);
 
             Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(IndicesOptions.DEFAULT, table.index());
             for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
@@ -401,6 +424,25 @@ public class EsqlSession {
         }
     }
 
+    private void preAnalyzeLookupIndices(List<TableInfo> indices, Set<String> fieldNames, ActionListener<IndexResolution> listener) {
+        if (indices.size() > 1) {
+            // Note: JOINs on more than one index are not yet supported
+            listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported"));
+        } else if (indices.size() == 1) {
+            TableInfo tableInfo = indices.get(0);
+            TableIdentifier table = tableInfo.id();
+            // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
+            indexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener);
+        } else {
+            try {
+                // No lookup indices specified
+                listener.onResponse(IndexResolution.invalid("[none specified]"));
+            } catch (Exception ex) {
+                listener.onFailure(ex);
+            }
+        }
+    }
+
     static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields) {
         if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
             // no explicit columns selection, for example "from employees"
@@ -422,6 +464,7 @@ public class EsqlSession {
         // "keep" attributes are special whenever a wildcard is used in their name
         // ie "from test | eval lang = languages + 1 | keep *l" should consider both "languages" and "*l" as valid fields to ask for
         AttributeSet keepCommandReferences = new AttributeSet();
+        AttributeSet keepJoinReferences = new AttributeSet();
 
         parsed.forEachDown(p -> {// go over each plan top-down
             if (p instanceof RegexExtract re) { // for Grok and Dissect
@@ -438,6 +481,11 @@ public class EsqlSession {
                 // The exact name of the field will be added later as part of enrichPolicyMatchFields Set
                 enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute);
                 references.addAll(enrichRefs);
+            } else if (p instanceof LookupJoin join) {
+                keepJoinReferences.addAll(join.config().matchFields());  // TODO: why is this empty
+                if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) {
+                    keepJoinReferences.addAll(usingJoinType.columns());
+                }
             } else {
                 references.addAll(p.references());
                 if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) {
@@ -470,6 +518,8 @@ public class EsqlSession {
                 references.removeIf(attr -> matchByName(attr, alias.name(), keepCommandReferences.contains(attr)));
             });
         });
+        // Add JOIN ON column references afterward to avoid Alias removal
+        references.addAll(keepJoinReferences);
 
         // remove valid metadata attributes because they will be filtered out by the IndexResolver anyway
         // otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -260,7 +260,7 @@ public class CsvTests extends ESTestCase {
             );
             assumeFalse(
                 "lookup join disabled for csv tests",
-                testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V3.capabilityName())
+                testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V4.capabilityName())
             );
             if (Build.current().isSnapshot()) {
                 assertThat(