Browse Source

Data streams service to handle modification requests (#78033)

Dan Hermann 4 years ago
parent
commit
ed8d79f5e1

+ 11 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -186,12 +186,19 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         int backingIndexPosition = indices.indexOf(index);
 
         if (backingIndexPosition == -1) {
-            throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
-                index.getName(), name));
+            throw new IllegalArgumentException(String.format(
+                Locale.ROOT,
+                "index [%s] is not part of data stream [%s]",
+                index.getName(), name
+            ));
         }
         if (generation == (backingIndexPosition + 1)) {
-            throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because " +
-                "it is the write index", index.getName(), name));
+            throw new IllegalArgumentException(String.format(
+                Locale.ROOT,
+                "cannot remove backing index [%s] of data stream [%s] because it is the write index",
+                index.getName(),
+                name
+            ));
         }
 
         List<Index> backingIndices = new ArrayList<>(indices);

+ 81 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java

@@ -0,0 +1,81 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.common.Strings;
+
+/**
+ * Operations on data streams
+ */
+public abstract class DataStreamAction {
+    private final String dataStream;
+
+    public static DataStreamAction addBackingIndex(String dataStream, String index) {
+        return new DataStreamAction.AddBackingIndex(dataStream, index);
+    }
+
+    public static DataStreamAction removeBackingIndex(String dataStream, String index) {
+        return new DataStreamAction.RemoveBackingIndex(dataStream, index);
+    }
+
+    private DataStreamAction(String dataStream) {
+        if (false == Strings.hasText(dataStream)) {
+            throw new IllegalArgumentException("[data_stream] is required");
+        }
+        this.dataStream = dataStream;
+    }
+
+    /**
+     * Data stream on which the operation should act
+     */
+    public String getDataStream() {
+        return dataStream;
+    }
+
+    public static class AddBackingIndex extends DataStreamAction {
+
+        private final String index;
+
+        private AddBackingIndex(String dataStream, String index) {
+            super(dataStream);
+
+            if (false == Strings.hasText(index)) {
+                throw new IllegalArgumentException("[index] is required");
+            }
+
+            this.index = index;
+        }
+
+        public String getIndex() {
+            return index;
+        }
+
+    }
+
+    public static class RemoveBackingIndex extends DataStreamAction {
+
+        private final String index;
+
+        private RemoveBackingIndex(String dataStream, String index) {
+            super(dataStream);
+
+            if (false == Strings.hasText(index)) {
+                throw new IllegalArgumentException("[index] is required");
+            }
+
+            this.index = index;
+        }
+
+        public String getIndex() {
+            return index;
+        }
+
+    }
+
+}

+ 170 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -0,0 +1,170 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.indices.IndicesService;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Handles data stream modification requests.
+ */
+public class MetadataDataStreamsService {
+
+    private final ClusterService clusterService;
+    private final IndicesService indicesService;
+
+    public MetadataDataStreamsService(ClusterService clusterService, IndicesService indicesService) {
+        this.clusterService = clusterService;
+        this.indicesService = indicesService;
+    }
+
+    public void updateBackingIndices(final ModifyDataStreamRequest request,
+                                     final ActionListener<AcknowledgedResponse> listener) {
+        if (request.actions().size() == 0) {
+            listener.onResponse(AcknowledgedResponse.TRUE);
+        } else {
+            clusterService.submitStateUpdateTask("update-backing-indices",
+                new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
+                    @Override
+                    public ClusterState execute(ClusterState currentState) {
+                        return modifyDataStream(
+                            currentState,
+                            request.actions(),
+                            indexMetadata -> {
+                                try {
+                                    return indicesService.createIndexMapperService(indexMetadata);
+                                } catch (IOException e) {
+                                    throw new IllegalStateException(e);
+                                }
+                            }
+                        );
+                    }
+                });
+        }
+    }
+
+    /**
+     * Computes the resulting cluster state after applying all requested data stream modifications in order.
+     *
+     * @param currentState current cluster state
+     * @param actions ordered list of modifications to perform
+     * @return resulting cluster state after all modifications have been performed
+     */
+    static ClusterState modifyDataStream(
+        ClusterState currentState,
+        Iterable<DataStreamAction> actions,
+        Function<IndexMetadata, MapperService> mapperSupplier
+    ) {
+        Metadata updatedMetadata = currentState.metadata();
+
+        for (var action : actions) {
+            Metadata.Builder builder = Metadata.builder(updatedMetadata);
+            if (action instanceof DataStreamAction.AddBackingIndex) {
+                addBackingIndex(
+                    updatedMetadata,
+                    builder,
+                    mapperSupplier,
+                    action.getDataStream(),
+                    ((DataStreamAction.AddBackingIndex) action).getIndex()
+                );
+            } else if (action instanceof DataStreamAction.RemoveBackingIndex) {
+                removeBackingIndex(
+                    updatedMetadata,
+                    builder,
+                    action.getDataStream(),
+                    ((DataStreamAction.RemoveBackingIndex) action).getIndex()
+                );
+            } else {
+                throw new IllegalStateException("unsupported data stream action type [" + action.getClass().getName() + "]");
+            }
+            updatedMetadata = builder.build();
+        }
+
+        return ClusterState.builder(currentState).metadata(updatedMetadata).build();
+    }
+
+    private static void addBackingIndex(
+        Metadata metadata,
+        Metadata.Builder builder,
+        Function<IndexMetadata, MapperService> mapperSupplier,
+        String dataStreamName,
+        String indexName
+    ) {
+        var dataStream = validateDataStream(metadata, dataStreamName);
+        var index = validateIndex(metadata, indexName);
+
+        try {
+            MetadataMigrateToDataStreamService.prepareBackingIndex(
+                builder,
+                index.getWriteIndex(),
+                dataStreamName,
+                mapperSupplier,
+                false);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("unable to prepare backing index", e);
+        }
+
+        // add index to data stream
+        builder.put(dataStream.getDataStream().addBackingIndex(metadata, index.getWriteIndex().getIndex()));
+    }
+
+    private static void removeBackingIndex(Metadata metadata, Metadata.Builder builder, String dataStreamName, String indexName) {
+        var dataStream = validateDataStream(metadata, dataStreamName);
+        var index = validateIndex(metadata, indexName);
+        builder.put(dataStream.getDataStream().removeBackingIndex(index.getWriteIndex().getIndex()));
+
+        // un-hide index
+        builder.put(IndexMetadata.builder(index.getWriteIndex())
+            .settings(Settings.builder().put(index.getWriteIndex().getSettings()).put("index.hidden", "false").build())
+            .settingsVersion(index.getWriteIndex().getSettingsVersion() + 1));
+    }
+
+    private static IndexAbstraction.DataStream validateDataStream(Metadata metadata, String dataStreamName) {
+        IndexAbstraction dataStream = metadata.getIndicesLookup().get(dataStreamName);
+        if (dataStream == null || dataStream.getType() != IndexAbstraction.Type.DATA_STREAM) {
+            throw new IllegalArgumentException("data stream [" + dataStreamName + "] not found");
+        }
+        return (IndexAbstraction.DataStream) dataStream;
+    }
+
+    private static IndexAbstraction.Index validateIndex(Metadata metadata, String indexName) {
+        IndexAbstraction index = metadata.getIndicesLookup().get(indexName);
+        if (index == null || index.getType() != IndexAbstraction.Type.CONCRETE_INDEX) {
+            throw new IllegalArgumentException("index [" + indexName + "] not found");
+        }
+        return (IndexAbstraction.Index) index;
+    }
+
+    public static final class ModifyDataStreamRequest extends ClusterStateUpdateRequest<ModifyDataStreamRequest> {
+
+        private final List<DataStreamAction> actions;
+
+        public ModifyDataStreamRequest(List<DataStreamAction> actions) {
+            this.actions = Collections.unmodifiableList(actions);
+        }
+
+        public List<DataStreamAction> actions() {
+            return actions;
+        }
+    }
+
+}

+ 11 - 8
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

@@ -100,7 +100,6 @@ public class MetadataMigrateToDataStreamService {
                             }
                         },
                         request,
-                        threadContext,
                         metadataCreateIndexService);
                     writeIndexRef.set(clusterState.metadata().dataStreams().get(request.aliasName).getWriteIndex().getName());
                     return clusterState;
@@ -111,7 +110,6 @@ public class MetadataMigrateToDataStreamService {
     static ClusterState migrateToDataStream(ClusterState currentState,
                                             Function<IndexMetadata, MapperService> mapperSupplier,
                                             MigrateToDataStreamClusterStateUpdateRequest request,
-                                            ThreadContext threadContext,
                                             MetadataCreateIndexService metadataCreateIndexService) throws Exception {
         validateRequest(currentState, request);
         IndexAbstraction.Alias alias = (IndexAbstraction.Alias) currentState.metadata().getIndicesLookup().get(request.aliasName);
@@ -119,7 +117,7 @@ public class MetadataMigrateToDataStreamService {
         validateBackingIndices(currentState, request.aliasName);
         Metadata.Builder mb = Metadata.builder(currentState.metadata());
         for (IndexMetadata im : alias.getIndices()) {
-            prepareBackingIndex(mb, im, request.aliasName, mapperSupplier);
+            prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true);
         }
         currentState = ClusterState.builder(currentState).metadata(mb).build();
 
@@ -153,12 +151,13 @@ public class MetadataMigrateToDataStreamService {
         }
     }
 
-    private static void prepareBackingIndex(
+    // hides the index, optionally removes the alias, and adds data stream timestamp field mapper
+    static void prepareBackingIndex(
         Metadata.Builder b,
         IndexMetadata im,
         String dataStreamName,
-        Function<IndexMetadata, MapperService> mapperSupplier) throws IOException {
-        // hides the index, removes the original alias, and adds data stream timestamp field mapper
+        Function<IndexMetadata, MapperService> mapperSupplier,
+        boolean removeAlias) throws IOException {
         MappingMetadata mm = im.mapping();
         if (mm == null) {
             throw new IllegalArgumentException("backing index [" + im.getIndex().getName() + "] must have mappings for a timestamp field");
@@ -170,8 +169,12 @@ public class MetadataMigrateToDataStreamService {
             MapperService.MergeReason.MAPPING_UPDATE);
         DocumentMapper mapper = mapperService.documentMapper();
 
-        b.put(IndexMetadata.builder(im)
-            .removeAlias(dataStreamName)
+        var imb = IndexMetadata.builder(im);
+        if (removeAlias) {
+            imb.removeAlias(dataStreamName);
+        }
+
+        b.put(imb
             .settings(Settings.builder().put(im.getSettings()).put("index.hidden", "true").build())
             .settingsVersion(im.getSettingsVersion() + 1)
             .mappingVersion(im.getMappingVersion() + 1)

+ 361 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -0,0 +1,361 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.MapperServiceTestCase;
+import org.elasticsearch.plugins.Plugin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
+
+    public void testAddBackingIndex() {
+        final long epochMillis = System.currentTimeMillis();
+        final int numBackingIndices = randomIntBetween(1, 4);
+        final String dataStreamName = randomAlphaOfLength(5);
+        IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices];
+        Metadata.Builder mb = Metadata.builder();
+        for (int k = 0; k < numBackingIndices; k++) {
+            backingIndices[k] =
+                IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis))
+                    .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putMapping(generateMapping("@timestamp"))
+                    .build();
+            mb.put(backingIndices[k], false);
+        }
+
+        mb.put(new DataStream(
+            dataStreamName,
+            createTimestampField("@timestamp"),
+            Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList())
+            )
+        );
+
+        final IndexMetadata indexToAdd = IndexMetadata.builder(randomAlphaOfLength(5))
+            .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+            .numberOfShards(1)
+            .numberOfReplicas(0)
+            .putMapping(generateMapping("@timestamp"))
+            .build();
+        mb.put(indexToAdd, false);
+
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+        ClusterState newState = MetadataDataStreamsService.modifyDataStream(
+            originalState,
+            List.of(DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName())),
+            this::getMapperService
+        );
+
+        IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
+        assertThat(ds, notNullValue());
+        assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
+        assertThat(ds.getIndices().size(), equalTo(numBackingIndices + 1));
+        List<String> backingIndexNames = ds.getIndices()
+            .stream()
+            .filter(x -> x.getIndex().getName().startsWith(".ds-"))
+            .map(x -> x.getIndex().getName())
+            .collect(Collectors.toList());
+        assertThat(backingIndexNames, containsInAnyOrder(
+            Arrays.stream(backingIndices)
+                .map(IndexMetadata::getIndex)
+                .map(Index::getName)
+                .collect(Collectors.toList())
+                .toArray(Strings.EMPTY_ARRAY)
+            )
+        );
+        IndexMetadata zeroIndex = ds.getIndices().get(0);
+        assertThat(zeroIndex.getIndex(), equalTo(indexToAdd.getIndex()));
+        assertThat(zeroIndex.getSettings().get("index.hidden"), equalTo("true"));
+        assertThat(zeroIndex.getAliases().size(), equalTo(0));
+    }
+
+    public void testRemoveBackingIndex() {
+        final long epochMillis = System.currentTimeMillis();
+        final int numBackingIndices = randomIntBetween(2, 4);
+        final String dataStreamName = randomAlphaOfLength(5);
+        IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices];
+        Metadata.Builder mb = Metadata.builder();
+        for (int k = 0; k < numBackingIndices; k++) {
+            backingIndices[k] =
+                IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis))
+                    .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putMapping(generateMapping("@timestamp"))
+                    .build();
+            mb.put(backingIndices[k], false);
+        }
+
+        mb.put(new DataStream(
+                dataStreamName,
+                createTimestampField("@timestamp"),
+                Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList())
+            )
+        );
+
+        final IndexMetadata indexToRemove = backingIndices[randomIntBetween(0, numBackingIndices - 2)];
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+        ClusterState newState = MetadataDataStreamsService.modifyDataStream(
+            originalState,
+            List.of(DataStreamAction.removeBackingIndex(dataStreamName, indexToRemove.getIndex().getName())),
+            this::getMapperService
+        );
+
+        IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
+        assertThat(ds, notNullValue());
+        assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
+        assertThat(ds.getIndices().size(), equalTo(numBackingIndices - 1));
+
+        List<Index> expectedBackingIndices = ds.getIndices()
+            .stream()
+            .map(IndexMetadata::getIndex)
+            .filter(x -> x.getName().equals(indexToRemove.getIndex().getName()) == false)
+            .collect(Collectors.toList());
+        assertThat(expectedBackingIndices, containsInAnyOrder(ds.getIndices().stream().map(IndexMetadata::getIndex).toArray()));
+
+        IndexMetadata removedIndex = newState.metadata().getIndices().get(indexToRemove.getIndex().getName());
+        assertThat(removedIndex, notNullValue());
+        assertThat(removedIndex.getSettings().get("index.hidden"), equalTo("false"));
+        assertNull(newState.metadata().getIndicesLookup().get(indexToRemove.getIndex().getName()).getParentDataStream());
+    }
+
+    public void testAddRemoveAddRoundtripInSingleRequest() {
+        final long epochMillis = System.currentTimeMillis();
+        final int numBackingIndices = randomIntBetween(1, 4);
+        final String dataStreamName = randomAlphaOfLength(5);
+        IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices];
+        Metadata.Builder mb = Metadata.builder();
+        for (int k = 0; k < numBackingIndices; k++) {
+            backingIndices[k] =
+                IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis))
+                    .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putMapping(generateMapping("@timestamp"))
+                    .build();
+            mb.put(backingIndices[k], false);
+        }
+
+        mb.put(new DataStream(
+                dataStreamName,
+                createTimestampField("@timestamp"),
+                Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList())
+            )
+        );
+
+        final IndexMetadata indexToAdd = IndexMetadata.builder(randomAlphaOfLength(5))
+            .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+            .numberOfShards(1)
+            .numberOfReplicas(0)
+            .putMapping(generateMapping("@timestamp"))
+            .build();
+        mb.put(indexToAdd, false);
+
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+        ClusterState newState = MetadataDataStreamsService.modifyDataStream(
+            originalState,
+            List.of(
+                DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName()),
+                DataStreamAction.removeBackingIndex(dataStreamName, indexToAdd.getIndex().getName()),
+                DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName())
+            ),
+            this::getMapperService
+        );
+
+        IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
+        assertThat(ds, notNullValue());
+        assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
+        assertThat(ds.getIndices().size(), equalTo(numBackingIndices + 1));
+        List<String> backingIndexNames = ds.getIndices()
+            .stream()
+            .filter(x -> x.getIndex().getName().startsWith(".ds-"))
+            .map(x -> x.getIndex().getName())
+            .collect(Collectors.toList());
+        assertThat(backingIndexNames, containsInAnyOrder(
+                Arrays.stream(backingIndices)
+                    .map(IndexMetadata::getIndex)
+                    .map(Index::getName)
+                    .collect(Collectors.toList())
+                    .toArray(Strings.EMPTY_ARRAY)
+            )
+        );
+        IndexMetadata zeroIndex = ds.getIndices().get(0);
+        assertThat(zeroIndex.getIndex(), equalTo(indexToAdd.getIndex()));
+        assertThat(zeroIndex.getSettings().get("index.hidden"), equalTo("true"));
+        assertThat(zeroIndex.getAliases().size(), equalTo(0));
+    }
+
+    public void testAddRemoveAddRoundtripInSeparateRequests() {
+        final long epochMillis = System.currentTimeMillis();
+        final int numBackingIndices = randomIntBetween(1, 4);
+        final String dataStreamName = randomAlphaOfLength(5);
+        IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices];
+        Metadata.Builder mb = Metadata.builder();
+        for (int k = 0; k < numBackingIndices; k++) {
+            backingIndices[k] =
+                IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis))
+                    .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putMapping(generateMapping("@timestamp"))
+                    .build();
+            mb.put(backingIndices[k], false);
+        }
+
+        mb.put(new DataStream(
+                dataStreamName,
+                createTimestampField("@timestamp"),
+                Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList())
+            )
+        );
+
+        final IndexMetadata indexToAdd = IndexMetadata.builder(randomAlphaOfLength(5))
+            .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+            .numberOfShards(1)
+            .numberOfReplicas(0)
+            .putMapping(generateMapping("@timestamp"))
+            .build();
+        mb.put(indexToAdd, false);
+
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+        ClusterState newState = MetadataDataStreamsService.modifyDataStream(
+            originalState,
+            List.of(DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName())),
+            this::getMapperService
+        );
+        newState = MetadataDataStreamsService.modifyDataStream(
+            newState,
+            List.of(DataStreamAction.removeBackingIndex(dataStreamName, indexToAdd.getIndex().getName())),
+            this::getMapperService
+        );
+        newState = MetadataDataStreamsService.modifyDataStream(
+            newState,
+            List.of(DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName())),
+            this::getMapperService
+        );
+
+        IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
+        assertThat(ds, notNullValue());
+        assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
+        assertThat(ds.getIndices().size(), equalTo(numBackingIndices + 1));
+        List<String> backingIndexNames = ds.getIndices()
+            .stream()
+            .filter(x -> x.getIndex().getName().startsWith(".ds-"))
+            .map(x -> x.getIndex().getName())
+            .collect(Collectors.toList());
+        assertThat(backingIndexNames, containsInAnyOrder(
+                Arrays.stream(backingIndices)
+                    .map(IndexMetadata::getIndex)
+                    .map(Index::getName)
+                    .collect(Collectors.toList())
+                    .toArray(Strings.EMPTY_ARRAY)
+            )
+        );
+        IndexMetadata zeroIndex = ds.getIndices().get(0);
+        assertThat(zeroIndex.getIndex(), equalTo(indexToAdd.getIndex()));
+        assertThat(zeroIndex.getSettings().get("index.hidden"), equalTo("true"));
+        assertThat(zeroIndex.getAliases().size(), equalTo(0));
+    }
+
+    public void testMissingDataStream() {
+        Metadata.Builder mb = Metadata.builder();
+        final IndexMetadata indexToAdd = IndexMetadata.builder(randomAlphaOfLength(5))
+            .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+            .numberOfShards(1)
+            .numberOfReplicas(0)
+            .putMapping(generateMapping("@timestamp"))
+            .build();
+        mb.put(indexToAdd, false);
+        final String missingDataStream = randomAlphaOfLength(5);
+
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> MetadataDataStreamsService.modifyDataStream(
+                originalState,
+                List.of(DataStreamAction.addBackingIndex(missingDataStream, indexToAdd.getIndex().getName())),
+                this::getMapperService
+            )
+        );
+
+        assertThat(e.getMessage(), equalTo("data stream [" + missingDataStream + "] not found"));
+    }
+
+    public void testMissingIndex() {
+        final long epochMillis = System.currentTimeMillis();
+        final int numBackingIndices = randomIntBetween(1, 4);
+        final String dataStreamName = randomAlphaOfLength(5);
+        IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices];
+        Metadata.Builder mb = Metadata.builder();
+        for (int k = 0; k < numBackingIndices; k++) {
+            backingIndices[k] =
+                IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis))
+                    .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putMapping(generateMapping("@timestamp"))
+                    .build();
+            mb.put(backingIndices[k], false);
+        }
+
+        mb.put(new DataStream(
+                dataStreamName,
+                createTimestampField("@timestamp"),
+                Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList())
+            )
+        );
+
+        final String missingIndex = randomAlphaOfLength(5);
+        ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build();
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> MetadataDataStreamsService.modifyDataStream(
+                originalState,
+                List.of(DataStreamAction.addBackingIndex(dataStreamName, missingIndex)),
+                this::getMapperService
+            )
+        );
+
+        assertThat(e.getMessage(), equalTo("index [" + missingIndex + "] not found"));
+    }
+
+    private MapperService getMapperService(IndexMetadata im) {
+        try {
+            String mapping = im.mapping().source().toString();
+            return createMapperService(mapping);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    protected Collection<? extends Plugin> getPlugins() {
+        return List.of(new MetadataIndexTemplateServiceTests.DummyPlugin());
+    }
+}

+ 0 - 4
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java

@@ -14,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperServiceTestCase;
 import org.elasticsearch.indices.EmptySystemIndices;
@@ -212,7 +211,6 @@ public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCa
             new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName,
                 TimeValue.ZERO,
                 TimeValue.ZERO),
-            new ThreadContext(Settings.EMPTY),
             getMetadataCreateIndexService()
         );
         IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
@@ -257,7 +255,6 @@ public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCa
             new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName,
                 TimeValue.ZERO,
                 TimeValue.ZERO),
-            new ThreadContext(Settings.EMPTY),
             getMetadataCreateIndexService()
         );
         IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName);
@@ -309,7 +306,6 @@ public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCa
                     new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName,
                         TimeValue.ZERO,
                         TimeValue.ZERO),
-                    new ThreadContext(Settings.EMPTY),
                     getMetadataCreateIndexService()));
         assertThat(e.getMessage(), containsString("alias [" + dataStreamName + "] must specify a write index"));
     }

+ 8 - 6
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -95,13 +95,15 @@ public final class DataStreamTestHelper {
     }
 
     public static String generateMapping(String timestampFieldName) {
-        return "{\n" +
-            "      \"properties\": {\n" +
-            "        \"" + timestampFieldName + "\": {\n" +
-            "          \"type\": \"date\"\n" +
-            "        }\n" +
+        return "{" +
+            "     \"_doc\":{\n" +
+            "        \"properties\": {\n" +
+            "          \"" + timestampFieldName + "\": {\n" +
+            "            \"type\": \"date\"\n" +
+            "          }\n" +
             "      }\n" +
-            "    }";
+            "    }" +
+            "}";
     }
 
     public static String generateMapping(String timestampFieldName, String type) {