1
0
Эх сурвалжийг харах

Use the system index descriptor in the snapshot blob cache cleanup task (#120937) (#121053)

Clean up of the `.snapshot-blob-cache*` system index is done only on the
node that hosts the primary of the shard 0 of that index. When the index
is migrated as part of an upgrade test e.g. v7 -> v8, the index is
reindexed to a new index `.snapshot-blob-cache-reindexed-for-9`. The
code scheduling this clean up task is not able to locate the shard and
would never trigger a clean up after the upgrade. This change uses the
system index descriptor to find the matching shard and would work for
future versions too.

Closes https://github.com/elastic/elasticsearch/issues/120518
Pooya Salehi 8 сар өмнө
parent
commit
604c015305

+ 6 - 0
docs/changelog/120937.yaml

@@ -0,0 +1,6 @@
+pr: 120937
+summary: Use the system index descriptor in the snapshot blob cache cleanup task
+area: Snapshot/Restore
+type: bug
+issues:
+ - 120518

+ 45 - 1
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java

@@ -23,8 +23,11 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.reindex.ReindexAction;
+import org.elasticsearch.index.reindex.ReindexRequest;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.LuceneFilesExtensions;
+import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.reindex.ReindexPlugin;
 import org.elasticsearch.repositories.IndexId;
@@ -63,9 +66,11 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 
 public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase {
 
@@ -194,7 +199,6 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
                 }
             });
         }
-
         logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
         assertAcked(indicesAdmin().prepareDelete("mounted-*"));
         assertBusy(() -> {
@@ -311,6 +315,46 @@ public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends Base
         }
     }
 
+    public void testCleanUpMigratedSystemIndexAfterIndicesAreDeleted() throws Exception {
+        final String repositoryName = "repository";
+        createRepository(repositoryName, FsRepository.TYPE);
+
+        final Map<String, Tuple<Settings, Long>> mountedIndices = mountRandomIndicesWithCache(repositoryName, 3, 10);
+        ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
+        refreshSystemIndex(true);
+
+        final long numberOfEntriesInCache = numberOfEntriesInCache();
+        logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache);
+        assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum()));
+
+        migrateTheSystemIndex();
+
+        logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
+        assertAcked(indicesAdmin().prepareDelete("mounted-*"));
+        assertBusy(() -> {
+            refreshSystemIndex(true);
+            assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0), 0L);
+        });
+    }
+
+    /**
+     *  Mimics migration of the {@link SearchableSnapshots#SNAPSHOT_BLOB_CACHE_INDEX} as done in
+     *  {@link org.elasticsearch.upgrades.SystemIndexMigrator}, where the index is re-indexed, and replaced by an alias.
+     */
+    private void migrateTheSystemIndex() {
+        final var migratedSnapshotBlobCache = SNAPSHOT_BLOB_CACHE_INDEX + SystemIndices.UPGRADED_INDEX_SUFFIX;
+        logger.info("--> migrating {} system index to {}", SNAPSHOT_BLOB_CACHE_INDEX, migratedSnapshotBlobCache);
+        var reindexRequest = new ReindexRequest().setSourceIndices(SNAPSHOT_BLOB_CACHE_INDEX)
+            .setDestIndex(migratedSnapshotBlobCache)
+            .setRefresh(true);
+        var resp = safeGet(client().execute(ReindexAction.INSTANCE, reindexRequest));
+        assertThat(resp.getBulkFailures(), is(empty()));
+        indicesAdmin().prepareAliases()
+            .removeIndex(SNAPSHOT_BLOB_CACHE_INDEX)
+            .addAlias(migratedSnapshotBlobCache, SNAPSHOT_BLOB_CACHE_INDEX)
+            .get();
+    }
+
     /**
      * @return a {@link Client} that can be used to query the blob store cache system index
      */

+ 8 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

@@ -339,7 +339,14 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
             final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(client, SNAPSHOT_BLOB_CACHE_INDEX);
             this.blobStoreCacheService.set(blobStoreCacheService);
             clusterService.addListener(
-                new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)
+                new BlobStoreCacheMaintenanceService(
+                    settings,
+                    clusterService,
+                    threadPool,
+                    client,
+                    services.systemIndices(),
+                    SNAPSHOT_BLOB_CACHE_INDEX
+                )
             );
             components.add(blobStoreCacheService);
         } else {

+ 18 - 14
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java

@@ -36,7 +36,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.document.DocumentField;
@@ -47,7 +46,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
 import org.elasticsearch.core.AbstractRefCounted;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
@@ -57,6 +55,8 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -145,6 +145,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
     private final Client clientWithOrigin;
     private final String systemIndexName;
     private final ThreadPool threadPool;
+    private final SystemIndexDescriptor systemIndexDescriptor;
 
     private volatile Scheduler.Cancellable periodicTask;
     private volatile TimeValue periodicTaskInterval;
@@ -158,10 +159,12 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
         ClusterService clusterService,
         ThreadPool threadPool,
         Client client,
+        SystemIndices systemIndices,
         String systemIndexName
     ) {
         this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN);
         this.systemIndexName = Objects.requireNonNull(systemIndexName);
+        this.systemIndexDescriptor = Objects.requireNonNull(systemIndices.findMatchingDescriptor(systemIndexName));
         this.clusterService = Objects.requireNonNull(clusterService);
         this.threadPool = Objects.requireNonNull(threadPool);
         this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings);
@@ -181,10 +184,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
         if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
             return; // state not fully recovered
         }
-        final ShardRouting primary = systemIndexPrimaryShard(state);
-        if (primary == null
-            || primary.active() == false
-            || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) {
+        if (systemIndexPrimaryShardActiveAndAssignedToLocalNode(state) == false) {
             // system index primary shard does not exist or is not assigned to this data node
             stopPeriodicTask();
             return;
@@ -242,16 +242,20 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
         }
     }
 
-    @Nullable
-    private ShardRouting systemIndexPrimaryShard(final ClusterState state) {
-        final IndexMetadata indexMetadata = state.metadata().index(systemIndexName);
-        if (indexMetadata != null) {
-            final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex());
-            if (indexRoutingTable != null) {
-                return indexRoutingTable.shard(0).primaryShard();
+    private boolean systemIndexPrimaryShardActiveAndAssignedToLocalNode(final ClusterState state) {
+        for (IndexMetadata indexMetadata : state.metadata()) {
+            if (indexMetadata.isSystem() && systemIndexDescriptor.matchesIndexPattern(indexMetadata.getIndex().getName())) {
+                final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex());
+                if (indexRoutingTable == null || indexRoutingTable.shard(0) == null) {
+                    continue;
+                }
+                final var primary = indexRoutingTable.shard(0).primaryShard();
+                if (primary != null && primary.active() && Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId())) {
+                    return true;
+                }
             }
         }
-        return null;
+        return false;
     }
 
     private static boolean hasSearchableSnapshotWith(final ClusterState state, final String snapshotId, final String indexId) {