|  | @@ -61,12 +61,14 @@ import org.elasticsearch.common.settings.Setting;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.unit.ByteSizeUnit;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.unit.ByteSizeValue;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.set.Sets;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.XContentFactory;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.XContentParser;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.XContentType;
 | 
	
		
			
				|  |  | +import org.elasticsearch.index.Index;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.mapper.MapperService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.shard.ShardId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
 | 
	
	
		
			
				|  | @@ -597,12 +599,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |              listener.onResponse(null);
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        final ActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size());
 | 
	
		
			
				|  |  | -        for (IndexId indexId: indices) {
 | 
	
		
			
				|  |  | -            threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(groupedListener) {
 | 
	
		
			
				|  |  | +        // listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot
 | 
	
		
			
				|  |  | +        final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                @Override
 | 
	
		
			
				|  |  | -                protected void doRun() {
 | 
	
		
			
				|  |  | +        // Listener that flattens out the delete results for each index
 | 
	
		
			
				|  |  | +        final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
 | 
	
		
			
				|  |  | +            ActionListener.map(deleteFromMetaListener,
 | 
	
		
			
				|  |  | +                results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
 | 
	
		
			
				|  |  | +        final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
 | 
	
		
			
				|  |  | +        for (IndexId indexId : indices) {
 | 
	
		
			
				|  |  | +            executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
 | 
	
		
			
				|  |  | +                deleteIdxMetaListener -> {
 | 
	
		
			
				|  |  |                      IndexMetaData indexMetaData = null;
 | 
	
		
			
				|  |  |                      try {
 | 
	
		
			
				|  |  |                          indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
 | 
	
	
		
			
				|  | @@ -612,20 +619,56 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                      deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId);
 | 
	
		
			
				|  |  |                      if (indexMetaData != null) {
 | 
	
		
			
				|  |  | +                        final int shardCount = indexMetaData.getNumberOfShards();
 | 
	
		
			
				|  |  | +                        assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
 | 
	
		
			
				|  |  | +                        // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
 | 
	
		
			
				|  |  | +                        final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
 | 
	
		
			
				|  |  | +                            new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
 | 
	
		
			
				|  |  | +                        final Index index = indexMetaData.getIndex();
 | 
	
		
			
				|  |  |                          for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
 | 
	
		
			
				|  |  | -                            try {
 | 
	
		
			
				|  |  | -                                deleteShardSnapshot(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
 | 
	
		
			
				|  |  | -                            } catch (Exception ex) {
 | 
	
		
			
				|  |  | -                                final int finalShardId = shardId;
 | 
	
		
			
				|  |  | -                                logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
 | 
	
		
			
				|  |  | -                                    snapshotId, indexId.getName(), finalShardId), ex);
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | +                            final ShardId shard = new ShardId(index, shardId);
 | 
	
		
			
				|  |  | +                            executor.execute(new AbstractRunnable() {
 | 
	
		
			
				|  |  | +                                @Override
 | 
	
		
			
				|  |  | +                                protected void doRun() throws Exception {
 | 
	
		
			
				|  |  | +                                    allShardsListener.onResponse(
 | 
	
		
			
				|  |  | +                                        deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                @Override
 | 
	
		
			
				|  |  | +                                public void onFailure(Exception ex) {
 | 
	
		
			
				|  |  | +                                    logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
 | 
	
		
			
				|  |  | +                                        snapshotId, indexId.getName(), shard.id()), ex);
 | 
	
		
			
				|  |  | +                                    // Just passing null here to count down the listener instead of failing it, the stale data left behind
 | 
	
		
			
				|  |  | +                                    // here will be retried in the next delete or repository cleanup
 | 
	
		
			
				|  |  | +                                    allShardsListener.onResponse(null);
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +                            });
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  | +                    } else {
 | 
	
		
			
				|  |  | +                        // Just invoke the listener without any shard generations to count it down, this index will be cleaned up
 | 
	
		
			
				|  |  | +                        // by the stale data cleanup in the end.
 | 
	
		
			
				|  |  | +                        deleteIdxMetaListener.onResponse(null);
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  | -                    groupedListener.onResponse(null);
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | +                }));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Delete all the now unreferenced blobs in the shard paths
 | 
	
		
			
				|  |  | +        deleteFromMetaListener.whenComplete(newGens -> {
 | 
	
		
			
				|  |  | +            final String basePath = basePath().buildAsString();
 | 
	
		
			
				|  |  | +            final int basePathLen = basePath.length();
 | 
	
		
			
				|  |  | +            blobContainer().deleteBlobsIgnoringIfNotExists(
 | 
	
		
			
				|  |  | +                newGens.stream().flatMap(shardBlob -> {
 | 
	
		
			
				|  |  | +                    final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString();
 | 
	
		
			
				|  |  | +                    assert shardPathAbs.startsWith(basePath);
 | 
	
		
			
				|  |  | +                    final String pathToShard = shardPathAbs.substring(basePathLen);
 | 
	
		
			
				|  |  | +                    return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob);
 | 
	
		
			
				|  |  | +                }).collect(Collectors.toList())
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            listener.onResponse(null);
 | 
	
		
			
				|  |  | +        }, e -> {
 | 
	
		
			
				|  |  | +            logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e);
 | 
	
		
			
				|  |  | +            listener.onResponse(null);
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
 | 
	
	
		
			
				|  | @@ -730,7 +773,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
 | 
	
		
			
				|  |  | -        return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId.getId())));
 | 
	
		
			
				|  |  | +        return shardContainer(indexId, shardId.getId());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private BlobContainer shardContainer(IndexId indexId, int shardId) {
 | 
	
		
			
				|  |  | +        return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
	
		
			
				|  | @@ -1211,8 +1258,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  |       * Delete shard snapshot
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    private void deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId)
 | 
	
		
			
				|  |  | -            throws IOException {
 | 
	
		
			
				|  |  | +    private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId,
 | 
	
		
			
				|  |  | +                                                              SnapshotId snapshotId) throws IOException {
 | 
	
		
			
				|  |  |          final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
 | 
	
		
			
				|  |  |          final Set<String> blobs;
 | 
	
		
			
				|  |  |          try {
 | 
	
	
		
			
				|  | @@ -1247,12 +1294,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |                  indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
 | 
	
		
			
				|  |  |                  blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            try {
 | 
	
		
			
				|  |  | -                shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
 | 
	
		
			
				|  |  | -            } catch (IOException e) {
 | 
	
		
			
				|  |  | -                logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
 | 
	
		
			
				|  |  | -                    snapshotId, snapshotShardId), e);
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +            return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), blobsToDelete);
 | 
	
		
			
				|  |  |          } catch (IOException e) {
 | 
	
		
			
				|  |  |              throw new IndexShardSnapshotFailedException(snapshotShardId,
 | 
	
		
			
				|  |  |                  "Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
 | 
	
	
		
			
				|  | @@ -1369,4 +1411,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * The result of removing a snapshot from a shard folder in the repository.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static final class ShardSnapshotMetaDeleteResult {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Index that the snapshot was removed from
 | 
	
		
			
				|  |  | +        private final IndexId indexId;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Shard id that the snapshot was removed from
 | 
	
		
			
				|  |  | +        private final int shardId;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Blob names in the shard directory that have become unreferenced in the new shard generation
 | 
	
		
			
				|  |  | +        private final Collection<String> blobsToDelete;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, Collection<String> blobsToDelete) {
 | 
	
		
			
				|  |  | +            this.indexId = indexId;
 | 
	
		
			
				|  |  | +            this.shardId = shardId;
 | 
	
		
			
				|  |  | +            this.blobsToDelete = blobsToDelete;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |