Browse Source

Bug Fix: System Data Streams Should Be Restorable (#124651)

This PR adds a new MetadataDeleteDataStreamService that allows us to delete system data streams prior to a restore operation.  This fixes a bug where system data streams were previously un-restorable.
John Verwolf 7 months ago
parent
commit
cb3c35783b

+ 5 - 0
docs/changelog/124651.yaml

@@ -0,0 +1,5 @@
+pr: 124651
+summary: "Fix system data streams to be restorable from a snapshot"
+area: Infra/Core
+type: bug
+issues: [89261]

+ 7 - 33
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java

@@ -21,9 +21,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
 import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -32,11 +31,8 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.injection.guice.Inject;
-import org.elasticsearch.snapshots.SnapshotInProgressException;
-import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -46,6 +42,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.datastreams.DataStreamsActionUtil.getDataStreamNames;
 
@@ -155,34 +152,11 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
             }
         }
 
-        Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreams);
-        if (snapshottingDataStreams.isEmpty() == false) {
-            throw new SnapshotInProgressException(
-                "Cannot delete data streams that are being snapshotted: "
-                    + snapshottingDataStreams
-                    + ". Try again after snapshot finishes or cancel the currently running snapshot."
-            );
-        }
-
-        Set<Index> backingIndicesToRemove = new HashSet<>();
-        for (String dataStreamName : dataStreams) {
-            DataStream dataStream = project.dataStreams().get(dataStreamName);
-            assert dataStream != null;
-            backingIndicesToRemove.addAll(dataStream.getIndices());
-            backingIndicesToRemove.addAll(dataStream.getFailureIndices());
-        }
-
-        // first delete the data streams and then the indices:
-        // (this to avoid data stream validation from failing when deleting an index that is part of a data stream
-        // without updating the data stream)
-        // TODO: change order when delete index api also updates the data stream the index to be removed is member of
-        ClusterState newState = projectState.updatedState(builder -> {
-            for (String ds : dataStreams) {
-                LOGGER.info("removing data stream [{}]", ds);
-                builder.removeDataStream(ds);
-            }
-        });
-        return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings);
+        return MetadataDataStreamsService.deleteDataStreams(
+            projectState,
+            dataStreams.stream().map(project.dataStreams()::get).collect(Collectors.toSet()),
+            settings
+        );
     }
 
     @Override

+ 1 - 0
server/build.gradle

@@ -80,6 +80,7 @@ dependencies {
   }
   internalClusterTestImplementation(project(':modules:reindex'))
   internalClusterTestImplementation(project(':modules:mapper-extras'))
+  internalClusterTestImplementation(project(':modules:data-streams'))
 }
 
 spotless {

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/AutoCreateSystemIndexIT.java

@@ -28,7 +28,7 @@ import org.elasticsearch.indices.TestSystemIndexDescriptorAllowsTemplates;
 import org.elasticsearch.indices.TestSystemIndexPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
-import org.elasticsearch.snapshots.SystemIndicesSnapshotIT;
+import org.elasticsearch.snapshots.SystemResourceSnapshotIT;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xcontent.XContentType;
 import org.junit.After;
@@ -300,7 +300,7 @@ public class AutoCreateSystemIndexIT extends ESIntegTestCase {
 
         @Override
         public String getFeatureName() {
-            return SystemIndicesSnapshotIT.SystemIndexTestPlugin.class.getSimpleName();
+            return SystemResourceSnapshotIT.SystemIndexTestPlugin.class.getSimpleName();
         }
 
         @Override

+ 464 - 32
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemIndicesSnapshotIT.java → server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java

@@ -10,12 +10,21 @@
 package org.elasticsearch.snapshots;
 
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
 import org.elasticsearch.indices.AssociatedIndexDescriptor;
+import org.elasticsearch.indices.ExecutorNames;
+import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptorUtils;
 import org.elasticsearch.plugins.Plugin;
@@ -24,10 +33,12 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.junit.Before;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -41,9 +52,10 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.oneOf;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
-public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
+public class SystemResourceSnapshotIT extends AbstractSnapshotIntegTestCase {
 
     public static final String REPO_NAME = "test-repo";
 
@@ -55,6 +67,11 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         plugins.add(SystemIndexTestPlugin.class);
         plugins.add(AnotherSystemIndexTestPlugin.class);
         plugins.add(AssociatedIndicesTestPlugin.class);
+        plugins.add(DataStreamsPlugin.class);
+        plugins.add(AnotherSystemDataStreamTestPlugin.class);
+        plugins.add(SystemDataStreamTestPlugin.class);
+        plugins.add(SystemDataStreamManyShardsTestPlugin.class);
+        plugins.add(AssociatedIndicesSystemDSTestPlugin.class);
         return plugins;
     }
 
@@ -70,16 +87,18 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
      */
     public void testRestoreSystemIndicesAsGlobalState() {
         createRepository(REPO_NAME, "fs");
-        // put a document in a system index
+        // put a document in a system index and data stream
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         // run a snapshot including global state
         createFullSnapshot(REPO_NAME, "test-snap");
 
-        // add another document
+        // add another document to each system resource
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
 
@@ -91,8 +110,9 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         ).setWaitForCompletion(true).setRestoreGlobalState(true).get();
         assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
 
-        // verify only the original document is restored
+        // verify only the original documents are restored
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
     }
 
     /**
@@ -101,6 +121,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
     public void testSnapshotWithoutGlobalState() {
         createRepository(REPO_NAME, "fs");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "system index doc");
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
         indexDoc("not-a-system-index", "1", "purpose", "non system index doc");
 
         // run a snapshot without global state
@@ -122,6 +143,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         assertThat("not-a-system-index", in(snapshottedIndices));
         assertThat(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, not(in(snapshottedIndices)));
+        assertThat(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, not(in(snapshottedIndices)));
     }
 
     /**
@@ -131,23 +153,44 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         createRepository(REPO_NAME, "fs");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
         indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(
+            SystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME,
+            AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME
+        );
 
         // snapshot by feature
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
             .setIncludeGlobalState(true)
             .setWaitForCompletion(true)
-            .setFeatureStates(SystemIndexTestPlugin.class.getSimpleName(), AnotherSystemIndexTestPlugin.class.getSimpleName())
+            .setFeatureStates(
+                SystemIndexTestPlugin.class.getSimpleName(),
+                AnotherSystemIndexTestPlugin.class.getSimpleName(),
+                SystemDataStreamTestPlugin.class.getSimpleName(),
+                AnotherSystemDataStreamTestPlugin.class.getSimpleName()
+            )
             .get();
         assertSnapshotSuccess(createSnapshotResponse);
 
         // add some other documents
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
         indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(
+            SystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME,
+            AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME
+        );
 
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
         assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
+        assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
 
         // restore indices as global state without closing the index
         RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
@@ -160,6 +203,8 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         // verify only the original document is restored
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
+        assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
     }
 
     /**
@@ -175,7 +220,8 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         indexDoc(regularIndex, "1", "purpose", "create an index that can be restored");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         // snapshot including global state
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
@@ -193,7 +239,11 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         assertThat(restoreResponse.getRestoreInfo().totalShards(), greaterThan(0));
         assertThat(
             restoreResponse.getRestoreInfo().indices(),
-            allOf(hasItem(regularIndex), not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME)))
+            allOf(
+                hasItem(regularIndex),
+                not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME)),
+                not(hasItem(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME))
+            )
         );
     }
 
@@ -207,7 +257,15 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         indexDoc(regularIndex, "1", "purpose", "create an index that can be restored");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
         indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(regularIndex, SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(
+            regularIndex,
+            SystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME,
+            AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME
+        );
 
         // snapshot including global state
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
@@ -219,10 +277,19 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         // add some other documents
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
         indexDoc(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME);
-
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        indexDataStream(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(
+            regularIndex,
+            SystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME,
+            SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME,
+            AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME
+        );
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
         assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
+        assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
 
         // Delete the regular index so we can restore it
         assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex));
@@ -232,14 +299,16 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
             TEST_REQUEST_TIMEOUT,
             REPO_NAME,
             "test-snap"
-        ).setWaitForCompletion(true).setFeatureStates("SystemIndexTestPlugin").get();
+        ).setWaitForCompletion(true).setFeatureStates("SystemIndexTestPlugin", "SystemDataStreamTestPlugin").get();
         assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
 
-        // verify that the restored system index has only one document
+        // verify that the restored system index and data stream each only have one document
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
 
-        // but the non-requested feature should still have its new document
+        // but the non-requested features should still have their new documents
         assertThat(getDocCount(AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
     }
 
     /**
@@ -254,36 +323,58 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc");
         indexDoc(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
         indexDoc(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME);
+        indexDataStream(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        indexDoc(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
+
+        refresh(
+            regularIndex,
+            AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME,
+            AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME,
+            AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME,
+            AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME
+        );
 
         // snapshot
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
-            .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName())
+            .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName(), AssociatedIndicesSystemDSTestPlugin.class.getSimpleName())
             .setWaitForCompletion(true)
             .get();
         assertSnapshotSuccess(createSnapshotResponse);
 
         // verify the correctness of the snapshot
-        Set<String> snapshottedIndices = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME)
-            .get()
-            .getSnapshots()
+        var snapshotsResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME).get();
+        Set<String> snapshottedIndices = snapshotsResponse.getSnapshots()
             .stream()
             .map(SnapshotInfo::indices)
             .flatMap(Collection::stream)
             .collect(Collectors.toSet());
+        Set<String> snapshottedDataStreams = snapshotsResponse.getSnapshots()
+            .stream()
+            .map(SnapshotInfo::dataStreams)
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
         assertThat(snapshottedIndices, hasItem(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME));
         assertThat(snapshottedIndices, hasItem(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME));
+        assertThat(snapshottedDataStreams, hasItem(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME));
+        assertThat(snapshottedIndices, hasItem(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME));
 
         // add some other documents
         indexDoc(regularIndex, "2", "purpose", "post-snapshot doc");
         indexDoc(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(regularIndex, AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME, AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         assertThat(getDocCount(regularIndex), equalTo(2L));
         assertThat(getDocCount(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
 
         // And delete the associated index so we can restore it
-        assertAcked(indicesAdmin().prepareDelete(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME).get());
+        assertAcked(
+            indicesAdmin().prepareDelete(
+                AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME,
+                AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME
+            ).get()
+        );
 
         // restore the feature state and its associated index
         RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
@@ -291,15 +382,17 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
             REPO_NAME,
             "test-snap"
         )
-            .setIndices(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME)
+            .setIndices(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME, AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME)
             .setWaitForCompletion(true)
-            .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName())
+            .setFeatureStates(AssociatedIndicesTestPlugin.class.getSimpleName(), AssociatedIndicesSystemDSTestPlugin.class.getSimpleName())
             .get();
         assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
 
         // verify only the original document is restored
         assertThat(getDocCount(AssociatedIndicesTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
         assertThat(getDocCount(AssociatedIndicesTestPlugin.ASSOCIATED_INDEX_NAME), equalTo(1L));
+        assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
+        assertThat(getDocCount(AssociatedIndicesSystemDSTestPlugin.ASSOCIATED_INDEX_NAME), equalTo(1L));
     }
 
     /**
@@ -308,7 +401,8 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
     public void testRestoreFeatureNotInSnapshot() {
         createRepository(REPO_NAME, "fs");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         // snapshot including global state
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
@@ -322,7 +416,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
             SnapshotRestoreException.class,
             clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
                 .setWaitForCompletion(true)
-                .setFeatureStates("SystemIndexTestPlugin", fakeFeatureStateName)
+                .setFeatureStates("SystemIndexTestPlugin", "SystemDataStreamTestPlugin", fakeFeatureStateName)
         );
 
         assertThat(
@@ -438,7 +532,8 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
     public void testRestoreSystemIndicesAsGlobalStateWithDefaultFeatureStateList() {
         createRepository(REPO_NAME, "fs");
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         // run a snapshot including global state
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
@@ -449,9 +544,11 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         // add another document
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
 
         // restore indices as global state a null list of feature states
         RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
@@ -463,6 +560,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         // verify that the system index is destroyed
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(1L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
     }
 
     /**
@@ -473,8 +571,9 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         createRepository(REPO_NAME, "fs");
         String regularIndex = "my-index";
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "1", "purpose", "pre-snapshot doc");
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
         indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, regularIndex);
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, regularIndex);
 
         // run a snapshot including global state
         CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
@@ -485,10 +584,12 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         // add another document
         indexDoc(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, "2", "purpose", "post-snapshot doc");
-        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME);
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "2", "purpose", "post-snapshot doc");
+        refresh(SystemIndexTestPlugin.SYSTEM_INDEX_NAME, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME);
 
         assertAcked(indicesAdmin().prepareDelete(regularIndex).get());
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
 
         // restore with global state and all indices but explicitly no feature states.
         RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
@@ -500,6 +601,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         // verify that the system index still has the updated document, i.e. has not been restored
         assertThat(getDocCount(SystemIndexTestPlugin.SYSTEM_INDEX_NAME), equalTo(2L));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(2L));
         // And the regular index has been restored
         assertThat(getDocCount(regularIndex), equalTo(1L));
     }
@@ -564,6 +666,8 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         // Create a system index
         final String systemIndexName = SystemIndexTestPlugin.SYSTEM_INDEX_NAME + "-1";
         indexDoc(systemIndexName, "1", "purpose", "pre-snapshot doc");
+        // Create a system data stream
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
 
         // And a regular index
         // And a regular index so we can avoid matching all indices on the restore
@@ -608,6 +712,109 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
 
     }
 
+    public void testSystemDataStreamAliasesAreAlwaysRestored() {
+        createRepository(REPO_NAME, "fs");
+        // Create a system data stream
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+
+        // And a regular index so we can avoid matching all indices on the restore
+        final String regularIndex = "regular-index";
+        final String regularAlias = "regular-alias";
+        indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc");
+
+        // And make sure they both have aliases
+        final String systemDataStreamAlias = SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME + "-alias";
+        assertAcked(
+            indicesAdmin().prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
+                .addAlias(regularIndex, regularAlias)
+                .addAlias(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, systemDataStreamAlias, true)
+                .get()
+        );
+
+        // And add a doc to ensure the alias works
+        indexDataStream(systemDataStreamAlias, "2", "purpose", "post-alias doc");
+
+        // Run a snapshot including global state
+        CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
+            .setWaitForCompletion(true)
+            .setIncludeGlobalState(true)
+            .get();
+        assertSnapshotSuccess(createSnapshotResponse);
+
+        // And delete the regular index and system data stream
+        assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex));
+        assertAcked(
+            client().execute(
+                DeleteDataStreamAction.INSTANCE,
+                new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME)
+            ).actionGet()
+        );
+
+        // Now restore the snapshot with no aliases
+        RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
+            TEST_REQUEST_TIMEOUT,
+            REPO_NAME,
+            "test-snap"
+        )
+            .setFeatureStates("SystemDataStreamTestPlugin")
+            .setWaitForCompletion(true)
+            .setRestoreGlobalState(false)
+            .setIncludeAliases(false)
+            .get();
+        assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
+
+        // The regular index should exist
+        assertTrue(indexExists(regularIndex));
+        assertFalse(indexExists(regularAlias));
+
+        // And the system data stream, queried by alias, should have 2 docs
+        assertTrue(indexExists(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME));
+        assertTrue(indexExists(systemDataStreamAlias));
+        assertThat(getDocCount(systemDataStreamAlias), equalTo(2L));
+    }
+
+    public void testDeletedDatastreamIsRestorable() {
+        createRepository(REPO_NAME, "fs");
+        // Create a system data stream
+        indexDataStream(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME, "1", "purpose", "pre-snapshot doc");
+
+        // And a regular index so we can avoid matching all indices on the restore
+        final String regularIndex = "regular-index";
+        indexDoc(regularIndex, "1", "purpose", "pre-snapshot doc");
+
+        // Run a snapshot including global state
+        CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO_NAME, "test-snap")
+            .setWaitForCompletion(true)
+            .setIncludeGlobalState(true)
+            .get();
+        assertSnapshotSuccess(createSnapshotResponse);
+
+        // And delete the regular index and system data stream
+        assertAcked(cluster().client().admin().indices().prepareDelete(regularIndex));
+        assertAcked(
+            client().execute(
+                DeleteDataStreamAction.INSTANCE,
+                new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME)
+            ).actionGet()
+        );
+
+        // Now restore the snapshot with no aliases
+        RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(
+            TEST_REQUEST_TIMEOUT,
+            REPO_NAME,
+            "test-snap"
+        )
+            .setFeatureStates("SystemDataStreamTestPlugin")
+            .setWaitForCompletion(true)
+            .setRestoreGlobalState(false)
+            .setIncludeAliases(false)
+            .get();
+
+        // And the system data stream, queried by alias, should have 2 docs
+        assertTrue(indexExists(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME));
+        assertThat(getDocCount(SystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME), equalTo(1L));
+    }
+
     /**
      * Tests that the special "none" feature state name cannot be combined with other
      * feature state names, and an error occurs if it's tried.
@@ -691,6 +898,7 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         assertThat(snapshottedIndices, allOf(hasItem(regularIndex), not(hasItem(SystemIndexTestPlugin.SYSTEM_INDEX_NAME))));
     }
 
+    // TODO, Do we need to test this for Datastreams?
     /**
      * Ensures that if we can only capture a partial snapshot of a system index, then the feature state associated with that index is
      * not included in the snapshot, because it would not be safe to restore that feature state.
@@ -744,6 +952,60 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         });
     }
 
+    /**
+     * Ensures that if we can only capture a partial snapshot of a system data stream, then the feature state associated
+     * with that data stream is not included in the snapshot, because it would not be safe to restore that feature state.
+     */
+    public void testPartialSnapshotsOfSystemDataStreamRemovesFeatureState() throws Exception {
+        final String partialIndexName = SystemDataStreamManyShardsTestPlugin.SYSTEM_DATASTREAM_NAME;
+        final String fullIndexName = AnotherSystemDataStreamTestPlugin.SYSTEM_DATASTREAM_NAME;
+
+        createRepositoryNoVerify(REPO_NAME, "mock");
+
+        // Create the index that we'll get a partial snapshot of with a bunch of shards
+        indexDataStream(partialIndexName, "1", "purpose", "pre-snapshot doc");
+        // And another one with the default
+        indexDataStream(fullIndexName, "1", "purpose", "pre-snapshot doc");
+        ensureGreen();
+
+        // Stop a random data node so we lose a shard from the partial index
+        internalCluster().stopRandomDataNode();
+        assertBusy(() -> {
+            var status = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).get().getStatus();
+            assertThat(status, oneOf(ClusterHealthStatus.YELLOW, ClusterHealthStatus.RED));
+        }, 30, TimeUnit.SECONDS);
+
+        // Get ready to block
+        blockMasterFromFinalizingSnapshotOnIndexFile(REPO_NAME);
+
+        // Start a snapshot and wait for it to hit the block, then kill the master to force a failover
+        final String partialSnapName = "test-partial-snap";
+        CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(
+            TEST_REQUEST_TIMEOUT,
+            REPO_NAME,
+            partialSnapName
+        ).setIncludeGlobalState(true).setWaitForCompletion(false).setPartial(true).get();
+        assertThat(createSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
+        waitForBlock(internalCluster().getMasterName(), REPO_NAME);
+        internalCluster().stopCurrentMasterNode();
+
+        // Now get the snapshot and do our checks
+        assertBusy(() -> {
+            GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, REPO_NAME)
+                .setSnapshots(partialSnapName)
+                .get();
+            SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
+            assertNotNull(snapshotInfo);
+            assertThat(snapshotInfo.failedShards(), lessThan(snapshotInfo.totalShards()));
+            List<String> statesInSnapshot = snapshotInfo.featureStates().stream().map(SnapshotFeatureInfo::getPluginName).toList();
+            assertThat(statesInSnapshot, not(hasItem((new SystemDataStreamManyShardsTestPlugin()).getFeatureName())));
+            assertThat(statesInSnapshot, hasItem((new AnotherSystemDataStreamTestPlugin()).getFeatureName()));
+        }, 5L, TimeUnit.SECONDS);
+
+        // Cleanup to prevent unrelated shutdown failures
+        internalCluster().startDataOnlyNode();
+    }
+
     public void testParallelIndexDeleteRemovesFeatureState() throws Exception {
         final String indexToBeDeleted = SystemIndexTestPlugin.SYSTEM_INDEX_NAME;
         final String fullIndexName = AnotherSystemIndexTestPlugin.SYSTEM_INDEX_NAME;
@@ -819,6 +1081,14 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         return indicesAdmin().prepareStats(indexName).get().getPrimaries().getDocs().getCount();
     }
 
+    private DocWriteResponse indexDataStream(String index, String id, String... source) {
+        var sourceWithTimestamp = new String[source.length + 2];
+        sourceWithTimestamp[0] = "@timestamp";
+        sourceWithTimestamp[1] = Long.toString(System.currentTimeMillis());
+        System.arraycopy(source, 0, sourceWithTimestamp, 2, source.length);
+        return prepareIndex(index).setId(id).setSource((Object[]) sourceWithTimestamp).setOpType(DocWriteRequest.OpType.CREATE).get();
+    }
+
     public static class SystemIndexTestPlugin extends Plugin implements SystemIndexPlugin {
 
         public static final String SYSTEM_INDEX_NAME = ".test-system-idx";
@@ -863,6 +1133,123 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
         }
     }
 
+    public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
+
+        public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream";
+
+        @Override
+        public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
+            try {
+                CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}");
+                return Collections.singletonList(
+                    new SystemDataStreamDescriptor(
+                        SYSTEM_DATASTREAM_NAME,
+                        "system data stream test",
+                        SystemDataStreamDescriptor.Type.EXTERNAL,
+                        ComposableIndexTemplate.builder()
+                            .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct?
+                            .template(new Template(Settings.EMPTY, mappings, null))
+                            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
+                            .build(),
+                        Map.of(),
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
+                    )
+                );
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String getFeatureName() {
+            return SystemDataStreamTestPlugin.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "A simple test plugin for data streams";
+        }
+    }
+
+    public static class SystemDataStreamManyShardsTestPlugin extends Plugin implements SystemIndexPlugin {
+
+        public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream-many-shards";
+
+        @Override
+        public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
+            try {
+                CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}");
+                return Collections.singletonList(
+                    new SystemDataStreamDescriptor(
+                        SYSTEM_DATASTREAM_NAME,
+                        "system data stream test",
+                        SystemDataStreamDescriptor.Type.EXTERNAL,
+                        ComposableIndexTemplate.builder()
+                            .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct?
+                            .template(new Template(indexSettings(6, 0).build(), mappings, null))
+                            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
+                            .build(),
+                        Map.of(),
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
+                    )
+                );
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String getFeatureName() {
+            return SystemDataStreamTestPlugin.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "A simple test plugin for data streams";
+        }
+    }
+
+    public static class AnotherSystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
+
+        public static final String SYSTEM_DATASTREAM_NAME = ".another-test-system-data-stream";
+
+        @Override
+        public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
+            try {
+                CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}");
+                return Collections.singletonList(
+                    new SystemDataStreamDescriptor(
+                        SYSTEM_DATASTREAM_NAME,
+                        "another system data stream test",
+                        SystemDataStreamDescriptor.Type.EXTERNAL,
+                        ComposableIndexTemplate.builder()
+                            .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct?
+                            .template(new Template(Settings.EMPTY, mappings, null))
+                            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
+                            .build(),
+                        Map.of(),
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
+                    )
+                );
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String getFeatureName() {
+            return AnotherSystemDataStreamTestPlugin.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "Another simple test plugin for data streams";
+        }
+    }
+
     public static class AssociatedIndicesTestPlugin extends Plugin implements SystemIndexPlugin {
 
         public static final String SYSTEM_INDEX_NAME = ".third-test-system-idx";
@@ -890,4 +1277,49 @@ public class SystemIndicesSnapshotIT extends AbstractSnapshotIntegTestCase {
             return "Another simple test plugin";
         }
     }
+
+    public static class AssociatedIndicesSystemDSTestPlugin extends Plugin implements SystemIndexPlugin {
+
+        public static final String SYSTEM_DATASTREAM_NAME = ".test-system-data-stream-two";
+        public static final String ASSOCIATED_INDEX_NAME = ".associated-idx2";
+
+        @Override
+        public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
+            try {
+                CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}");
+                return Collections.singletonList(
+                    new SystemDataStreamDescriptor(
+                        SYSTEM_DATASTREAM_NAME,
+                        "system data stream test",
+                        SystemDataStreamDescriptor.Type.EXTERNAL,
+                        ComposableIndexTemplate.builder()
+                            .indexPatterns(List.of(SYSTEM_DATASTREAM_NAME)) // TODO is this correct?
+                            .template(new Template(Settings.EMPTY, mappings, null))
+                            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
+                            .build(),
+                        Map.of(),
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
+                    )
+                );
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
+            return Collections.singletonList(new AssociatedIndexDescriptor(ASSOCIATED_INDEX_NAME, "Associated indices"));
+        }
+
+        @Override
+        public String getFeatureName() {
+            return AssociatedIndicesSystemDSTestPlugin.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "Another simple test plugin";
+        }
+    }
 }

+ 55 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -9,6 +9,7 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -31,16 +32,23 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.snapshots.SnapshotInProgressException;
+import org.elasticsearch.snapshots.SnapshotsService;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Handles data stream modification requests.
  */
 public class MetadataDataStreamsService {
-
+    private static final Logger LOGGER = LogManager.getLogger(MetadataDataStreamsService.class);
     private final ClusterService clusterService;
     private final IndicesService indicesService;
     private final DataStreamGlobalRetentionSettings globalRetentionSettings;
@@ -432,6 +440,52 @@ public class MetadataDataStreamsService {
         return index;
     }
 
+    /**
+     * Removes the given data stream and their backing indices from the Project State.
+     *
+     * @param projectState The project state
+     * @param dataStreams  The data streams to remove
+     * @param settings     The settings
+     * @return The updated Project State
+     */
+    public static ClusterState deleteDataStreams(ProjectState projectState, Set<DataStream> dataStreams, Settings settings) {
+        if (dataStreams.isEmpty()) {
+            return projectState.cluster();
+        }
+
+        Set<String> dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet());
+        Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreamNames);
+        if (snapshottingDataStreams.isEmpty() == false) {
+            throw new SnapshotInProgressException(
+                "Cannot delete data streams that are being snapshotted: ["
+                    + String.join(", ", snapshottingDataStreams)
+                    + "]. Try again after snapshot finishes or cancel the currently running snapshot."
+            );
+        }
+
+        Set<Index> backingIndicesToRemove = new HashSet<>();
+        for (DataStream dataStream : dataStreams) {
+            assert dataStream != null;
+            if (projectState.metadata().dataStreams().get(dataStream.getName()) == null) {
+                throw new ResourceNotFoundException("data stream [" + dataStream.getName() + "] not found");
+            }
+            backingIndicesToRemove.addAll(dataStream.getIndices());
+            backingIndicesToRemove.addAll(dataStream.getFailureIndices());
+        }
+
+        // first delete the data streams and then the indices:
+        // (this to avoid data stream validation from failing when deleting an index that is part of a data stream
+        // without updating the data stream)
+        // TODO: change order when "delete index api" also updates the data stream the "index to be removed" is a member of
+        ClusterState newState = projectState.updatedState(builder -> {
+            for (DataStream ds : dataStreams) {
+                LOGGER.info("removing data stream [{}]", ds.getName());
+                builder.removeDataStream(ds.getName());
+            }
+        });
+        return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings);
+    }
+
     /**
      * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
      */

+ 42 - 8
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
 import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
 import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
 import org.elasticsearch.cluster.metadata.ProjectMetadata;
@@ -620,7 +621,7 @@ public final class RestoreService implements ClusterStateApplier {
         Collection<String> featureStateDataStreams,
         boolean includeAliases
     ) {
-        Map<String, DataStream> dataStreams;
+        Map<String, DataStream> allDataStreams;
         Map<String, DataStreamAlias> dataStreamAliases;
         List<String> requestedDataStreams = filterIndices(
             snapshotInfo.dataStreams(),
@@ -628,20 +629,21 @@ public final class RestoreService implements ClusterStateApplier {
             IndicesOptions.lenientExpand()
         );
         if (requestedDataStreams.isEmpty()) {
-            dataStreams = Map.of();
+            allDataStreams = Map.of();
             dataStreamAliases = Map.of();
         } else {
             if (globalMetadata == null) {
                 globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
             }
             final Map<String, DataStream> dataStreamsInSnapshot = globalMetadata.getProject().dataStreams();
-            dataStreams = Maps.newMapWithExpectedSize(requestedDataStreams.size());
+            allDataStreams = Maps.newMapWithExpectedSize(requestedDataStreams.size());
+            Map<String, DataStream> systemDataStreams = new HashMap<>();
             for (String requestedDataStream : requestedDataStreams) {
                 final DataStream dataStreamInSnapshot = dataStreamsInSnapshot.get(requestedDataStream);
                 assert dataStreamInSnapshot != null : "DataStream [" + requestedDataStream + "] not found in snapshot";
 
                 if (dataStreamInSnapshot.isSystem() == false) {
-                    dataStreams.put(requestedDataStream, dataStreamInSnapshot);
+                    allDataStreams.put(requestedDataStream, dataStreamInSnapshot);
                 } else if (requestIndices.contains(requestedDataStream)) {
                     throw new IllegalArgumentException(
                         format(
@@ -650,7 +652,8 @@ public final class RestoreService implements ClusterStateApplier {
                         )
                     );
                 } else if (featureStateDataStreams.contains(requestedDataStream)) {
-                    dataStreams.put(requestedDataStream, dataStreamInSnapshot);
+                    allDataStreams.put(requestedDataStream, dataStreamInSnapshot);
+                    systemDataStreams.put(requestedDataStream, dataStreamInSnapshot);
                 } else {
                     logger.debug(
                         "omitting system data stream [{}] from snapshot restoration because its feature state was not requested",
@@ -658,11 +661,12 @@ public final class RestoreService implements ClusterStateApplier {
                     );
                 }
             }
-            if (includeAliases) {
+            if (includeAliases || systemDataStreams.isEmpty() == false) {
                 dataStreamAliases = new HashMap<>();
                 final Map<String, DataStreamAlias> dataStreamAliasesInSnapshot = globalMetadata.getProject().dataStreamAliases();
+                Map<String, DataStream> dataStreamsWithAliases = includeAliases ? allDataStreams : systemDataStreams;
                 for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) {
-                    DataStreamAlias copy = alias.intersect(dataStreams.keySet()::contains);
+                    DataStreamAlias copy = alias.intersect(dataStreamsWithAliases.keySet()::contains);
                     if (copy.getDataStreams().isEmpty() == false) {
                         dataStreamAliases.put(alias.getName(), copy);
                     }
@@ -671,7 +675,7 @@ public final class RestoreService implements ClusterStateApplier {
                 dataStreamAliases = Map.of();
             }
         }
-        return new Tuple<>(dataStreams, dataStreamAliases);
+        return new Tuple<>(allDataStreams, dataStreamAliases);
     }
 
     private Map<String, List<String>> getFeatureStatesToRestore(
@@ -765,6 +769,29 @@ public final class RestoreService implements ClusterStateApplier {
             .collect(Collectors.toUnmodifiableSet());
     }
 
+    /**
+     * Resolves a set of datastream names that currently exist in the cluster that are part of a feature state which is
+     * about to be restored, and should therefore be removed prior to restoring those feature states from the snapshot.
+     *
+     * @param currentState           The current cluster state
+     * @param featureStatesToRestore A set of feature state names that are about to be restored
+     * @return A set of datastream names that should be removed based on the feature states being restored
+     */
+    private Set<DataStream> resolveSystemDataStreamsToDelete(ClusterState currentState, Set<String> featureStatesToRestore) {
+        if (featureStatesToRestore == null) {
+            return Collections.emptySet();
+        }
+
+        return featureStatesToRestore.stream()
+            .map(systemIndices::getFeature)
+            .filter(Objects::nonNull) // Features that aren't present on this node will be warned about in `getFeatureStatesToRestore`
+            .flatMap(feature -> feature.getDataStreamDescriptors().stream())
+            .map(SystemDataStreamDescriptor::getDataStreamName)
+            .filter(datastreamName -> currentState.metadata().getProject().dataStreams().containsKey(datastreamName))
+            .map(dataStreamName -> currentState.metadata().getProject().dataStreams().get(dataStreamName))
+            .collect(Collectors.toUnmodifiableSet());
+    }
+
     // visible for testing
     static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metadata, RestoreSnapshotRequest request) {
         String dataStreamName = dataStream.getName();
@@ -1336,6 +1363,13 @@ public final class RestoreService implements ClusterStateApplier {
                 settings
             );
 
+            // Clear out all existing system data streams
+            currentState = MetadataDataStreamsService.deleteDataStreams(
+                currentState.projectState(),
+                resolveSystemDataStreamsToDelete(currentState, featureStatesToRestore),
+                settings
+            );
+
             // List of searchable snapshots indices to restore
             final Set<Index> searchableSnapshotsIndices = new HashSet<>();
 

+ 67 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -9,7 +9,11 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ProjectState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -20,11 +24,19 @@ import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperServiceTestCase;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.snapshots.SnapshotInProgressException;
+import org.elasticsearch.snapshots.SnapshotInfoTestUtils;
+import org.elasticsearch.test.index.IndexVersionUtils;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -466,6 +478,61 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
         assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY));
     }
 
+    public void testDeleteMissing() {
+        DataStream dataStream = DataStreamTestHelper.randomInstance();
+        final var projectId = randomProjectIdOrDefault();
+        ProjectState state = ClusterState.builder(ClusterName.DEFAULT)
+            .putProjectMetadata(ProjectMetadata.builder(projectId))
+            .build()
+            .projectState(projectId);
+
+        ResourceNotFoundException e = expectThrows(
+            ResourceNotFoundException.class,
+            () -> MetadataDataStreamsService.deleteDataStreams(state, Set.of(dataStream), Settings.EMPTY)
+        );
+        assertThat(e.getMessage(), containsString(dataStream.getName()));
+    }
+
+    public void testDeleteSnapshotting() {
+        String dataStreamName = randomAlphaOfLength(5);
+        Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
+        SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry(
+            SnapshotsInProgress.Entry.snapshot(
+                snapshot,
+                true,
+                false,
+                SnapshotsInProgress.State.INIT,
+                Collections.emptyMap(),
+                List.of(dataStreamName),
+                Collections.emptyList(),
+                System.currentTimeMillis(),
+                (long) randomIntBetween(0, 1000),
+                Map.of(),
+                null,
+                SnapshotInfoTestUtils.randomUserMetadata(),
+                IndexVersionUtils.randomVersion()
+            )
+        );
+        final DataStream dataStream = DataStreamTestHelper.randomInstance(dataStreamName);
+        var projectId = randomProjectIdOrDefault();
+        ProjectState state = ClusterState.builder(ClusterName.DEFAULT)
+            .putCustom(SnapshotsInProgress.TYPE, snaps)
+            .putProjectMetadata(ProjectMetadata.builder(projectId).put(dataStream))
+            .build()
+            .projectState(projectId);
+        Exception e = expectThrows(
+            SnapshotInProgressException.class,
+            () -> MetadataDataStreamsService.deleteDataStreams(state, Set.of(dataStream), Settings.EMPTY)
+        );
+        assertEquals(
+            "Cannot delete data streams that are being snapshotted: ["
+                + dataStreamName
+                + "]. Try again after snapshot finishes "
+                + "or cancel the currently running snapshot.",
+            e.getMessage()
+        );
+    }
+
     private MapperService getMapperService(IndexMetadata im) {
         try {
             String mapping = im.mapping().source().toString();