Przeglądaj źródła

Reduce Memory Use of Parallel Azure Blob Deletes (#71330)

1. Limit the number of blob deletes to execute in parallel to `100`.
2. Use flat listing when deleting a directory to require fewer listings
in between deletes and keep the code simpler.

closes #71267
Armin Braun 4 lat temu
rodzic
commit
821383378e

+ 48 - 61
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -33,6 +33,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -58,7 +59,6 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
@@ -66,12 +66,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 public class AzureBlobStore implements BlobStore {
     private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
@@ -153,8 +152,7 @@ public class AzureBlobStore implements BlobStore {
     private boolean isListRequest(String httpMethod, URL url) {
         return httpMethod.equals("GET") &&
             url.getQuery() != null &&
-            url.getQuery().contains("comp=list") &&
-            url.getQuery().contains("delimiter=");
+            url.getQuery().contains("comp=list");
     }
 
     // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
@@ -216,49 +214,54 @@ public class AzureBlobStore implements BlobStore {
         }
     }
 
+    // number of concurrent blob delete requests to use while bulk deleting
+    private static final int CONCURRENT_DELETES = 100;
+
     public DeleteResult deleteBlobDirectory(String path) throws IOException {
         final AtomicInteger blobsDeleted = new AtomicInteger(0);
         final AtomicLong bytesDeleted = new AtomicLong(0);
 
-        final BlobServiceClient client = client();
         SocketAccess.doPrivilegedVoidException(() -> {
-            final BlobContainerClient blobContainerClient = client.getBlobContainerClient(container);
             final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient().getBlobContainerAsyncClient(container);
-            final Queue<String> directories = new ArrayDeque<>();
-            directories.offer(path);
-            String directoryName;
-            List<Mono<Void>> deleteTasks = new ArrayList<>();
-            while ((directoryName = directories.poll()) != null) {
-                final BlobListDetails blobListDetails = new BlobListDetails()
-                    .setRetrieveMetadata(true);
-
-                final ListBlobsOptions options = new ListBlobsOptions()
-                    .setPrefix(directoryName)
-                    .setDetails(blobListDetails);
-
-                for (BlobItem blobItem : blobContainerClient.listBlobsByHierarchy("/", options, null)) {
+            final ListBlobsOptions options = new ListBlobsOptions()
+                    .setPrefix(path)
+                    .setDetails(new BlobListDetails().setRetrieveMetadata(true));
+            try {
+                blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> {
                     if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
-                        directories.offer(blobItem.getName());
+                        return Mono.empty();
                     } else {
-                        BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
-                        final Mono<Void> deleteTask = blobAsyncClient.delete()
-                            // Ignore not found blobs, as it's possible that due to network errors a request
-                            // for an already deleted blob is retried, causing an error.
-                            .onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
-                            .onErrorMap(throwable -> new IOException("Error deleting blob " + blobItem.getName(), throwable));
-                        deleteTasks.add(deleteTask);
+                        final String blobName = blobItem.getName();
+                        BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
+                        final Mono<Void> deleteTask = getDeleteTask(blobName, blobAsyncClient);
                         bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
                         blobsDeleted.incrementAndGet();
+                        return deleteTask;
                     }
-                }
+                }, CONCURRENT_DELETES).then().block();
+            } catch (Exception e) {
+                filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
             }
-
-            executeDeleteTasks(deleteTasks, () -> "Deleting directory [" + path + "] failed");
         });
 
         return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
     }
 
+    private static void filterDeleteExceptionsAndRethrow(Exception e, IOException exception) throws IOException {
+        int suppressedCount = 0;
+        for (Throwable suppressed : e.getSuppressed()) {
+            // We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
+            if (suppressed instanceof IOException) {
+                exception.addSuppressed(suppressed);
+                suppressedCount++;
+                if (suppressedCount > 10) {
+                    break;
+                }
+            }
+        }
+        throw exception;
+    }
+
     void deleteBlobList(List<String> blobs) throws IOException {
         if (blobs.isEmpty()) {
             return;
@@ -266,41 +269,25 @@ public class AzureBlobStore implements BlobStore {
 
         BlobServiceAsyncClient asyncClient = asyncClient();
         SocketAccess.doPrivilegedVoidException(() -> {
-            List<Mono<Void>> deleteTasks = new ArrayList<>(blobs.size());
             final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
-            for (String blob : blobs) {
-                final Mono<Void> deleteTask = blobContainerClient.getBlobAsyncClient(blob)
-                    .delete()
-                    // Ignore not found blobs
-                    .onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
-                    .onErrorMap(throwable -> new IOException("Error deleting blob " + blob, throwable));
-
-                deleteTasks.add(deleteTask);
+            try {
+                Flux.fromIterable(blobs).flatMap(blob ->
+                        getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES).then().block();
+            } catch (Exception e) {
+                filterDeleteExceptionsAndRethrow(e,
+                        new IOException("Unable to delete blobs "
+                                + AllocationService.firstListElementsToCommaDelimitedString(blobs, Function.identity(), false)));
             }
-
-            executeDeleteTasks(deleteTasks, () -> "Unable to delete blobs " + blobs);
         });
     }
 
-    private boolean isNotFoundError(Throwable e) {
-        return e instanceof BlobStorageException && ((BlobStorageException) e).getStatusCode() == 404;
-    }
-
-    private void executeDeleteTasks(List<Mono<Void>> deleteTasks, Supplier<String> errorMessageSupplier) throws IOException {
-        try {
-            // zipDelayError executes all tasks in parallel and delays
-            // error propagation until all tasks have finished.
-            Mono.zipDelayError(deleteTasks, results -> null).block();
-        } catch (Exception e) {
-            final IOException exception = new IOException(errorMessageSupplier.get());
-            for (Throwable suppressed : e.getSuppressed()) {
-                // We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
-                if (suppressed instanceof IOException) {
-                    exception.addSuppressed(suppressed);
-                }
-            }
-            throw exception;
-        }
+    private static Mono<Void> getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) {
+        return blobAsyncClient.delete()
+                // Ignore not found blobs, as it's possible that due to network errors a request
+                // for an already deleted blob is retried, causing an error.
+                .onErrorResume(e ->
+                        e instanceof BlobStorageException && ((BlobStorageException) e).getStatusCode() == 404, throwable -> Mono.empty())
+                .onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable));
     }
 
     public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException {