Browse Source

Fix privileges for system index migration WRITE block (#121329)

This PR removes a potential cause of data loss when migrating system indices. It does this by changing the way we set a "write-block" on the system index to migrate - now using a dedicated transport request rather than a settings update. Furthermore, we no longer delete the write-block prior to deleting the index, as this was another source of potential data loss. Additionally, we now remove the block if the migration fails.
John Verwolf 8 months ago
parent
commit
3b238be259

+ 0 - 5
docs/changelog/121120.yaml

@@ -1,5 +0,0 @@
-pr: 121120
-summary: Revert "Reduce Data Loss in System Indices Migration 8x"
-area: Infra/Core
-type: bug
-issues: []

+ 36 - 1
modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java

@@ -9,14 +9,17 @@
 
 package org.elasticsearch.migration;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.ActionFilter;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.indices.AssociatedIndexDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -50,6 +54,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.unmodifiableSet;
+import static org.elasticsearch.common.util.set.Sets.newHashSet;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.equalTo;
@@ -267,12 +275,18 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase
         assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
     }
 
-    public static class TestPlugin extends Plugin implements SystemIndexPlugin {
+    public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
         public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
         public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
+        private final BlockingActionFilter blockingActionFilter;
 
         public TestPlugin() {
+            blockingActionFilter = new BlockingActionFilter();
+        }
 
+        @Override
+        public List<ActionFilter> getActionFilters() {
+            return singletonList(blockingActionFilter);
         }
 
         @Override
@@ -311,5 +325,26 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase
             postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
             listener.onResponse(true);
         }
+
+        public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
+            private Set<String> blockedActions = emptySet();
+
+            @Override
+            protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
+                if (blockedActions.contains(action)) {
+                    throw new ElasticsearchException("force exception on [" + action + "]");
+                }
+                return true;
+            }
+
+            @Override
+            public int order() {
+                return 0;
+            }
+
+            public void blockActions(String... actions) {
+                blockedActions = unmodifiableSet(newHashSet(actions));
+            }
+        }
     }
 }

+ 59 - 0
modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

@@ -17,11 +17,14 @@ import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeAction
 import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
 import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
 import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.support.ActionFilter;
+import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -36,10 +39,12 @@ import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
 import org.elasticsearch.painless.PainlessPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.upgrades.FeatureMigrationResults;
 import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
 
@@ -274,6 +279,60 @@ public class FeatureMigrationIT extends AbstractFeatureMigrationIntegTest {
         });
     }
 
+    @AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
+    public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
+        createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
+        ensureGreen();
+
+        // Block the alias request to simulate a failure
+        InternalTestCluster internalTestCluster = internalCluster();
+        ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
+        BlockingActionFilter blockingActionFilter = null;
+        for (ActionFilter filter : actionFilters.filters()) {
+            if (filter instanceof BlockingActionFilter) {
+                blockingActionFilter = (BlockingActionFilter) filter;
+                break;
+            }
+        }
+        assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
+        blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);
+
+        // Start the migration
+        client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
+
+        // Wait till the migration fails
+        assertBusy(() -> {
+            GetFeatureUpgradeStatusResponse statusResp = client().execute(
+                GetFeatureUpgradeStatusAction.INSTANCE,
+                new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
+            ).get();
+            logger.info(Strings.toString(statusResp));
+            assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
+        });
+
+        // Get the settings to see if the write block was removed
+        var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
+        var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
+        var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
+        assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));
+
+        // Unblock the alias request
+        blockingActionFilter.blockActions();
+
+        // Retry the migration
+        client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
+
+        // Ensure that the migration is successful after the alias request is unblocked
+        assertBusy(() -> {
+            GetFeatureUpgradeStatusResponse statusResp = client().execute(
+                GetFeatureUpgradeStatusAction.INSTANCE,
+                new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
+            ).get();
+            logger.info(Strings.toString(statusResp));
+            assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
+        });
+    }
+
     public void testMigrationWillRunAfterError() throws Exception {
         createSystemIndexForDescriptor(INTERNAL_MANAGED);
 

+ 18 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java

@@ -77,6 +77,17 @@ public class IndicesAliasesResponse extends AcknowledgedResponse {
         return errors;
     }
 
+    /**
+     * Get a list of all errors from the response. If there are no errors, an empty list is returned.
+     */
+    public List<ElasticsearchException> getErrors() {
+        if (errors == false) {
+            return List.of();
+        } else {
+            return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
+        }
+    }
+
     /**
      *  Build a response from a list of action results. Sets the errors boolean based
      *  on whether an of the individual results contain an error.
@@ -165,6 +176,13 @@ public class IndicesAliasesResponse extends AcknowledgedResponse {
             return new AliasActionResult(indices, action, null);
         }
 
+        /**
+         * The error result if the action failed, null if the action succeeded.
+         */
+        public ElasticsearchException getError() {
+            return error;
+        }
+
         private int getStatus() {
             return error == null ? 200 : error.status().getStatus();
         }

+ 62 - 30
server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

@@ -15,7 +15,9 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -31,7 +33,6 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
 import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -58,6 +59,7 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
 import static org.elasticsearch.core.Strings.format;
 
@@ -447,12 +449,33 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
                                     logAndThrowExceptionForFailures(bulkByScrollResponse)
                                 );
                             } else {
-                                // Successful completion of reindexing - remove read only and delete old index
-                                setWriteBlock(
-                                    oldIndex,
-                                    false,
-                                    delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
-                                );
+                                // Successful completion of reindexing. Now we need to set the alias and remove the old index.
+                                setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
+                                    if (aliasesResponse.hasErrors()) {
+                                        var e = new ElasticsearchException("Aliases request had errors");
+                                        for (var error : aliasesResponse.getErrors()) {
+                                            e.addSuppressed(error);
+                                        }
+                                        throw e;
+                                    }
+                                    logger.info(
+                                        "Successfully migrated old index [{}] to new index [{}] from feature [{}]",
+                                        oldIndexName,
+                                        migrationInfo.getNextIndexName(),
+                                        migrationInfo.getFeatureName()
+                                    );
+                                    delegate2.onResponse(bulkByScrollResponse);
+                                }, e -> {
+                                    logger.error(
+                                        () -> format(
+                                            "An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
+                                            oldIndexName,
+                                            migrationInfo.getFeatureName()
+                                        ),
+                                        e
+                                    );
+                                    removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
+                                }));
                             }
                         }, e -> {
                             logger.error(
@@ -504,10 +527,7 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
         metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
     }
 
-    private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
-        SystemIndexMigrationInfo migrationInfo,
-        BulkByScrollResponse bulkByScrollResponse
-    ) {
+    private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
         final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
         aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
         aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
@@ -526,30 +546,42 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
             );
         });
 
-        // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
-        // while we're trying to migrate them.
-        return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
-            listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
-        );
+        aliasesRequest.execute(listener);
     }
 
     /**
-     * Makes the index readonly if it's not set as a readonly yet
+     * Sets the write block on the index to the given value.
      */
     private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
-        final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();
-
-        metadataUpdateSettingsService.updateSettings(
-            new UpdateSettingsClusterStateUpdateRequest(
-                TimeValue.MINUS_ONE,
-                TimeValue.ZERO,
-                readOnlySettings,
-                UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
-                UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
-                index
-            ),
-            listener
-        );
+        if (readOnlyValue) {
+            // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
+            // in-flight writes are completed before returning.
+            baseClient.admin()
+                .indices()
+                .addBlock(
+                    new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(TimeValue.MINUS_ONE),
+                    listener.delegateFailureAndWrap((l, response) -> {
+                        if (response.isAcknowledged() == false) {
+                            throw new ElasticsearchException("Failed to acknowledge read-only block index request");
+                        }
+                        l.onResponse(response);
+                    })
+                );
+        } else {
+            // The only way to remove a Block is via a settings update.
+            final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
+            metadataUpdateSettingsService.updateSettings(
+                new UpdateSettingsClusterStateUpdateRequest(
+                    TimeValue.MINUS_ONE,
+                    TimeValue.ZERO,
+                    readOnlySettings,
+                    UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
+                    UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
+                    index
+                ),
+                listener
+            );
+        }
     }
 
     private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.security.authz.privilege;
 
+import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
 import org.elasticsearch.action.search.TransportSearchShardsAction;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
 import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
@@ -38,12 +39,13 @@ public final class SystemPrivilege extends Privilege {
         RetentionLeaseActions.ADD.name() + "*", // needed for CCR to add retention leases
         RetentionLeaseActions.REMOVE.name() + "*", // needed for CCR to remove retention leases
         RetentionLeaseActions.RENEW.name() + "*", // needed for CCR to renew retention leases
-        "indices:admin/settings/update", // needed for DiskThresholdMonitor.markIndicesReadOnly
+        "indices:admin/settings/update", // needed for: DiskThresholdMonitor.markIndicesReadOnly, SystemIndexMigrator
         CompletionPersistentTaskAction.NAME, // needed for ShardFollowTaskCleaner
         "indices:data/write/*", // needed for SystemIndexMigrator
         "indices:data/read/*", // needed for SystemIndexMigrator
         "indices:admin/refresh", // needed for SystemIndexMigrator
         "indices:admin/aliases", // needed for SystemIndexMigrator
+        TransportAddIndexBlockAction.TYPE.name() + "*", // needed for SystemIndexMigrator
         TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs
         ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials
     );