Browse Source

ESQL: Fix async operator warnings not always sent when blocking (#132744)

Fixes https://github.com/elastic/elasticsearch/issues/128030
Fixes https://github.com/elastic/elasticsearch/issues/130296
Fixes https://github.com/elastic/elasticsearch/issues/130642
Fixes https://github.com/elastic/elasticsearch/issues/131148
Fixes https://github.com/elastic/elasticsearch/issues/132554
Fixes https://github.com/elastic/elasticsearch/issues/132555
Fixes https://github.com/elastic/elasticsearch/issues/132604
Fixes https://github.com/elastic/elasticsearch/issues/131563
Fixes https://github.com/elastic/elasticsearch/issues/132778

Extracted from https://github.com/elastic/elasticsearch/pull/132738

An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
Iván Cea Fontenla 2 months ago
parent
commit
4e3602d0c6

+ 14 - 0
docs/changelog/132744.yaml

@@ -0,0 +1,14 @@
+pr: 132744
+summary: Fix async operator warnings not always sent when blocking
+area: ES|QL
+type: bug
+issues:
+ - 130642
+ - 132554
+ - 132778
+ - 130296
+ - 132555
+ - 131563
+ - 131148
+ - 132604
+ - 128030

+ 0 - 21
muted-tests.yml

@@ -284,9 +284,6 @@ tests:
 - class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
   method: testOneRemoteClusterPartial
   issue: https://github.com/elastic/elasticsearch/issues/124055
-- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndex}
-  issue: https://github.com/elastic/elasticsearch/issues/128030
 - class: org.elasticsearch.packaging.test.EnrollmentProcessTests
   method: test20DockerAutoFormCluster
   issue: https://github.com/elastic/elasticsearch/issues/128113
@@ -348,18 +345,12 @@ tests:
 - class: org.elasticsearch.index.IndexingPressureIT
   method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize
   issue: https://github.com/elastic/elasticsearch/issues/130281
-- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyOnFrom}
-  issue: https://github.com/elastic/elasticsearch/issues/130296
 - class: org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceExecutorTests
   method: testSuccessfulExecution
   issue: https://github.com/elastic/elasticsearch/issues/130306
 - class: org.elasticsearch.gradle.LoggedExecFuncTest
   method: failed tasks output logged to console when spooling true
   issue: https://github.com/elastic/elasticsearch/issues/119509
-- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyFromRow}
-  issue: https://github.com/elastic/elasticsearch/issues/130642
 - class: org.elasticsearch.indices.stats.IndexStatsIT
   method: testFilterCacheStats
   issue: https://github.com/elastic/elasticsearch/issues/124447
@@ -369,9 +360,6 @@ tests:
 - class: org.elasticsearch.search.SearchWithRejectionsIT
   method: testOpenContextsAfterRejections
   issue: https://github.com/elastic/elasticsearch/issues/130821
-- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyOnFromAfterStats}
-  issue: https://github.com/elastic/elasticsearch/issues/131148
 - class: org.elasticsearch.xpack.esql.ccq.MultiClustersIT
   method: testLookupJoinAliases
   issue: https://github.com/elastic/elasticsearch/issues/131166
@@ -462,12 +450,6 @@ tests:
 - class: org.elasticsearch.xpack.esql.inference.completion.CompletionOperatorTests
   method: testSimpleCircuitBreaking
   issue: https://github.com/elastic/elasticsearch/issues/132382
-- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyOnFrom}
-  issue: https://github.com/elastic/elasticsearch/issues/132554
-- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyFromRow}
-  issue: https://github.com/elastic/elasticsearch/issues/132555
 - class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
   method: test {csv-spec:spatial.ConvertFromStringParseError}
   issue: https://github.com/elastic/elasticsearch/issues/132558
@@ -480,9 +462,6 @@ tests:
 - class: org.elasticsearch.xpack.logsdb.qa.StoredSourceLogsDbVersusReindexedLogsDbChallengeRestIT
   method: testEsqlSource
   issue: https://github.com/elastic/elasticsearch/issues/132602
-- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
-  method: test {csv-spec:lookup-join.MvJoinKeyFromRowExpanded}
-  issue: https://github.com/elastic/elasticsearch/issues/132604
 - class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
   method: testRevertModelSnapshot_DeleteInterveningResults
   issue: https://github.com/elastic/elasticsearch/issues/132349

+ 3 - 6
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

@@ -95,19 +95,16 @@ public abstract class AsyncOperator<Fetched> implements Operator {
         driverContext.addAsyncAction();
         boolean success = false;
         try {
-            final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
-                buffers.put(seqNo, output);
-                onSeqNoCompleted(seqNo);
-            }, e -> {
+            final ActionListener<Fetched> listener = ActionListener.wrap(output -> buffers.put(seqNo, output), e -> {
                 releasePageOnAnyThread(input);
                 failureCollector.unwrapAndCollect(e);
-                onSeqNoCompleted(seqNo);
             });
             final long startNanos = System.nanoTime();
             performAsync(input, ActionListener.runAfter(listener, () -> {
                 responseHeadersCollector.collect();
-                driverContext.removeAsyncAction();
                 processNanos.add(System.nanoTime() - startNanos);
+                onSeqNoCompleted(seqNo);
+                driverContext.removeAsyncAction();
             }));
             success = true;
         } finally {

+ 6 - 6
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -429,7 +429,7 @@ emp_no:integer | language_code:integer | language_name:keyword
 
 mvJoinKeyOnTheLookupIndex
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 FROM employees
 | WHERE 10003 < emp_no AND emp_no < 10008
@@ -451,7 +451,7 @@ emp_no:integer | language_code:integer | language_name:keyword
 
 mvJoinKeyOnFrom
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 FROM employees
 | WHERE emp_no < 10006
@@ -474,7 +474,7 @@ emp_no:integer | language_code:integer | language_name:keyword
 
 mvJoinKeyOnTheLookupIndexAfterStats
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 FROM employees
 | WHERE 10003 < emp_no AND emp_no < 10008
@@ -497,7 +497,7 @@ emp_no:integer | language_code:integer | language_name:keyword
 
 mvJoinKeyOnFromAfterStats
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 FROM employees
 | WHERE emp_no < 10006
@@ -521,7 +521,7 @@ emp_no:integer | language_code:integer | language_name:keyword
 
 mvJoinKeyFromRow
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 ROW language_code = [4, 5, 6, 7]
 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code
@@ -538,7 +538,7 @@ language_code:integer | language_name:keyword | country:text
 
 mvJoinKeyFromRowExpanded
 required_capability: join_lookup_v12
-required_capability: join_lookup_skip_mv_warnings
+required_capability: async_operator_warnings_fix
 
 ROW language_code = [4, 5, 6, 7, 8]
 | MV_EXPAND language_code

+ 5 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -825,6 +825,11 @@ public class EsqlCapabilities {
          */
         JOIN_LOOKUP_SKIP_MV_WARNINGS(JOIN_LOOKUP_V12.isEnabled()),
 
+        /**
+         * Fix for async operator sometimes completing the driver without emitting the stored warnings
+         */
+        ASYNC_OPERATOR_WARNINGS_FIX,
+
         /**
          * Fix pushing down LIMIT past LOOKUP JOIN in case of multiple matching join keys.
          */