Browse Source

Parallelize stale index deletion (#100316)

After deleting a snapshot today we clean up all the now-dangling indices
sequentially, which can be rather slow. With this commit we parallelize
the work across the whole `SNAPSHOT` pool on the master node.

Closes #61513

Co-authored-by: Piyush Daftary <pdaftary@amazon.com>
David Turner 2 năm trước cách đây
mục cha
commit
cadcb9b5fd

+ 6 - 0
docs/changelog/100316.yaml

@@ -0,0 +1,6 @@
+pr: 100316
+summary: Parallelize stale index deletion
+area: Snapshot/Restore
+type: enhancement
+issues:
+ - 61513

+ 195 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

@@ -13,22 +13,38 @@ import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRe
 import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoryConflictException;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryVerificationException;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.nio.file.Path;
 import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import java.util.function.ToLongFunction;
 
 import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -295,4 +311,183 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
         logger.info("--> wait until snapshot deletion is finished");
         assertAcked(future.actionGet());
     }
+
+    public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception {
+        Client client = client();
+        Path repositoryPath = randomRepoPath();
+        final String repositoryName = "test-repo";
+        final String snapshot1Name = "test-snap-1";
+        final String snapshot2Name = "test-snap-2";
+
+        logger.info("-->  creating repository at {}", repositoryPath.toAbsolutePath());
+        createRepository(repositoryName, "mock", repositoryPath);
+
+        logger.info("--> creating index-1 and ingest data");
+        createIndex("test-idx-1");
+        ensureGreen();
+        for (int j = 0; j < 10; j++) {
+            indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j);
+        }
+        refresh();
+
+        logger.info("--> creating first snapshot");
+        createFullSnapshot(repositoryName, snapshot1Name);
+
+        logger.info("--> creating index-2 and ingest data");
+        createIndex("test-idx-2");
+        ensureGreen();
+        for (int j = 0; j < 10; j++) {
+            indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j);
+        }
+        refresh();
+
+        logger.info("--> creating second snapshot");
+        createFullSnapshot(repositoryName, snapshot2Name);
+
+        // Make repository throw exceptions when trying to delete stale indices
+        // This will make sure stale indices stay in repository after snapshot delete
+        final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
+            .repository(repositoryName);
+        repository.setFailOnDeleteContainer(true);
+
+        logger.info("--> delete the second snapshot");
+        client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();
+
+        // Make repository work normally
+        repository.setFailOnDeleteContainer(false);
+
+        // This snapshot should delete last snapshot's residual stale indices as well
+        logger.info("--> delete snapshot one");
+        client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();
+
+        logger.info("--> check no leftover files");
+        assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs
+
+        logger.info("--> done");
+    }
+
+    public void testCleanupStaleBlobsConcurrency() throws Exception {
+        // This test is verifying the detailed behaviour of cleanup tasks that are enqueued after a snapshot delete is committed to the
+        // repository, ensuring that we see exactly the right number of tasks enqueued at each stage to demonstrate that we do use all the
+        // threads available to us, but don't spam the threadpool queue with all the tasks at once, and that we submit one task that drains
+        // the queue eagerly to provide backpressure. That means this test is sensitive to changes in the breakdown of the cleanup work
+        // after a snapshot delete.
+
+        final var client = client();
+        final var repositoryPath = randomRepoPath();
+        final var repositoryName = "test-repo";
+        createRepository(repositoryName, "mock", repositoryPath);
+
+        final var threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
+        final var snapshotPoolSize = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax();
+        final var indexCount = snapshotPoolSize * 3;
+
+        for (int i = 0; i < indexCount; i++) {
+            createIndex("test-idx-" + i);
+            for (int j = 0; j < 10; j++) {
+                indexDoc("test-idx-" + i, Integer.toString(10 + j), "foo", "bar" + 10 + j);
+            }
+        }
+
+        ensureGreen();
+
+        final var snapshotName = "test-snap";
+        createFullSnapshot(repositoryName, snapshotName);
+
+        final var executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+        final var barrier = new CyclicBarrier(snapshotPoolSize + 1);
+        final var keepBlocking = new AtomicBoolean(true);
+        final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
+        final ToLongFunction<ClusterState> repoGenFn = s -> RepositoriesMetadata.get(s).repository(repositoryName).generation();
+        final var repositoryGenerationBeforeDelete = repoGenFn.applyAsLong(clusterService.state());
+        final ClusterStateListener clusterStateListener = event -> {
+            if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete
+                && repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) {
+                // We are updating the safe repository generation which indicates that the snapshot delete is complete. Once this cluster
+                // state update completes we will enqueue all the cleanup work on the SNAPSHOT pool. So here we prepare for that by blocking
+                // all the SNAPSHOT threads:
+
+                // All but one of the threads just repeatedly block on the barrier without picking up any new tasks
+                for (int i = 0; i < snapshotPoolSize - 1; i++) {
+                    executor.execute(() -> {
+                        while (keepBlocking.get()) {
+                            safeAwait(barrier);
+                            safeAwait(barrier);
+                        }
+                    });
+                }
+
+                // The last thread runs a task which blocks on the barrier and then enqueues itself again, at the back of the queue,
+                // so that this thread will run everything _currently_ in the queue each time the barrier is released, in the order in which
+                // it was enqueued, and will then block on the barrier again.
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        executor.execute(() -> {
+                            safeAwait(barrier);
+                            safeAwait(barrier);
+                            if (keepBlocking.get()) {
+                                this.run();
+                            }
+                        });
+                    }
+                }.run();
+            }
+        };
+        clusterService.addListener(clusterStateListener);
+
+        final var deleteFuture = new PlainActionFuture<AcknowledgedResponse>();
+        client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).execute(deleteFuture);
+
+        safeAwait(barrier); // wait for all the snapshot threads to be blocked
+        clusterService.removeListener(clusterStateListener);
+
+        // We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it
+        // will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases
+        // the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service:
+        PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
+            @Override
+            public Tuple<ClusterState, Object> executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) {
+                return Tuple.tuple(clusterState, null);
+            }
+
+            @Override
+            public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
+                fut.onResponse(null);
+            }
+        }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS);
+
+        final IntSupplier queueLength = () -> threadPool.stats()
+            .stats()
+            .stream()
+            .filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT))
+            .findFirst()
+            .orElseThrow()
+            .queue();
+
+        // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
+        // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
+        // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
+        // cleanup.
+        assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1));
+
+        safeAwait(barrier); // unblock the barrier thread and let it process the queue
+        safeAwait(barrier); // wait for the queue to be processed
+
+        // We first ran all the one-task actions, each of which completes and puts another one-task action into the queue. Then the eager
+        // runner runs all the remaining tasks.
+        assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize));
+
+        safeAwait(barrier); // unblock the barrier thread and let it process the queue
+        safeAwait(barrier); // wait for the queue to be processed
+
+        // Since the eager runner already ran all the remaining tasks, when the enqueued actions run they add no more work to the queue.
+        assertThat(queueLength.getAsInt(), equalTo(0));
+
+        assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs
+
+        keepBlocking.set(false);
+        safeAwait(barrier); // release the threads so they can exit
+        assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged());
+    }
 }

+ 8 - 0
server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java

@@ -38,4 +38,12 @@ public final class DeleteResult {
     public DeleteResult add(long blobs, long bytes) {
         return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
     }
+
+    public static DeleteResult of(long blobs, long bytes) {
+        if (blobs == 0 && bytes == 0) {
+            return ZERO;
+        } else {
+            return new DeleteResult(blobs, bytes);
+        }
+    }
 }

+ 51 - 0
server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java

@@ -17,6 +17,7 @@ import org.elasticsearch.core.Strings;
 
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -155,4 +156,54 @@ public class AbstractThrottledTaskRunner<T extends ActionListener<Releasable>> {
         return runningTasks.get();
     }
 
+    /**
+     * Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the
+     * tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}.
+     */
+    public void runSyncTasksEagerly(Executor executor) {
+        executor.execute(new AbstractRunnable() {
+            @Override
+            protected void doRun() {
+                final AtomicBoolean isDone = new AtomicBoolean(true);
+                final Releasable ref = () -> isDone.set(true);
+                ActionListener<Releasable> task;
+                while ((task = tasks.poll()) != null) {
+                    isDone.set(false);
+                    try {
+                        logger.trace("[{}] eagerly running task {}", taskRunnerName, task);
+                        task.onResponse(ref);
+                    } catch (Exception e) {
+                        logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e);
+                        assert false : e;
+                        task.onFailure(e);
+                        return;
+                    }
+                    if (isDone.get() == false) {
+                        logger.error(
+                            "runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]",
+                            taskRunnerName,
+                            task
+                        );
+                        assert false;
+                        return;
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                logger.error("unexpected failure in runSyncTasksEagerly", e);
+                assert false : e;
+            }
+
+            @Override
+            public void onRejection(Exception e) {
+                if (e instanceof EsRejectedExecutionException) {
+                    logger.debug("runSyncTasksEagerly was rejected", e);
+                } else {
+                    onFailure(e);
+                }
+            }
+        });
+    }
 }

+ 81 - 84
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -72,6 +72,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.core.CheckedConsumer;
@@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private final ShardSnapshotTaskRunner shardSnapshotTaskRunner;
 
+    private final ThrottledTaskRunner staleBlobDeleteRunner;
+
     /**
      * Constructs new BlobStoreRepository
      * @param metadata   The metadata for this repository including name and settings
@@ -430,6 +433,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             this::doSnapshotShard,
             this::snapshotFile
         );
+        staleBlobDeleteRunner = new ThrottledTaskRunner(
+            "cleanupStaleBlobs",
+            threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
+            threadPool.executor(ThreadPool.Names.SNAPSHOT)
+        );
     }
 
     @Override
@@ -1150,31 +1158,65 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         RepositoryData newRepoData,
         ActionListener<DeleteResult> listener
     ) {
-        final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> {
-            DeleteResult deleteResult = DeleteResult.ZERO;
-            for (DeleteResult result : deleteResults) {
-                deleteResult = deleteResult.add(result);
+        final var blobsDeleted = new AtomicLong();
+        final var bytesDeleted = new AtomicLong();
+        try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) {
+
+            final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
+            if (staleRootBlobs.isEmpty() == false) {
+                staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
+                    try (ref) {
+                        logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
+                        deleteFromContainer(blobContainer(), staleRootBlobs.iterator());
+                        for (final var staleRootBlob : staleRootBlobs) {
+                            bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length());
+                        }
+                        blobsDeleted.addAndGet(staleRootBlobs.size());
+                    } catch (Exception e) {
+                        logger.warn(
+                            () -> format(
+                                "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
+                                metadata.name(),
+                                staleRootBlobs
+                            ),
+                            e
+                        );
+                    }
+                }));
             }
-            listener.onResponse(deleteResult);
-        }, listener::onFailure));
 
-        final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
-        final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
-        if (staleRootBlobs.isEmpty()) {
-            groupedListener.onResponse(DeleteResult.ZERO);
-        } else {
-            executor.execute(ActionRunnable.supply(groupedListener, () -> {
-                List<String> deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
-                return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
-            }));
+            final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
+            for (final var indexEntry : foundIndices.entrySet()) {
+                final var indexSnId = indexEntry.getKey();
+                if (survivingIndexIds.contains(indexSnId)) {
+                    continue;
+                }
+                staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
+                    try (ref) {
+                        logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
+                        final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT);
+                        blobsDeleted.addAndGet(deleteResult.blobsDeleted());
+                        bytesDeleted.addAndGet(deleteResult.bytesDeleted());
+                        logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
+                    } catch (IOException e) {
+                        logger.warn(() -> format("""
+                            [%s] index %s is no longer part of any snapshot in the repository, \
+                            but failed to clean up its index folder""", metadata.name(), indexSnId), e);
+                    }
+                }));
+            }
         }
 
-        final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
-        if (foundIndices.keySet().equals(survivingIndexIds)) {
-            groupedListener.onResponse(DeleteResult.ZERO);
-        } else {
-            executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
-        }
+        // If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us from
+        // falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because that
+        // would slow down other snapshot operations in situations that do not need backpressure.
+        //
+        // The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads
+        // the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one
+        // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT
+        // pool is fully occupied with blob deletions, which pushes back on other snapshot operations.
+
+        staleBlobDeleteRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT));
     }
 
     /**
@@ -1183,8 +1225,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      * TODO: Add shard level cleanups
      * TODO: Add unreferenced index metadata cleanup
      * <ul>
-     *     <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
-     *     <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
+     *     <li>Deleting stale indices</li>
+     *     <li>Deleting unreferenced root level blobs</li>
      * </ul>
      * @param repositoryStateId     Current repository state id
      * @param repositoryMetaVersion version of the updated repository metadata to write
@@ -1262,70 +1304,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }).toList();
     }
 
-    private List<String> cleanupStaleRootFiles(
-        long previousGeneration,
-        Collection<SnapshotId> deletedSnapshots,
-        List<String> blobsToDelete
-    ) {
-        if (blobsToDelete.isEmpty()) {
-            return blobsToDelete;
-        }
-        try {
-            if (logger.isInfoEnabled()) {
-                // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
-                // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
-                // delete would also log a confusing INFO message about "stale blobs".
-                final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
-                    .flatMap(
-                        snapshotId -> Stream.of(
-                            GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
-                            SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
-                            INDEX_FILE_PREFIX + previousGeneration
-                        )
+    private void logStaleRootLevelBlobs(long previousGeneration, Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
+        if (logger.isInfoEnabled()) {
+            // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
+            // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
+            // delete would also log a confusing INFO message about "stale blobs".
+            final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
+                .flatMap(
+                    snapshotId -> Stream.of(
+                        GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
+                        SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
+                        INDEX_FILE_PREFIX + previousGeneration
                     )
-                    .collect(Collectors.toSet());
-                final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
-                if (blobsToLog.isEmpty() == false) {
-                    logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
-                }
-            }
-            deleteFromContainer(blobContainer(), blobsToDelete.iterator());
-            return blobsToDelete;
-        } catch (Exception e) {
-            logger.warn(
-                () -> format(
-                    "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
-                    metadata.name(),
-                    blobsToDelete
-                ),
-                e
-            );
-        }
-        return Collections.emptyList();
-    }
-
-    private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
-        DeleteResult deleteResult = DeleteResult.ZERO;
-        for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
-            final String indexSnId = indexEntry.getKey();
-            try {
-                if (survivingIndexIds.contains(indexSnId) == false) {
-                    logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
-                    deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT));
-                    logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
-                }
-            } catch (Exception e) {
-                logger.warn(
-                    () -> format(
-                        "[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder",
-                        metadata.name(),
-                        indexSnId
-                    ),
-                    e
-                );
+                )
+                .collect(Collectors.toSet());
+            final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
+            if (blobsToLog.isEmpty() == false) {
+                logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
             }
         }
-        return deleteResult;
     }
 
     @Override

+ 50 - 0
server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java

@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -141,6 +142,55 @@ public class AbstractThrottledTaskRunnerTests extends ESTestCase {
         assertNoRunningTasks(taskRunner);
     }
 
+    public void testRunSyncTasksEagerly() {
+        final int maxTasks = randomIntBetween(1, maxThreads);
+        final int taskCount = between(maxTasks, maxTasks * 2);
+        final var barrier = new CyclicBarrier(maxTasks + 1);
+        final var executedCountDown = new CountDownLatch(taskCount);
+        final var testThread = Thread.currentThread();
+
+        class TestTask implements ActionListener<Releasable> {
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError(e);
+            }
+
+            @Override
+            public void onResponse(Releasable releasable) {
+                try (releasable) {
+                    if (Thread.currentThread() != testThread) {
+                        safeAwait(barrier);
+                        safeAwait(barrier);
+                    }
+                } finally {
+                    executedCountDown.countDown();
+                }
+            }
+        }
+
+        final BlockingQueue<TestTask> queue = ConcurrentCollections.newBlockingQueue();
+        final AbstractThrottledTaskRunner<TestTask> taskRunner = new AbstractThrottledTaskRunner<>("test", maxTasks, executor, queue);
+        for (int i = 0; i < taskCount; i++) {
+            taskRunner.enqueueTask(new TestTask());
+        }
+
+        safeAwait(barrier);
+        assertThat(taskRunner.runningTasks(), equalTo(maxTasks)); // maxTasks tasks are running now
+        assertEquals(taskCount - maxTasks, queue.size()); // the remainder are enqueued
+
+        final var capturedTask = new AtomicReference<Runnable>();
+        taskRunner.runSyncTasksEagerly(t -> assertTrue(capturedTask.compareAndSet(null, t)));
+        assertEquals(taskCount - maxTasks, queue.size()); // hasn't run any tasks yet
+        capturedTask.get().run();
+        assertTrue(queue.isEmpty());
+
+        safeAwait(barrier);
+        safeAwait(executedCountDown);
+        assertTrue(queue.isEmpty());
+        assertNoRunningTasks(taskRunner);
+    }
+
     public void testFailsTasksOnRejectionOrShutdown() throws Exception {
         final var executor = randomBoolean()
             ? EsExecutors.newScaling("test", maxThreads, maxThreads, 0, TimeUnit.MILLISECONDS, true, threadFactory, threadContext)

+ 12 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -170,6 +170,8 @@ public class MockRepository extends FsRepository {
 
     private volatile boolean blocked = false;
 
+    private volatile boolean failOnDeleteContainer = false;
+
     public MockRepository(
         RepositoryMetadata metadata,
         Environment environment,
@@ -352,6 +354,13 @@ public class MockRepository extends FsRepository {
         blockOnceOnReadSnapshotInfo.set(true);
     }
 
+    /**
+     * Sets the fail-on-delete-container flag, which if {@code true} throws an exception when deleting a {@link BlobContainer}.
+     */
+    public void setFailOnDeleteContainer(boolean failOnDeleteContainer) {
+        this.failOnDeleteContainer = failOnDeleteContainer;
+    }
+
     public boolean blocked() {
         return blocked;
     }
@@ -550,6 +559,9 @@ public class MockRepository extends FsRepository {
 
             @Override
             public DeleteResult delete(OperationPurpose purpose) throws IOException {
+                if (failOnDeleteContainer) {
+                    throw new IOException("simulated delete-container failure");
+                }
                 DeleteResult deleteResult = DeleteResult.ZERO;
                 for (BlobContainer child : children(purpose).values()) {
                     deleteResult = deleteResult.add(child.delete(purpose));