Bläddra i källkod

Add Bulk Delete Api to BlobStore (#40322)

* Adds Bulk delete API to blob container
* Implement bulk delete API for S3
* Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs
* Closes #40250
Armin Braun 6 år sedan
förälder
incheckning
b1e2cec55b

+ 52 - 0
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -23,6 +23,7 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -56,6 +57,12 @@ import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING
 
 class S3BlobContainer extends AbstractBlobContainer {
 
+    /**
+     * Maximum number of deletes in a {@link DeleteObjectsRequest}.
+     * @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
+     */
+    private static final int MAX_BULK_DELETES = 1000;
+
     private final S3BlobStore blobStore;
     private final String keyPath;
 
@@ -118,6 +125,51 @@ class S3BlobContainer extends AbstractBlobContainer {
         deleteBlobIgnoringIfNotExists(blobName);
     }
 
+    @Override
+    public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
+        if (blobNames.isEmpty()) {
+            return;
+        }
+        try (AmazonS3Reference clientReference = blobStore.clientReference()) {
+            // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
+            final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
+            final List<String> partition = new ArrayList<>();
+            for (String blob : blobNames) {
+                partition.add(buildKey(blob));
+                if (partition.size() == MAX_BULK_DELETES ) {
+                    deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
+                    partition.clear();
+                }
+            }
+            if (partition.isEmpty() == false) {
+                deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
+            }
+            SocketAccess.doPrivilegedVoid(() -> {
+                AmazonClientException aex = null;
+                for (DeleteObjectsRequest deleteRequest : deleteRequests) {
+                    try {
+                        clientReference.client().deleteObjects(deleteRequest);
+                    } catch (AmazonClientException e) {
+                        if (aex == null) {
+                            aex = e;
+                        } else {
+                            aex.addSuppressed(e);
+                        }
+                    }
+                }
+                if (aex != null) {
+                    throw aex;
+                }
+            });
+        } catch (final AmazonClientException e) {
+            throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
+        }
+    }
+
+    private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
+        return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
+    }
+
     @Override
     public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
         try (AmazonS3Reference clientReference = blobStore.clientReference()) {

+ 4 - 3
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java

@@ -324,7 +324,7 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
         // Delete Multiple Objects
         //
         // https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
-        handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
+        final RequestHandler bulkDeleteHandler = request -> {
             final List<String> deletes = new ArrayList<>();
             final List<String> errors = new ArrayList<>();
 
@@ -344,7 +344,6 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
                             if (closingOffset != -1) {
                                 offset = offset + startMarker.length();
                                 final String objectName = requestBody.substring(offset, closingOffset);
-
                                 boolean found = false;
                                 for (Bucket bucket : buckets.values()) {
                                     if (bucket.objects.containsKey(objectName)) {
@@ -369,7 +368,9 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
                 }
             }
             return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
-        });
+        };
+        handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
+        handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
 
         // non-authorized requests
 

+ 1 - 5
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java

@@ -158,11 +158,7 @@ class MockAmazonS3 extends AbstractAmazonS3 {
 
         final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
         for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
-            if (blobs.remove(key.getKey()) == null) {
-                AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
-                exception.setStatusCode(404);
-                throw exception;
-            } else {
+            if (blobs.remove(key.getKey()) != null) {
                 DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
                 deletion.setKey(key.getKey());
                 deletions.add(deletion);

+ 30 - 1
server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -96,8 +97,9 @@ public interface BlobContainer {
      * @throws  IOException if the input stream could not be read, or the target blob could not be written to.
      */
     void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
+
     /**
-     * Deletes a blob with giving name, if the blob exists. If the blob does not exist,
+     * Deletes the blob with the given name, if the blob exists. If the blob does not exist,
      * this method throws a NoSuchFileException.
      *
      * @param   blobName
@@ -107,6 +109,33 @@ public interface BlobContainer {
      */
     void deleteBlob(String blobName) throws IOException;
 
+    /**
+     * Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
+     * when one or multiple of the given blobs don't exist and simply ignore this case.
+     *
+     * @param   blobNames  The names of the blob to delete.
+     * @throws  IOException if a subset of blob exists but could not be deleted.
+     */
+    default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
+        IOException ioe = null;
+        for (String blobName : blobNames) {
+            try {
+                deleteBlob(blobName);
+            } catch (NoSuchFileException e) {
+                // ignored
+            } catch (IOException e) {
+                if (ioe == null) {
+                    ioe = e;
+                } else {
+                    ioe.addSuppressed(e);
+                }
+            }
+        }
+        if (ioe != null) {
+            throw ioe;
+        }
+    }
+
     /**
      * Deletes a blob with giving name, ignoring if the blob does not exist.
      *

+ 31 - 42
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -101,7 +101,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
@@ -466,22 +465,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
             indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
             final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
-            for (final IndexId indexId : indicesToCleanUp) {
                 try {
-                    indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
-                } catch (DirectoryNotEmptyException dnee) {
-                    // if the directory isn't empty for some reason, it will fail to clean up;
-                    // we'll ignore that and accept that cleanup didn't fully succeed.
-                    // since we are using UUIDs for path names, this won't be an issue for
-                    // snapshotting indices of the same name
-                    logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
-                        "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
+                    indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
+                        indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
                 } catch (IOException ioe) {
                     // a different IOException occurred while trying to delete - will just log the issue for now
-                    logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
-                        "but failed to clean up its index folder.", metadata.name(), indexId), ioe);
+                    logger.warn(() ->
+                        new ParameterizedMessage(
+                            "[{}] indices {} are no longer part of any snapshots in the repository, " +
+                        "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
                 }
-            }
         } catch (IOException | ResourceNotFoundException ex) {
             throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
         }
@@ -1018,16 +1011,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             try {
                 // Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
                 // attempt to write an index file with this generation failed mid-way after creating the temporary file.
-                for (final String blobName : blobs.keySet()) {
-                    if (FsBlobContainer.isTempBlobName(blobName)) {
-                        try {
-                            blobContainer.deleteBlobIgnoringIfNotExists(blobName);
-                        } catch (IOException e) {
-                            logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
-                                snapshotId, shardId, blobName), e);
-                            throw e;
-                        }
-                    }
+                final List<String> blobNames =
+                    blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
+                try {
+                    blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
+                } catch (IOException e) {
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
+                        snapshotId, shardId, blobNames), e);
+                    throw e;
                 }
 
                 // If we deleted all snapshots, we don't need to create a new index file
@@ -1036,28 +1027,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 }
 
                 // Delete old index files
-                for (final String blobName : blobs.keySet()) {
-                    if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
-                        try {
-                            blobContainer.deleteBlobIgnoringIfNotExists(blobName);
-                        } catch (IOException e) {
-                            logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
-                                snapshotId, shardId, blobName), e);
-                            throw e;
-                        }
-                    }
+                final List<String> indexBlobs =
+                    blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
+                try {
+                    blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
+                } catch (IOException e) {
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
+                        snapshotId, shardId, indexBlobs), e);
+                    throw e;
                 }
 
                 // Delete all blobs that don't exist in a snapshot
-                for (final String blobName : blobs.keySet()) {
-                    if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
-                        try {
-                            blobContainer.deleteBlobIgnoringIfNotExists(blobName);
-                        } catch (IOException e) {
-                            logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
-                                snapshotId, shardId, blobName), e);
-                        }
-                    }
+                final List<String> orphanedBlobs = blobs.keySet().stream()
+                    .filter(blobName ->
+                        blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
+                    .collect(Collectors.toList());
+                try {
+                    blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
+                } catch (IOException e) {
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
+                        snapshotId, shardId, orphanedBlobs), e);
                 }
             } catch (IOException e) {
                 String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";

+ 18 - 0
test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java

@@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
@@ -136,6 +137,23 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
         }
     }
 
+    public void testDeleteBlobs() throws IOException {
+        try (BlobStore store = newBlobStore()) {
+            final List<String> blobNames = Arrays.asList("foobar", "barfoo");
+            final BlobContainer container = store.blobContainer(new BlobPath());
+            container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
+            byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
+            final BytesArray bytesArray = new BytesArray(data);
+            for (String blobName : blobNames) {
+                writeBlob(container, blobName, bytesArray, randomBoolean());
+            }
+            assertEquals(container.listBlobs().size(), 2);
+            container.deleteBlobsIgnoringIfNotExists(blobNames);
+            assertTrue(container.listBlobs().isEmpty());
+            container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
+        }
+    }
+
     public void testDeleteBlobIgnoringIfNotExists() throws IOException {
         try (BlobStore store = newBlobStore()) {
             BlobPath blobPath = new BlobPath();