瀏覽代碼

Fix counting skipped shards with filters (#131737) (#131769)

* Fix counting skipped shards with filters

(cherry picked from commit a345f561e150c8943c1b16a71a00bd46d15e0a4f)

# Conflicts:
#	x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
Stanislav Malyshev 3 月之前
父節點
當前提交
7be2bc48a1

+ 0 - 3
muted-tests.yml

@@ -330,9 +330,6 @@ tests:
 - class: org.elasticsearch.xpack.search.CrossClusterAsyncSearchIT
   method: testCCSClusterDetailsWhereAllShardsSkippedInCanMatch
   issue: https://github.com/elastic/elasticsearch/issues/128418
-- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithFiltersIT
-  method: testTimestampFilterFromQuery
-  issue: https://github.com/elastic/elasticsearch/issues/127332
 - class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
   method: testSearchWhileRelocating
   issue: https://github.com/elastic/elasticsearch/issues/128500

+ 26 - 3
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -72,6 +72,7 @@ public class MultiClustersIT extends ESRestTestCase {
     List<Doc> localDocs = List.of();
     final String remoteIndex = "test-remote-index";
     List<Doc> remoteDocs = List.of();
+    private Boolean shouldCheckShardCounts = null;
 
     @Before
     public void setUpIndices() throws Exception {
@@ -164,6 +165,17 @@ public class MultiClustersIT extends ESRestTestCase {
         }
     }
 
+    private boolean checkShardCounts() {
+        if (shouldCheckShardCounts == null) {
+            try {
+                shouldCheckShardCounts = capabilitiesSupportedNewAndOld(List.of("correct_skipped_shard_count"));
+            } catch (IOException e) {
+                shouldCheckShardCounts = false;
+            }
+        }
+        return shouldCheckShardCounts;
+    }
+
     private <C, V> void assertResultMapForLike(
         boolean includeCCSMetadata,
         Map<String, Object> result,
@@ -295,11 +307,16 @@ public class MultiClustersIT extends ESRestTestCase {
         assertThat(
             remoteClusterShards,
             matchesMap().entry("total", greaterThanOrEqualTo(0))
-                .entry("successful", remoteClusterShards.get("total"))
+                .entry("successful", greaterThanOrEqualTo(0))
                 .entry("skipped", greaterThanOrEqualTo(0))
                 .entry("failed", 0)
         );
-
+        if (checkShardCounts()) {
+            assertThat(
+                (int) remoteClusterShards.get("successful") + (int) remoteClusterShards.get("skipped"),
+                equalTo(remoteClusterShards.get("total"))
+            );
+        }
         if (remoteOnly == false) {
             @SuppressWarnings("unchecked")
             Map<String, Object> localCluster = (Map<String, Object>) details.get("(local)");
@@ -313,10 +330,16 @@ public class MultiClustersIT extends ESRestTestCase {
             assertThat(
                 localClusterShards,
                 matchesMap().entry("total", greaterThanOrEqualTo(0))
-                    .entry("successful", localClusterShards.get("total"))
+                    .entry("successful", greaterThanOrEqualTo(0))
                     .entry("skipped", greaterThanOrEqualTo(0))
                     .entry("failed", 0)
             );
+            if (checkShardCounts()) {
+                assertThat(
+                    (int) localClusterShards.get("successful") + (int) localClusterShards.get("skipped"),
+                    equalTo(localClusterShards.get("total"))
+                );
+            }
         }
     }
 

+ 5 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java

@@ -62,8 +62,10 @@ public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase
     protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
         assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL);
         assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
-        assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
-        assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
+        // We should have at least one successful shard for data
+        assertThat(clusterMetatata.getSuccessfulShards(), greaterThanOrEqualTo(1));
+        // Some shards may be skipped, but total sum of the shards should match up
+        assertThat(clusterMetatata.getSkippedShards() + clusterMetatata.getSuccessfulShards(), equalTo(shards));
     }
 
     protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) {
@@ -81,7 +83,7 @@ public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase
     ) {
         assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL);
         assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
-        assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
+        assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0));
         assertThat(clusterMetatata.getSkippedShards(), equalTo(shards));
     }
 

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1250,7 +1250,12 @@ public class EsqlCapabilities {
          * Forbid usage of brackets in unquoted index and enrich policy names
          * https://github.com/elastic/elasticsearch/issues/130378
          */
-        NO_BRACKETS_IN_UNQUOTED_INDEX_NAMES;
+        NO_BRACKETS_IN_UNQUOTED_INDEX_NAMES,
+
+        /**
+         * Support correct counting of skipped shards.
+         */
+        CORRECT_SKIPPED_SHARDS_COUNT;
 
         private final boolean enabled;
 

+ 10 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

@@ -134,17 +134,20 @@ abstract class DataNodeRequestSender {
                 var computeListener = new ComputeListener(
                     transportService.getThreadPool(),
                     runOnTaskFailure,
-                    listener.map(
-                        completionInfo -> new ComputeResponse(
+                    listener.map(completionInfo -> {
+                        final int totalSkipShards = targetShards.skippedShards() + skippedShards.get();
+                        final int failedShards = shardFailures.size();
+                        final int successfulShards = targetShards.totalShards() - totalSkipShards - failedShards;
+                        return new ComputeResponse(
                             completionInfo,
                             timeValueNanos(System.nanoTime() - startTimeInNanos),
                             targetShards.totalShards(),
-                            targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
-                            targetShards.skippedShards() + skippedShards.get(),
-                            shardFailures.size(),
+                            successfulShards,
+                            totalSkipShards,
+                            failedShards,
                             selectFailures()
-                        )
-                    )
+                        );
+                    })
                 )
             ) {
                 pendingShardIds.addAll(order(targetShards));