Browse Source

Add support for is_write_index flag to data stream aliases. (#73462)

This allows indexing documents into a data stream alias.
The ingestion is that forwarded to the write index of the data stream
that is marked as write data stream.
The `is_write_index` parameter can be used to indicate what the write data stream is,
when updating / adding a data steam alias.

Relates to #66163
Martijn van Groningen 4 years ago
parent
commit
afc17bdb74
18 changed files with 604 additions and 110 deletions
  1. 3 3
      docs/reference/alias.asciidoc
  2. 1 4
      server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java
  3. 9 3
      server/src/main/java/org/elasticsearch/cluster/metadata/AliasAction.java
  4. 19 6
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAlias.java
  5. 5 2
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java
  6. 29 7
      server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
  7. 3 0
      server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetAliasesAction.java
  8. 17 3
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  9. 5 1
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  10. 8 8
      server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java
  11. 11 4
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java
  12. 37 2
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java
  13. 134 15
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java
  14. 47 1
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  15. 47 16
      x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java
  16. 61 0
      x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/DataStreamsRestIT.java
  17. 1 13
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  18. 167 22
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

+ 3 - 3
docs/reference/alias.asciidoc

@@ -210,7 +210,8 @@ GET _alias/logs
 === Write index
 
 If an alias points to multiple indices, you can use `is_write_index` to specify
-a write index. {es} routes any write requests for the alias to this index.
+a write index or data stream. {es} routes any write requests for the alias to this
+index or data stream.
 
 [source,console]
 ----
@@ -242,8 +243,7 @@ with a write index instead. See
 
 If an alias points to multiple indices with no write index, the alias rejects
 write requests. If an alias points to one index and `is_write_index` is not set,
-the index automatically acts as the write index. Data stream aliases do not
-support `is_write_index`.
+the index automatically acts as the write index.
 
 [discrete]
 [[filter-alias]]

+ 1 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java

@@ -107,9 +107,6 @@ public class TransportIndicesAliasesAction extends AcknowledgedTransportMasterNo
                 if (action.searchRouting() != null) {
                     throw new IllegalArgumentException("aliases that point to data streams don't support search_routing");
                 }
-                if (action.writeIndex() != null) {
-                    throw new IllegalArgumentException("aliases that point to data streams don't support is_write_index");
-                }
                 if (action.isHidden() != null) {
                     throw new IllegalArgumentException("aliases that point to data streams don't support is_hidden");
                 }
@@ -129,7 +126,7 @@ public class TransportIndicesAliasesAction extends AcknowledgedTransportMasterNo
                 switch (action.actionType()) {
                     case ADD:
                         for (String dataStreamName : concreteDataStreams) {
-                            finalActions.add(new AliasAction.AddDataStreamAlias(action.aliases()[0], dataStreamName));
+                            finalActions.add(new AliasAction.AddDataStreamAlias(action.aliases()[0], dataStreamName, action.writeIndex()));
                         }
                         break;
                     case REMOVE:

+ 9 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/AliasAction.java

@@ -203,11 +203,13 @@ public abstract class AliasAction {
 
         private final String aliasName;
         private final String dataStreamName;
+        private final Boolean isWriteDataStream;
 
-        public AddDataStreamAlias(String aliasName, String dataStreamName) {
+        public AddDataStreamAlias(String aliasName, String dataStreamName, Boolean isWriteDataStream) {
             super(dataStreamName);
             this.aliasName = aliasName;
             this.dataStreamName = dataStreamName;
+            this.isWriteDataStream = isWriteDataStream;
         }
 
         public String getAliasName() {
@@ -218,6 +220,10 @@ public abstract class AliasAction {
             return dataStreamName;
         }
 
+        public Boolean getWriteDataStream() {
+            return isWriteDataStream;
+        }
+
         @Override
         boolean removeIndex() {
             return false;
@@ -225,8 +231,8 @@ public abstract class AliasAction {
 
         @Override
         boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
-            aliasValidator.validate(aliasName, null, null, null);
-            return metadata.put(aliasName, dataStreamName);
+            aliasValidator.validate(aliasName, null, null, isWriteDataStream);
+            return metadata.put(aliasName, dataStreamName, isWriteDataStream);
         }
     }
 

+ 19 - 6
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAlias.java

@@ -25,29 +25,34 @@ import java.util.Objects;
 public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implements ToXContentObject {
 
     public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
+    public static final ParseField WRITE_DATA_STREAM_FIELD = new ParseField("write_data_stream");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStreamAlias, String> PARSER = new ConstructingObjectParser<>(
         "data_stream_alias",
         false,
-        (args, name) -> new DataStreamAlias(name, (List<String>) args[0])
+        (args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1])
     );
 
     static {
         PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), DATA_STREAMS_FIELD);
+        PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), WRITE_DATA_STREAM_FIELD);
     }
 
     private final String name;
     private final List<String> dataStreams;
+    private final String writeDataStream;
 
-    public DataStreamAlias(String name, List<String> dataStreams) {
+    public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream) {
         this.name = Objects.requireNonNull(name);
         this.dataStreams = List.copyOf(dataStreams);
+        this.writeDataStream = writeDataStream;
     }
 
     public DataStreamAlias(StreamInput in) throws IOException {
         this.name = in.readString();
         this.dataStreams = in.readStringList();
+        this.writeDataStream = in.readOptionalString();
     }
 
     public String getName() {
@@ -58,6 +63,10 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         return dataStreams;
     }
 
+    public String getWriteDataStream() {
+        return writeDataStream;
+    }
+
     public static Diff<DataStreamAlias> readDiffFrom(StreamInput in) throws IOException {
         return readDiffFrom(DataStreamAlias::new, in);
     }
@@ -68,14 +77,16 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
             throw new ParsingException(parser.getTokenLocation(), "unexpected token");
         }
         String name = parser.currentName();
-        DataStreamAlias alias = PARSER.parse(parser, name);
-        return alias;
+        return PARSER.parse(parser, name);
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject(name);
         builder.field(DATA_STREAMS_FIELD.getPreferredName(), dataStreams);
+        if (writeDataStream != null) {
+            builder.field(WRITE_DATA_STREAM_FIELD.getPreferredName(), writeDataStream);
+        }
         builder.endObject();
         return builder;
     }
@@ -84,6 +95,7 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(name);
         out.writeStringCollection(dataStreams);
+        out.writeOptionalString(writeDataStream);
     }
 
     @Override
@@ -92,11 +104,12 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         if (o == null || getClass() != o.getClass()) return false;
         DataStreamAlias that = (DataStreamAlias) o;
         return Objects.equals(name, that.name) &&
-            Objects.equals(dataStreams, that.dataStreams);
+            Objects.equals(dataStreams, that.dataStreams) &&
+            Objects.equals(writeDataStream, that.writeDataStream);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, dataStreams);
+        return Objects.hash(name, dataStreams, writeDataStream);
     }
 }

+ 5 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

@@ -336,11 +336,14 @@ public interface IndexAbstraction {
 
         private final org.elasticsearch.cluster.metadata.DataStreamAlias dataStreamAlias;
         private final List<IndexMetadata> indicesOfAllDataStreams;
+        private final IndexMetadata writeIndexOfWriteDataStream;
 
         public DataStreamAlias(org.elasticsearch.cluster.metadata.DataStreamAlias dataStreamAlias,
-                               List<IndexMetadata> indicesOfAllDataStreams) {
+                               List<IndexMetadata> indicesOfAllDataStreams,
+                               IndexMetadata writeIndexOfWriteDataStream) {
             this.dataStreamAlias = dataStreamAlias;
             this.indicesOfAllDataStreams = indicesOfAllDataStreams;
+            this.writeIndexOfWriteDataStream = writeIndexOfWriteDataStream;
         }
 
         @Override
@@ -360,7 +363,7 @@ public interface IndexAbstraction {
 
         @Override
         public IndexMetadata getWriteIndex() {
-            return null;
+            return writeIndexOfWriteDataStream;
         }
 
         @Override

+ 29 - 7
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1177,7 +1177,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
             return this;
         }
 
-        public boolean put(String aliasName, String dataStream) {
+        public boolean put(String aliasName, String dataStream, Boolean isWriteDataStream) {
             Map<String, DataStream> existingDataStream =
                 Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
                     .map(dsmd -> new HashMap<>(dsmd.dataStreams()))
@@ -1193,14 +1193,23 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
 
             DataStreamAlias alias = dataStreamAliases.get(aliasName);
             if (alias == null) {
-                alias = new DataStreamAlias(aliasName, List.of(dataStream));
+                String writeDataStream = isWriteDataStream != null && isWriteDataStream ? dataStream : null;
+                alias = new DataStreamAlias(aliasName, List.of(dataStream), writeDataStream);
             } else {
                 Set<String> dataStreams = new HashSet<>(alias.getDataStreams());
+                String writeDataStream = alias.getWriteDataStream();
+                if (isWriteDataStream == null || isWriteDataStream == false) {
+                    if (dataStream.equals(writeDataStream)) {
+                        writeDataStream = null;
+                    }
+                } else if (isWriteDataStream) {
+                    writeDataStream = dataStream;
+                }
                 boolean added = dataStreams.add(dataStream);
-                if (added == false) {
+                if (added == false && Objects.equals(alias.getWriteDataStream(), writeDataStream)) {
                     return false;
                 }
-                alias = new DataStreamAlias(aliasName, List.copyOf(dataStreams));
+                alias = new DataStreamAlias(aliasName, List.copyOf(dataStreams), writeDataStream);
             }
             dataStreamAliases.put(aliasName, alias);
 
@@ -1229,7 +1238,11 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                     if (dataStreams.isEmpty()) {
                         aliasesToDelete.add(alias.getName());
                     } else {
-                        aliasesToUpdate.add(new DataStreamAlias(alias.getName(), List.copyOf(dataStreams)));
+                        String writeDataStream = alias.getWriteDataStream();
+                        if (dataStreams.contains(writeDataStream) == false) {
+                            writeDataStream = null;
+                        }
+                        aliasesToUpdate.add(new DataStreamAlias(alias.getName(), List.copyOf(dataStreams), writeDataStream));
                     }
                 }
             }
@@ -1261,8 +1274,12 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
             if (dataStreams.isEmpty()) {
                 dataStreamAliases.remove(aliasName);
             } else {
+                String writeDataStream = existing.getWriteDataStream();
+                if (dataStreamName.equals(writeDataStream)) {
+                    writeDataStream = null;
+                }
                 dataStreamAliases.put(aliasName,
-                    new DataStreamAlias(existing.getName(), List.copyOf(dataStreams)));
+                    new DataStreamAlias(existing.getName(), List.copyOf(dataStreams), writeDataStream));
             }
 
             Map<String, DataStream> existingDataStream =
@@ -1521,8 +1538,13 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                         .flatMap(ds -> ds.getIndices().stream())
                         .map(index -> indices.get(index.getName()))
                         .collect(Collectors.toList());
+                    IndexMetadata writeIndexOfWriteDataStream = null;
+                    if (alias.getWriteDataStream() != null) {
+                        DataStream writeDataStream = dataStreamMetadata.dataStreams().get(alias.getWriteDataStream());
+                        writeIndexOfWriteDataStream = indices.get(writeDataStream.getWriteIndex().getName());
+                    }
                     IndexAbstraction existing = indicesLookup.put(alias.getName(),
-                        new IndexAbstraction.DataStreamAlias(alias, allIndicesOfAllDataStreams));
+                        new IndexAbstraction.DataStreamAlias(alias, allIndicesOfAllDataStreams, writeIndexOfWriteDataStream));
                     assert existing == null : "duplicate data stream alias for " + alias.getName();
                 }
             }

+ 3 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetAliasesAction.java

@@ -160,6 +160,9 @@ public class RestGetAliasesAction extends BaseRestHandler {
                     {
                         for (DataStreamAlias alias : entry.getValue()) {
                             builder.startObject(alias.getName());
+                            if (entry.getKey().equals(alias.getWriteDataStream())) {
+                                builder.field("is_write_data_stream", true);
+                            }
                             builder.endObject();
                         }
                     }

+ 17 - 3
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -468,8 +468,13 @@ public class RestoreService implements ClusterStateApplier {
                 List<String> intersectingDataStreams = alias.getDataStreams().stream()
                     .filter(requestedDataStreams::contains)
                     .collect(Collectors.toList());
+                String writeDateStream = alias.getWriteDataStream();
+                if (intersectingDataStreams.contains(writeDateStream) == false) {
+                    writeDateStream = null;
+                }
                 if (intersectingDataStreams.isEmpty() == false) {
-                    dataStreamAliases.put(alias.getName(), new DataStreamAlias(alias.getName(), intersectingDataStreams));
+                    DataStreamAlias copy = new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDateStream);
+                    dataStreamAliases.put(alias.getName(), copy);
                 }
             }
         }
@@ -1193,7 +1198,7 @@ public class RestoreService implements ClusterStateApplier {
                         List<String> renamedDataStreams = alias.getDataStreams().stream()
                             .map(s -> s.replaceAll(request.renamePattern(), request.renameReplacement()))
                             .collect(Collectors.toList());
-                        return new DataStreamAlias(alias.getName(), renamedDataStreams);
+                        return new DataStreamAlias(alias.getName(), renamedDataStreams, alias.getWriteDataStream());
                     } else {
                         return alias;
                     }
@@ -1203,7 +1208,16 @@ public class RestoreService implements ClusterStateApplier {
                         // Merge data stream alias from snapshot with an existing data stream aliases in target cluster:
                         Set<String> mergedDataStreams = new HashSet<>(current.getDataStreams());
                         mergedDataStreams.addAll(alias.getDataStreams());
-                        DataStreamAlias newInstance = new DataStreamAlias(alias.getName(), List.copyOf(mergedDataStreams));
+
+                        String writeDataStream = alias.getWriteDataStream();
+                        if (writeDataStream == null) {
+                            if (current.getWriteDataStream() != null && mergedDataStreams.contains(current.getWriteDataStream())) {
+                                writeDataStream = current.getWriteDataStream();
+                            }
+                        }
+
+                        DataStreamAlias newInstance =
+                            new DataStreamAlias(alias.getName(), List.copyOf(mergedDataStreams), writeDataStream);
                         updatedDataStreamAliases.put(alias.getName(), newInstance);
                     }
                 });

+ 5 - 1
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -2430,7 +2430,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 List<String> intersectingDataStreams = alias.getDataStreams().stream()
                     .filter(dataStreams::containsKey)
                     .collect(Collectors.toList());
-                return new DataStreamAlias(alias.getName(), intersectingDataStreams);
+                String writeDataStream = alias.getWriteDataStream();
+                if (intersectingDataStreams.contains(writeDataStream) == false) {
+                    writeDataStream = null;
+                }
+                return new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDataStream);
             }).collect(Collectors.toMap(DataStreamAlias::getName, Function.identity()));
     }
 

+ 8 - 8
server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java

@@ -195,30 +195,30 @@ public class TransportGetAliasesActionTests extends ESTestCase {
         var tuples = List.of(new Tuple<>("logs-foo", 1), new Tuple<>("logs-bar", 1), new Tuple<>("logs-baz", 1));
         var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(tuples, List.of());
         var builder = Metadata.builder(clusterState.metadata());
-        builder.put("logs", "logs-foo");
-        builder.put("logs", "logs-bar");
-        builder.put("secret", "logs-bar");
+        builder.put("logs", "logs-foo", null);
+        builder.put("logs", "logs-bar", null);
+        builder.put("secret", "logs-bar", null);
         clusterState = ClusterState.builder(clusterState).metadata(builder).build();
 
         // return all all data streams with aliases
         var getAliasesRequest = new GetAliasesRequest();
         var result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-foo", "logs-bar"));
-        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"))));
-        assertThat(result.get("logs-bar"), containsInAnyOrder(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo")),
-            new DataStreamAlias("secret", List.of("logs-bar"))));
+        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null)));
+        assertThat(result.get("logs-bar"), containsInAnyOrder(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null),
+            new DataStreamAlias("secret", List.of("logs-bar"), null)));
 
         // filter by alias name
         getAliasesRequest = new GetAliasesRequest("secret");
         result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-bar"));
-        assertThat(result.get("logs-bar"), contains(new DataStreamAlias("secret", List.of("logs-bar"))));
+        assertThat(result.get("logs-bar"), contains(new DataStreamAlias("secret", List.of("logs-bar"), null)));
 
         // filter by data stream:
         getAliasesRequest = new GetAliasesRequest().indices("logs-foo");
         result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-foo"));
-        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"))));
+        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null)));
     }
 
     public ClusterState systemIndexTestClusterState() {

+ 11 - 4
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -51,6 +51,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;
@@ -2131,10 +2132,10 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
             .put(new DataStream(dataStream1, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())))
             .put(new DataStream(dataStream2, createTimestampField("@timestamp"), List.of(index3.getIndex(), index4.getIndex())))
             .put(new DataStream(dataStream3, createTimestampField("@timestamp"), List.of(index5.getIndex(), index6.getIndex())));
-        mdBuilder.put(dataStreamAlias1, dataStream1);
-        mdBuilder.put(dataStreamAlias1, dataStream2);
-        mdBuilder.put(dataStreamAlias2, dataStream2);
-        mdBuilder.put(dataStreamAlias3, dataStream3);
+        mdBuilder.put(dataStreamAlias1, dataStream1, null);
+        mdBuilder.put(dataStreamAlias1, dataStream2, true);
+        mdBuilder.put(dataStreamAlias2, dataStream2, null);
+        mdBuilder.put(dataStreamAlias3, dataStream3, null);
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {
@@ -2181,6 +2182,12 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
             Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, false, "my-alias*");
             assertThat(result, arrayWithSize(0));
         }
+        {
+            IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN;
+            Index result = indexNameExpressionResolver.concreteWriteIndex(state, indicesOptions, dataStreamAlias1, false, true);
+            assertThat(result, notNullValue());
+            assertThat(result.getName(), backingIndexEqualTo(dataStream2, 2));
+        }
     }
 
     public void testDataStreamsWithWildcardExpression() {

+ 37 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java

@@ -507,8 +507,8 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
             new Tuple<>("logs-foobar", 1), new Tuple<>("metrics-foobar", 1)), List.of());
 
         ClusterState result = service.applyAliasActions(state, List.of(
-            new AliasAction.AddDataStreamAlias("foobar", "logs-foobar"),
-            new AliasAction.AddDataStreamAlias("foobar", "metrics-foobar")
+            new AliasAction.AddDataStreamAlias("foobar", "logs-foobar", null),
+            new AliasAction.AddDataStreamAlias("foobar", "metrics-foobar", null)
         ));
         assertThat(result.metadata().dataStreamAliases().get("foobar"), notNullValue());
         assertThat(result.metadata().dataStreamAliases().get("foobar").getDataStreams(),
@@ -526,6 +526,41 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
         assertThat(result.metadata().dataStreamAliases().get("foobar"), nullValue());
     }
 
+    public void testDataStreamAliasesWithWriteFlag() {
+        ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(
+            new Tuple<>("logs-http-emea", 1), new Tuple<>("logs-http-nasa", 1)), List.of());
+
+        ClusterState result = service.applyAliasActions(state, List.of(
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", true),
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", null)
+        ));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http"), notNullValue());
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getDataStreams(),
+            containsInAnyOrder("logs-http-nasa", "logs-http-emea"));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getWriteDataStream(), equalTo("logs-http-emea"));
+
+        result = service.applyAliasActions(state, List.of(
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", false),
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", true)
+        ));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http"), notNullValue());
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getDataStreams(),
+            containsInAnyOrder("logs-http-nasa", "logs-http-emea"));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getWriteDataStream(), equalTo("logs-http-nasa"));
+
+        result = service.applyAliasActions(result, List.of(
+            new AliasAction.RemoveDataStreamAlias("logs-http", "logs-http-emea", null)
+        ));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http"), notNullValue());
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getDataStreams(), contains("logs-http-nasa"));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http").getWriteDataStream(), equalTo("logs-http-nasa"));
+
+        result = service.applyAliasActions(result, List.of(
+            new AliasAction.RemoveDataStreamAlias("logs-http", "logs-http-nasa", null)
+        ));
+        assertThat(result.metadata().dataStreamAliases().get("logs-http"), nullValue());
+    }
+
     private ClusterState applyHiddenAliasMix(ClusterState before, Boolean isHidden1, Boolean isHidden2) {
         return service.applyAliasActions(before, Arrays.asList(
             new AliasAction.Add("test", "alias", null, null, null, null, isHidden1),

+ 134 - 15
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

@@ -50,6 +50,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBack
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.Metadata.Builder.validateDataStreams;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -1235,12 +1236,12 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
 
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-eu"), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-eu", null), is(true));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-us"), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-us", null), is(true));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au"), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au"), is(false));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null), is(false));
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1251,11 +1252,11 @@ public class MetadataTests extends ESTestCase {
     public void testDataStreamReferToNonExistingDataStream() {
         Metadata.Builder mdBuilder = Metadata.builder();
 
-        Exception e = expectThrows(IllegalArgumentException.class, () -> mdBuilder.put("logs-postgres", "logs-postgres-eu"));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> mdBuilder.put("logs-postgres", "logs-postgres-eu", null));
         assertThat(e.getMessage(), equalTo("alias [logs-postgres] refers to a non existing data stream [logs-postgres-eu]"));
 
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu");
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-eu"));
@@ -1264,11 +1265,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamShouldUpdateAlias() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu");
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us");
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au");
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1296,11 +1297,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamAlias() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu");
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us");
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au");
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1328,11 +1329,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamAliasMustExists() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu");
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us");
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au");
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1343,6 +1344,124 @@ public class MetadataTests extends ESTestCase {
         assertThat(mdBuilder2.removeDataStreamAlias("logs-mysql", "logs-postgres-us", false), is(false));
     }
 
+    public void testDataStreamWriteAlias() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
+        mdBuilder.put("logs-postgres", "logs-postgres-replicated", null);
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), nullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+
+        mdBuilder = Metadata.builder(metadata);
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true), is(true));
+
+        metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-replicated"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+    }
+
+    public void testDataStreamMultipleWriteAlias() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-foobar"));
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-barbaz"));
+        mdBuilder.put("logs", "logs-foobar", true);
+        mdBuilder.put("logs", "logs-barbaz", true);
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs").getWriteDataStream(), equalTo("logs-barbaz"));
+        assertThat(metadata.dataStreamAliases().get("logs").getDataStreams(), containsInAnyOrder("logs-foobar", "logs-barbaz"));
+    }
+
+    public void testDataStreamWriteAliasUnset() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
+        mdBuilder.put("logs-postgres", "logs-postgres-replicated", true);
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-replicated"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+
+        // Unset write flag
+        mdBuilder = Metadata.builder(metadata);
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", randomBoolean() ? null : false), is(true));
+        metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), nullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+    }
+
+    public void testDataStreamWriteAliasChange() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-primary"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
+            containsInAnyOrder("logs-postgres-primary", "logs-postgres-replicated"));
+
+        // change write flag:
+        mdBuilder = Metadata.builder(metadata);
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", randomBoolean() ? null : false), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true), is(true));
+        metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-replicated"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
+            containsInAnyOrder("logs-postgres-primary", "logs-postgres-replicated"));
+    }
+
+    public void testDataStreamWriteRemoveAlias() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-primary"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
+            containsInAnyOrder("logs-postgres-primary", "logs-postgres-replicated"));
+
+        mdBuilder = Metadata.builder(metadata);
+        assertThat(mdBuilder.removeDataStreamAlias("logs-postgres", "logs-postgres-primary", randomBoolean()), is(true));
+        metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), nullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+    }
+
+    public void testDataStreamWriteRemoveDataStream() {
+        Metadata.Builder mdBuilder = Metadata.builder();
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
+        mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+
+        Metadata metadata = mdBuilder.build();
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-primary"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
+            containsInAnyOrder("logs-postgres-primary", "logs-postgres-replicated"));
+
+        mdBuilder = Metadata.builder(metadata);
+        mdBuilder.removeDataStream("logs-postgres-primary");
+        metadata = mdBuilder.build();
+        assertThat(metadata.dataStreams().keySet(), contains("logs-postgres-replicated"));
+        assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), nullValue());
+        assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
+    }
+
     public static Metadata randomMetadata() {
         return randomMetadata(1);
     }

+ 47 - 1
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -16,6 +16,9 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +34,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUI
 import static org.elasticsearch.test.ESTestCase.generateRandomStringArray;
 import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
 import static org.elasticsearch.test.ESTestCase.randomBoolean;
+import static org.elasticsearch.test.ESTestCase.randomFrom;
 
 public final class DataStreamTestHelper {
 
@@ -147,9 +151,11 @@ public final class DataStreamTestHelper {
     }
 
     public static DataStreamAlias randomAliasInstance() {
+        List<String> dataStreams = List.of(generateRandomStringArray(5, 5, false, false));
         return new DataStreamAlias(
             randomAlphaOfLength(5),
-            List.of(generateRandomStringArray(5, 5, false))
+            dataStreams,
+            randomBoolean() ? randomFrom(dataStreams) : null
         );
     }
 
@@ -213,4 +219,44 @@ public final class DataStreamTestHelper {
         return String.format(Locale.ROOT, "\\.ds-%s-(\\d{4}\\.\\d{2}\\.\\d{2}-)?%06d",dataStreamName, generation);
     }
 
+    public static Matcher<String> backingIndexEqualTo(String dataStreamName, int generation) {
+        return new TypeSafeMatcher<>() {
+
+            @Override
+            protected boolean matchesSafely(String backingIndexName) {
+                if (backingIndexName == null) {
+                    return false;
+                }
+
+                int indexOfLastDash = backingIndexName.lastIndexOf('-');
+                String actualDataStreamName = parseDataStreamName(backingIndexName, indexOfLastDash);
+                int actualGeneration = parseGeneration(backingIndexName, indexOfLastDash);
+                return actualDataStreamName.equals(dataStreamName) && actualGeneration == generation;
+            }
+
+            @Override
+            protected void describeMismatchSafely(String backingIndexName, Description mismatchDescription) {
+                int indexOfLastDash = backingIndexName.lastIndexOf('-');
+                String dataStreamName = parseDataStreamName(backingIndexName, indexOfLastDash);
+                int generation = parseGeneration(backingIndexName, indexOfLastDash);
+                mismatchDescription.appendText(" was data stream name ").appendValue(dataStreamName)
+                    .appendText(" and generation ").appendValue(generation);
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText("expected data stream name ").appendValue(dataStreamName)
+                    .appendText(" and expected generation ").appendValue(generation);
+            }
+
+            private String parseDataStreamName(String backingIndexName, int indexOfLastDash) {
+                return backingIndexName.substring(4, backingIndexName.lastIndexOf('-', indexOfLastDash - 1));
+            }
+
+            private int parseGeneration(String backingIndexName, int indexOfLastDash) {
+                return Integer.parseInt(backingIndexName.substring(indexOfLastDash + 1));
+            }
+        };
+    }
+
 }

+ 47 - 16
x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java

@@ -566,6 +566,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices();
         int initialNumberOfSuccessfulFollowedIndicesInLeaderCluster;
 
+        var aliasName = "logs-http";
         var leaderDataStreamName = "logs-http-eu";
         var followerDataStreamName = "logs-http-na";
 
@@ -617,8 +618,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
             var numDocs = 128;
             // Create data stream in leader cluster and ensure it is followed in follow cluster
             try (var leaderClient = buildLeaderClient()) {
+                // Setting up data stream and alias with write flag in leader cluster:
+                Request createDataStreamRequest = new Request("PUT", "/_data_stream/" + leaderDataStreamName);
+                assertOK(leaderClient.performRequest(createDataStreamRequest));
+                Request updateAliasesRequest = new Request("POST", "/_aliases");
+                updateAliasesRequest.setJsonEntity("{\"actions\":[" +
+                    "{\"add\":{\"index\":\"" + leaderDataStreamName + "\",\"alias\":\"logs-http\",\"is_write_index\":true}}" +
+                    "]}"
+                );
+                assertOK(leaderClient.performRequest(updateAliasesRequest));
+
                 for (int i = 0; i < numDocs; i++) {
-                    Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
+                    Request indexRequest = new Request("POST", "/" + aliasName + "/_doc");
                     indexRequest.addParameter("refresh", "true");
                     indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
                     assertOK(leaderClient.performRequest(indexRequest));
@@ -632,13 +643,35 @@ public class AutoFollowIT extends ESCCRRestTestCase {
                 ensureYellow(leaderDataStreamName);
                 verifyDocuments(client(), leaderDataStreamName, numDocs);
             });
+            // Setting up data stream and alias with write flag in follower cluster:
+            Request createDataStreamRequest = new Request("PUT", "/_data_stream/" + followerDataStreamName);
+            assertOK(client().performRequest(createDataStreamRequest));
+            Request updateAliasesRequest = new Request("POST", "/_aliases");
+            updateAliasesRequest.setJsonEntity("{\"actions\":[" +
+                "{\"add\":{\"index\":\"" + followerDataStreamName + "\",\"alias\":\"logs-http\",\"is_write_index\":true}}" +
+                "]}"
+            );
+            assertOK(client().performRequest(updateAliasesRequest));
+
             for (int i = 0; i < numDocs; i++) {
-                var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
+                var indexRequest = new Request("POST", "/" + aliasName + "/_doc");
                 indexRequest.addParameter("refresh", "true");
                 indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
                 assertOK(client().performRequest(indexRequest));
             }
             verifyDocuments(client(), followerDataStreamName, numDocs);
+
+            // TODO: Don't update logs-http alias in follower cluster when data streams are automatically replicated
+            //  from leader to follower cluster:
+            // (only set the write flag to logs-http-na)
+            // Create alias in follower cluster that point to leader and follower data streams:
+            updateAliasesRequest = new Request("POST", "/_aliases");
+            updateAliasesRequest.setJsonEntity("{\"actions\":[" +
+                "{\"add\":{\"index\":\"" + leaderDataStreamName + "\",\"alias\":\"logs-http\"}}" +
+                "]}"
+            );
+            assertOK(client().performRequest(updateAliasesRequest));
+
             try (var leaderClient = buildLeaderClient()) {
                 assertBusy(() -> {
                     assertThat(getNumberOfSuccessfulFollowedIndices(leaderClient),
@@ -647,17 +680,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
                     ensureYellow(followerDataStreamName);
                     verifyDocuments(leaderClient, followerDataStreamName, numDocs);
                 });
+                updateAliasesRequest = new Request("POST", "/_aliases");
+                updateAliasesRequest.setJsonEntity("{\"actions\":[" +
+                    "{\"add\":{\"index\":\"" + followerDataStreamName + "\",\"alias\":\"logs-http\"}}" +
+                    "]}"
+                );
+                assertOK(leaderClient.performRequest(updateAliasesRequest));
             }
 
-            // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and
-            // writes via 'logs-http' alias (ensuring write goes to write data stream).
-            // Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario.
-            // See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322
-
             // See all eu and na logs in leader and follower cluster:
-            verifyDocuments(client(), "logs-http*", numDocs * 2);
+            verifyDocuments(client(), aliasName, numDocs * 2);
             try (var leaderClient = buildLeaderClient()) {
-                verifyDocuments(leaderClient, "logs-http*", numDocs * 2);
+                verifyDocuments(leaderClient, aliasName, numDocs * 2);
             }
 
             int moreDocs = 48;
@@ -665,7 +699,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
             {
                 try (var leaderClient = buildLeaderClient()) {
                     for (int i = 0; i < moreDocs; i++) {
-                        var indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
+                        var indexRequest = new Request("POST", "/" + aliasName + "/_doc");
                         indexRequest.addParameter("refresh", "true");
                         indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
                         assertOK(leaderClient.performRequest(indexRequest));
@@ -679,7 +713,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
             // Index more docs into follower cluster
             {
                 for (int i = 0; i < moreDocs; i++) {
-                    var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
+                    var indexRequest = new Request("POST", "/" + aliasName + "/_doc");
                     indexRequest.addParameter("refresh", "true");
                     indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
                     assertOK(client().performRequest(indexRequest));
@@ -692,13 +726,10 @@ public class AutoFollowIT extends ESCCRRestTestCase {
                 }
             }
 
-            // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http'
-            // (see previous TODO)
-
             // See all eu and na logs in leader and follower cluster:
-            verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2);
+            verifyDocuments(client(), aliasName, (numDocs + moreDocs) * 2);
             try (RestClient leaderClient = buildLeaderClient()) {
-                verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2);
+                verifyDocuments(leaderClient, aliasName, (numDocs + moreDocs) * 2);
             }
         } finally {
             cleanUpFollower(

+ 61 - 0
x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/DataStreamsRestIT.java

@@ -21,6 +21,7 @@ import java.util.Map;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.startsWith;
 
 public class DataStreamsRestIT extends ESRestTestCase {
 
@@ -236,4 +237,64 @@ public class DataStreamsRestIT extends ESRestTestCase {
         assertEquals(404, getAliasesResponse.get("status"));
     }
 
+    public void testDataStreamWriteAlias() throws IOException {
+        // Create a template
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
+        putComposableIndexTemplateRequest.setJsonEntity("{\"index_patterns\": [\"logs-*\"], \"data_stream\": {}}");
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        Request createDocRequest = new Request("POST", "/logs-emea/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
+        assertOK(client().performRequest(createDocRequest));
+
+        createDocRequest = new Request("POST", "/logs-nasa/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
+        assertOK(client().performRequest(createDocRequest));
+
+        Request updateAliasesRequest = new Request("POST", "/_aliases");
+        updateAliasesRequest.setJsonEntity(
+            "{\"actions\":[{\"add\":{\"index\":\"logs-emea\",\"alias\":\"logs\",\"is_write_index\":true}}," +
+                "{\"add\":{\"index\":\"logs-nasa\",\"alias\":\"logs\"}}]}");
+        assertOK(client().performRequest(updateAliasesRequest));
+
+        Request getAliasesRequest = new Request("GET", "/_aliases");
+        Map<String, Object> getAliasesResponse = entityAsMap(client().performRequest(getAliasesRequest));
+        assertEquals(
+            Map.of("logs", Map.of("is_write_data_stream", true)),
+            XContentMapValues.extractValue("logs-emea.aliases", getAliasesResponse)
+        );
+        assertEquals(Map.of("logs", Map.of()), XContentMapValues.extractValue("logs-nasa.aliases", getAliasesResponse));
+
+        Request searchRequest = new Request("GET", "/logs/_search");
+        Map<String, Object> searchResponse = entityAsMap(client().performRequest(searchRequest));
+        assertEquals(2, XContentMapValues.extractValue("hits.total.value", searchResponse));
+
+        createDocRequest = new Request("POST", "/logs/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
+        Response createDocResponse = client().performRequest(createDocRequest);
+        assertOK(createDocResponse);
+        assertThat((String) entityAsMap(createDocResponse).get("_index"), startsWith(".ds-logs-emea"));
+
+        updateAliasesRequest = new Request("POST", "/_aliases");
+        updateAliasesRequest.setJsonEntity("{\"actions\":[" +
+            "{\"add\":{\"index\":\"logs-emea\",\"alias\":\"logs\",\"is_write_index\":false}}," +
+            "{\"add\":{\"index\":\"logs-nasa\",\"alias\":\"logs\",\"is_write_index\":true}}" +
+            "]}");
+        assertOK(client().performRequest(updateAliasesRequest));
+
+        createDocRequest = new Request("POST", "/logs/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
+        createDocResponse = client().performRequest(createDocRequest);
+        assertOK(createDocResponse);
+        assertThat((String) entityAsMap(createDocResponse).get("_index"), startsWith(".ds-logs-nasa"));
+
+        getAliasesRequest = new Request("GET", "/_aliases");
+        getAliasesResponse = entityAsMap(client().performRequest(getAliasesRequest));
+        assertEquals(Map.of("logs", Map.of()), XContentMapValues.extractValue("logs-emea.aliases", getAliasesResponse));
+        assertEquals(
+            Map.of("logs", Map.of("is_write_data_stream", true)),
+            XContentMapValues.extractValue("logs-nasa.aliases", getAliasesResponse)
+        );
+    }
+
 }

+ 1 - 13
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -701,7 +701,7 @@ public class DataStreamIT extends ESIntegTestCase {
         GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
         assertThat(
             response.getDataStreamAliases(),
-            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("foo", List.of("metrics-foo")))))
+            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("foo", List.of("metrics-foo"), null))))
         );
     }
 
@@ -793,18 +793,6 @@ public class DataStreamIT extends ESIntegTestCase {
             );
             assertThat(e.getMessage(), equalTo("aliases that point to data streams don't support search_routing"));
         }
-        {
-            AliasActions addAction = new AliasActions(AliasActions.Type.ADD).index("metrics-*")
-                .aliases("my-alias")
-                .writeIndex(randomBoolean());
-            IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();
-            aliasesAddRequest.addAliasAction(addAction);
-            Exception e = expectThrows(
-                IllegalArgumentException.class,
-                () -> client().admin().indices().aliases(aliasesAddRequest).actionGet()
-            );
-            assertThat(e.getMessage(), equalTo("aliases that point to data streams don't support is_write_index"));
-        }
         {
             AliasActions addAction = new AliasActions(AliasActions.Type.ADD).index("metrics-*")
                 .aliases("my-alias")

+ 167 - 22
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -10,7 +10,9 @@ import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
@@ -74,6 +76,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
     private Client client;
 
     private String dsBackingIndexName;
+    private String otherDsBackingIndexName;
     private String ds2BackingIndexName;
     private String id;
 
@@ -103,6 +106,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { "*" });
         GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet();
         dsBackingIndexName = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName();
+        otherDsBackingIndexName = getDataStreamResponse.getDataStreams().get(1).getDataStream().getIndices().get(0).getName();
         // Will be used in some tests, to test renaming while restoring a snapshot:
         ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
 
@@ -112,7 +116,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
 
         IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
         aliasesRequest.addAliasAction(new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("ds"));
-        aliasesRequest.addAliasAction(new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("other-ds"));
+        aliasesRequest.addAliasAction(new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("other-ds").writeIndex(true));
         assertAcked(client.admin().indices().aliases(aliasesRequest).actionGet());
     }
 
@@ -171,8 +175,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
     }
 
     public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
@@ -216,11 +222,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
         assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(dsBackingIndexName));
         backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
-        String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1);
-        assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName));
+        assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(otherDsBackingIndexName));
     }
 
-    public void testSnapshotAndRestoreInPlace() throws Exception {
+    public void testSnapshotAndRestoreInPlace() {
         CreateSnapshotResponse createSnapshotResponse = client.admin()
             .cluster()
             .prepareCreateSnapshot(REPO, SNAPSHOT)
@@ -278,32 +283,110 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3)));
     }
 
-    public void testSnapshotAndRestoreAll() throws Exception {
-        CreateSnapshotResponse createSnapshotResponse = client.admin()
-            .cluster()
-            .prepareCreateSnapshot(REPO, SNAPSHOT)
-            .setWaitForCompletion(true)
-            .setIndices("ds")
-            .setIncludeGlobalState(false)
+    public void testSnapshotAndRestoreAllIncludeSpecificDataStream() throws Exception {
+        IndexResponse indexResponse = client.prepareIndex("other-ds")
+            .setOpType(DocWriteRequest.OpType.CREATE)
+            .setSource(DOCUMENT_SOURCE)
             .get();
+        assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
+        String id2 = indexResponse.getId();
+
+        String id;
+        String dataStreamToSnapshot;
+        String backingIndexName;
+        if (randomBoolean()) {
+            dataStreamToSnapshot = "ds";
+            id = this.id;
+            backingIndexName = this.dsBackingIndexName;
+        } else {
+            dataStreamToSnapshot = "other-ds";
+            id = id2;
+            backingIndexName = this.otherDsBackingIndexName;
+        }
+        boolean filterDuringSnapshotting = randomBoolean();
+
+        CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(REPO, SNAPSHOT);
+        createSnapshotRequest.waitForCompletion(true);
+        if (filterDuringSnapshotting) {
+            createSnapshotRequest.indices(dataStreamToSnapshot);
+        }
+        createSnapshotRequest.includeGlobalState(false);
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().createSnapshot(createSnapshotRequest).actionGet();
 
         RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
         assertEquals(RestStatus.OK, status);
 
-        assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());
+        if (filterDuringSnapshotting) {
+            assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(backingIndexName));
+        } else {
+            assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(dsBackingIndexName, otherDsBackingIndexName));
+        }
 
         assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get());
         assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));
 
-        RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
-            .cluster()
-            .prepareRestoreSnapshot(REPO, SNAPSHOT)
-            .setWaitForCompletion(true)
-            .setRestoreGlobalState(true)
-            .get();
+        RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(REPO, SNAPSHOT);
+        restoreSnapshotRequest.waitForCompletion(true);
+        restoreSnapshotRequest.includeGlobalState(true);
+        if (filterDuringSnapshotting == false) {
+            restoreSnapshotRequest.indices(dataStreamToSnapshot);
+        }
+        RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).actionGet();
 
         assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
 
+        assertEquals(DOCUMENT_SOURCE, client.prepareGet(backingIndexName, id).get().getSourceAsMap());
+        SearchHit[] hits = client.prepareSearch(backingIndexName).get().getHits().getHits();
+        assertEquals(1, hits.length);
+        assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
+
+        GetDataStreamAction.Response ds = client.execute(
+            GetDataStreamAction.INSTANCE,
+            new GetDataStreamAction.Request(new String[] { dataStreamToSnapshot })
+        ).get();
+        assertEquals(1, ds.getDataStreams().size());
+        assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
+        assertEquals(backingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
+
+        GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
+        assertThat(getAliasesResponse.getDataStreamAliases().keySet(), contains(dataStreamToSnapshot));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get(dataStreamToSnapshot),
+            equalTo(
+                List.of(
+                    new DataStreamAlias(
+                        "my-alias",
+                        List.of(dataStreamToSnapshot),
+                        "other-ds".equals(dataStreamToSnapshot) ? "other-ds" : null
+                    )
+                )
+            )
+        );
+
+        DeleteDataStreamAction.Request r = new DeleteDataStreamAction.Request(new String[] { dataStreamToSnapshot });
+        assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, r).get());
+    }
+
+    public void testSnapshotAndRestoreReplaceAll() throws Exception {
+        CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(REPO, SNAPSHOT);
+        createSnapshotRequest.waitForCompletion(true);
+        createSnapshotRequest.includeGlobalState(false);
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().createSnapshot(createSnapshotRequest).actionGet();
+
+        RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
+        assertEquals(RestStatus.OK, status);
+        assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(dsBackingIndexName, otherDsBackingIndexName));
+
+        assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get());
+        assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));
+
+        RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(REPO, SNAPSHOT);
+        restoreSnapshotRequest.waitForCompletion(true);
+        restoreSnapshotRequest.includeGlobalState(true);
+        RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).actionGet();
+
+        assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards());
+
         assertEquals(DOCUMENT_SOURCE, client.prepareGet(dsBackingIndexName, id).get().getSourceAsMap());
         SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
         assertEquals(1, hits.length);
@@ -311,15 +394,75 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
 
         GetDataStreamAction.Response ds = client.execute(
             GetDataStreamAction.INSTANCE,
-            new GetDataStreamAction.Request(new String[] { "ds" })
+            new GetDataStreamAction.Request(new String[] { "*" })
         ).get();
-        assertEquals(1, ds.getDataStreams().size());
+        assertEquals(2, ds.getDataStreams().size());
+        assertThat(
+            ds.getDataStreams().stream().map(i -> i.getDataStream().getName()).collect(Collectors.toList()),
+            containsInAnyOrder("ds", "other-ds")
+        );
+
+        GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
+        assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+
+        DeleteDataStreamAction.Request r = new DeleteDataStreamAction.Request(new String[] { "*" });
+        assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, r).get());
+    }
+
+    public void testSnapshotAndRestoreAll() throws Exception {
+        CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(REPO, SNAPSHOT);
+        createSnapshotRequest.waitForCompletion(true);
+        createSnapshotRequest.includeGlobalState(false);
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().createSnapshot(createSnapshotRequest).actionGet();
+
+        RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
+        assertEquals(RestStatus.OK, status);
+        assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(dsBackingIndexName, otherDsBackingIndexName));
+
+        assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get());
+        assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));
+
+        RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(REPO, SNAPSHOT);
+        restoreSnapshotRequest.waitForCompletion(true);
+        restoreSnapshotRequest.includeGlobalState(true);
+        RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).actionGet();
+        assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards());
+
+        assertEquals(DOCUMENT_SOURCE, client.prepareGet(dsBackingIndexName, id).get().getSourceAsMap());
+        SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
+        assertEquals(1, hits.length);
+        assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
+
+        GetDataStreamAction.Response ds = client.execute(
+            GetDataStreamAction.INSTANCE,
+            new GetDataStreamAction.Request(new String[] { "*" })
+        ).get();
+        assertEquals(2, ds.getDataStreams().size());
         assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
         assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
+        assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size());
+        assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName());
 
         GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
-        assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds"));
-        assertThat(getAliasesResponse.getDataStreamAliases().get("ds"), equalTo(List.of(new DataStreamAlias("my-alias", List.of("ds")))));
+        assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
+
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
+
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
 
         assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
     }
@@ -366,8 +509,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds2").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
     }
 
     public void testBackingIndexIsNotRenamedWhenRestoringDataStream() {