Переглянути джерело

Submit GCS delete batch requests incrementally (#80540)

Today we use a single batch instance during blob deletions, internally
the SDK splits these requests into 100 element batches but it waits
until the entire batch is submitted. This works well in most scenarios,
but when the number of blobs to delete is large it can cause memory
consumption problems, since each request in the SDK has a non negligible
size.
This commits fixes this problem by sending these batch requests early
and creating a new batch client once the previous batch has been sent.
Francisco Fernández Castaño 4 роки тому
батько
коміт
c9303e273d

+ 12 - 2
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -71,6 +71,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
     // called "resumable upload")
     // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
     public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE;
+    public static final int MAX_DELETES_PER_BATCH = 1000;
 
     static {
         final String key = "es.repository_gcs.large_blob_threshold_byte_size";
@@ -542,7 +543,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         try {
             SocketAccess.doPrivilegedVoidIOException(() -> {
                 final AtomicReference<StorageException> ioe = new AtomicReference<>();
-                final StorageBatch batch = client().batch();
+                StorageBatch batch = client().batch();
+                int pendingDeletesInBatch = 0;
                 while (blobIdsToDelete.hasNext()) {
                     BlobId blob = blobIdsToDelete.next();
                     batch.delete(blob).notify(new BatchResult.Callback<>() {
@@ -562,8 +564,16 @@ class GoogleCloudStorageBlobStore implements BlobStore {
                             }
                         }
                     });
+                    pendingDeletesInBatch++;
+                    if (pendingDeletesInBatch % MAX_DELETES_PER_BATCH == 0) {
+                        batch.submit();
+                        batch = client().batch();
+                        pendingDeletesInBatch = 0;
+                    }
+                }
+                if (pendingDeletesInBatch > 0) {
+                    batch.submit();
                 }
-                batch.submit();
 
                 final StorageException exception = ioe.get();
                 if (exception != null) {

+ 63 - 0
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

@@ -47,6 +47,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
@@ -61,6 +62,7 @@ import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
 import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
+import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
@@ -72,6 +74,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 
 @SuppressForbidden(reason = "use a http server")
@@ -392,6 +395,66 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
         assertThat(allow410Gone.get(), is(false));
     }
 
+    public void testDeleteBatchesAreSentIncrementally() throws Exception {
+        // See com.google.cloud.storage.spi.v1.HttpStorageRpc.DefaultRpcBatch.MAX_BATCH_SIZE
+        final int sdkMaxBatchSize = 100;
+        final AtomicInteger receivedBatchRequests = new AtomicInteger();
+
+        final int totalDeletes = randomIntBetween(MAX_DELETES_PER_BATCH - 1, MAX_DELETES_PER_BATCH * 2);
+        final AtomicInteger pendingDeletes = new AtomicInteger();
+        final Iterator<String> blobNamesIterator = new Iterator<>() {
+            int totalDeletesSent = 0;
+
+            @Override
+            public boolean hasNext() {
+                return totalDeletesSent < totalDeletes;
+            }
+
+            @Override
+            public String next() {
+                if (pendingDeletes.get() == MAX_DELETES_PER_BATCH) {
+                    // Check that once MAX_DELETES_PER_BATCH deletes are enqueued the pending batch requests are sent
+                    assertThat(receivedBatchRequests.get(), is(greaterThan(0)));
+                    assertThat(receivedBatchRequests.get(), is(lessThanOrEqualTo(MAX_DELETES_PER_BATCH / sdkMaxBatchSize)));
+                    receivedBatchRequests.set(0);
+                    pendingDeletes.set(0);
+                }
+
+                pendingDeletes.incrementAndGet();
+                return Integer.toString(totalDeletesSent++);
+            }
+        };
+        final BlobContainer blobContainer = createBlobContainer(1, null, null, null);
+        httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
+            assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
+
+            receivedBatchRequests.incrementAndGet();
+            final StringBuilder batch = new StringBuilder();
+            for (String line : Streams.readAllLines(exchange.getRequestBody())) {
+                if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
+                    batch.append(line).append("\r\n");
+                } else if (line.startsWith("DELETE")) {
+                    batch.append("HTTP/1.1 204 NO_CONTENT").append("\r\n");
+                    batch.append("\r\n");
+                }
+            }
+            byte[] response = batch.toString().getBytes(UTF_8);
+            exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
+            exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+            exchange.getResponseBody().write(response);
+        }));
+
+        blobContainer.deleteBlobsIgnoringIfNotExists(blobNamesIterator);
+
+        // Ensure that the remaining deletes are sent in the last batch
+        if (pendingDeletes.get() > 0) {
+            assertThat(receivedBatchRequests.get(), is(greaterThan(0)));
+            assertThat(receivedBatchRequests.get(), is(lessThanOrEqualTo(MAX_DELETES_PER_BATCH / sdkMaxBatchSize)));
+
+            assertThat(pendingDeletes.get(), is(lessThanOrEqualTo(MAX_DELETES_PER_BATCH)));
+        }
+    }
+
     private HttpHandler safeHandler(HttpHandler handler) {
         final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
         return exchange -> {