浏览代码

Avoid atomic write of large blobs in repo analyzer (#69960)

Today we randomly perform an atomic write even if there are no early
reads, but we should only do so if the blob is small enough to write
atomically.
David Turner 4 年之前
父节点
当前提交
421df6c797

+ 32 - 0
x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/test/resources/rest-api-spec/test/10_analyze.yml

@@ -18,6 +18,15 @@ setup:
             readonly: true
             location: "test_repo_loc"
 
+  - do:
+      snapshot.create_repository:
+        repository: test_repo_slow
+        body:
+          type: fs
+          settings:
+            max_snapshot_bytes_per_sec: "1b"
+            location: "test_repo_loc"
+
 ---
 "Analysis fails on readonly repositories":
   - skip:
@@ -145,3 +154,26 @@ setup:
   - is_false: summary.read.max_wait
   - is_false: summary.read.total_throttled
   - is_false: summary.read.total_elapsed
+
+---
+"Timeout with large blobs":
+  - skip:
+      version: "- 7.11.99"
+      reason: "introduced in 7.12"
+
+  - do:
+      catch: request
+      snapshot.repository_analyze:
+        repository: test_repo_slow
+        blob_count: 1
+        concurrency: 1
+        max_blob_size: 2gb
+        max_total_data_size: 2gb
+        detailed: false
+        human: false
+        timeout: 1s
+
+  - match: { status: 500 }
+  - match: { error.type: repository_verification_exception }
+  - match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" }
+  - match: { error.root_cause.0.type: receive_timeout_transport_exception }

+ 17 - 5
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -188,6 +189,12 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
         }
     }
 
+    /**
+     * The atomic write API is based around a {@link BytesReference} which uses {@code int} for lengths and offsets and so on, so we can
+     * only use it to write a blob with size at most {@link Integer#MAX_VALUE}:
+     */
+    static final long MAX_ATOMIC_WRITE_SIZE = Integer.MAX_VALUE;
+
     /**
      * Analysis on a single blob, performing the write(s) and orchestrating the read(s).
      */
@@ -265,10 +272,15 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
         }
 
         void run() {
-            writeRandomBlob(request.readEarly || random.nextBoolean(), true, this::doReadBeforeWriteComplete, write1Step);
+            writeRandomBlob(
+                request.readEarly || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean()),
+                true,
+                this::doReadBeforeWriteComplete,
+                write1Step
+            );
 
             if (request.writeAndOverwrite) {
-                assert request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
+                assert request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
                 write1Step.whenComplete(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {});
             } else {
                 write2Step.onResponse(null);
@@ -277,7 +289,7 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
         }
 
         private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLastRead, StepListener<WriteDetails> stepListener) {
-            assert atomic == false || request.targetLength <= Integer.MAX_VALUE : "oversized atomic write";
+            assert atomic == false || request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
             final RandomBlobContent content = new RandomBlobContent(
                 request.getRepositoryName(),
                 random.nextLong(),
@@ -296,7 +308,7 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
                 // E.g. for S3 blob containers we would like to choose somewhat more randomly between single-part and multi-part uploads,
                 // rather than relying on the usual distinction based on the size of the blob.
 
-                if (atomic || (request.targetLength <= Integer.MAX_VALUE && random.nextBoolean())) {
+                if (atomic || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean())) {
                     final RandomBlobContentBytesReference bytesReference = new RandomBlobContentBytesReference(
                         content,
                         Math.toIntExact(request.getTargetLength())
@@ -613,7 +625,7 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
             boolean writeAndOverwrite
         ) {
             assert 0 < targetLength;
-            assert targetLength <= Integer.MAX_VALUE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
+            assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
             this.repositoryName = repositoryName;
             this.blobPath = blobPath;
             this.blobName = blobName;

+ 2 - 1
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java

@@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 import java.util.stream.IntStream;
 
+import static org.elasticsearch.repositories.blobstore.testkit.BlobAnalyzeAction.MAX_ATOMIC_WRITE_SIZE;
 import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit.humanReadableNanos;
 
 /**
@@ -451,7 +452,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
 
             for (int i = 0; i < request.getBlobCount(); i++) {
                 final long targetLength = blobSizes.get(i);
-                final boolean smallBlob = targetLength <= Integer.MAX_VALUE; // avoid the atomic API for larger blobs
+                final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
                 final VerifyBlobTask verifyBlobTask = new VerifyBlobTask(
                     nodes.get(random.nextInt(nodes.size())),
                     new BlobAnalyzeAction.Request(