Browse Source

Promptly fail recovery from snapshot (#96421)

Recovering a blob from snapshot may take some time. If the recovery is
cancelled during this time, there is no point in continuing, and indeed
we must give up as soon as possible to avoid holding the shard lock for
longer than necessary. This commit checks for cancellation on every read
from the blob store, rather than just between blobs.

Closes #95525
David Turner 2 years ago
parent
commit
651ec81d04

+ 6 - 0
docs/changelog/96421.yaml

@@ -0,0 +1,6 @@
+pr: 96421
+summary: Promptly fail recovery from snapshot
+area: Recovery
+type: bug
+issues:
+ - 95525

+ 5 - 0
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -356,6 +356,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
         return recoverySettings.tryAcquireSnapshotDownloadPermits();
     }
 
+    // Visible for testing
+    public int ongoingRecoveryCount() {
+        return onGoingRecoveries.size();
+    }
+
     /**
      * Prepare the start recovery request.
      *

+ 14 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -42,6 +42,7 @@ import org.elasticsearch.index.store.StoreFileMetadata;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.repositories.IndexId;
 
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
@@ -586,7 +587,19 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         ) {
             StoreFileMetadata metadata = fileInfo.metadata();
             int readSnapshotFileBufferSize = snapshotFilesProvider.getReadSnapshotFileBufferSizeForRepo(repository);
-            multiFileWriter.writeFile(metadata, readSnapshotFileBufferSize, inputStream);
+            multiFileWriter.writeFile(metadata, readSnapshotFileBufferSize, new FilterInputStream(inputStream) {
+                @Override
+                public int read() throws IOException {
+                    cancellableThreads.checkForCancel();
+                    return super.read();
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    cancellableThreads.checkForCancel();
+                    return super.read(b, off, len);
+                }
+            });
             listener.onResponse(null);
         } catch (Exception e) {
             logger.debug(() -> format("Unable to recover snapshot file %s from repository %s", fileInfo, repository), e);

+ 68 - 0
x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java

@@ -662,6 +662,74 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
         }
     }
 
+    public void testCancelledRecoveryAbortsDownloadPromptly() throws Exception {
+        updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1");
+
+        try {
+            internalCluster().ensureAtLeastNumDataNodes(2);
+
+            String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+            createIndex(
+                indexName,
+                Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
+            );
+            ensureGreen(indexName);
+
+            int numDocs = randomIntBetween(1, 1000);
+            indexDocs(indexName, numDocs, numDocs);
+
+            String repoName = "repo";
+            createRepo(repoName, TestRepositoryPlugin.FILTER_TYPE);
+            createSnapshot(repoName, "snap", Collections.singletonList(indexName));
+
+            final AtomicBoolean isCancelled = new AtomicBoolean();
+            final CountDownLatch readFromBlobCalledLatch = new CountDownLatch(1);
+            final CountDownLatch readFromBlobRespondLatch = new CountDownLatch(1);
+
+            FilterFsRepository.wrapReadBlobMethod((blobName, stream) -> {
+                if (blobName.startsWith("__")) {
+                    return new FilterInputStream(stream) {
+                        @Override
+                        public int read() throws IOException {
+                            beforeRead();
+                            return super.read();
+                        }
+
+                        @Override
+                        public int read(byte[] b, int off, int len) throws IOException {
+                            beforeRead();
+                            return super.read(b, off, len);
+                        }
+
+                        private void beforeRead() {
+                            assertFalse(isCancelled.get()); // should have no further reads once the index is deleted
+                            readFromBlobCalledLatch.countDown();
+                            safeAwait(readFromBlobRespondLatch);
+                        }
+                    };
+                } else {
+                    return stream;
+                }
+            });
+
+            updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1), indexName);
+            safeAwait(readFromBlobCalledLatch);
+
+            assertAcked(client().admin().indices().prepareDelete(indexName).get());
+            // cancellation flag is set when applying the cluster state that deletes the index, so no further waiting is necessary
+            isCancelled.set(true);
+            readFromBlobRespondLatch.countDown();
+
+            assertThat(indexExists(indexName), is(equalTo(false)));
+            assertBusy(
+                () -> internalCluster().getInstances(PeerRecoveryTargetService.class)
+                    .forEach(peerRecoveryTargetService -> assertEquals(0, peerRecoveryTargetService.ongoingRecoveryCount()))
+            );
+        } finally {
+            updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), null);
+        }
+    }
+
     public void testRecoveryAfterRestoreUsesSnapshots() throws Exception {
         String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         createIndex(