浏览代码

Fix request processing scheduling (#127464)

Ievgen Degtiarenko 5 月之前
父节点
当前提交
c922e522df

+ 0 - 3
muted-tests.yml

@@ -399,9 +399,6 @@ tests:
 - class: org.elasticsearch.packaging.test.DockerTests
   method: test026InstallBundledRepositoryPluginsViaConfigFile
   issue: https://github.com/elastic/elasticsearch/issues/127158
-- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
-  method: testRetryOnlyMovedShards
-  issue: https://github.com/elastic/elasticsearch/issues/127168
 - class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
   method: testSearchWhileRelocating
   issue: https://github.com/elastic/elasticsearch/issues/127188

+ 14 - 25
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

@@ -189,6 +189,20 @@ abstract class DataNodeRequestSender {
                     if (changed.compareAndSet(true, false) == false) {
                         break;
                     }
+                    var pendingRetries = new HashSet<ShardId>();
+                    for (ShardId shardId : pendingShardIds) {
+                        if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
+                            var failure = shardFailures.get(shardId);
+                            if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) {
+                                pendingRetries.add(shardId);
+                            }
+                        }
+                    }
+                    if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
+                        for (var entry : resolveShards(pendingRetries).entrySet()) {
+                            targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue());
+                        }
+                    }
                     for (ShardId shardId : pendingShardIds) {
                         if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
                             shardFailures.compute(
@@ -257,26 +271,11 @@ abstract class DataNodeRequestSender {
         final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
         sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
 
-            private final Set<ShardId> pendingRetries = new HashSet<>();
-
             void onAfter(DriverCompletionInfo info) {
                 nodePermits.get(request.node).release();
                 if (concurrentRequests != null) {
                     concurrentRequests.release();
                 }
-
-                if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
-                    try {
-                        sendingLock.lock();
-                        var resolutions = resolveShards(pendingRetries);
-                        for (var entry : resolutions.entrySet()) {
-                            targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
-                        }
-                    } finally {
-                        sendingLock.unlock();
-                    }
-                }
-
                 trySendingRequestsForPendingShards(targetShards, computeListener);
                 listener.onResponse(info);
             }
@@ -293,7 +292,6 @@ abstract class DataNodeRequestSender {
                     final ShardId shardId = entry.getKey();
                     trackShardLevelFailure(shardId, false, entry.getValue());
                     pendingShardIds.add(shardId);
-                    maybeScheduleRetry(shardId, false, entry.getValue());
                 }
                 onAfter(response.completionInfo());
             }
@@ -303,7 +301,6 @@ abstract class DataNodeRequestSender {
                 for (ShardId shardId : request.shardIds) {
                     trackShardLevelFailure(shardId, receivedData, e);
                     pendingShardIds.add(shardId);
-                    maybeScheduleRetry(shardId, receivedData, e);
                 }
                 onAfter(DriverCompletionInfo.EMPTY);
             }
@@ -317,14 +314,6 @@ abstract class DataNodeRequestSender {
                     onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
                 }
             }
-
-            private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) {
-                if (receivedData == false
-                    && targetShards.getShard(shardId).remainingNodes.isEmpty()
-                    && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
-                    pendingRetries.add(shardId);
-                }
-            }
         });
     }
 

+ 40 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

@@ -501,6 +501,46 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         assertThat("Must retry only affected shards", resolvedShards, contains(shard2));
     }
 
+    public void testRetryUnassignedShardWithoutPartialResults() {
+        var attempt = new AtomicInteger(0);
+        var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> {
+            attempt.incrementAndGet();
+            return Map.of(shard1, List.of());
+        },
+            (node, shardIds, aliasFilters, listener) -> runWithDelay(
+                () -> listener.onResponse(
+                    Objects.equals(shardIds, List.of(shard2))
+                        ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
+                        : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
+                )
+            )
+
+        );
+        expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet);
+    }
+
+    public void testRetryUnassignedShardWithPartialResults() {
+        var response = safeGet(
+            sendRequests(
+                true,
+                -1,
+                List.of(targetShard(shard1, node1), targetShard(shard2, node2)),
+                shardIds -> Map.of(shard1, List.of()),
+                (node, shardIds, aliasFilters, listener) -> runWithDelay(
+                    () -> listener.onResponse(
+                        Objects.equals(shardIds, List.of(shard2))
+                            ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
+                            : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
+                    )
+                )
+            )
+        );
+        assertThat(response.totalShards, equalTo(2));
+        assertThat(response.successfulShards, equalTo(1));
+        assertThat(response.skippedShards, equalTo(0));
+        assertThat(response.failedShards, equalTo(1));
+    }
+
     static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
         return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
     }