Răsfoiți Sursa

Implement failIfAlreadyExists in S3 repositories (#133030)

Today, S3-backed repositories ignore the `failIfAlreadyExists` flag and
may therefore overwrite a blob which already exists, potentially
corrupting a repository subject to concurrent writes, rather than
failing the second write.

AWS S3 now supports writes conditional on the non-existence of an object
via the `If-None-Match: *` HTTP header. This commit adjusts the
S3-backed repository implementation to respect the `failIfAlreadyExists`
flag using these conditional writes, eliminating the possibility of
overwriting blobs which should not be overwritten.

Relates #128565
Kris Van Erum 1 lună în urmă
părinte
comite
025396bf0a

+ 6 - 0
docs/changelog/133030.yaml

@@ -0,0 +1,6 @@
+pr: 133030
+summary: Implement `failIfAlreadyExists` in S3 repositories
+area: Snapshot/Restore
+type: enhancement
+issues:
+  - 128565

+ 52 - 2
modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

@@ -83,8 +83,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -105,6 +108,7 @@ import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
 
 @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
 // Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@@ -425,7 +429,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
         if (randomBoolean()) {
             repository.blobStore()
                 .blobContainer(repository.basePath())
-                .writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
+                .writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, false);
         } else {
             repository.blobStore()
                 .blobContainer(repository.basePath())
@@ -434,7 +438,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
                     getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
                     serialized.streamInput(),
                     serialized.length(),
-                    true
+                    false
                 );
         }
 
@@ -568,6 +572,52 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
         }
     }
 
+    public void testFailIfAlreadyExists() throws IOException, InterruptedException {
+        try (BlobStore store = newBlobStore()) {
+            final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
+            final String blobName = randomAlphaOfLengthBetween(8, 12);
+
+            final byte[] data;
+            if (randomBoolean()) {
+                // single upload
+                data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
+            } else {
+                // multipart upload
+                int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes());
+                data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16)));
+            }
+
+            // initial write blob
+            AtomicInteger exceptionCount = new AtomicInteger(0);
+            try (var executor = Executors.newFixedThreadPool(2)) {
+                for (int i = 0; i < 2; i++) {
+                    executor.submit(() -> {
+                        try {
+                            writeBlob(container, blobName, new BytesArray(data), true);
+                        } catch (IOException e) {
+                            exceptionCount.incrementAndGet();
+                        }
+                    });
+                }
+                executor.shutdown();
+                var done = executor.awaitTermination(1, TimeUnit.SECONDS);
+                assertTrue(done);
+            }
+
+            assertEquals(1, exceptionCount.get());
+
+            // overwrite if failIfAlreadyExists is set to false
+            writeBlob(container, blobName, new BytesArray(data), false);
+
+            // throw exception if failIfAlreadyExists is set to true
+            var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true));
+
+            assertThat(exception.getMessage(), startsWith("Unable to upload"));
+
+            container.delete(randomPurpose());
+        }
+    }
+
     /**
      * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
      */

+ 20 - 10
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -137,18 +137,15 @@ class S3BlobContainer extends AbstractBlobContainer {
         return ByteSizeValue.of(32, ByteSizeUnit.MB).getBytes();
     }
 
-    /**
-     * This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
-     */
     @Override
     public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
         throws IOException {
         assert BlobContainer.assertPurposeConsistency(purpose, blobName);
         assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
         if (blobSize <= getLargeBlobThresholdInBytes()) {
-            executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
+            executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
         } else {
-            executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
+            executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
         }
     }
 
@@ -545,7 +542,8 @@ class S3BlobContainer extends AbstractBlobContainer {
         final S3BlobStore s3BlobStore,
         final String blobName,
         final InputStream input,
-        final long blobSize
+        final long blobSize,
+        final boolean failIfAlreadyExists
     ) throws IOException {
         try (var clientReference = s3BlobStore.clientReference()) {
             // Extra safety checks
@@ -565,6 +563,9 @@ class S3BlobContainer extends AbstractBlobContainer {
             if (s3BlobStore.serverSideEncryption()) {
                 putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
             }
+            if (failIfAlreadyExists) {
+                putRequestBuilder.ifNoneMatch("*");
+            }
             S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
 
             final var putRequest = putRequestBuilder.build();
@@ -586,7 +587,8 @@ class S3BlobContainer extends AbstractBlobContainer {
         final String blobName,
         final long partSize,
         final long blobSize,
-        final PartOperation partOperation
+        final PartOperation partOperation,
+        final boolean failIfAlreadyExists
     ) throws IOException {
 
         ensureMultiPartUploadSize(blobSize);
@@ -639,6 +641,11 @@ class S3BlobContainer extends AbstractBlobContainer {
                 .key(blobName)
                 .uploadId(uploadId)
                 .multipartUpload(b -> b.parts(parts));
+
+            if (failIfAlreadyExists) {
+                completeMultipartUploadRequestBuilder.ifNoneMatch("*");
+            }
+
             S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
             final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
             try (var clientReference = s3BlobStore.clientReference()) {
@@ -663,7 +670,8 @@ class S3BlobContainer extends AbstractBlobContainer {
         final S3BlobStore s3BlobStore,
         final String blobName,
         final InputStream input,
-        final long blobSize
+        final long blobSize,
+        final boolean failIfAlreadyExists
     ) throws IOException {
         executeMultipart(
             purpose,
@@ -680,7 +688,8 @@ class S3BlobContainer extends AbstractBlobContainer {
                         .uploadPart(uploadRequest, RequestBody.fromInputStream(input, partSize));
                     return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
                 }
-            }
+            },
+            failIfAlreadyExists
         );
     }
 
@@ -727,7 +736,8 @@ class S3BlobContainer extends AbstractBlobContainer {
                     final var uploadPartCopyResponse = clientReference.client().uploadPartCopy(uploadPartCopyRequest);
                     return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
                 }
-            })
+            }),
+            false
         );
     }
 

+ 48 - 7
modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

@@ -69,7 +69,14 @@ public class S3BlobStoreContainerTests extends ESTestCase {
 
         final IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
+            () -> blobContainer.executeSingleUpload(
+                randomPurpose(),
+                blobStore,
+                randomAlphaOfLengthBetween(1, 10),
+                null,
+                blobSize,
+                randomBoolean()
+            )
         );
         assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
     }
@@ -88,7 +95,8 @@ public class S3BlobStoreContainerTests extends ESTestCase {
                 blobStore,
                 blobName,
                 new ByteArrayInputStream(new byte[0]),
-                ByteSizeUnit.MB.toBytes(2)
+                ByteSizeUnit.MB.toBytes(2),
+                randomBoolean()
             )
         );
         assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
@@ -123,6 +131,8 @@ public class S3BlobStoreContainerTests extends ESTestCase {
             when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
         }
 
+        final boolean failIfAlreadyExists = randomBoolean();
+
         final S3Client client = configureMockClient(blobStore);
 
         final ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
@@ -131,7 +141,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
         when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
 
         final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
-        blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
+        blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
 
         final PutObjectRequest request = requestCaptor.getValue();
         assertEquals(bucketName, request.bucket());
@@ -147,6 +157,10 @@ public class S3BlobStoreContainerTests extends ESTestCase {
             );
         }
 
+        if (failIfAlreadyExists) {
+            assertEquals("*", request.ifNoneMatch());
+        }
+
         final RequestBody requestBody = bodyCaptor.getValue();
         try (var contentStream = requestBody.contentStreamProvider().newStream()) {
             assertEquals(inputStream.available(), blobSize);
@@ -164,7 +178,14 @@ public class S3BlobStoreContainerTests extends ESTestCase {
 
         final IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
+            () -> blobContainer.executeMultipartUpload(
+                randomPurpose(),
+                blobStore,
+                randomAlphaOfLengthBetween(1, 10),
+                null,
+                blobSize,
+                randomBoolean()
+            )
         );
         assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
     }
@@ -176,7 +197,14 @@ public class S3BlobStoreContainerTests extends ESTestCase {
 
         final IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
+            () -> blobContainer.executeMultipartUpload(
+                randomPurpose(),
+                blobStore,
+                randomAlphaOfLengthBetween(1, 10),
+                null,
+                blobSize,
+                randomBoolean()
+            )
         );
         assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
     }
@@ -225,6 +253,8 @@ public class S3BlobStoreContainerTests extends ESTestCase {
             when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
         }
 
+        final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
+
         final S3Client client = configureMockClient(blobStore);
 
         final var uploadId = randomIdentifier();
@@ -273,7 +303,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
         if (doCopy) {
             blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
         } else {
-            blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
+            blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
         }
 
         final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
@@ -340,6 +370,10 @@ public class S3BlobStoreContainerTests extends ESTestCase {
         assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
         assertEquals(uploadId, compRequest.uploadId());
 
+        if (failIfAlreadyExists) {
+            assertEquals("*", compRequest.ifNoneMatch());
+        }
+
         final List<String> actualETags = compRequest.multipartUpload()
             .parts()
             .stream()
@@ -419,7 +453,14 @@ public class S3BlobStoreContainerTests extends ESTestCase {
 
         final IOException e = expectThrows(IOException.class, () -> {
             final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
-            blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
+            blobContainer.executeMultipartUpload(
+                randomPurpose(),
+                blobStore,
+                blobName,
+                new ByteArrayInputStream(new byte[0]),
+                blobSize,
+                randomBoolean()
+            );
         });
 
         assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());

+ 5 - 0
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java

@@ -62,4 +62,9 @@ public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
             assertThat(response.result().blobs(), equalTo(0L));
         }
     }
+
+    @Override
+    public void testFailIfAlreadyExists() {
+        // HDFS does not implement failIfAlreadyExists correctly
+    }
 }

+ 67 - 15
test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

@@ -188,8 +188,9 @@ public class S3HttpHandler implements HttpHandler {
 
             } else if (request.isCompleteMultipartUploadRequest()) {
                 final byte[] responseBody;
+                boolean preconditionFailed = false;
                 synchronized (uploads) {
-                    final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
+                    final var upload = getUpload(request.getQueryParamOnce("uploadId"));
                     if (upload == null) {
                         if (Randomness.get().nextBoolean()) {
                             responseBody = null;
@@ -205,19 +206,35 @@ public class S3HttpHandler implements HttpHandler {
                         }
                     } else {
                         final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
-                        blobs.put(request.path(), blobContents);
-                        responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
-                            + "<CompleteMultipartUploadResult>\n"
-                            + "<Bucket>"
-                            + bucket
-                            + "</Bucket>\n"
-                            + "<Key>"
-                            + request.path()
-                            + "</Key>\n"
-                            + "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
+
+                        if (isProtectOverwrite(exchange)) {
+                            var previousValue = blobs.putIfAbsent(request.path(), blobContents);
+                            if (previousValue != null) {
+                                preconditionFailed = true;
+                            }
+                        } else {
+                            blobs.put(request.path(), blobContents);
+                        }
+
+                        if (preconditionFailed == false) {
+                            responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+                                + "<CompleteMultipartUploadResult>\n"
+                                + "<Bucket>"
+                                + bucket
+                                + "</Bucket>\n"
+                                + "<Key>"
+                                + request.path()
+                                + "</Key>\n"
+                                + "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
+                            removeUpload(upload.getUploadId());
+                        } else {
+                            responseBody = null;
+                        }
                     }
                 }
-                if (responseBody == null) {
+                if (preconditionFailed) {
+                    exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
+                } else if (responseBody == null) {
                     exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
                 } else {
                     exchange.getResponseHeaders().add("Content-Type", "application/xml");
@@ -232,6 +249,10 @@ public class S3HttpHandler implements HttpHandler {
                 // a copy request is a put request with an X-amz-copy-source header
                 final var copySource = copySourceName(exchange);
                 if (copySource != null) {
+                    if (isProtectOverwrite(exchange)) {
+                        throw new AssertionError("If-None-Match: * header is not supported here");
+                    }
+
                     var sourceBlob = blobs.get(copySource);
                     if (sourceBlob == null) {
                         exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
@@ -247,9 +268,22 @@ public class S3HttpHandler implements HttpHandler {
                     }
                 } else {
                     final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
-                    blobs.put(request.path(), blob.v2());
-                    exchange.getResponseHeaders().add("ETag", blob.v1());
-                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
+                    boolean preconditionFailed = false;
+                    if (isProtectOverwrite(exchange)) {
+                        var previousValue = blobs.putIfAbsent(request.path(), blob.v2());
+                        if (previousValue != null) {
+                            preconditionFailed = true;
+                        }
+                    } else {
+                        blobs.put(request.path(), blob.v2());
+                    }
+
+                    if (preconditionFailed) {
+                        exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
+                    } else {
+                        exchange.getResponseHeaders().add("ETag", blob.v1());
+                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
+                    }
                 }
 
             } else if (request.isListObjectsRequest()) {
@@ -539,6 +573,24 @@ public class S3HttpHandler implements HttpHandler {
         return parseRangeHeader(sourceRangeHeaders.getFirst());
     }
 
+    private static boolean isProtectOverwrite(final HttpExchange exchange) {
+        final var ifNoneMatch = exchange.getRequestHeaders().get("If-None-Match");
+
+        if (ifNoneMatch == null) {
+            return false;
+        }
+
+        if (ifNoneMatch.size() != 1) {
+            throw new AssertionError("multiple If-None-Match headers found: " + ifNoneMatch);
+        }
+
+        if (ifNoneMatch.getFirst().equals("*")) {
+            return true;
+        }
+
+        throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch);
+    }
+
     MultipartUpload putUpload(String path) {
         final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
         synchronized (uploads) {

+ 95 - 0
test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java

@@ -32,10 +32,14 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 
 public class S3HttpHandlerTests extends ESTestCase {
 
@@ -383,6 +387,91 @@ public class S3HttpHandlerTests extends ESTestCase {
 
     }
 
+    public void testPreventObjectOverwrite() throws InterruptedException {
+        final var handler = new S3HttpHandler("bucket", "path");
+
+        var tasks = List.of(
+            createPutObjectTask(handler),
+            createPutObjectTask(handler),
+            createMultipartUploadTask(handler),
+            createMultipartUploadTask(handler)
+        );
+
+        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
+            tasks.forEach(task -> executor.submit(task.consumer));
+            executor.shutdown();
+            var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
+            assertTrue(done);
+        }
+
+        List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList();
+        assertThat(successfulTasks, hasSize(1));
+
+        tasks.stream().filter(task -> task.uploadId != null).forEach(task -> {
+            if (task.status == RestStatus.PRECONDITION_FAILED) {
+                assertNotNull(handler.getUpload(task.uploadId));
+            } else {
+                assertNull(handler.getUpload(task.uploadId));
+            }
+        });
+
+        assertEquals(
+            new TestHttpResponse(RestStatus.OK, successfulTasks.getFirst().body, TestHttpExchange.EMPTY_HEADERS),
+            handleRequest(handler, "GET", "/bucket/path/blob")
+        );
+    }
+
+    private static TestWriteTask createPutObjectTask(S3HttpHandler handler) {
+        return new TestWriteTask(
+            (task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, ifNoneMatchHeader()).status()
+        );
+    }
+
+    private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) {
+        final var multipartUploadTask = new TestWriteTask(
+            (task) -> task.status = handleRequest(
+                handler,
+                "POST",
+                "/bucket/path/blob?uploadId=" + task.uploadId,
+                new BytesArray(Strings.format("""
+                    <?xml version="1.0" encoding="UTF-8"?>
+                    <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+                       <Part>
+                          <ETag>%s</ETag>
+                          <PartNumber>1</PartNumber>
+                       </Part>
+                    </CompleteMultipartUpload>""", task.etag)),
+                ifNoneMatchHeader()
+            ).status()
+        );
+
+        final var createUploadResponse = handleRequest(handler, "POST", "/bucket/path/blob?uploads");
+        multipartUploadTask.uploadId = getUploadId(createUploadResponse.body());
+
+        final var uploadPart1Response = handleRequest(
+            handler,
+            "PUT",
+            "/bucket/path/blob?uploadId=" + multipartUploadTask.uploadId + "&partNumber=1",
+            multipartUploadTask.body
+        );
+        multipartUploadTask.etag = Objects.requireNonNull(uploadPart1Response.etag());
+
+        return multipartUploadTask;
+    }
+
+    private static class TestWriteTask {
+        final BytesReference body;
+        final Runnable consumer;
+        String uploadId;
+        String etag;
+        RestStatus status;
+
+        TestWriteTask(Consumer<TestWriteTask> consumer) {
+            this.body = randomBytesReference(50);
+            this.consumer = () -> consumer.accept(this);
+        }
+    }
+
     private void runExtractPartETagsTest(String body, String... expectedTags) {
         assertEquals(List.of(expectedTags), S3HttpHandler.extractPartEtags(new BytesArray(body.getBytes(StandardCharsets.UTF_8))));
     }
@@ -467,6 +556,12 @@ public class S3HttpHandlerTests extends ESTestCase {
         return headers;
     }
 
+    private static Headers ifNoneMatchHeader() {
+        var headers = new Headers();
+        headers.put("If-None-Match", List.of("*"));
+        return headers;
+    }
+
     private static class TestHttpExchange extends HttpExchange {
 
         private static final Headers EMPTY_HEADERS = new Headers();

+ 58 - 1
test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

@@ -346,6 +346,55 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
         }
     }
 
+    public void testFailIfAlreadyExists() {
+        final var blobName = randomIdentifier();
+        final int blobLength = randomIntBetween(100, 2_000);
+        final var initialBlobBytes = randomBytesReference(blobLength);
+        final var overwriteBlobBytes = randomBytesReference(blobLength);
+
+        final var repository = getRepository();
+
+        CheckedFunction<BlobContainer, Void, IOException> initialWrite = blobStore -> {
+            blobStore.writeBlobAtomic(randomPurpose(), blobName, initialBlobBytes, true);
+            return null;
+        };
+
+        // initial write blob
+        var initialWrite1 = submitOnBlobStore(repository, initialWrite);
+        var initialWrite2 = submitOnBlobStore(repository, initialWrite);
+
+        Exception ex1 = null;
+        Exception ex2 = null;
+
+        try {
+            initialWrite1.actionGet();
+        } catch (Exception e) {
+            ex1 = e;
+        }
+
+        try {
+            initialWrite2.actionGet();
+        } catch (Exception e) {
+            ex2 = e;
+        }
+
+        assertTrue("Exactly one of the writes must succeed", (ex1 == null) != (ex2 == null));
+
+        // override if failIfAlreadyExists is set to false
+        executeOnBlobStore(repository, blobStore -> {
+            blobStore.writeBlob(randomPurpose(), blobName, overwriteBlobBytes, false);
+            return null;
+        });
+
+        assertEquals(overwriteBlobBytes, readBlob(repository, blobName, 0, overwriteBlobBytes.length()));
+
+        // throw exception if failIfAlreadyExists is set to true
+        executeOnBlobStore(repository, blobStore -> {
+            expectThrows(Exception.class, () -> blobStore.writeBlob(randomPurpose(), blobName, initialBlobBytes, true));
+            return null;
+        });
+    }
+
     protected void testReadFromPositionLargerThanBlobLength(Predicate<RequestedRangeNotSatisfiedException> responseCodeChecker) {
         final var blobName = randomIdentifier();
         final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
@@ -381,12 +430,20 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
         assertThat(responseCodeChecker.test(rangeNotSatisfiedException), is(true));
     }
 
-    protected static <T> T executeOnBlobStore(BlobStoreRepository repository, CheckedFunction<BlobContainer, T, IOException> fn) {
+    protected static <T> PlainActionFuture<T> submitOnBlobStore(
+        BlobStoreRepository repository,
+        CheckedFunction<BlobContainer, T, IOException> fn
+    ) {
         final var future = new PlainActionFuture<T>();
         repository.threadPool().generic().execute(ActionRunnable.supply(future, () -> {
             var blobContainer = repository.blobStore().blobContainer(repository.basePath());
             return fn.apply(blobContainer);
         }));
+        return future;
+    }
+
+    protected static <T> T executeOnBlobStore(BlobStoreRepository repository, CheckedFunction<BlobContainer, T, IOException> fn) {
+        final var future = submitOnBlobStore(repository, fn);
         return future.actionGet();
     }