Ver código fonte

Move experimental frozen to frozen shard limit (#71781)

Frozen indices created on 7.12 would not belong to the frozen shard
limit group, now we convert them when last node is upgraded.

Relates #71392
Henning Andersen 4 anos atrás
pai
commit
3c7c0c62f3

+ 2 - 0
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

@@ -101,6 +101,7 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.full.PersistentCache;
 import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
 import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
 import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
+import org.elasticsearch.xpack.searchablesnapshots.upgrade.SearchableSnapshotIndexMetadataUpgrader;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -348,6 +349,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
         this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
         components.add(new FrozenCacheServiceSupplier(frozenCacheService.get()));
         components.add(new CacheServiceSupplier(cacheService.get()));
+        new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
         return Collections.unmodifiableList(components);
     }
 

+ 131 - 0
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/upgrade/SearchableSnapshotIndexMetadataUpgrader.java

@@ -0,0 +1,131 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.searchablesnapshots.upgrade;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.ShardLimitValidator;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class upgrades frozen indices to apply the index.shard_limit.group=frozen setting after all nodes have been upgraded to 7.13+
+ */
+public class SearchableSnapshotIndexMetadataUpgrader {
+    private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexMetadataUpgrader.class);
+
+    private final ClusterService clusterService;
+    private final ThreadPool threadPool;
+    private final AtomicBoolean upgraded = new AtomicBoolean();
+    private final ClusterStateListener listener = this::clusterChanged;
+
+    public SearchableSnapshotIndexMetadataUpgrader(ClusterService clusterService, ThreadPool threadPool) {
+        this.clusterService = clusterService;
+        this.threadPool = threadPool;
+    }
+
+    public void initialize() {
+        clusterService.addListener(listener);
+    }
+
+    private void clusterChanged(ClusterChangedEvent event) {
+        if (upgraded.get()) {
+            return;
+        }
+
+        if (event.localNodeMaster() && event.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_13_0)) {
+            // only want one doing this at a time, assume it succeeds and reset if not.
+            if (upgraded.compareAndSet(false, true)) {
+                final Executor executor = threadPool.generic();
+                executor.execute(() -> maybeUpgradeIndices(event.state()));
+            }
+        }
+    }
+
+    private void maybeUpgradeIndices(ClusterState state) {
+        // 99% of the time, this will be a noop, so precheck that before adding a cluster state update.
+        if (needsUpgrade(state)) {
+            logger.info("Upgrading partial searchable snapshots to use frozen shard limit group");
+            clusterService.submitStateUpdateTask("searchable-snapshot-index-upgrader", new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    return upgradeIndices(currentState);
+                }
+
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    clusterService.removeListener(listener);
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    logger.warn(
+                        "upgrading frozen indices to have frozen shard limit group failed, will retry on the next cluster state update",
+                        e
+                    );
+                    // let us try again later.
+                    upgraded.set(false);
+                }
+            });
+        } else {
+            clusterService.removeListener(listener);
+        }
+    }
+
+    static boolean needsUpgrade(ClusterState state) {
+        return StreamSupport.stream(state.metadata().spliterator(), false)
+            .filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_7_13_0))
+            .map(IndexMetadata::getSettings)
+            .filter(SearchableSnapshotsConstants::isPartialSearchableSnapshotIndex)
+            .anyMatch(SearchableSnapshotIndexMetadataUpgrader::notFrozenShardLimitGroup);
+    }
+
+    static ClusterState upgradeIndices(ClusterState currentState) {
+        if (needsUpgrade(currentState) == false) {
+            return currentState;
+        }
+        Metadata.Builder builder = Metadata.builder(currentState.metadata());
+        StreamSupport.stream(currentState.metadata().spliterator(), false)
+            .filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_8_0_0))
+            .filter(
+                imd -> SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(imd.getSettings())
+                    && notFrozenShardLimitGroup(imd.getSettings())
+            )
+            .map(SearchableSnapshotIndexMetadataUpgrader::setShardLimitGroupFrozen)
+            .forEach(imd -> builder.put(imd, true));
+        return ClusterState.builder(currentState).metadata(builder).build();
+    }
+
+    private static boolean notFrozenShardLimitGroup(org.elasticsearch.common.settings.Settings settings) {
+        return ShardLimitValidator.FROZEN_GROUP.equals(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings)) == false;
+    }
+
+    private static IndexMetadata setShardLimitGroupFrozen(IndexMetadata indexMetadata) {
+        return IndexMetadata.builder(indexMetadata)
+            .settings(
+                Settings.builder()
+                    .put(indexMetadata.getSettings())
+                    .put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
+            )
+            .settingsVersion(indexMetadata.getSettingsVersion() + 1)
+            .build();
+    }
+}

+ 159 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/upgrade/SearchableSnapshotIndexMetadataUpgraderTests.java

@@ -0,0 +1,159 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.searchablesnapshots.upgrade;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.indices.ShardLimitValidator;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
+
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class SearchableSnapshotIndexMetadataUpgraderTests extends ESTestCase {
+
+    public void testNoUpgradeNeeded() {
+        Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
+        assertThat(needsUpgrade(metadataBuilder), is(false));
+    }
+
+    public void testNeedsUpgrade() {
+        Metadata.Builder metadataBuilder = addIndex(
+            partial_7_12(),
+            randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
+        );
+        assertThat(needsUpgrade(metadataBuilder), is(true));
+    }
+
+    public void testUpgradeIndices() {
+        Metadata.Builder metadataBuilder = addIndex(
+            partial_7_12(),
+            randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
+        );
+
+        ClusterState originalState = clusterState(metadataBuilder);
+        ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
+
+        assertThat(upgradedState, not(sameInstance(originalState)));
+        assertThat(upgradedState.metadata().indices().size(), equalTo(originalState.metadata().indices().size()));
+
+        assertTrue(StreamSupport.stream(upgradedState.metadata().spliterator(), false).anyMatch(upgraded -> {
+            IndexMetadata original = originalState.metadata().index(upgraded.getIndex());
+            assertThat(original, notNullValue());
+            if (isPartial(upgraded) == false
+                || ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(original.getSettings())
+                    .equals(ShardLimitValidator.FROZEN_GROUP)) {
+                assertThat(upgraded, sameInstance(original));
+                return false;
+            } else {
+                assertThat(isPartial(upgraded), is(isPartial(original)));
+                assertThat(upgraded.getNumberOfShards(), equalTo(original.getNumberOfShards()));
+                assertThat(upgraded.getNumberOfReplicas(), equalTo(original.getNumberOfReplicas()));
+                assertThat(
+                    ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(upgraded.getSettings()),
+                    equalTo(ShardLimitValidator.FROZEN_GROUP)
+                );
+                assertThat(upgraded.getSettingsVersion(), equalTo(original.getSettingsVersion() + 1));
+                return true;
+            }
+        }));
+    }
+
+    public void testNoopUpgrade() {
+        Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
+        ClusterState originalState = clusterState(metadataBuilder);
+        ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
+        assertThat(upgradedState, sameInstance(originalState));
+    }
+
+    private Settings normal() {
+        return settings(VersionUtils.randomVersion(random())).build();
+    }
+
+    private Settings partial_7_12() {
+        return searchableSnapshotSettings(VersionUtils.randomVersionBetween(random(), Version.V_7_12_0, Version.V_7_12_1), true);
+    }
+
+    private Settings partial_7_13plus() {
+        Settings settings = searchableSnapshotSettings(
+            VersionUtils.randomVersionBetween(random(), Version.V_7_13_0, Version.CURRENT),
+            true
+        );
+        if (randomBoolean()) {
+            return shardLimitGroupFrozen(settings);
+        } else {
+            return settings;
+        }
+    }
+
+    private Settings full() {
+        return searchableSnapshotSettings(VersionUtils.randomVersion(random()), false);
+    }
+
+    private Settings searchableSnapshotSettings(Version version, boolean partial) {
+        Settings.Builder settings = settings(version);
+        settings.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY);
+        if (partial || randomBoolean()) {
+            settings.put(SearchableSnapshotsConstants.SNAPSHOT_PARTIAL_SETTING.getKey(), partial);
+        }
+        return settings.build();
+    }
+
+    private Settings shardLimitGroupFrozen(Settings settings) {
+        return Settings.builder()
+            .put(settings)
+            .put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
+            .build();
+    }
+
+    private Metadata.Builder addIndex(Settings settings, Metadata.Builder builder) {
+        builder.put(
+            IndexMetadata.builder(randomAlphaOfLength(10))
+                .settings(settings)
+                .numberOfShards(between(1, 10))
+                .numberOfReplicas(between(0, 10))
+                .build(),
+            false
+        );
+        return builder;
+    }
+
+    private Metadata.Builder randomMetadata(Settings... indexSettingsList) {
+        Metadata.Builder builder = new Metadata.Builder();
+        for (Settings settings : indexSettingsList) {
+            for (int i = 0; i < between(0, 10); ++i) {
+                addIndex(settings, builder);
+            }
+        }
+        return builder;
+    }
+
+    private boolean needsUpgrade(Metadata.Builder metadataBuilder) {
+        return SearchableSnapshotIndexMetadataUpgrader.needsUpgrade(clusterState(metadataBuilder));
+    }
+
+    private ClusterState clusterState(Metadata.Builder metadataBuilder) {
+        return ClusterState.builder(ClusterName.DEFAULT).metadata(metadataBuilder).build();
+    }
+
+    private boolean isPartial(IndexMetadata upgraded) {
+        return SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(upgraded.getSettings());
+    }
+}

+ 10 - 0
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.indices.ShardLimitValidator;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.rest.RestStatus;
 import org.hamcrest.Matcher;
@@ -31,6 +32,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.notNullValue;
 
 public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase {
@@ -46,6 +48,14 @@ public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase
         final Storage storage = Storage.SHARED_CACHE;
         assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE);
 
+        if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) {
+            assertBusy(() -> {
+                Map<String, Object> settings = getIndexSettingsAsMap("mounted_index_shared_cache");
+                assertThat(settings,
+                    hasEntry(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP));
+            });
+        }
+
         executeMountAndRecoversCorrectlyTestCase(storage, 5678L);
     }