Pārlūkot izejas kodu

Fix Snapshot + DataStream RollOver Bugs (#68258)

There were a number of issues around data streams rolling over during
a snapshot that are fixed in this PR:

1. If partial snapshots cause a data stream to not be fully snapshotted
because an index gets concurrently deleted we must not add it to the
resulting cluster state (otherwise we trip assertions on either snapshot itself
or a later restore, depending on whether or not the complete global state is
snapshotted).
2. When a non-partial snapshot is running we must not allow a datastream rollover,
otherwise we cannot finish the datastream snapshot correctly because the newly
created write index has not become part of the snapshot.
3. If any part of a datastream fails snapshotting or gets concurrently deleted during
a partial snapshot we must not add it to `SnapshotInfo` same as we do not add concurrently
deleted indices to it.
Armin Braun 4 gadi atpakaļ
vecāks
revīzija
0abb33091b

+ 15 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -39,8 +39,11 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.snapshots.SnapshotInProgressException;
+import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -207,6 +210,18 @@ public class MetadataRolloverService {
     private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
                                               CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
                                               boolean silent, boolean onlyValidate) throws Exception {
+
+        if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
+            // we can't roll over the snapshot concurrently because the snapshot contains the indices that existed when it was started but
+            // the cluster metadata of when it completes so the new write index would not exist in the snapshot if there was a concurrent
+            // rollover
+            throw new SnapshotInProgressException(
+                    "Cannot roll over data stream that is being snapshotted: "
+                            + dataStream.getName()
+                            + ". Try again after snapshot finishes or cancel the currently running snapshot."
+            );
+        }
+
         lookupTemplateForDataStream(dataStreamName, currentState.metadata());
 
         final Version minNodeVersion = currentState.nodes().getMinNodeVersion();

+ 41 - 21
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -678,15 +678,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         } else {
             builder = Metadata.builder(metadata);
         }
-        // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation
-        Map<String, DataStream> dataStreams = new HashMap<>();
+        // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have
+        // all their indices contained in the snapshot
+        final Map<String, DataStream> dataStreams = new HashMap<>();
+        final Set<String> indicesInSnapshot = snapshot.indices().stream().map(IndexId::getName).collect(Collectors.toSet());
         for (String dataStreamName : snapshot.dataStreams()) {
             DataStream dataStream = metadata.dataStreams().get(dataStreamName);
             if (dataStream == null) {
                 assert snapshot.partial() : "Data stream [" + dataStreamName +
                         "] was deleted during a snapshot but snapshot was not partial.";
             } else {
-                dataStreams.put(dataStreamName, dataStream);
+                boolean missingIndex = false;
+                for (Index index : dataStream.getIndices()) {
+                    final String indexName = index.getName();
+                    if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) {
+                        assert snapshot.partial() : "Data stream [" + dataStreamName +
+                                "] is missing index [" + index + "] but snapshot was not partial.";
+                        missingIndex = true;
+                        break;
+                    }
+                }
+                if (missingIndex == false) {
+                    dataStreams.put(dataStreamName, dataStream);
+                }
             }
         }
         return builder.dataStreams(dataStreams).build();
@@ -1158,12 +1172,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             }
             final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
             final String repository = snapshot.getRepository();
-            final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshot.getSnapshotId(),
-                shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
-                entry.dataStreams(),
-                entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
-                entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
-                entry.includeGlobalState(), entry.userMetadata());
             final StepListener<Metadata> metadataListener = new StepListener<>();
             final Repository repo = repositoriesService.repository(snapshot.getRepository());
             if (entry.isClone()) {
@@ -1190,18 +1198,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             } else {
                 metadataListener.onResponse(metadata);
             }
-            metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
-                    shardGenerations,
-                    repositoryData.getGenId(),
-                    metadataForSnapshot(entry, meta),
-                    snapshotInfo,
-                    entry.version(),
-                    state -> stateWithoutSnapshot(state, snapshot),
-                    ActionListener.wrap(newRepoData -> {
-                        completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
-                        logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
-                        runNextQueuedOperation(newRepoData, repository, true);
-                    }, e -> handleFinalizationFailure(e, entry, repositoryData))),
+            metadataListener.whenComplete(meta -> {
+                        final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
+                        final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshot.getSnapshotId(),
+                                shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
+                                entry.partial() ? entry.dataStreams().stream()
+                                        .filter(metaForSnapshot.dataStreams()::containsKey)
+                                        .collect(Collectors.toList()) : entry.dataStreams(),
+                                entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
+                                entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
+                                entry.includeGlobalState(), entry.userMetadata());
+                        repo.finalizeSnapshot(
+                                shardGenerations,
+                                repositoryData.getGenId(),
+                                metaForSnapshot,
+                                snapshotInfo,
+                                entry.version(),
+                                state -> stateWithoutSnapshot(state, snapshot),
+                                ActionListener.wrap(newRepoData -> {
+                                    completeListenersIgnoringException(
+                                            endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
+                                    logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
+                                    runNextQueuedOperation(newRepoData, repository, true);
+                                }, e -> handleFinalizationFailure(e, entry, repositoryData)));
+                    },
                     e -> handleFinalizationFailure(e, entry, repositoryData));
         } catch (Exception e) {
             assert false : new AssertionError(e);

+ 53 - 4
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -11,9 +11,16 @@ import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
+import org.elasticsearch.snapshots.RestoreInfo;
 import org.elasticsearch.snapshots.SnapshotInProgressException;
+import org.elasticsearch.snapshots.SnapshotInfo;
+import org.elasticsearch.snapshots.SnapshotRestoreException;
+import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
 import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
 import org.elasticsearch.xpack.core.action.GetDataStreamAction;
@@ -25,10 +32,6 @@ import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
-import org.elasticsearch.snapshots.SnapshotInfo;
-import org.elasticsearch.snapshots.SnapshotRestoreException;
-import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
 import org.hamcrest.Matchers;
@@ -489,6 +492,52 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
                 .setIndices(indexWithoutDataStream)
                 .get()
         );
+    }
+
+    public void testSnapshotDSDuringRollover() throws Exception {
+        // repository consistency check requires at least one snapshot per registered repository
+        createFullSnapshot(REPO, "snap-so-repo-checks-pass");
+        final String repoName = "mock-repo";
+        createRepository(repoName, "mock");
+        final boolean partial = randomBoolean();
+        blockAllDataNodes(repoName);
+        final String snapshotName = "ds-snap";
+        final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
+            .cluster()
+            .prepareCreateSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .setPartial(partial)
+            .setIncludeGlobalState(randomBoolean())
+            .execute();
+        waitForBlockOnAnyDataNode(repoName);
+        awaitNumberOfSnapshotsInProgress(1);
+        final ActionFuture<RolloverResponse> rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null));
+
+        if (partial) {
+            assertTrue(rolloverResponse.get().isRolledOver());
+        } else {
+            SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class, rolloverResponse::actionGet);
+            assertThat(e.getMessage(), containsString("Cannot roll over data stream that is being snapshotted:"));
+        }
+        unblockAllDataNodes(repoName);
+        final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
+
+        if (snapshotInfo.dataStreams().contains("ds")) {
+            assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
 
+            RestoreInfo restoreSnapshotResponse = client().admin()
+                .cluster()
+                .prepareRestoreSnapshot(repoName, snapshotName)
+                .setWaitForCompletion(true)
+                .setIndices("ds")
+                .get()
+                .getRestoreInfo();
+
+            assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
+            assertEquals(restoreSnapshotResponse.failedShards(), 0);
+            assertFalse(partial);
+        } else {
+            assertTrue(partial);
+        }
     }
 }