Преглед на файлове

Improve Partial Snapshot Rollover Behavior (#69364)

Using new reconciliation functionality to not needlessly drop rolling over
data streams from the final snapshot.

closes #68536
Armin Braun преди 4 години
родител
ревизия
d334e3bef1

+ 9 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -24,6 +24,7 @@ import org.elasticsearch.index.Index;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -177,15 +178,19 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
      * stream definitions that do not reference backing indices not contained in the snapshot.
      *
      * @param indicesInSnapshot List of indices in the snapshot
-     * @return Reconciled {@link DataStream} instance
+     * @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
+     *         given indices
      */
-    public DataStream snapshot(List<String> indicesInSnapshot) {
+    @Nullable
+    public DataStream snapshot(Collection<String> indicesInSnapshot) {
         // do not include indices not available in the snapshot
         List<Index> reconciledIndices = new ArrayList<>(this.indices);
-        reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
+        if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
+            return this;
+        }
 
         if (reconciledIndices.size() == 0) {
-            throw new IllegalArgumentException("cannot reconcile data stream without at least one backing index");
+            return null;
         }
 
         return new DataStream(

+ 3 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -792,8 +792,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         break;
                     }
                 }
-                if (missingIndex == false) {
-                    dataStreams.put(dataStreamName, dataStream);
+                final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream;
+                if (reconciled != null) {
+                    dataStreams.put(dataStreamName, reconciled);
                 }
             }
         }

+ 2 - 8
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -24,7 +24,6 @@ import java.util.Locale;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasItems;
@@ -221,12 +220,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
             preSnapshotDataStream.isReplicated()
         );
 
-        IllegalArgumentException e = expectThrows(
-            IllegalArgumentException.class,
-            () -> postSnapshotDataStream.snapshot(
-                preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())
-            )
-        );
-        assertThat(e.getMessage(), containsString("cannot reconcile data stream without at least one backing index"));
+        assertNull(postSnapshotDataStream.snapshot(
+                preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())));
     }
 }

+ 59 - 15
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -51,7 +51,9 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 
 public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
@@ -541,22 +543,64 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         unblockAllDataNodes(repoName);
         final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
 
-        if (snapshotInfo.dataStreams().contains("ds")) {
-            assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
+        assertThat(snapshotInfo.dataStreams(), hasItems("ds"));
+        assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
 
-            RestoreInfo restoreSnapshotResponse = client().admin()
-                .cluster()
-                .prepareRestoreSnapshot(repoName, snapshotName)
-                .setWaitForCompletion(true)
-                .setIndices("ds")
-                .get()
-                .getRestoreInfo();
+        RestoreInfo restoreSnapshotResponse = client().admin()
+            .cluster()
+            .prepareRestoreSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .setIndices("ds")
+            .get()
+            .getRestoreInfo();
 
-            assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
-            assertEquals(restoreSnapshotResponse.failedShards(), 0);
-            assertFalse(partial);
-        } else {
-            assertTrue(partial);
-        }
+        assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
+        assertEquals(restoreSnapshotResponse.failedShards(), 0);
+    }
+
+    public void testSnapshotDSDuringRolloverAndDeleteOldIndex() throws Exception {
+        // repository consistency check requires at least one snapshot per registered repository
+        createFullSnapshot(REPO, "snap-so-repo-checks-pass");
+        final String repoName = "mock-repo";
+        createRepository(repoName, "mock");
+        blockAllDataNodes(repoName);
+        final String snapshotName = "ds-snap";
+        final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
+            .cluster()
+            .prepareCreateSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .setPartial(true)
+            .setIncludeGlobalState(randomBoolean())
+            .execute();
+        waitForBlockOnAnyDataNode(repoName);
+        awaitNumberOfSnapshotsInProgress(1);
+        final RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null)).get();
+        assertTrue(rolloverResponse.isRolledOver());
+
+        logger.info("--> deleting former write index");
+        assertAcked(client().admin().indices().prepareDelete(rolloverResponse.getOldIndex()));
+
+        unblockAllDataNodes(repoName);
+        final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
+
+        assertThat(
+            "snapshot should not contain 'ds' since none of its indices existed both at the start and at the end of the snapshot",
+            snapshotInfo.dataStreams(),
+            not(hasItems("ds"))
+        );
+        assertAcked(
+            client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "other-ds" })).get()
+        );
+
+        RestoreInfo restoreSnapshotResponse = client().admin()
+            .cluster()
+            .prepareRestoreSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .setIndices("other-ds")
+            .get()
+            .getRestoreInfo();
+
+        assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
+        assertEquals(restoreSnapshotResponse.failedShards(), 0);
     }
 }