浏览代码

Cleanup Bulk Delete Exception Logging (#41693)

* Cleanup Bulk Delete Exception Logging

* Follow up to #41368
* Collect all failed blob deletes and add them to the exception message
* Remove logging of blob name list from caller exception logging
Armin Braun 6 年之前
父节点
当前提交
59eeaa02c0

+ 1 - 0
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -343,6 +343,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         if (e != null) {
             throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
         }
+        assert failedBlobs.isEmpty();
     }
 
     private static String buildKey(String keyPath, String s) {

+ 23 - 9
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PartETag;
@@ -34,6 +35,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -50,6 +52,8 @@ import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
 import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
@@ -127,12 +131,13 @@ class S3BlobContainer extends AbstractBlobContainer {
         if (blobNames.isEmpty()) {
             return;
         }
+        final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
         try (AmazonS3Reference clientReference = blobStore.clientReference()) {
             // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
             final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
             final List<String> partition = new ArrayList<>();
-            for (String blob : blobNames) {
-                partition.add(buildKey(blob));
+            for (String key : outstanding) {
+                partition.add(key);
                 if (partition.size() == MAX_BULK_DELETES ) {
                     deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
                     partition.clear();
@@ -144,23 +149,32 @@ class S3BlobContainer extends AbstractBlobContainer {
             SocketAccess.doPrivilegedVoid(() -> {
                 AmazonClientException aex = null;
                 for (DeleteObjectsRequest deleteRequest : deleteRequests) {
+                    List<String> keysInRequest =
+                        deleteRequest.getKeys().stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
                     try {
                         clientReference.client().deleteObjects(deleteRequest);
+                        outstanding.removeAll(keysInRequest);
+                    } catch (MultiObjectDeleteException e) {
+                        // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
+                        // first remove all keys that were sent in the request and then add back those that ran into an exception.
+                        outstanding.removeAll(keysInRequest);
+                        outstanding.addAll(
+                            e.getErrors().stream().map(MultiObjectDeleteException.DeleteError::getKey).collect(Collectors.toSet()));
+                        aex = ExceptionsHelper.useOrSuppress(aex, e);
                     } catch (AmazonClientException e) {
-                        if (aex == null) {
-                            aex = e;
-                        } else {
-                            aex.addSuppressed(e);
-                        }
+                        // The AWS client threw any unexpected exception and did not execute the request at all so we do not
+                        // remove any keys from the outstanding deletes set.
+                        aex = ExceptionsHelper.useOrSuppress(aex, e);
                     }
                 }
                 if (aex != null) {
                     throw aex;
                 }
             });
-        } catch (final AmazonClientException e) {
-            throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
+        } catch (Exception e) {
+            throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
         }
+        assert outstanding.isEmpty();
     }
 
     private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {

+ 6 - 6
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1000,8 +1000,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 try {
                     blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
                 } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
-                        snapshotId, shardId, blobNames), e);
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
+                        snapshotId, shardId), e);
                     throw e;
                 }
 
@@ -1016,8 +1016,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 try {
                     blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
                 } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
-                        snapshotId, shardId, indexBlobs), e);
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
+                        snapshotId, shardId), e);
                     throw e;
                 }
 
@@ -1029,8 +1029,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 try {
                     blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
                 } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
-                        snapshotId, shardId, orphanedBlobs), e);
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs during finalization",
+                        snapshotId, shardId), e);
                 }
             } catch (IOException e) {
                 String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";