Browse Source

Make data stream deletion project-aware (MP-1678)

This commit allows the data stream deletion API to function in a
multi-project setting.
Niels Bauman 11 months ago
parent
commit
a40ad7df09

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java

@@ -103,7 +103,7 @@ public class DataStreamsStatsTransportAction extends TransportBroadcastByNodeAct
     protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) {
         return DataStreamsActionUtil.resolveConcreteIndexNames(
             indexNameExpressionResolver,
-            clusterState,
+            clusterState.getMetadata().getProject(),
             request.indices(),
             request.indicesOptions()
         ).toArray(String[]::new);

+ 28 - 16
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java

@@ -18,12 +18,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
@@ -52,6 +54,7 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
     private static final Logger LOGGER = LogManager.getLogger(DeleteDataStreamTransportAction.class);
 
     private final SystemIndices systemIndices;
+    private final ProjectResolver projectResolver;
 
     @Inject
     public DeleteDataStreamTransportAction(
@@ -60,7 +63,8 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
         ThreadPool threadPool,
         ActionFilters actionFilters,
         IndexNameExpressionResolver indexNameExpressionResolver,
-        SystemIndices systemIndices
+        SystemIndices systemIndices,
+        ProjectResolver projectResolver
     ) {
         super(
             DeleteDataStreamAction.NAME,
@@ -73,6 +77,7 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
         this.systemIndices = systemIndices;
+        this.projectResolver = projectResolver;
     }
 
     @Override
@@ -82,8 +87,14 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
         ClusterState state,
         ActionListener<AcknowledgedResponse> listener
     ) throws Exception {
+        final var projectState = projectResolver.getProjectState(state);
         // resolve the names in the request early
-        List<String> names = getDataStreamNames(indexNameExpressionResolver, state, request.getNames(), request.indicesOptions());
+        List<String> names = getDataStreamNames(
+            indexNameExpressionResolver,
+            projectState.metadata(),
+            request.getNames(),
+            request.indicesOptions()
+        );
         for (String name : names) {
             systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
         }
@@ -101,7 +112,7 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
                 public ClusterState execute(ClusterState currentState) {
                     return removeDataStream(
                         indexNameExpressionResolver,
-                        currentState,
+                        currentState.projectState(projectState.projectId()),
                         request,
                         ds -> systemIndices.validateDataStreamAccess(ds, threadPool.getThreadContext()),
                         clusterService.getSettings()
@@ -123,26 +134,27 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
 
     static ClusterState removeDataStream(
         IndexNameExpressionResolver indexNameExpressionResolver,
-        ClusterState currentState,
+        ProjectState projectState,
         DeleteDataStreamAction.Request request,
         Consumer<String> systemDataStreamAccessValidator,
         Settings settings
     ) {
-        List<String> names = getDataStreamNames(indexNameExpressionResolver, currentState, request.getNames(), request.indicesOptions());
+        final ProjectMetadata project = projectState.metadata();
+        List<String> names = getDataStreamNames(indexNameExpressionResolver, project, request.getNames(), request.indicesOptions());
         Set<String> dataStreams = new HashSet<>(names);
         for (String dataStreamName : dataStreams) {
             systemDataStreamAccessValidator.accept(dataStreamName);
         }
-        Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(currentState, dataStreams);
 
         if (dataStreams.isEmpty()) {
             if (request.isWildcardExpressionsOriginallySpecified()) {
-                return currentState;
+                return projectState.cluster();
             } else {
                 throw new ResourceNotFoundException("data streams " + Arrays.toString(request.getNames()) + " not found");
             }
         }
 
+        Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreams);
         if (snapshottingDataStreams.isEmpty() == false) {
             throw new SnapshotInProgressException(
                 "Cannot delete data streams that are being snapshotted: "
@@ -153,7 +165,7 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
 
         Set<Index> backingIndicesToRemove = new HashSet<>();
         for (String dataStreamName : dataStreams) {
-            DataStream dataStream = currentState.metadata().getProject().dataStreams().get(dataStreamName);
+            DataStream dataStream = project.dataStreams().get(dataStreamName);
             assert dataStream != null;
             backingIndicesToRemove.addAll(dataStream.getIndices());
             backingIndicesToRemove.addAll(dataStream.getFailureIndices().getIndices());
@@ -163,13 +175,13 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
         // (this to avoid data stream validation from failing when deleting an index that is part of a data stream
         // without updating the data stream)
         // TODO: change order when delete index api also updates the data stream the index to be removed is member of
-        Metadata.Builder metadata = Metadata.builder(currentState.metadata());
-        for (String ds : dataStreams) {
-            LOGGER.info("removing data stream [{}]", ds);
-            metadata.removeDataStream(ds);
-        }
-        currentState = ClusterState.builder(currentState).metadata(metadata).build();
-        return MetadataDeleteIndexService.deleteIndices(currentState, backingIndicesToRemove, settings);
+        ClusterState newState = projectState.updatedState(builder -> {
+            for (String ds : dataStreams) {
+                LOGGER.info("removing data stream [{}]", ds);
+                builder.removeDataStream(ds);
+            }
+        });
+        return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings);
     }
 
     @Override

+ 127 - 21
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportActionTests.java

@@ -15,6 +15,12 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
+import org.elasticsearch.cluster.project.TestProjectResolvers;
+import org.elasticsearch.cluster.routing.GlobalRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
@@ -47,13 +53,33 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
         final String dataStreamName = "my-data-stream";
         final List<String> otherIndices = randomSubsetOf(List.of("foo", "bar", "baz"));
 
-        ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
+        ClusterState initialState = DataStreamTestHelper.getClusterStateWithDataStreams(
+            List.of(new Tuple<>(dataStreamName, 2)),
+            otherIndices
+        );
+        final var projectId = initialState.metadata().projects().keySet().iterator().next();
+        final var stateWithProjects = addProjectsWithDataStreams(initialState, dataStreamName, otherIndices);
         DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
-        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
-        assertThat(newState.metadata().getProject().dataStreams().size(), equalTo(0));
-        assertThat(newState.metadata().getProject().indices().size(), equalTo(otherIndices.size()));
+        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(
+            iner,
+            TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
+            req,
+            validator,
+            Settings.EMPTY
+        );
+        assertThat(newState.metadata().getProject(projectId).dataStreams().size(), equalTo(0));
+        assertThat(newState.metadata().getProject(projectId).indices().size(), equalTo(otherIndices.size()));
         for (String indexName : otherIndices) {
-            assertThat(newState.metadata().getProject().indices().get(indexName).getIndex().getName(), equalTo(indexName));
+            assertThat(newState.metadata().getProject(projectId).indices().get(indexName).getIndex().getName(), equalTo(indexName));
+        }
+        // Ensure the other projects did not get affected.
+        for (ProjectMetadata project : stateWithProjects.metadata().projects().values()) {
+            if (project.id().equals(projectId)) {
+                continue;
+            }
+            assertEquals(1, project.dataStreams().size());
+            // Other indices + 2 for the backing indices of the data stream.
+            assertEquals(otherIndices.size() + 2, project.indices().size());
         }
     }
 
@@ -72,12 +98,29 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
             false,
             true
         );
+        final var projectId = cs.metadata().projects().keySet().iterator().next();
+        final var stateWithProjects = addProjectsWithDataStreams(cs, dataStreamName, otherIndices);
         DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
-        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
-        assertThat(newState.metadata().getProject().dataStreams().size(), equalTo(0));
-        assertThat(newState.metadata().getProject().indices().size(), equalTo(otherIndices.size()));
+        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(
+            iner,
+            TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
+            req,
+            validator,
+            Settings.EMPTY
+        );
+        assertThat(newState.metadata().getProject(projectId).dataStreams().size(), equalTo(0));
+        assertThat(newState.metadata().getProject(projectId).indices().size(), equalTo(otherIndices.size()));
         for (String indexName : otherIndices) {
-            assertThat(newState.metadata().getProject().indices().get(indexName).getIndex().getName(), equalTo(indexName));
+            assertThat(newState.metadata().getProject(projectId).indices().get(indexName).getIndex().getName(), equalTo(indexName));
+        }
+        // Ensure the other projects did not get affected.
+        for (ProjectMetadata project : stateWithProjects.metadata().projects().values()) {
+            if (project.id().equals(projectId)) {
+                continue;
+            }
+            assertEquals(1, project.dataStreams().size());
+            // Other indices + 2 for the backing indices of the data stream.
+            assertEquals(otherIndices.size() + 2, project.indices().size());
         }
     }
 
@@ -92,15 +135,30 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
             ),
             List.of()
         );
+        final var projectId = cs.metadata().projects().keySet().iterator().next();
+        final var stateWithProjects = addProjectsWithDataStreams(cs, randomFrom(dataStreamNames), List.of());
 
         DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ba*", "eggplant" });
-        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
-        assertThat(newState.metadata().getProject().dataStreams().size(), equalTo(1));
-        DataStream remainingDataStream = newState.metadata().getProject().dataStreams().get(dataStreamNames[0]);
+        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(
+            iner,
+            TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
+            req,
+            validator,
+            Settings.EMPTY
+        );
+        assertThat(newState.metadata().getProject(projectId).dataStreams().size(), equalTo(1));
+        DataStream remainingDataStream = newState.metadata().getProject(projectId).dataStreams().get(dataStreamNames[0]);
         assertNotNull(remainingDataStream);
-        assertThat(newState.metadata().getProject().indices().size(), equalTo(remainingDataStream.getIndices().size()));
+        assertThat(newState.metadata().getProject(projectId).indices().size(), equalTo(remainingDataStream.getIndices().size()));
         for (Index i : remainingDataStream.getIndices()) {
-            assertThat(newState.metadata().getProject().indices().get(i.getName()).getIndex(), equalTo(i));
+            assertThat(newState.metadata().getProject(projectId).indices().get(i.getName()).getIndex(), equalTo(i));
+        }
+        // Ensure the other projects did not get affected.
+        for (ProjectMetadata project : stateWithProjects.metadata().projects().values()) {
+            if (project.id().equals(projectId)) {
+                continue;
+            }
+            assertEquals(1, project.dataStreams().size());
         }
     }
 
@@ -116,11 +174,19 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
         SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(dataStreamName, "repo1", false))
             .withAddedEntry(createEntry(dataStreamName2, "repo2", true));
         ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
+        final var projectId = snapshotCs.metadata().projects().keySet().iterator().next();
+        final var stateWithProjects = addProjectsWithDataStreams(snapshotCs, dataStreamName2, otherIndices);
 
         DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
         SnapshotInProgressException e = expectThrows(
             SnapshotInProgressException.class,
-            () -> DeleteDataStreamTransportAction.removeDataStream(iner, snapshotCs, req, validator, Settings.EMPTY)
+            () -> DeleteDataStreamTransportAction.removeDataStream(
+                iner,
+                TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
+                req,
+                validator,
+                Settings.EMPTY
+            )
         );
 
         assertThat(
@@ -130,6 +196,13 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
                     + "snapshot finishes or cancel the currently running snapshot."
             )
         );
+        // Ensure the other projects did not get affected.
+        for (ProjectMetadata project : stateWithProjects.metadata().projects().values()) {
+            if (project.id().equals(projectId)) {
+                continue;
+            }
+            assertEquals(1, project.dataStreams().size());
+        }
     }
 
     private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
@@ -162,12 +235,14 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
             ),
             List.of()
         );
+        final var projectId = cs.metadata().projects().keySet().iterator().next();
+        final var stateWithProjects = addProjectsWithDataStreams(cs, randomFrom(dataStreamNames), List.of());
 
         expectThrows(
             ResourceNotFoundException.class,
             () -> DeleteDataStreamTransportAction.removeDataStream(
                 iner,
-                cs,
+                TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
                 new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName }),
                 validator,
                 Settings.EMPTY
@@ -178,13 +253,44 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
             TEST_REQUEST_TIMEOUT,
             new String[] { dataStreamName + "*" }
         );
-        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(iner, cs, req, validator, Settings.EMPTY);
-        assertThat(newState, sameInstance(cs));
-        assertThat(newState.metadata().getProject().dataStreams().size(), equalTo(cs.metadata().getProject().dataStreams().size()));
+        ClusterState newState = DeleteDataStreamTransportAction.removeDataStream(
+            iner,
+            TestProjectResolvers.singleProject(projectId).getProjectState(stateWithProjects),
+            req,
+            validator,
+            Settings.EMPTY
+        );
+        assertThat(newState, sameInstance(stateWithProjects));
+        assertThat(
+            newState.metadata().getProject(projectId).dataStreams().size(),
+            equalTo(stateWithProjects.metadata().getProject(projectId).dataStreams().size())
+        );
         assertThat(
-            newState.metadata().getProject().dataStreams().keySet(),
-            containsInAnyOrder(cs.metadata().getProject().dataStreams().keySet().toArray(Strings.EMPTY_ARRAY))
+            newState.metadata().getProject(projectId).dataStreams().keySet(),
+            containsInAnyOrder(stateWithProjects.metadata().getProject(projectId).dataStreams().keySet().toArray(Strings.EMPTY_ARRAY))
         );
     }
 
+    private ClusterState addProjectsWithDataStreams(ClusterState initialState, String dataStreamName, List<String> otherIndices) {
+        final var metadataBuilder = Metadata.builder(initialState.metadata());
+        final var routingTableBuilder = GlobalRoutingTable.builder();
+        final int numberOfProjects = randomIntBetween(0, 5);
+        for (int i = 0; i < numberOfProjects; i++) {
+            final var id = new ProjectId(randomUUID());
+            var projectBuilder = ProjectMetadata.builder(id);
+            DataStreamTestHelper.getClusterStateWithDataStreams(
+                projectBuilder,
+                List.of(Tuple.tuple(dataStreamName, 2)),
+                otherIndices,
+                System.currentTimeMillis(),
+                Settings.EMPTY,
+                1,
+                false,
+                false
+            );
+            metadataBuilder.put(projectBuilder.build());
+            routingTableBuilder.put(id, RoutingTable.EMPTY_ROUTING_TABLE);
+        }
+        return ClusterState.builder(initialState).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build();
+    }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
@@ -296,7 +297,11 @@ public class MetadataRolloverService {
         boolean isFailureStoreRollover
     ) throws Exception {
 
-        if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
+        Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(
+            currentState.projectState(currentState.metadata().getProject().id()),
+            Collections.singleton(dataStream.getName())
+        );
+        if (snapshottingDataStreams.isEmpty() == false) {
             // we can't roll over the snapshot concurrently because the snapshot contains the indices that existed when it was started but
             // the cluster metadata of when it completes so the new write index would not exist in the snapshot if there was a concurrent
             // rollover

+ 16 - 5
server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsActionUtil.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.index.Index;
 
 import java.util.List;
@@ -22,6 +23,16 @@ import java.util.stream.Stream;
 
 public class DataStreamsActionUtil {
 
+    @Deprecated
+    public static List<String> getDataStreamNames(
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        ClusterState currentState,
+        String[] names,
+        IndicesOptions indicesOptions
+    ) {
+        return getDataStreamNames(indexNameExpressionResolver, currentState.metadata().getProject(), names, indicesOptions);
+    }
+
     /**
      * Gets data streams names, expanding wildcards using {@link IndicesOptions} provided.
      * For data streams we only care about the hidden state (we can't have closed or open data streams),
@@ -30,12 +41,12 @@ public class DataStreamsActionUtil {
      */
     public static List<String> getDataStreamNames(
         IndexNameExpressionResolver indexNameExpressionResolver,
-        ClusterState currentState,
+        ProjectMetadata project,
         String[] names,
         IndicesOptions indicesOptions
     ) {
         indicesOptions = updateIndicesOptions(indicesOptions);
-        return indexNameExpressionResolver.dataStreamNames(currentState, indicesOptions, names);
+        return indexNameExpressionResolver.dataStreamNames(project, indicesOptions, names);
     }
 
     public static IndicesOptions updateIndicesOptions(IndicesOptions indicesOptions) {
@@ -49,12 +60,12 @@ public class DataStreamsActionUtil {
 
     public static Stream<String> resolveConcreteIndexNames(
         IndexNameExpressionResolver indexNameExpressionResolver,
-        ClusterState clusterState,
+        ProjectMetadata project,
         String[] names,
         IndicesOptions indicesOptions
     ) {
-        List<String> abstractionNames = getDataStreamNames(indexNameExpressionResolver, clusterState, names, indicesOptions);
-        SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getProject().getIndicesLookup();
+        List<String> abstractionNames = getDataStreamNames(indexNameExpressionResolver, project, names, indicesOptions);
+        SortedMap<String, IndexAbstraction> indicesLookup = project.getIndicesLookup();
 
         return abstractionNames.stream().flatMap(abstractionName -> {
             IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);

+ 4 - 0
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -1017,6 +1017,10 @@ public class ClusterState implements ChunkedToXContent, Diffable<ClusterState> {
             return this;
         }
 
+        public Builder putRoutingTable(ProjectId projectId, RoutingTable routingTable) {
+            return routingTable(GlobalRoutingTable.builder(this.routingTable).put(projectId, routingTable).build());
+        }
+
         public Builder metadata(Metadata.Builder metadataBuilder) {
             return metadata(metadataBuilder.build());
         }

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -239,6 +239,7 @@ public class IndexNameExpressionResolver {
         return concreteIndexNames(context, request.indices());
     }
 
+    @Deprecated
     public List<String> dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) {
         return dataStreamNames(projectResolver.getProjectMetadata(state), options, indexExpressions);
     }

+ 26 - 23
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateAckListener;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -141,16 +142,21 @@ public class MetadataDeleteIndexService {
         }
     }
 
+    @Deprecated
+    public static ClusterState deleteIndices(ClusterState currentState, Set<Index> indices, Settings settings) {
+        return deleteIndices(currentState.projectState(), indices, settings);
+    }
+
     /**
      * Delete some indices from the cluster state.
      */
-    public static ClusterState deleteIndices(ClusterState currentState, Set<Index> indices, Settings settings) {
-        final Metadata meta = currentState.metadata();
+    public static ClusterState deleteIndices(ProjectState projectState, Set<Index> indices, Settings settings) {
+        var project = projectState.metadata();
         final Set<Index> indicesToDelete = new HashSet<>();
         final Map<Index, DataStream> dataStreamIndices = new HashMap<>();
         for (Index index : indices) {
-            IndexMetadata im = meta.getProject().getIndexSafe(index);
-            DataStream parent = meta.getProject().getIndicesLookup().get(im.getIndex().getName()).getParentDataStream();
+            IndexMetadata im = project.getIndexSafe(index);
+            DataStream parent = project.getIndicesLookup().get(im.getIndex().getName()).getParentDataStream();
             if (parent != null) {
                 boolean isFailureStoreWriteIndex = im.getIndex().equals(parent.getFailureStoreWriteIndex());
                 if (isFailureStoreWriteIndex || im.getIndex().equals(parent.getWriteIndex())) {
@@ -171,7 +177,7 @@ public class MetadataDeleteIndexService {
         }
 
         // Check if index deletion conflicts with any running snapshots
-        Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
+        Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(projectState, indicesToDelete);
         if (snapshottingIndices.isEmpty() == false) {
             throw new SnapshotInProgressException(
                 "Cannot delete indices that are being snapshotted: "
@@ -180,30 +186,30 @@ public class MetadataDeleteIndexService {
             );
         }
 
-        RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
-        Metadata.Builder metadataBuilder = Metadata.builder(meta);
-        ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
+        RoutingTable.Builder routingTableBuilder = RoutingTable.builder(projectState.routingTable());
+        ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(project);
+        ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(projectState.cluster().blocks());
 
-        final IndexGraveyard.Builder graveyardBuilder = IndexGraveyard.builder(metadataBuilder.indexGraveyard());
+        final IndexGraveyard.Builder graveyardBuilder = IndexGraveyard.builder(projectBuilder.indexGraveyard());
         final int previousGraveyardSize = graveyardBuilder.tombstones().size();
         for (final Index index : indices) {
             String indexName = index.getName();
             logger.info("{} deleting index", index);
             routingTableBuilder.remove(indexName);
             clusterBlocksBuilder.removeIndexBlocks(indexName);
-            metadataBuilder.remove(indexName);
+            projectBuilder.remove(indexName);
             if (dataStreamIndices.containsKey(index)) {
-                DataStream parent = metadataBuilder.dataStream(dataStreamIndices.get(index).getName());
+                DataStream parent = projectBuilder.dataStream(dataStreamIndices.get(index).getName());
                 if (parent.isFailureStoreIndex(index.getName())) {
-                    metadataBuilder.put(parent.removeFailureStoreIndex(index));
+                    projectBuilder.put(parent.removeFailureStoreIndex(index));
                 } else {
-                    metadataBuilder.put(parent.removeBackingIndex(index));
+                    projectBuilder.put(parent.removeBackingIndex(index));
                 }
             }
         }
         // add tombstones to the cluster state for each deleted index
         final IndexGraveyard currentGraveyard = graveyardBuilder.addTombstones(indices).build(settings);
-        metadataBuilder.indexGraveyard(currentGraveyard); // the new graveyard set on the metadata
+        projectBuilder.indexGraveyard(currentGraveyard); // the new graveyard set on the metadata
         logger.trace(
             "{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.",
             graveyardBuilder.getNumPurged(),
@@ -211,12 +217,9 @@ public class MetadataDeleteIndexService {
             currentGraveyard.getTombstones().size()
         );
 
-        Metadata newMetadata = metadataBuilder.build();
-        ClusterBlocks blocks = clusterBlocksBuilder.build();
-
         // update snapshot restore entries
-        Map<String, ClusterState.Custom> customs = currentState.getCustoms();
-        final RestoreInProgress restoreInProgress = RestoreInProgress.get(currentState);
+        Map<String, ClusterState.Custom> customs = projectState.cluster().getCustoms();
+        final RestoreInProgress restoreInProgress = RestoreInProgress.get(projectState.cluster());
         RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices);
         if (updatedRestoreInProgress != restoreInProgress) {
             ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(customs);
@@ -224,10 +227,10 @@ public class MetadataDeleteIndexService {
             customs = builder.build();
         }
 
-        return ClusterState.builder(currentState)
-            .routingTable(routingTableBuilder.build())
-            .metadata(newMetadata)
-            .blocks(blocks)
+        return ClusterState.builder(projectState.cluster())
+            .putRoutingTable(project.id(), routingTableBuilder.build())
+            .putProjectMetadata(projectBuilder.build())
+            .blocks(clusterBlocksBuilder.build())
             .customs(customs)
             .build();
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

@@ -327,7 +327,7 @@ public class MetadataIndexStateService {
         }
 
         // Check if index closing conflicts with any running snapshots
-        Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToClose);
+        Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState.projectState(), indicesToClose);
         if (snapshottingIndices.isEmpty() == false) {
             throw new SnapshotInProgressException(
                 "Cannot close indices that are being snapshotted: "
@@ -866,7 +866,7 @@ public class MetadataIndexStateService {
                 }
 
                 // Check if index closing conflicts with any running snapshots
-                Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, Set.of(index));
+                Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState.projectState(), Set.of(index));
                 if (snapshottingIndices.isEmpty() == false) {
                     closingResults.put(
                         result.getKey(),

+ 11 - 6
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.NotMasterException;
+import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.RepositoryCleanupInProgress;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
@@ -73,6 +74,7 @@ import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.core.FixForMultiProject;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Predicates;
 import org.elasticsearch.core.SuppressForbidden;
@@ -3033,9 +3035,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
      * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the
      * indices-to-check set.
      */
-    public static Set<String> snapshottingDataStreams(final ClusterState currentState, final Set<String> dataStreamsToCheck) {
-        Map<String, DataStream> dataStreams = currentState.metadata().getProject().dataStreams();
-        return SnapshotsInProgress.get(currentState)
+    @FixForMultiProject
+    public static Set<String> snapshottingDataStreams(final ProjectState projectState, final Set<String> dataStreamsToCheck) {
+        // TODO multi-project: this will behave incorrectly when there are data streams with equal names in different projects that are
+        // being snapshotted at the same time.
+        Map<String, DataStream> dataStreams = projectState.metadata().dataStreams();
+        return SnapshotsInProgress.get(projectState.cluster())
             .asStream()
             .filter(e -> e.partial() == false)
             .flatMap(e -> e.dataStreams().stream())
@@ -3046,13 +3051,13 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
     /**
      * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
      */
-    public static Set<Index> snapshottingIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
+    public static Set<Index> snapshottingIndices(final ProjectState projectState, final Set<Index> indicesToCheck) {
         final Set<Index> indices = new HashSet<>();
-        for (List<SnapshotsInProgress.Entry> snapshotsInRepo : SnapshotsInProgress.get(currentState).entriesByRepo()) {
+        for (List<SnapshotsInProgress.Entry> snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()).entriesByRepo()) {
             for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) {
                 if (entry.partial() == false && entry.isClone() == false) {
                     for (String indexName : entry.indices().keySet()) {
-                        IndexMetadata indexMetadata = currentState.metadata().getProject().index(indexName);
+                        IndexMetadata indexMetadata = projectState.metadata().index(indexName);
                         if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) {
                             indices.add(indexMetadata.getIndex());
                         }

+ 7 - 6
server/src/test/java/org/elasticsearch/action/datastreams/DataStreamsActionUtilTests.java

@@ -17,7 +17,8 @@ import org.elasticsearch.cluster.metadata.DataStreamMetadata;
 import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
@@ -47,9 +48,10 @@ public class DataStreamsActionUtilTests extends ESTestCase {
         var dataStreamIndex3 = new Index(".ds-foo2", IndexMetadata.INDEX_UUID_NA_VALUE);
         var dataStreamIndex4 = new Index(".ds-baz1", IndexMetadata.INDEX_UUID_NA_VALUE);
 
+        var projectId = new ProjectId(randomUUID());
         ClusterState clusterState = ClusterState.builder(new ClusterName("test-cluster"))
-            .metadata(
-                Metadata.builder()
+            .putProjectMetadata(
+                ProjectMetadata.builder(projectId)
                     .putCustom(
                         DataStreamMetadata.TYPE,
                         new DataStreamMetadata(
@@ -72,19 +74,18 @@ public class DataStreamsActionUtilTests extends ESTestCase {
                             dataStreamIndex4
                         )
                     )
-                    .build()
             )
             .build();
 
         var query = new String[] { "foo*", "baz*" };
         var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
-        when(indexNameExpressionResolver.dataStreamNames(any(ClusterState.class), any(), eq(query))).thenReturn(
+        when(indexNameExpressionResolver.dataStreamNames((ProjectMetadata) any(), any(), eq(query))).thenReturn(
             List.of("fooDs", "foo2Ds", "bazDs")
         );
 
         var resolved = DataStreamsActionUtil.resolveConcreteIndexNames(
             indexNameExpressionResolver,
-            clusterState,
+            clusterState.getMetadata().getProject(projectId),
             query,
             IndicesOptions.builder().wildcardOptions(IndicesOptions.WildcardOptions.builder().includeHidden(true)).build()
         ).toList();

+ 4 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java

@@ -481,7 +481,10 @@ public class SnapshotsServiceTests extends ESTestCase {
         );
 
         assertThat(
-            SnapshotsService.snapshottingIndices(clusterState, singleton(clusterState.metadata().getProject().index(indexName).getIndex())),
+            SnapshotsService.snapshottingIndices(
+                clusterState.projectState(),
+                singleton(clusterState.metadata().getProject().index(indexName).getIndex())
+            ),
             empty()
         );
     }

+ 15 - 0
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -458,6 +458,21 @@ public final class DataStreamTestHelper {
         int replicas,
         boolean replicated,
         boolean storeFailures
+    ) {
+        ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID);
+        getClusterStateWithDataStreams(projectBuilder, dataStreams, indexNames, currentTime, settings, replicas, replicated, storeFailures);
+        builder.put(projectBuilder);
+    }
+
+    public static void getClusterStateWithDataStreams(
+        ProjectMetadata.Builder builder,
+        List<Tuple<String, Integer>> dataStreams,
+        List<String> indexNames,
+        long currentTime,
+        Settings settings,
+        int replicas,
+        boolean replicated,
+        boolean storeFailures
     ) {
         builder.put(
             "template_1",