浏览代码

Optimize Snapshot Finalization (#42723)

* Optimize Snapshot Finalization

* Delete index-N blobs and segement blobs in one single bulk delete instead of in separate ones to save RPC calls on implementations that have bulk deletes implemented
* Don't fail snapshot because deleting old index-N failed, this results in needlessly logging finalization failures and makes analysis of failures harder going forward as well as incorrect index.latest blobs
Armin Braun 6 年之前
父节点
当前提交
8101fc3849
共有 1 个文件被更改,包括 22 次插入40 次删除
  1. 22 40
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+ 22 - 40
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -698,12 +698,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
         logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
         writeAtomic(indexBlob, snapshotsBytes, true);
-        // delete the N-2 index file if it exists, keep the previous one around as a backup
-        if (isReadOnly() == false && newGen - 2 >= 0) {
-            final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
-            blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
-        }
-
         // write the current generation to the index-latest file
         final BytesReference genBytes;
         try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -712,6 +706,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
         logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
         writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
+        // delete the N-2 index file if it exists, keep the previous one around as a backup
+        if (newGen - 2 >= 0) {
+            final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
+            try {
+                blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
+            } catch (IOException e) {
+                logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
+            }
+        }
     }
 
     /**
@@ -964,45 +967,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                                 final Map<String, BlobMetaData> blobs,
                                 final String reason) {
             final String indexGeneration = Integer.toString(fileListGeneration);
-            final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
             try {
-                // Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
-                // attempt to write an index file with this generation failed mid-way after creating the temporary file.
-                final List<String> blobNames =
-                    blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
-                try {
-                    blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
-                } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
-                        snapshotId, shardId), e);
-                    throw e;
-                }
-
-                // If we deleted all snapshots, we don't need to create a new index file
-                if (snapshots.size() > 0) {
+                final List<String> blobsToDelete;
+                if (snapshots.isEmpty()) {
+                    // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
+                    blobsToDelete = List.copyOf(blobs.keySet());
+                } else {
+                    final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
                     indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, blobContainer, indexGeneration);
+                    // Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
+                    blobsToDelete = blobs.keySet().stream().filter(blob ->
+                        blob.startsWith(SNAPSHOT_INDEX_PREFIX)
+                            || blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
+                            || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
                 }
-
-                // Delete old index files
-                final List<String> indexBlobs =
-                    blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
-                try {
-                    blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
-                } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
-                        snapshotId, shardId), e);
-                    throw e;
-                }
-
-                // Delete all blobs that don't exist in a snapshot
-                final List<String> orphanedBlobs = blobs.keySet().stream()
-                    .filter(blobName ->
-                        blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
-                    .collect(Collectors.toList());
                 try {
-                    blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
+                    blobContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
                 } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs during finalization",
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
                         snapshotId, shardId), e);
                 }
             } catch (IOException e) {