Explorar el Código

[ES|QL] Make numberOfChannels consistent with layout map by removing duplicated ChannelSet (#125636)

* make numberOfChannels consistent with layout
Fang Xing hace 7 meses
padre
commit
80125a4bac

+ 6 - 0
docs/changelog/125636.yaml

@@ -0,0 +1,6 @@
+pr: 125636
+summary: Make `numberOfChannels` consistent with layout map by removing duplicated
+  `ChannelSet`
+area: ES|QL
+type: bug
+issues: []

+ 33 - 0
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -994,6 +994,32 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values"));
     }
 
+    public void testMultipleBatchesWithLookupJoin() throws IOException {
+        assumeTrue(
+            "Makes numberOfChannels consistent with layout map for join with multiple batches",
+            EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled()
+        );
+        // Create more than 10 indices to trigger multiple batches of data node execution.
+        // The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
+        for (int i = 1; i <= 20; i++) {
+            createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
+        }
+        bulkLoadTestDataLookupMode(10);
+        // lookup join with and without sort
+        for (String sort : List.of("", "| sort integer")) {
+            var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
+            Map<String, Object> result = runEsql(query);
+            var columns = as(result.get("columns"), List.class);
+            assertEquals(21, columns.size());
+            var values = as(result.get("values"), List.class);
+            assertEquals(10, values.size());
+        }
+        // clean up
+        for (int i = 1; i <= 20; i++) {
+            assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
+        }
+    }
+
     public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
         List<String> dateMathOverflowExpressions = List.of(
             "2147483647 day + 1 day",
@@ -1668,6 +1694,13 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return "[" + value + ", " + value + "]";
     }
 
+    private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
+        Request request = new Request("PUT", "/" + indexName);
+        String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
+        request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
+        assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
+    }
+
     public static RequestObjectBuilder requestObjectBuilder() throws IOException {
         return new RequestObjectBuilder();
     }

+ 87 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
 10092          | 1                     | English
 10093          | 3                     | Spanish
 ;
+
+multipleBatchesWithSort
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+required_capability: make_number_of_channels_consistent_with_layout
+
+from *
+| rename city.country.continent.planet.name as message
+| lookup join message_types_lookup on message
+| sort language_code, birth_date
+| keep language_code
+| limit 1
+;
+
+language_code:integer
+1
+;
+
+multipleBatchesWithMvExpand
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+required_capability: make_number_of_channels_consistent_with_layout
+
+from *
+| rename city.country.continent.planet.name as message
+| lookup join message_types_lookup on message
+| keep birth_date, language_code
+| mv_expand birth_date
+| sort birth_date, language_code
+| limit 1
+;
+
+birth_date:datetime      |language_code:integer
+1952-02-27T00:00:00.000Z |null
+;
+
+multipleBatchesWithAggregate1
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+required_capability: make_number_of_channels_consistent_with_layout
+
+from *
+| rename city.country.continent.planet.name as message
+| lookup join message_types_lookup on message
+| keep birth_date, language_code
+| stats x=max(birth_date), y=min(language_code)
+;
+
+x:datetime               |y:integer
+1965-01-03T00:00:00.000Z |1
+;
+
+multipleBatchesWithAggregate2
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+required_capability: make_number_of_channels_consistent_with_layout
+
+from *
+| rename city.country.continent.planet.name as message
+| lookup join message_types_lookup on message
+| keep birth_date, language_code
+| stats m=min(birth_date) by language_code
+| sort language_code
+| limit 1
+;
+
+m:datetime |language_code:integer
+null       |1
+;
+
+multipleBatchesWithAggregate3
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+required_capability: make_number_of_channels_consistent_with_layout
+
+from *
+| rename city.country.continent.planet.name as message
+| lookup join message_types_lookup on message
+| keep birth_date, language_code
+| stats m=min(language_code) by birth_date
+| sort birth_date
+| limit 1
+;
+
+m:integer |birth_date:datetime
+null      |1952-02-27T00:00:00.000Z
+;

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -932,7 +932,12 @@ public class EsqlCapabilities {
         /**
          * Index component selector syntax (my-data-stream-name::failures)
          */
-        INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
+        INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),
+
+        /**
+         * Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
+         */
+        MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;
 
         private final boolean enabled;
 

+ 13 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java

@@ -107,8 +107,20 @@ public interface Layout {
             Map<NameId, ChannelAndType> layout = new HashMap<>();
             int numberOfChannels = 0;
             for (ChannelSet set : channels) {
-                int channel = numberOfChannels++;
+                boolean createNewChannel = true;
+                int channel = 0;
                 for (NameId id : set.nameIds) {
+                    if (layout.containsKey(id)) {
+                        // If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
+                        // a null in the list of channels, and NullPointerException when build() is called.
+                        // TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
+                        // with the same ids as the missing field ids.
+                        continue;
+                    }
+                    if (createNewChannel) {
+                        channel = numberOfChannels++;
+                        createNewChannel = false;
+                    }
                     ChannelAndType next = new ChannelAndType(channel, set.type);
                     ChannelAndType prev = layout.put(id, next);
                     // Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238