浏览代码

Do not set fatal exception when shard follow task is stopped. (#37603)

When shard follow task is cancelled while fetching operations then
the fatal exception field should not be set.
Martijn van Groningen 6 年之前
父节点
当前提交
88f4b0a326

+ 9 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -418,12 +418,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
 
     private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
         assert e != null;
-        if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) {
-            int currentRetry = retryCounter.incrementAndGet();
-            LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
-                params.getFollowShardId(), currentRetry), e);
-            long delay = computeDelay(currentRetry, params.getReadPollTimeout().getMillis());
-            scheduler.accept(TimeValue.timeValueMillis(delay), task);
+        if (shouldRetry(params.getRemoteCluster(), e)) {
+            if (isStopped() == false) {
+                // Only retry is the shard follow task is not stopped.
+                int currentRetry = retryCounter.incrementAndGet();
+                LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
+                    params.getFollowShardId(), currentRetry), e);
+                long delay = computeDelay(currentRetry, params.getReadPollTimeout().getMillis());
+                scheduler.accept(TimeValue.timeValueMillis(delay), task);
+            }
         } else {
             fatalException = ExceptionsHelper.convertToElastic(e);
             LOGGER.warn("shard follow task encounter non-retryable error", e);

+ 31 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -40,7 +40,9 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 public class ShardFollowNodeTaskTests extends ESTestCase {
@@ -287,6 +289,35 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
     }
 
+    public void testFatalExceptionNotSetWhenStoppingWhileFetchingOps() {
+        ShardFollowTaskParams params = new ShardFollowTaskParams();
+        params.maxReadRequestOperationCount = 64;
+        params.maxOutstandingReadRequests = 1;
+        params.maxOutstandingWriteRequests = 1;
+        ShardFollowNodeTask task = createShardFollowTask(params);
+        startTask(task, 63, -1);
+
+        readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
+
+        mappingVersions.add(1L);
+        leaderGlobalCheckpoints.add(63L);
+        maxSeqNos.add(63L);
+        responseSizes.add(64);
+        simulateResponse.set(true);
+        beforeSendShardChangesRequest = status -> {
+            // Cancel just before attempting to fetch operations:
+            task.onCancelled();
+        };
+        task.coordinateReads();
+
+        assertThat(task.isStopped(), is(true));
+        ShardFollowNodeTaskStatus status = task.getStatus();
+        assertThat(status.getFatalException(), nullValue());
+        assertThat(status.failedReadRequests(), equalTo(1L));
+        assertThat(status.successfulReadRequests(), equalTo(0L));
+        assertThat(status.readExceptions().size(), equalTo(1));
+    }
+
     public void testEmptyShardChangesResponseShouldClearFetchException() {
         ShardFollowTaskParams params = new ShardFollowTaskParams();
         params.maxReadRequestOperationCount = 64;