浏览代码

Add periodic maintenance task to clean up unused blob store cache docs (#78438)

In #77686 we added a service to clean up blob store 
cache docs after a searchable snapshot is no more 
used. We noticed some situations where some cache 
docs could still remain in the system index: when the 
system index is not available when the searchable 
snapshot index is deleted; when the system index is 
restored from a backup or when the searchable 
snapshot index was deleted on a version before #77686.

This commit introduces a maintenance task that 
periodically scans and cleans up unused blob cache 
docs. This task is scheduled to run every hour on the 
data node that contain the blob store cache primary 
shard. The periodic task works by using a point in 
time context with search_after.
Tanguy Leroux 4 年之前
父节点
当前提交
63d663e220

+ 28 - 0
docs/reference/searchable-snapshots/index.asciidoc

@@ -198,6 +198,34 @@ IMPORTANT: You can only configure these settings on nodes with the
 <<data-frozen-node,`data_frozen`>> role. Additionally, nodes with a shared
 cache can only have a single <<path-settings,data path>>.
 
+{es} also uses a dedicated system index named `.snapshot-blob-cache` to speed
+up the recoveries of {search-snap} shards. This index is used as an additional
+caching layer on top of the partially or fully mounted data and contains the
+minimal required data to start the {search-snap} shards. {es} automatically
+deletes the documents that are no longer used in this index. This periodic
+clean up can be tuned using the following settings:
+
+`searchable_snapshots.blob_cache.periodic_cleanup.interval`::
+(<<dynamic-cluster-setting,Dynamic>>)
+The interval at which the periodic cleanup of the `.snapshot-blob-cache`
+index is scheduled. Defaults to every hour (`1h`).
+
+`searchable_snapshots.blob_cache.periodic_cleanup.retention_period`::
+(<<dynamic-cluster-setting,Dynamic>>)
+The retention period to keep obsolete documents in the `.snapshot-blob-cache`
+index. Defaults to every hour (`1h`).
+
+`searchable_snapshots.blob_cache.periodic_cleanup.batch_size`::
+(<<dynamic-cluster-setting,Dynamic>>)
+The number of documents that are searched for and bulk-deleted at once during
+the periodic cleanup of the `.snapshot-blob-cache` index. Defaults to `100`.
+
+`searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive`::
+(<<dynamic-cluster-setting,Dynamic>>)
+The value used for the <point-in-time-keep-alive,point-in-time keep alive>>
+requests executed during the periodic cleanup of the `.snapshot-blob-cache`
+index. Defaults to `10m`.
+
 [discrete]
 [[searchable-snapshots-costs]]
 === Reduce costs with {search-snaps}

+ 247 - 65
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java

@@ -7,29 +7,49 @@
 
 package org.elasticsearch.xpack.searchablesnapshots.cache.blob;
 
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.store.LuceneFilesExtensions;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage;
 import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase;
 import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
+import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -57,71 +77,17 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
     /**
      * Test that snapshot blob cache entries are deleted from the system index after the corresponding searchable snapshot index is deleted
      */
-    public void testMaintenance() throws Exception {
+    public void testCleanUpAfterIndicesAreDeleted() throws Exception {
         final String repositoryName = "repository";
         createRepository(repositoryName, FsRepository.TYPE);
 
-        final int nbIndices = randomIntBetween(3, 10);
-
-        logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices);
-        final Map<String, Long> mountedIndices = new HashMap<>();
-        final Map<String, Settings> mountedIndicesSettings = new HashMap<>();
-
-        int i = 0;
-        long previousNumberOfCachedEntries = 0;
-        while (mountedIndices.size() < nbIndices) {
-            final String indexName = "index-" + i;
-            createIndex(indexName);
-
-            final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
-            for (int n = 100; n > 0; n--) {
-                indexRequestBuilders.add(
-                    client().prepareIndex(indexName)
-                        .setSource(
-                            XContentFactory.smileBuilder()
-                                .startObject()
-                                .field("text", randomRealisticUnicodeOfCodepointLength(10))
-                                .endObject()
-                        )
-                );
-            }
-            indexRandom(true, indexRequestBuilders);
-
-            createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName));
-            assertAcked(client().admin().indices().prepareDelete(indexName));
-
-            final String mountedIndex = "mounted-index-" + i;
-            mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values()));
-
-            ensureGreen(mountedIndex);
-            assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
-            assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
-            waitForBlobCacheFillsToComplete();
-
-            refreshSystemIndex(false);
-
-            final long numberOfEntriesInCache = numberOfEntriesInCache();
-            if (numberOfEntriesInCache > previousNumberOfCachedEntries) {
-                final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries;
-                logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries);
-                mountedIndices.put(mountedIndex, nbEntries);
-                mountedIndicesSettings.put(mountedIndex, getIndexSettings(mountedIndex));
-
-            } else {
-                logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex);
-                assertAcked(client().admin().indices().prepareDelete(mountedIndex));
-            }
-
-            previousNumberOfCachedEntries = numberOfEntriesInCache;
-            i += 1;
-        }
-
+        final Map<String, Tuple<Settings, Long>> mountedIndices = mountRandomIndicesWithCache(repositoryName, 3, 10);
         ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
         refreshSystemIndex(true);
 
         final long numberOfEntriesInCache = numberOfEntriesInCache();
         logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache);
-        assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum()));
+        assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum()));
 
         final List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices.keySet());
         assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new)));
@@ -129,7 +95,7 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
         final long expectedDeletedEntriesInCache = mountedIndices.entrySet()
             .stream()
             .filter(e -> indicesToDelete.contains(e.getKey()))
-            .mapToLong(Map.Entry::getValue)
+            .mapToLong(entry -> entry.getValue().v2())
             .sum();
         logger.info("--> deleting indices [{}] with [{}] entries in snapshot blob cache", indicesToDelete, expectedDeletedEntriesInCache);
 
@@ -138,7 +104,7 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
             assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache));
 
             for (String mountedIndex : mountedIndices.keySet()) {
-                final Settings indexSettings = mountedIndicesSettings.get(mountedIndex);
+                final Settings indexSettings = mountedIndices.get(mountedIndex).v1();
                 assertHitCount(
                     systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
                         .setQuery(
@@ -150,7 +116,7 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
                         )
                         .setSize(0)
                         .get(),
-                    indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex)
+                    indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex).v2()
                 );
             }
         });
@@ -178,13 +144,12 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
                 Settings.EMPTY,
                 randomFrom(Storage.values())
             );
-
             ensureGreen(remainingMountedIndex);
-            mountedIndicesSettings.put(remainingMountedIndex, getIndexSettings(remainingMountedIndex));
 
             assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
             assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
             waitForBlobCacheFillsToComplete();
+            ensureClusterStateConsistency();
 
             logger.info(
                 "--> deleting more mounted indices [{}] with snapshot [{}/{}] of index [{}] is still mounted as index [{}]",
@@ -200,7 +165,7 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
                 refreshSystemIndex(true);
 
                 for (String mountedIndex : mountedIndices.keySet()) {
-                    final Settings indexSettings = mountedIndicesSettings.get(mountedIndex);
+                    final Settings indexSettings = mountedIndices.get(mountedIndex).v1();
 
                     final long remainingEntriesInCache = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
                         .setQuery(
@@ -218,11 +183,11 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
                     if (indicesToDelete.contains(mountedIndex)) {
                         assertThat(remainingEntriesInCache, equalTo(0L));
                     } else if (snapshotId.equals(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))) {
-                        assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex)));
+                        assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex).v2()));
                     } else if (moreIndicesToDelete.contains(mountedIndex)) {
                         assertThat(remainingEntriesInCache, equalTo(0L));
                     } else {
-                        assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex)));
+                        assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex).v2()));
                     }
                 }
             });
@@ -236,6 +201,130 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
         });
     }
 
+    /**
+     * Test that obsolete blob cache entries are deleted from the system index by the periodic maintenance task.
+     */
+    public void testPeriodicMaintenance() throws Exception {
+        ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60L));
+
+        createRepository("repo", FsRepository.TYPE);
+        Map<String, Tuple<Settings, Long>> mountedIndices = mountRandomIndicesWithCache("repo", 1, 3);
+        ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
+
+        final long nbEntriesInCacheForMountedIndices = mountedIndices.values().stream().mapToLong(Tuple::v2).sum();
+        refreshSystemIndex(true);
+        assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices));
+
+        createRepository("other", FsRepository.TYPE);
+        Map<String, Tuple<Settings, Long>> otherMountedIndices = mountRandomIndicesWithCache("other", 1, 3);
+
+        final long nbEntriesInCacheForOtherIndices = otherMountedIndices.values().stream().mapToLong(Tuple::v2).sum();
+        refreshSystemIndex(true);
+        assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices));
+
+        if (randomBoolean()) {
+            final int oldDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().minus(Duration.ofDays(7L)).toEpochMilli());
+            refreshSystemIndex(true);
+            assertThat(
+                numberOfEntriesInCache(),
+                equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + oldDocsInCache)
+            );
+        }
+
+        // creates a backup of the system index cache to be restored later
+        createRepository("backup", FsRepository.TYPE);
+        createSnapshot("backup", "backup", List.of(SNAPSHOT_BLOB_CACHE_INDEX));
+
+        final Set<String> indicesToDelete = new HashSet<>(mountedIndices.keySet());
+        indicesToDelete.add(randomFrom(otherMountedIndices.keySet()));
+
+        assertAcked(systemClient().admin().indices().prepareDelete(SNAPSHOT_BLOB_CACHE_INDEX));
+        assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new)));
+        assertAcked(client().admin().cluster().prepareDeleteRepository("repo"));
+        ensureClusterStateConsistency();
+
+        assertThat(numberOfEntriesInCache(), equalTo(0L));
+
+        assertAcked(
+            client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setTransientSettings(
+                    Settings.builder()
+                        .put(
+                            BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey(),
+                            TimeValue.timeValueSeconds(1L)
+                        )
+                )
+        );
+        try {
+            // restores the .snapshot-blob-cache index with - now obsolete - documents
+            final RestoreSnapshotResponse restoreResponse = client().admin()
+                .cluster()
+                .prepareRestoreSnapshot("backup", "backup")
+                .setIndices(SNAPSHOT_BLOB_CACHE_INDEX)
+                .setWaitForCompletion(true)
+                .get();
+            assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(1));
+            assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0));
+
+            final int recentDocsInCache;
+            if (randomBoolean()) {
+                // recent as in the future, actually
+                recentDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().plus(Duration.ofDays(10L)).toEpochMilli());
+            } else {
+                recentDocsInCache = 0;
+            }
+
+            // only very old docs should have been deleted
+            assertBusy(() -> {
+                refreshSystemIndex(true);
+                assertThat(
+                    numberOfEntriesInCache(),
+                    equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + recentDocsInCache)
+                );
+            }, 30L, TimeUnit.SECONDS);
+
+            // updating the retention period from 1H to immediate
+            assertAcked(
+                client().admin()
+                    .cluster()
+                    .prepareUpdateSettings()
+                    .setTransientSettings(
+                        Settings.builder()
+                            .put(
+                                BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey(),
+                                TimeValue.timeValueSeconds(0L)
+                            )
+                    )
+            );
+
+            // only used documents should remain
+            final long expectNumberOfRemainingCacheEntries = otherMountedIndices.entrySet()
+                .stream()
+                .filter(e -> indicesToDelete.contains(e.getKey()) == false)
+                .mapToLong(e -> e.getValue().v2())
+                .sum();
+
+            assertBusy(() -> {
+                refreshSystemIndex(true);
+                assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries + recentDocsInCache));
+            }, 30L, TimeUnit.SECONDS);
+
+        } finally {
+            assertAcked(
+                client().admin()
+                    .cluster()
+                    .prepareUpdateSettings()
+                    .setTransientSettings(
+                        Settings.builder()
+                            .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey())
+                            .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey())
+                    )
+            );
+        }
+    }
+
     /**
      * @return a {@link Client} that can be used to query the blob store cache system index
      */
@@ -270,4 +359,97 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
     private Settings getIndexSettings(String indexName) {
         return client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName);
     }
+
+    private Map<String, Tuple<Settings, Long>> mountRandomIndicesWithCache(String repositoryName, int min, int max) throws Exception {
+        refreshSystemIndex(false);
+        long previousNumberOfCachedEntries = numberOfEntriesInCache();
+
+        final int nbIndices = randomIntBetween(min, max);
+        logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices);
+        final Map<String, Tuple<Settings, Long>> mountedIndices = new HashMap<>();
+
+        int i = 0;
+        while (mountedIndices.size() < nbIndices) {
+            final String indexName = "index-" + i;
+            createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
+
+            while (true) {
+                final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+                for (int n = 500; n > 0; n--) {
+                    final XContentBuilder builder = XContentFactory.jsonBuilder();
+                    builder.startObject();
+                    for (int j = 0; j < 10; j++) {
+                        builder.field("text_" + j, randomRealisticUnicodeOfCodepointLength(10));
+                        builder.field("int_" + j, randomInt());
+                    }
+                    builder.endObject();
+                    indexRequestBuilders.add(client().prepareIndex(indexName).setSource(builder));
+                }
+                indexRandom(true, indexRequestBuilders);
+
+                final String snapshot = "snapshot-" + i;
+                createSnapshot(repositoryName, snapshot, List.of(indexName));
+
+                final String mountedIndex = "mounted-" + indexName + "-in-" + repositoryName;
+                mountSnapshot(repositoryName, snapshot, indexName, mountedIndex, Settings.EMPTY, randomFrom(Storage.values()));
+
+                ensureGreen(mountedIndex);
+                assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
+                assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
+                waitForBlobCacheFillsToComplete();
+
+                refreshSystemIndex(false);
+                final long numberOfEntriesInCache = numberOfEntriesInCache();
+                if (numberOfEntriesInCache > previousNumberOfCachedEntries) {
+                    final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries;
+                    logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries);
+                    mountedIndices.put(mountedIndex, Tuple.tuple(getIndexSettings(mountedIndex), nbEntries));
+                    previousNumberOfCachedEntries = numberOfEntriesInCache;
+                    break;
+
+                } else {
+                    logger.info("--> mounted index [{}] did not generate any entry in cache", mountedIndex);
+                    assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot).get());
+                    assertAcked(client().admin().indices().prepareDelete(mountedIndex));
+                }
+            }
+            assertAcked(client().admin().indices().prepareDelete(indexName));
+            i += 1;
+        }
+        return Collections.unmodifiableMap(mountedIndices);
+    }
+
+    private int indexRandomDocsInCache(final int minDocs, final int maxDocs, final long creationTimeInEpochMillis) {
+        final int nbDocs = randomIntBetween(minDocs, maxDocs);
+        final CountDownLatch latch = new CountDownLatch(nbDocs);
+
+        String repository = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
+        SnapshotId snapshotId = new SnapshotId("snap", UUIDs.randomBase64UUID());
+        IndexId indexId = new IndexId("index", UUIDs.randomBase64UUID());
+        ShardId shardId = new ShardId("index", UUIDs.randomBase64UUID(), randomInt(5));
+        String fileName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + '.' + randomFrom(LuceneFilesExtensions.values()).getExtension();
+        byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 64));
+
+        final BlobStoreCacheService blobStoreCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class);
+        for (int i = 0; i < nbDocs; i++) {
+            int length = randomIntBetween(1, Math.max(1, bytes.length - 1));
+            blobStoreCacheService.putAsync(
+                repository,
+                snapshotId,
+                indexId,
+                shardId,
+                fileName,
+                ByteRange.of(i, i + length),
+                new BytesArray(bytes, 0, length),
+                creationTimeInEpochMillis,
+                ActionListener.wrap(latch::countDown)
+            );
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new AssertionError(e);
+        }
+        return nbDocs;
+    }
 }

+ 9 - 4
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

@@ -314,7 +314,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
             FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING,
             FrozenCacheService.SNAPSHOT_CACHE_MAX_FREQ_SETTING,
             FrozenCacheService.SNAPSHOT_CACHE_DECAY_INTERVAL_SETTING,
-            FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING
+            FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING,
+            BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING,
+            BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING,
+            BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING,
+            BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD
         );
     }
 
@@ -345,11 +349,12 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
             final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
                 clusterService,
                 client,
-                SNAPSHOT_BLOB_CACHE_INDEX,
-                threadPool::absoluteTimeInMillis
+                SNAPSHOT_BLOB_CACHE_INDEX
             );
             this.blobStoreCacheService.set(blobStoreCacheService);
-            clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX));
+            clusterService.addListener(
+                new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)
+            );
             components.add(blobStoreCacheService);
         } else {
             PersistentCache.cleanUp(settings, nodeEnvironment);

+ 498 - 32
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java

@@ -11,17 +11,44 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkAction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.search.ClosePointInTimeAction;
+import org.elasticsearch.action.search.ClosePointInTimeRequest;
+import org.elasticsearch.action.search.ClosePointInTimeResponse;
+import org.elasticsearch.action.search.OpenPointInTimeAction;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -29,24 +56,36 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
+import org.elasticsearch.search.sort.ShardDocSortField;
 import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
-import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
-import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
-import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
 
 /**
  * A service that delete documents in the snapshot blob cache index when they are not required anymore.
@@ -60,14 +99,80 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
 
     private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class);
 
+    /**
+     * The interval at which the periodic cleanup of the blob store cache index is scheduled.
+     */
+    public static final Setting<TimeValue> SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING = Setting.timeSetting(
+        "searchable_snapshots.blob_cache.periodic_cleanup.interval",
+        TimeValue.timeValueHours(1),
+        TimeValue.ZERO,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+    /**
+     * The keep alive value for the internal point-in-time requests executed during the periodic cleanup.
+     */
+    public static final Setting<TimeValue> SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING = Setting.timeSetting(
+        "searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive",
+        TimeValue.timeValueMinutes(10L),
+        TimeValue.timeValueSeconds(30L),
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+    /**
+     * The number of documents that are searched for and bulk-deleted at once during the periodic cleanup.
+     */
+    public static final Setting<Integer> SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING = Setting.intSetting(
+        "searchable_snapshots.blob_cache.periodic_cleanup.batch_size",
+        100,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    /**
+     * The retention period to keep obsolete documents in the blob store cache index. This duration is used during the periodic cleanup in
+     * order to avoid deleting documents belonging to concurrently mounted searchable snapshots. Defaults to 1h.
+     */
+    public static final Setting<TimeValue> SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD = Setting.timeSetting(
+        "searchable_snapshots.blob_cache.periodic_cleanup.retention_period",
+        TimeValue.timeValueHours(1L),
+        TimeValue.ZERO,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    private final ClusterService clusterService;
     private final Client clientWithOrigin;
     private final String systemIndexName;
     private final ThreadPool threadPool;
 
-    public BlobStoreCacheMaintenanceService(ThreadPool threadPool, Client client, String systemIndexName) {
+    private volatile Scheduler.Cancellable periodicTask;
+    private volatile TimeValue periodicTaskInterval;
+    private volatile TimeValue periodicTaskKeepAlive;
+    private volatile TimeValue periodicTaskRetention;
+    private volatile int periodicTaskBatchSize;
+    private volatile boolean schedulePeriodic;
+
+    public BlobStoreCacheMaintenanceService(
+        Settings settings,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        Client client,
+        String systemIndexName
+    ) {
         this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN);
         this.systemIndexName = Objects.requireNonNull(systemIndexName);
+        this.clusterService = Objects.requireNonNull(clusterService);
         this.threadPool = Objects.requireNonNull(threadPool);
+        this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings);
+        this.periodicTaskKeepAlive = SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING.get(settings);
+        this.periodicTaskBatchSize = SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING.get(settings);
+        this.periodicTaskRetention = SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.get(settings);
+        final ClusterSettings clusterSettings = clusterService.getClusterSettings();
+        clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, this::setPeriodicTaskInterval);
+        clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, this::setPeriodicTaskKeepAlive);
+        clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, this::setPeriodicTaskBatchSize);
+        clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD, this::setPeriodicTaskRetention);
     }
 
     @Override
@@ -77,11 +182,63 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
             return; // state not fully recovered
         }
         final ShardRouting primary = systemIndexPrimaryShard(state);
-        if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) {
-            return; // system index primary shard does not exist or is not assigned to this data node
+        if (primary == null
+            || primary.active() == false
+            || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) {
+            // system index primary shard does not exist or is not assigned to this data node
+            stopPeriodicTask();
+            return;
         }
         if (event.indicesDeleted().isEmpty() == false) {
-            threadPool.generic().execute(new MaintenanceTask(event));
+            threadPool.generic().execute(new DeletedIndicesMaintenanceTask(event));
+        }
+        if (periodicTask == null || periodicTask.isCancelled()) {
+            schedulePeriodic = true;
+            startPeriodicTask();
+        }
+    }
+
+    private synchronized void setPeriodicTaskInterval(TimeValue interval) {
+        this.periodicTaskInterval = interval;
+    }
+
+    private void setPeriodicTaskKeepAlive(TimeValue keepAlive) {
+        this.periodicTaskKeepAlive = keepAlive;
+    }
+
+    public void setPeriodicTaskRetention(TimeValue retention) {
+        this.periodicTaskRetention = retention;
+    }
+
+    public void setPeriodicTaskBatchSize(int batchSize) {
+        this.periodicTaskBatchSize = batchSize;
+    }
+
+    private synchronized void startPeriodicTask() {
+        if (schedulePeriodic) {
+            try {
+                final TimeValue delay = periodicTaskInterval;
+                if (delay.getMillis() > 0L) {
+                    final PeriodicMaintenanceTask task = new PeriodicMaintenanceTask(periodicTaskKeepAlive, periodicTaskBatchSize);
+                    periodicTask = threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
+                } else {
+                    periodicTask = null;
+                }
+            } catch (EsRejectedExecutionException e) {
+                if (e.isExecutorShutdown()) {
+                    logger.debug("failed to schedule next periodic maintenance task for blob store cache, node is shutting down", e);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private synchronized void stopPeriodicTask() {
+        schedulePeriodic = false;
+        if (periodicTask != null && periodicTask.isCancelled() == false) {
+            periodicTask.cancel();
+            periodicTask = null;
         }
     }
 
@@ -97,28 +254,34 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
         return null;
     }
 
-    private static boolean hasSearchableSnapshotWith(final ClusterState state, final SnapshotId snapshotId, final IndexId indexId) {
+    private static boolean hasSearchableSnapshotWith(final ClusterState state, final String snapshotId, final String indexId) {
         for (IndexMetadata indexMetadata : state.metadata()) {
             final Settings indexSettings = indexMetadata.getSettings();
             if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) {
-                final SnapshotId otherSnapshotId = new SnapshotId(
-                    SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
-                    SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
-                );
-                if (Objects.equals(snapshotId, otherSnapshotId)) {
-                    final IndexId otherIndexId = new IndexId(
-                        SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
-                        SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)
-                    );
-                    if (Objects.equals(indexId, otherIndexId)) {
-                        return true;
-                    }
+                if (Objects.equals(snapshotId, SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))
+                    && Objects.equals(indexId, SNAPSHOT_INDEX_ID_SETTING.get(indexSettings))) {
+                    return true;
                 }
             }
         }
         return false;
     }
 
+    private static Map<String, Set<String>> listSearchableSnapshots(final ClusterState state) {
+        Map<String, Set<String>> snapshots = null;
+        for (IndexMetadata indexMetadata : state.metadata()) {
+            final Settings indexSettings = indexMetadata.getSettings();
+            if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) {
+                if (snapshots == null) {
+                    snapshots = new HashMap<>();
+                }
+                snapshots.computeIfAbsent(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), s -> new HashSet<>())
+                    .add(SNAPSHOT_INDEX_ID_SETTING.get(indexSettings));
+            }
+        }
+        return snapshots != null ? Collections.unmodifiableMap(snapshots) : Collections.emptyMap();
+    }
+
     static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) {
         final Set<String> paths = IntStream.range(0, numberOfShards)
             .mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard)))
@@ -127,11 +290,14 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
         return QueryBuilders.termsQuery("blob.path", paths);
     }
 
-    private class MaintenanceTask extends AbstractRunnable {
+    /**
+     * A maintenance task that cleans up the blob store cache index after searchable snapshot indices are deleted
+     */
+    private class DeletedIndicesMaintenanceTask extends AbstractRunnable {
 
         private final ClusterChangedEvent event;
 
-        MaintenanceTask(ClusterChangedEvent event) {
+        DeletedIndicesMaintenanceTask(ClusterChangedEvent event) {
             assert event.indicesDeleted().isEmpty() == false;
             this.event = Objects.requireNonNull(event);
         }
@@ -150,14 +316,8 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
                     if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) {
                         assert state.metadata().hasIndex(deletedIndex) == false;
 
-                        final SnapshotId snapshotId = new SnapshotId(
-                            SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting),
-                            SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting)
-                        );
-                        final IndexId indexId = new IndexId(
-                            SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting),
-                            SNAPSHOT_INDEX_ID_SETTING.get(indexSetting)
-                        );
+                        final String snapshotId = SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting);
+                        final String indexId = SNAPSHOT_INDEX_ID_SETTING.get(indexSetting);
 
                         // we should do nothing if the current cluster state contains another
                         // searchable snapshot index that uses the same index snapshot
@@ -171,7 +331,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
                         }
 
                         final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName);
-                        request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId()));
+                        request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId, indexId));
                         request.setRefresh(queue.isEmpty());
 
                         queue.add(Tuple.tuple(request, new ActionListener<>() {
@@ -237,4 +397,310 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
             );
         }
     }
+
+    /**
+     * A maintenance task that periodically cleans up unused cache entries from the blob store cache index.
+     *
+     * This task first opens a point-in-time context on the blob store cache system index and uses it to search all documents. For each
+     * document found the task verifies if it belongs to an existing searchable snapshot index. If the doc does not belong to any
+     * index then it is deleted as part of a bulk request. Once the bulk is executed the next batch of documents is searched for. Once
+     * all documents from the PIT have been verified the task closes the PIT and completes itself.
+     *
+     * The task executes every step (PIT opening, searches, bulk deletes, PIT closing) using the generic thread pool.
+     * The same task instance is used for all the steps and makes sure that a closed instance is not executed again.
+     */
+    private class PeriodicMaintenanceTask implements Runnable, Releasable {
+
+        private final TimeValue keepAlive;
+        private final int batchSize;
+
+        private final AtomicReference<Exception> error = new AtomicReference<>();
+        private final AtomicBoolean closed = new AtomicBoolean();
+        private final AtomicLong deletes = new AtomicLong();
+        private final AtomicLong total = new AtomicLong();
+
+        private volatile Map<String, Set<String>> existingSnapshots;
+        private volatile Set<String> existingRepositories;
+        private volatile SearchResponse searchResponse;
+        private volatile Instant expirationTime;
+        private volatile String pointIntTimeId;
+        private volatile Object[] searchAfter;
+
+        PeriodicMaintenanceTask(TimeValue keepAlive, int batchSize) {
+            this.keepAlive = keepAlive;
+            this.batchSize = batchSize;
+        }
+
+        @Override
+        public void run() {
+            assert assertGenericThread();
+            try {
+                ensureOpen();
+                if (pointIntTimeId == null) {
+                    final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(SNAPSHOT_BLOB_CACHE_INDEX);
+                    openRequest.keepAlive(keepAlive);
+                    clientWithOrigin.execute(OpenPointInTimeAction.INSTANCE, openRequest, new ActionListener<>() {
+                        @Override
+                        public void onResponse(OpenPointInTimeResponse response) {
+                            logger.trace("periodic maintenance task initialized with point-in-time id [{}]", response.getPointInTimeId());
+                            PeriodicMaintenanceTask.this.pointIntTimeId = response.getPointInTimeId();
+                            executeNext(PeriodicMaintenanceTask.this);
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            if (TransportActions.isShardNotAvailableException(e)) {
+                                complete(null);
+                            } else {
+                                complete(e);
+                            }
+                        }
+                    });
+                    return;
+                }
+
+                final String pitId = pointIntTimeId;
+                assert Strings.hasLength(pitId);
+
+                if (searchResponse == null) {
+                    final SearchSourceBuilder searchSource = new SearchSourceBuilder();
+                    searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis"));
+                    searchSource.fetchSource(false);
+                    searchSource.trackScores(false);
+                    searchSource.sort(ShardDocSortField.NAME);
+                    searchSource.size(batchSize);
+                    if (searchAfter != null) {
+                        searchSource.searchAfter(searchAfter);
+                        searchSource.trackTotalHits(false);
+                    } else {
+                        searchSource.trackTotalHits(true);
+                    }
+                    final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId);
+                    pointInTime.setKeepAlive(keepAlive);
+                    searchSource.pointInTimeBuilder(pointInTime);
+                    final SearchRequest searchRequest = new SearchRequest();
+                    searchRequest.source(searchSource);
+                    clientWithOrigin.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<>() {
+                        @Override
+                        public void onResponse(SearchResponse response) {
+                            if (searchAfter == null) {
+                                assert PeriodicMaintenanceTask.this.total.get() == 0L;
+                                PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value);
+                            }
+                            PeriodicMaintenanceTask.this.searchResponse = response;
+                            PeriodicMaintenanceTask.this.searchAfter = null;
+                            executeNext(PeriodicMaintenanceTask.this);
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            complete(e);
+                        }
+                    });
+                    return;
+                }
+
+                final SearchHit[] searchHits = searchResponse.getHits().getHits();
+                if (searchHits != null && searchHits.length > 0) {
+                    if (expirationTime == null) {
+                        final TimeValue retention = periodicTaskRetention;
+                        expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis())
+                            .minus(retention.duration(), retention.timeUnit().toChronoUnit());
+
+                        final ClusterState state = clusterService.state();
+                        // compute the list of existing searchable snapshots and repositories once
+                        existingSnapshots = listSearchableSnapshots(state);
+                        existingRepositories = state.metadata()
+                            .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY)
+                            .repositories()
+                            .stream()
+                            .map(RepositoryMetadata::name)
+                            .collect(Collectors.toSet());
+                    }
+
+                    final BulkRequest bulkRequest = new BulkRequest();
+                    final Map<String, Set<String>> knownSnapshots = existingSnapshots;
+                    assert knownSnapshots != null;
+                    final Set<String> knownRepositories = existingRepositories;
+                    assert knownRepositories != null;
+                    final Instant expirationTime = this.expirationTime;
+                    assert expirationTime != null;
+
+                    Object[] lastSortValues = null;
+                    for (SearchHit searchHit : searchHits) {
+                        lastSortValues = searchHit.getSortValues();
+                        assert searchHit.getId() != null;
+                        try {
+                            boolean delete = false;
+
+                            // See {@link BlobStoreCacheService#generateId}
+                            // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset}
+                            final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/");
+                            assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId();
+
+                            final String repositoryName = parts[0];
+                            if (knownRepositories.contains(repositoryName) == false) {
+                                logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId());
+                                delete = true;
+                            } else {
+                                final Set<String> knownIndexIds = knownSnapshots.get(parts[1]);
+                                if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) {
+                                    logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId());
+                                    delete = true;
+                                }
+                            }
+                            if (delete) {
+                                final Instant creationTime = getCreationTime(searchHit);
+                                if (creationTime.isAfter(expirationTime)) {
+                                    logger.trace(
+                                        "blob store cache entry with id [{}] was created recently, skipping deletion",
+                                        searchHit.getId()
+                                    );
+                                    continue;
+                                }
+                                bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId()));
+                            }
+                        } catch (Exception e) {
+                            logger.warn(
+                                () -> new ParameterizedMessage(
+                                    "exception when parsing blob store cache entry with id [{}], skipping",
+                                    searchHit.getId()
+                                ),
+                                e
+                            );
+                        }
+                    }
+
+                    assert lastSortValues != null;
+                    if (bulkRequest.numberOfActions() == 0) {
+                        this.searchResponse = null;
+                        this.searchAfter = lastSortValues;
+                        executeNext(this);
+                        return;
+                    }
+
+                    final Object[] finalSearchAfter = lastSortValues;
+                    clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
+                        @Override
+                        public void onResponse(BulkResponse response) {
+                            for (BulkItemResponse itemResponse : response.getItems()) {
+                                if (itemResponse.isFailed() == false) {
+                                    assert itemResponse.getResponse() instanceof DeleteResponse;
+                                    PeriodicMaintenanceTask.this.deletes.incrementAndGet();
+                                }
+                            }
+                            PeriodicMaintenanceTask.this.searchResponse = null;
+                            PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter;
+                            executeNext(PeriodicMaintenanceTask.this);
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            complete(e);
+                        }
+                    });
+                    return;
+                }
+                // we're done, complete the task
+                complete(null);
+            } catch (Exception e) {
+                complete(e);
+            }
+        }
+
+        public boolean isClosed() {
+            return closed.get();
+        }
+
+        private void ensureOpen() {
+            if (isClosed()) {
+                assert false : "should not use periodic task after close";
+                throw new IllegalStateException("Periodic maintenance task is closed");
+            }
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                final Exception e = error.get();
+                if (e != null) {
+                    logger.warn(
+                        () -> new ParameterizedMessage(
+                            "periodic maintenance task completed with failure ({} deleted documents out of a total of {})",
+                            deletes.get(),
+                            total.get()
+                        ),
+                        e
+                    );
+                } else {
+                    logger.info(
+                        () -> new ParameterizedMessage(
+                            "periodic maintenance task completed ({} deleted documents out of a total of {})",
+                            deletes.get(),
+                            total.get()
+                        )
+                    );
+                }
+            }
+        }
+
+        private void complete(@Nullable Exception failure) {
+            assert isClosed() == false;
+            final Releasable releasable = () -> {
+                try {
+                    final Exception previous = error.getAndSet(failure);
+                    assert previous == null : "periodic maintenance task already failed: " + previous;
+                    close();
+                } finally {
+                    startPeriodicTask();
+                }
+            };
+            boolean waitForRelease = false;
+            try {
+                final String pitId = pointIntTimeId;
+                if (Strings.hasLength(pitId)) {
+                    final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId);
+                    clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() {
+                        @Override
+                        public void onResponse(ClosePointInTimeResponse response) {
+                            if (response.isSucceeded()) {
+                                logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId);
+                            } else {
+                                logger.debug("point-in-time id [{}] not found", pitId);
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e);
+                        }
+                    }, () -> Releasables.close(releasable)));
+                    waitForRelease = true;
+                }
+            } finally {
+                if (waitForRelease == false) {
+                    Releasables.close(releasable);
+                }
+            }
+        }
+    }
+
+    private void executeNext(PeriodicMaintenanceTask maintenanceTask) {
+        threadPool.generic().execute(maintenanceTask);
+    }
+
+    private static boolean assertGenericThread() {
+        final String threadName = Thread.currentThread().getName();
+        assert threadName.contains(ThreadPool.Names.GENERIC) : threadName;
+        return true;
+    }
+
+    private static Instant getCreationTime(SearchHit searchHit) {
+        final DocumentField creationTimeField = searchHit.field(CachedBlob.CREATION_TIME_FIELD);
+        assert creationTimeField != null;
+        final Object creationTimeValue = creationTimeField.getValue();
+        assert creationTimeValue != null;
+        assert creationTimeValue instanceof String : "expect a java.lang.String but got " + creationTimeValue.getClass();
+        return Instant.ofEpochMilli(Long.parseLong(creationTimeField.getValue()));
+    }
 }

+ 3 - 5
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java

@@ -49,7 +49,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
@@ -72,17 +71,15 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent {
 
     private final ClusterService clusterService;
     private final Semaphore inFlightCacheFills;
-    private final Supplier<Long> timeSupplier;
     private final AtomicBoolean closed;
     private final Client client;
     private final String index;
 
-    public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier<Long> timeSupplier) {
+    public BlobStoreCacheService(ClusterService clusterService, Client client, String index) {
         this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
         this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS);
         this.closed = new AtomicBoolean(false);
         this.clusterService = clusterService;
-        this.timeSupplier = timeSupplier;
         this.index = index;
     }
 
@@ -242,12 +239,13 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent {
         final String name,
         final ByteRange range,
         final BytesReference bytes,
+        final long timeInEpochMillis,
         final ActionListener<Void> listener
     ) {
         final String id = generateId(repository, snapshotId, indexId, shardId, name, range);
         try {
             final CachedBlob cachedBlob = new CachedBlob(
-                Instant.ofEpochMilli(timeSupplier.get()),
+                Instant.ofEpochMilli(timeInEpochMillis),
                 Version.CURRENT,
                 repository,
                 name,

+ 11 - 2
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java

@@ -31,6 +31,7 @@ public class CachedBlob implements ToXContent {
     public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L);
 
     private static final String TYPE = "blob";
+    public static final String CREATION_TIME_FIELD = "creation_time";
 
     private final Instant creationTime;
     private final Version version;
@@ -80,7 +81,7 @@ public class CachedBlob implements ToXContent {
         builder.startObject();
         {
             builder.field("type", TYPE);
-            builder.field("creation_time", creationTime.toEpochMilli());
+            builder.field(CREATION_TIME_FIELD, creationTime.toEpochMilli());
             builder.field("version", version.id);
             builder.field("repository", repository);
             builder.startObject("blob");
@@ -117,9 +118,17 @@ public class CachedBlob implements ToXContent {
         return bytes;
     }
 
+    public Version version() {
+        return version;
+    }
+
+    public Instant creationTime() {
+        return creationTime;
+    }
+
     @SuppressWarnings("unchecked")
     public static CachedBlob fromSource(final Map<String, Object> source) {
-        final Long creationTimeEpochMillis = (Long) source.get("creation_time");
+        final Long creationTimeEpochMillis = (Long) source.get(CREATION_TIME_FIELD);
         if (creationTimeEpochMillis == null) {
             throw new IllegalStateException("cached blob document does not have the [creation_time] field");
         }

+ 11 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java

@@ -704,7 +704,17 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
     }
 
     public void putCachedBlob(String name, ByteRange range, BytesReference content, ActionListener<Void> listener) {
-        blobStoreCacheService.putAsync(repository, snapshotId, indexId, shardId, name, range, content, listener);
+        blobStoreCacheService.putAsync(
+            repository,
+            snapshotId,
+            indexId,
+            shardId,
+            name,
+            range,
+            content,
+            threadPool.absoluteTimeInMillis(),
+            listener
+        );
     }
 
     public FrozenCacheFile getFrozenCacheFile(String fileName, long length) {

+ 6 - 6
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java

@@ -100,7 +100,7 @@ public class BlobStoreCacheServiceTests extends ESTestCase {
             return null;
         }).when(mockClient).execute(eq(GetAction.INSTANCE), any(GetRequest.class), any(ActionListener.class));
 
-        BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
+        BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
         blobCacheService.start();
 
         PlainActionFuture<CachedBlob> future = PlainActionFuture.newFuture();
@@ -132,17 +132,17 @@ public class BlobStoreCacheServiceTests extends ESTestCase {
             return null;
         }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));
 
-        BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
+        BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
         blobCacheService.start();
 
         PlainActionFuture<Void> future = PlainActionFuture.newFuture();
-        blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
+        blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
         assertThat(future.actionGet(), nullValue());
 
         blobCacheService.stop();
 
         future = PlainActionFuture.newFuture();
-        blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
+        blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
         IllegalStateException exception = expectThrows(IllegalStateException.class, future::actionGet);
         assertThat(exception.getMessage(), containsString("Blob cache service is closed"));
     }
@@ -170,7 +170,7 @@ public class BlobStoreCacheServiceTests extends ESTestCase {
             return null;
         }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));
 
-        final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
+        final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
         blobCacheService.start();
 
         assertThat(blobCacheService.getInFlightCacheFills(), equalTo(0));
@@ -180,7 +180,7 @@ public class BlobStoreCacheServiceTests extends ESTestCase {
             final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
             threadPool.generic()
                 .execute(
-                    () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future)
+                    () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future)
                 );
             futures.add(future);
         }

+ 2 - 2
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java

@@ -316,7 +316,7 @@ public final class TestUtils {
     public static class NoopBlobStoreCacheService extends BlobStoreCacheService {
 
         public NoopBlobStoreCacheService() {
-            super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
+            super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX);
         }
 
         @Override
@@ -345,7 +345,7 @@ public final class TestUtils {
         private final ConcurrentHashMap<String, BytesArray> blobs = new ConcurrentHashMap<>();
 
         public SimpleBlobStoreCacheService() {
-            super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX, System::currentTimeMillis);
+            super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX);
         }
 
         @Override