Преглед на файлове

Add filter support to data stream aliases (#74784)

This allows specifying a query as filter on data stream alias,
which will then always be applied when searching via this alias.

Relates #66163
Martijn van Groningen преди 4 години
родител
ревизия
3dde09a7b4
променени са 19 файла, в които са добавени 788 реда и са изтрити 174 реда
  1. 3 3
      docs/reference/indices/aliases.asciidoc
  2. 2 4
      server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java
  3. 5 3
      server/src/main/java/org/elasticsearch/cluster/metadata/AliasAction.java
  4. 108 11
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAlias.java
  5. 39 23
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
  6. 11 3
      server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
  7. 37 19
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java
  8. 44 8
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  9. 4 0
      server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetAliasesAction.java
  10. 8 8
      server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java
  11. 80 30
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamAliasTests.java
  12. 45 5
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java
  13. 6 6
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java
  14. 30 30
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java
  15. 92 0
      server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java
  16. 4 3
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  17. 129 15
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  18. 60 3
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java
  19. 81 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/140_data_stream_aliases.yml

+ 3 - 3
docs/reference/indices/aliases.asciidoc

@@ -94,7 +94,7 @@ parameter.
 // tag::alias-options[]
 `filter`::
 (Optional, <<query-dsl,Query DSL object>> Query used to limit documents the
-alias can access. Data stream aliases don't support this parameter.
+alias can access.
 // end::alias-options[]
 +
 Only the `add` action supports this parameter.
@@ -109,7 +109,7 @@ indices return an error.
 (Required*, array of strings) Data streams or indices for the action. Supports
 wildcards (`*`). If `index` is not specified, this parameter is required. For
 the `add` and `remove_index` actions, wildcard patterns that match both data
-streams and indices return an error. 
+streams and indices return an error.
 
 // tag::alias-options[]
 `index_routing`::
@@ -140,7 +140,7 @@ Only the `add` action supports this parameter.
 `must_exist`::
 (Optional, Boolean)
 If `true`, the alias must exist to perform the action. Defaults to `false`. Only
-the `remove` action supports this parameter. 
+the `remove` action supports this parameter.
 
 // tag::alias-options[]
 `routing`::

+ 2 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java

@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.AliasAction;
+import org.elasticsearch.cluster.metadata.AliasAction.AddDataStreamAlias;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.DataStreamAlias;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -108,9 +109,6 @@ public class TransportIndicesAliasesAction extends AcknowledgedTransportMasterNo
                 switch (action.actionType()) {
                     case ADD:
                         // Fail if parameters are used that data stream aliases don't support:
-                        if (action.filter() != null) {
-                            throw new IllegalArgumentException("aliases that point to data streams don't support filters");
-                        }
                         if (action.routing() != null) {
                             throw new IllegalArgumentException("aliases that point to data streams don't support routing");
                         }
@@ -130,7 +128,7 @@ public class TransportIndicesAliasesAction extends AcknowledgedTransportMasterNo
                         }
                         for (String dataStreamName : concreteDataStreams) {
                             for (String alias : concreteDataStreamAliases(action, state.metadata(), dataStreamName)) {
-                                finalActions.add(new AliasAction.AddDataStreamAlias(alias, dataStreamName, action.writeIndex()));
+                                finalActions.add(new AddDataStreamAlias(alias, dataStreamName, action.writeIndex(), action.filter()));
                             }
                         }
                         continue;

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/AliasAction.java

@@ -204,12 +204,14 @@ public abstract class AliasAction {
         private final String aliasName;
         private final String dataStreamName;
         private final Boolean isWriteDataStream;
+        private final String filter;
 
-        public AddDataStreamAlias(String aliasName, String dataStreamName, Boolean isWriteDataStream) {
+        public AddDataStreamAlias(String aliasName, String dataStreamName, Boolean isWriteDataStream, String filter) {
             super(dataStreamName);
             this.aliasName = aliasName;
             this.dataStreamName = dataStreamName;
             this.isWriteDataStream = isWriteDataStream;
+            this.filter = filter;
         }
 
         public String getAliasName() {
@@ -231,8 +233,8 @@ public abstract class AliasAction {
 
         @Override
         boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
-            aliasValidator.validate(aliasName, null, null, isWriteDataStream);
-            return metadata.put(aliasName, dataStreamName, isWriteDataStream);
+            aliasValidator.validate(aliasName, null, filter, isWriteDataStream);
+            return metadata.put(aliasName, dataStreamName, isWriteDataStream, filter);
         }
     }
 

+ 108 - 11
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAlias.java

@@ -7,8 +7,12 @@
  */
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -16,11 +20,15 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Predicate;
@@ -30,34 +38,77 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
 
     public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
     public static final ParseField WRITE_DATA_STREAM_FIELD = new ParseField("write_data_stream");
+    public static final ParseField FILTER_FIELD = new ParseField("filter");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStreamAlias, String> PARSER = new ConstructingObjectParser<>(
         "data_stream_alias",
         false,
-        (args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1])
+        (args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1], (CompressedXContent) args[2])
     );
 
     static {
         PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), DATA_STREAMS_FIELD);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), WRITE_DATA_STREAM_FIELD);
+        PARSER.declareField(
+            ConstructingObjectParser.optionalConstructorArg(),
+            (p, c) -> {
+                if (p.currentToken() == XContentParser.Token.VALUE_EMBEDDED_OBJECT ||
+                    p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                    return new CompressedXContent(p.binaryValue());
+                } else if (p.currentToken() == XContentParser.Token.START_OBJECT) {
+                    XContentBuilder builder = XContentFactory.jsonBuilder().map(p.mapOrdered());
+                    return new CompressedXContent(BytesReference.bytes(builder));
+                } else {
+                    assert false : "unexpected token [" + p.currentToken() + " ]";
+                    return null;
+                }
+            },
+            FILTER_FIELD,
+            ObjectParser.ValueType.VALUE_OBJECT_ARRAY
+        );
     }
 
     private final String name;
     private final List<String> dataStreams;
     private final String writeDataStream;
+    private final CompressedXContent filter;
 
-    public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream) {
+    private DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, CompressedXContent filter) {
         this.name = Objects.requireNonNull(name);
         this.dataStreams = List.copyOf(dataStreams);
         this.writeDataStream = writeDataStream;
+        this.filter = filter;
         assert writeDataStream == null || dataStreams.contains(writeDataStream);
     }
 
+    public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, Map<String, Object> filter) {
+        this(name, dataStreams, writeDataStream, compress(filter));
+    }
+
+    private static CompressedXContent compress(Map<String, Object> filterAsMap) {
+        if (filterAsMap == null) {
+            return null;
+        }
+
+        try {
+            XContentBuilder builder = XContentFactory.jsonBuilder().map(filterAsMap);
+            return new CompressedXContent(BytesReference.bytes(builder));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private static Map<String, Object> decompress(CompressedXContent filter) {
+        String filterAsString = filter.string();
+        return XContentHelper.convertToMap(XContentFactory.xContent(filterAsString), filterAsString, true);
+    }
+
     public DataStreamAlias(StreamInput in) throws IOException {
         this.name = in.readString();
         this.dataStreams = in.readStringList();
         this.writeDataStream = in.readOptionalString();
+        this.filter = in.getVersion().onOrAfter(Version.V_8_0_0) && in.readBoolean() ? CompressedXContent.readCompressedString(in) : null;
     }
 
     /**
@@ -85,15 +136,20 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         return writeDataStream;
     }
 
+    public CompressedXContent getFilter() {
+        return filter;
+    }
+
     /**
      * Returns a new {@link DataStreamAlias} instance with the provided data stream name added to it as a new member.
      * If the provided isWriteDataStream is set to <code>true</code> then the provided data stream is also set as write data stream.
      * If the provided isWriteDataStream is set to <code>false</code> and the provided data stream is also the write data stream of
      * this instance then the returned data stream alias instance's write data stream is unset.
+     * If the provided filter is the same as the filter of this alias then this instance isn't updated, otherwise it is updated.
      *
      * The same instance is returned if the attempted addition of the provided data stream didn't change this instance.
      */
-    public DataStreamAlias addDataStream(String dataStream, Boolean isWriteDataStream) {
+    public DataStreamAlias update(String dataStream, Boolean isWriteDataStream, Map<String, Object> filterAsMap) {
         String writeDataStream = this.writeDataStream;
         if (isWriteDataStream != null) {
             if (isWriteDataStream) {
@@ -105,10 +161,24 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
             }
         }
 
+        boolean filterUpdated;
+        CompressedXContent filter;
+        if (filterAsMap != null) {
+            filter = compress(filterAsMap);
+            if (this.filter == null) {
+                filterUpdated = true;
+            } else {
+                filterUpdated = filterAsMap.equals(decompress(this.filter)) == false;
+            }
+        } else {
+            filter = this.filter;
+            filterUpdated = false;
+        }
+
         Set<String> dataStreams = new HashSet<>(this.dataStreams);
         boolean added = dataStreams.add(dataStream);
-        if (added || Objects.equals(this.writeDataStream, writeDataStream) == false) {
-            return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream);
+        if (added || Objects.equals(this.writeDataStream, writeDataStream) == false || filterUpdated) {
+            return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
         } else {
             return this;
         }
@@ -133,7 +203,7 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
             if (dataStream.equals(writeDataStream)) {
                 writeDataStream = null;
             }
-            return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream);
+            return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
         }
     }
 
@@ -152,7 +222,7 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         if (intersectingDataStreams.contains(writeDataStream) == false) {
             writeDataStream = null;
         }
-        return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream);
+        return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream, this.filter);
     }
 
     /**
@@ -171,7 +241,7 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
             }
         }
 
-        return new DataStreamAlias(this.name, List.copyOf(mergedDataStreams), writeDataStream);
+        return new DataStreamAlias(this.name, List.copyOf(mergedDataStreams), writeDataStream, filter);
     }
 
     /**
@@ -187,7 +257,7 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         if (writeDataStream != null) {
             writeDataStream = writeDataStream.replaceAll(renamePattern, renameReplacement);
         }
-        return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream);
+        return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream, filter);
     }
 
     public static Diff<DataStreamAlias> readDiffFrom(StreamInput in) throws IOException {
@@ -210,6 +280,14 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         if (writeDataStream != null) {
             builder.field(WRITE_DATA_STREAM_FIELD.getPreferredName(), writeDataStream);
         }
+        if (filter != null) {
+            boolean binary = params.paramAsBoolean("binary", false);
+            if (binary) {
+                builder.field("filter", filter.compressed());
+            } else {
+                builder.field("filter", XContentHelper.convertToMap(filter.uncompressed(), true).v2());
+            }
+        }
         builder.endObject();
         return builder;
     }
@@ -219,6 +297,14 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         out.writeString(name);
         out.writeStringCollection(dataStreams);
         out.writeOptionalString(writeDataStream);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            if (filter != null) {
+                out.writeBoolean(true);
+                filter.writeTo(out);
+            } else {
+                out.writeBoolean(false);
+            }
+        }
     }
 
     @Override
@@ -228,11 +314,22 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement
         DataStreamAlias that = (DataStreamAlias) o;
         return Objects.equals(name, that.name) &&
             Objects.equals(dataStreams, that.dataStreams) &&
-            Objects.equals(writeDataStream, that.writeDataStream);
+            Objects.equals(writeDataStream, that.writeDataStream) &&
+            Objects.equals(filter, that.filter);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, dataStreams, writeDataStream);
+        return Objects.hash(name, dataStreams, writeDataStream, filter);
+    }
+
+    @Override
+    public String toString() {
+        return "DataStreamAlias{" +
+            "name='" + name + '\'' +
+            ", dataStreams=" + dataStreams +
+            ", writeDataStream='" + writeDataStream + '\'' +
+            ", filter=" + filter.string() +
+            '}';
     }
 }

+ 39 - 23
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -552,39 +552,55 @@ public class IndexNameExpressionResolver {
             return null;
         }
 
-        final ImmutableOpenMap<String, AliasMetadata> indexAliases = indexMetadata.getAliases();
-        final AliasMetadata[] aliasCandidates;
-        if (iterateIndexAliases(indexAliases.size(), resolvedExpressions.size())) {
-            // faster to iterate indexAliases
-            aliasCandidates = StreamSupport.stream(Spliterators.spliteratorUnknownSize(indexAliases.values().iterator(), 0), false)
+        IndexAbstraction ia = state.metadata().getIndicesLookup().get(index);
+        if (ia.getParentDataStream() != null) {
+            DataStream dataStream = ia.getParentDataStream().getDataStream();
+            Map<String, DataStreamAlias> dataStreamAliases = state.metadata().dataStreamAliases();
+            Stream<DataStreamAlias> stream;
+            if (iterateIndexAliases(dataStreamAliases.size(), resolvedExpressions.size())) {
+                stream = dataStreamAliases.values().stream()
+                    .filter(dataStreamAlias -> resolvedExpressions.contains(dataStreamAlias.getName()));
+            } else {
+                stream = resolvedExpressions.stream().map(dataStreamAliases::get).filter(Objects::nonNull);
+            }
+            return stream.filter(dataStreamAlias -> dataStreamAlias.getDataStreams().contains(dataStream.getName()))
+                .filter(dataStreamAlias -> dataStreamAlias.getFilter() != null)
+                .map(DataStreamAlias::getName)
+                .toArray(String[]::new);
+        } else {
+            final ImmutableOpenMap<String, AliasMetadata> indexAliases = indexMetadata.getAliases();
+            final AliasMetadata[] aliasCandidates;
+            if (iterateIndexAliases(indexAliases.size(), resolvedExpressions.size())) {
+                // faster to iterate indexAliases
+                aliasCandidates = StreamSupport.stream(Spliterators.spliteratorUnknownSize(indexAliases.values().iterator(), 0), false)
                     .map(cursor -> cursor.value)
                     .filter(aliasMetadata -> resolvedExpressions.contains(aliasMetadata.alias()))
                     .toArray(AliasMetadata[]::new);
-        } else {
-            // faster to iterate resolvedExpressions
-            aliasCandidates = resolvedExpressions.stream()
+            } else {
+                // faster to iterate resolvedExpressions
+                aliasCandidates = resolvedExpressions.stream()
                     .map(indexAliases::get)
                     .filter(Objects::nonNull)
                     .toArray(AliasMetadata[]::new);
-        }
-
-        List<String> aliases = null;
-        for (AliasMetadata aliasMetadata : aliasCandidates) {
-            if (requiredAlias.test(aliasMetadata)) {
-                // If required - add it to the list of aliases
-                if (aliases == null) {
-                    aliases = new ArrayList<>();
+            }
+            List<String> aliases = null;
+            for (AliasMetadata aliasMetadata : aliasCandidates) {
+                if (requiredAlias.test(aliasMetadata)) {
+                    // If required - add it to the list of aliases
+                    if (aliases == null) {
+                        aliases = new ArrayList<>();
+                    }
+                    aliases.add(aliasMetadata.alias());
+                } else {
+                    // If not, we have a non required alias for this index - no further checking needed
+                    return null;
                 }
-                aliases.add(aliasMetadata.alias());
-            } else {
-                // If not, we have a non required alias for this index - no further checking needed
+            }
+            if (aliases == null) {
                 return null;
             }
+            return aliases.toArray(new String[aliases.size()]);
         }
-        if (aliases == null) {
-            return null;
-        }
-        return aliases.toArray(new String[aliases.size()]);
     }
 
     /**

+ 11 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
@@ -1178,7 +1179,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
             return this;
         }
 
-        public boolean put(String aliasName, String dataStream, Boolean isWriteDataStream) {
+        public boolean put(String aliasName, String dataStream, Boolean isWriteDataStream, String filter) {
             Map<String, DataStream> existingDataStream =
                 Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
                     .map(dsmd -> new HashMap<>(dsmd.dataStreams()))
@@ -1192,12 +1193,19 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                 throw new IllegalArgumentException("alias [" + aliasName + "] refers to a non existing data stream [" + dataStream + "]");
             }
 
+            Map<String, Object> filterAsMap;
+            if (filter != null) {
+                filterAsMap = XContentHelper.convertToMap(XContentFactory.xContent(filter), filter, true);
+            } else {
+                filterAsMap = null;
+            }
+
             DataStreamAlias alias = dataStreamAliases.get(aliasName);
             if (alias == null) {
                 String writeDataStream = isWriteDataStream != null && isWriteDataStream ? dataStream : null;
-                alias = new DataStreamAlias(aliasName, List.of(dataStream), writeDataStream);
+                alias = new DataStreamAlias(aliasName, List.of(dataStream), writeDataStream, filterAsMap);
             } else {
-                DataStreamAlias copy = alias.addDataStream(dataStream, isWriteDataStream);
+                DataStreamAlias copy = alias.update(dataStream, isWriteDataStream, filterAsMap);
                 if (copy == alias) {
                     return false;
                 }

+ 37 - 19
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java

@@ -130,6 +130,15 @@ public class MetadataIndexAliasesService {
                 if (dataStream != null) {
                     NewAliasValidator newAliasValidator = (alias, indexRouting, filter, writeIndex) -> {
                         aliasValidator.validateAlias(alias, action.getIndex(), indexRouting, lookup);
+                        if (Strings.hasLength(filter)) {
+                            for (Index index : dataStream.getIndices()) {
+                                IndexMetadata imd = metadata.get(index.getName());
+                                if (imd == null) {
+                                    throw new IndexNotFoundException(action.getIndex());
+                                }
+                                validateFilter(indicesToClose, indices, action, imd, alias, filter);
+                            }
+                        }
                     };
                     if (action.apply(newAliasValidator, metadata, null)) {
                         changed = true;
@@ -145,25 +154,7 @@ public class MetadataIndexAliasesService {
                 NewAliasValidator newAliasValidator = (alias, indexRouting, filter, writeIndex) -> {
                     aliasValidator.validateAlias(alias, action.getIndex(), indexRouting, lookup);
                     if (Strings.hasLength(filter)) {
-                        IndexService indexService = indices.get(index.getIndex().getName());
-                        if (indexService == null) {
-                            indexService = indicesService.indexService(index.getIndex());
-                            if (indexService == null) {
-                                // temporarily create the index and add mappings so we can parse the filter
-                                try {
-                                    indexService = indicesService.createIndex(index, emptyList(), false);
-                                    indicesToClose.add(index.getIndex());
-                                } catch (IOException e) {
-                                    throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
-                                }
-                                indexService.mapperService().merge(index, MapperService.MergeReason.MAPPING_RECOVERY);
-                            }
-                            indices.put(action.getIndex(), indexService);
-                        }
-                        // the context is only used for validation so it's fine to pass fake values for the shard id,
-                        // but the current timestamp should be set to real value as we may use `now` in a filtered alias
-                        aliasValidator.validateAliasFilter(alias, filter, indexService.newSearchExecutionContext(0, 0,
-                                null, () -> System.currentTimeMillis(), null, emptyMap()), xContentRegistry);
+                        validateFilter(indicesToClose, indices, action, index, alias, filter);
                     }
                 };
                 if (action.apply(newAliasValidator, metadata, index)) {
@@ -198,6 +189,33 @@ public class MetadataIndexAliasesService {
         }
     }
 
+    private void validateFilter(List<Index> indicesToClose,
+                                Map<String, IndexService> indices,
+                                AliasAction action,
+                                IndexMetadata index,
+                                String alias,
+                                String filter) {
+        IndexService indexService = indices.get(index.getIndex().getName());
+        if (indexService == null) {
+            indexService = indicesService.indexService(index.getIndex());
+            if (indexService == null) {
+                // temporarily create the index and add mappings so we can parse the filter
+                try {
+                    indexService = indicesService.createIndex(index, emptyList(), false);
+                    indicesToClose.add(index.getIndex());
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
+                }
+                indexService.mapperService().merge(index, MapperService.MergeReason.MAPPING_RECOVERY);
+            }
+            indices.put(action.getIndex(), indexService);
+        }
+        // the context is only used for validation so it's fine to pass fake values for the shard id,
+        // but the current timestamp should be set to real value as we may use `now` in a filtered alias
+        aliasValidator.validateAliasFilter(alias, filter, indexService.newSearchExecutionContext(0, 0,
+                null, System::currentTimeMillis, null, emptyMap()), xContentRegistry);
+    }
+
     private void validateAliasTargetIsNotDSBackingIndex(ClusterState currentState, AliasAction action) {
         IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(action.getIndex());
         assert indexAbstraction != null : "invalid cluster metadata. index [" + action.getIndex() + "] was not found";

+ 44 - 8
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -27,17 +27,16 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedBiConsumer;
-import org.elasticsearch.core.CheckedConsumer;
-import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.common.CheckedSupplier;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -46,16 +45,13 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.core.Releasable;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.Maps;
-import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@@ -67,6 +63,12 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.AbstractRefCounted;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.CheckedFunction;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.ShardLock;
@@ -94,6 +96,7 @@ import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MappingLookup;
 import org.elasticsearch.index.merge.MergeStats;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
@@ -135,6 +138,7 @@ import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1532,9 +1536,41 @@ public class IndicesService extends AbstractLifecycleComponent
                 return parseInnerQueryBuilder(parser);
             }
         };
-        IndexMetadata indexMetadata = state.metadata().index(index);
         String[] aliases = indexNameExpressionResolver.filteringAliases(state, index, resolvedExpressions);
-        return new AliasFilter(ShardSearchRequest.parseAliasFilter(filterParser, indexMetadata, aliases), aliases);
+        if (aliases == null) {
+            return AliasFilter.EMPTY;
+        }
+
+        Metadata metadata = state.metadata();
+        IndexAbstraction ia = state.metadata().getIndicesLookup().get(index);
+        if (ia.getParentDataStream() != null) {
+            List<QueryBuilder> filters = Arrays.stream(aliases)
+                .map(name -> metadata.dataStreamAliases().get(name))
+                .map(dataStreamAlias -> {
+                    try {
+                        return filterParser.apply(dataStreamAlias.getFilter().uncompressed());
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                })
+                .collect(Collectors.toList());
+            if (filters.isEmpty()) {
+                return new AliasFilter(null, aliases);
+            } else {
+                if (filters.size() == 1) {
+                    return new AliasFilter(filters.get(0), aliases);
+                } else {
+                    BoolQueryBuilder bool = new BoolQueryBuilder();
+                    for (QueryBuilder filter : filters) {
+                        bool.should(filter);
+                    }
+                    return new AliasFilter(bool, aliases);
+                }
+            }
+        } else {
+            IndexMetadata indexMetadata = metadata.index(index);
+            return new AliasFilter(ShardSearchRequest.parseAliasFilter(filterParser, indexMetadata, aliases), aliases);
+        }
     }
 
     /**

+ 4 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetAliasesAction.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BytesRestResponse;
 import org.elasticsearch.rest.RestRequest;
@@ -163,6 +164,9 @@ public class RestGetAliasesAction extends BaseRestHandler {
                             if (entry.getKey().equals(alias.getWriteDataStream())) {
                                 builder.field("is_write_index", true);
                             }
+                            if (alias.getFilter() != null) {
+                                builder.field("filter", XContentHelper.convertToMap(alias.getFilter().uncompressed(), true).v2());
+                            }
                             builder.endObject();
                         }
                     }

+ 8 - 8
server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java

@@ -196,30 +196,30 @@ public class TransportGetAliasesActionTests extends ESTestCase {
         var tuples = List.of(new Tuple<>("logs-foo", 1), new Tuple<>("logs-bar", 1), new Tuple<>("logs-baz", 1));
         var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(tuples, List.of());
         var builder = Metadata.builder(clusterState.metadata());
-        builder.put("logs", "logs-foo", null);
-        builder.put("logs", "logs-bar", null);
-        builder.put("secret", "logs-bar", null);
+        builder.put("logs", "logs-foo", null, null);
+        builder.put("logs", "logs-bar", null, null);
+        builder.put("secret", "logs-bar", null, null);
         clusterState = ClusterState.builder(clusterState).metadata(builder).build();
 
         // return all all data streams with aliases
         var getAliasesRequest = new GetAliasesRequest();
         var result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-foo", "logs-bar"));
-        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null)));
-        assertThat(result.get("logs-bar"), containsInAnyOrder(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null),
-            new DataStreamAlias("secret", List.of("logs-bar"), null)));
+        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null, null)));
+        assertThat(result.get("logs-bar"), containsInAnyOrder(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null, null),
+            new DataStreamAlias("secret", List.of("logs-bar"), null, null)));
 
         // filter by alias name
         getAliasesRequest = new GetAliasesRequest("secret");
         result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-bar"));
-        assertThat(result.get("logs-bar"), contains(new DataStreamAlias("secret", List.of("logs-bar"), null)));
+        assertThat(result.get("logs-bar"), contains(new DataStreamAlias("secret", List.of("logs-bar"), null, null)));
 
         // filter by data stream:
         getAliasesRequest = new GetAliasesRequest().indices("logs-foo");
         result = TransportGetAliasesAction.postProcess(resolver, getAliasesRequest, clusterState);
         assertThat(result.keySet(), containsInAnyOrder("logs-foo"));
-        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null)));
+        assertThat(result.get("logs-foo"), contains(new DataStreamAlias("logs", List.of("logs-bar", "logs-foo"), null, null)));
     }
 
     public void testNetNewSystemIndicesDontErrorWhenNotRequested() {

+ 80 - 30
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamAliasTests.java

@@ -9,15 +9,18 @@
 package org.elasticsearch.cluster.metadata;
 
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
@@ -30,6 +33,12 @@ public class DataStreamAliasTests extends AbstractSerializingTestCase<DataStream
         return DataStreamAlias.fromXContent(parser);
     }
 
+    @Override
+    protected ToXContent.Params getToXContentParams() {
+        return randomBoolean() ? new ToXContent.MapParams(Map.of("binary", randomBoolean() ? "true" : "false")) :
+            super.getToXContentParams();
+    }
+
     @Override
     protected Writeable.Reader<DataStreamAlias> instanceReader() {
         return DataStreamAlias::new;
@@ -40,83 +49,124 @@ public class DataStreamAliasTests extends AbstractSerializingTestCase<DataStream
         return DataStreamTestHelper.randomAliasInstance();
     }
 
-    public void testAddDataStream() {
+    public void testUpdate() {
         // Add ds-3 to alias, a different instance is returned with ds-3 as one of the data streams being referred to
-        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
-        DataStreamAlias result = alias.addDataStream("ds-3", null);
+        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+        DataStreamAlias result = alias.update("ds-3", null, null);
         assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
         assertThat(result.getWriteDataStream(), nullValue());
         // Add ds-3 to alias as write data stream, same as above but the returned instance also refers to ds-3 as write data stream
-        result = alias.addDataStream("ds-3", true);
+        result = alias.update("ds-3", true, null);
         assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
         assertThat(result.getWriteDataStream(), equalTo("ds-3"));
         // Add ds-2 as data stream, which is already referred to by this alias. The same instance is returned to signal a noop.
-        result = alias.addDataStream("ds-2", null);
+        result = alias.update("ds-2", null, null);
         assertThat(result, sameInstance(alias));
         // Add ds-2 as non write data stream, which is already referred to by this alias. The same instance is returned to signal a noop.
-        result = alias.addDataStream("ds-2", false);
+        result = alias.update("ds-2", false, null);
         assertThat(result, sameInstance(alias));
         // Add ds-2 as write data stream, which is already referred to by this alias, but not as write data stream,
         // an updated instance should be returned.
-        result = alias.addDataStream("ds-2", true);
+        result = alias.update("ds-2", true, null);
         assertThat(result, not(sameInstance(alias)));
         assertThat(result.getWriteDataStream(), equalTo("ds-2"));
         // Add ds-2 as write non data stream, which is already referred to by this alias, but as write data stream,
         // an updated instance should be returned.
-        alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
-        result = alias.addDataStream("ds-2", false);
+        alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
+        result = alias.update("ds-2", false, null);
         assertThat(result, not(sameInstance(alias)));
         assertThat(result.getWriteDataStream(), nullValue());
     }
 
+    public void testUpdateFilter() {
+        // Add a filter:
+        {
+            DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+            DataStreamAlias result = alias.update("ds-2", null, Map.of("term", Map.of("field", "value")));
+            assertThat(result, not(sameInstance(alias)));
+            assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2"));
+            assertThat(result.getWriteDataStream(), nullValue());
+            assertThat(result.getFilter(), notNullValue());
+            assertThat(result.getFilter().string(), equalTo("{\"term\":{\"field\":\"value\"}}"));
+        }
+        // noop update to filter:
+        {
+            DataStreamAlias alias =
+                new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, Map.of("term", Map.of("field", "value")));
+            DataStreamAlias result = alias.update("ds-2", null, Map.of("term", Map.of("field", "value")));
+            assertThat(result, sameInstance(alias));
+        }
+        // update filter:
+        {
+            DataStreamAlias alias =
+                new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, Map.of("term", Map.of("field", "value")));
+            DataStreamAlias result = alias.update("ds-2", null, Map.of("term", Map.of("field", "value1")));
+            assertThat(result, not(sameInstance(alias)));
+            assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2"));
+            assertThat(result.getWriteDataStream(), nullValue());
+            assertThat(result.getFilter(), notNullValue());
+            assertThat(result.getFilter().string(), equalTo("{\"term\":{\"field\":\"value1\"}}"));
+        }
+        // Filter not specified, keep existing filter:
+        {
+            DataStreamAlias alias =
+                new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, Map.of("term", Map.of("field", "value")));
+            DataStreamAlias result = alias.update("ds-2", null, null);
+            assertThat(result, sameInstance(alias));
+            assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2"));
+            assertThat(result.getWriteDataStream(), nullValue());
+            assertThat(result.getFilter().string(), equalTo("{\"term\":{\"field\":\"value\"}}"));
+        }
+    }
+
     public void testRemoveDataStream() {
         // Remove a referenced data stream:
-        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
+        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
         DataStreamAlias result = alias.removeDataStream("ds-2");
         assertThat(result, not(sameInstance(alias)));
         assertThat(result.getDataStreams(), containsInAnyOrder("ds-1"));
         assertThat(result.getWriteDataStream(), nullValue());
         // Remove the data stream that is also referenced as write data stream:
-        alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
+        alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
         result = alias.removeDataStream("ds-2");
         assertThat(result, not(sameInstance(alias)));
         assertThat(result.getDataStreams(), containsInAnyOrder("ds-1"));
         assertThat(result.getWriteDataStream(), nullValue());
         // Removing the last referenced data stream name, return null to signal the entire alias can be removed.
-        alias = new DataStreamAlias("my-alias", List.of("ds-1"), null);
+        alias = new DataStreamAlias("my-alias", List.of("ds-1"), null, null);
         result = alias.removeDataStream("ds-1");
         assertThat(result, nullValue());
         // Removing a non referenced data stream name, signal noop, by returning the same instance
-        alias = new DataStreamAlias("my-alias", List.of("ds-1"), null);
+        alias = new DataStreamAlias("my-alias", List.of("ds-1"), null, null);
         result = alias.removeDataStream("ds-2");
         assertThat(result, sameInstance(alias));
     }
 
     public void testIntersect() {
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null);
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null, null);
             DataStreamAlias result = alias1.intersect(s -> alias2.getDataStreams().contains(s));
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-2"));
             assertThat(result.getWriteDataStream(), nullValue());
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null);
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null, null);
             DataStreamAlias result = alias1.intersect(s -> alias2.getDataStreams().contains(s));
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-2"));
             assertThat(result.getWriteDataStream(), equalTo("ds-2"));
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3");
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3", null);
             DataStreamAlias result = alias1.intersect(s -> alias2.getDataStreams().contains(s));
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-2"));
             assertThat(result.getWriteDataStream(), nullValue());
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2", "ds-3"), "ds-3");
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-2");
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2", "ds-3"), "ds-3", null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-2", null);
             DataStreamAlias result = alias1.intersect(s -> alias2.getDataStreams().contains(s));
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-2", "ds-3"));
             assertThat(result.getWriteDataStream(), equalTo("ds-3"));
@@ -125,29 +175,29 @@ public class DataStreamAliasTests extends AbstractSerializingTestCase<DataStream
 
     public void testMerge() {
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null);
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null, null);
             DataStreamAlias result = alias1.merge(alias2);
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
             assertThat(result.getWriteDataStream(), nullValue());
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null);
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), null, null);
             DataStreamAlias result = alias1.merge(alias2);
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
             assertThat(result.getWriteDataStream(), equalTo("ds-2"));
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3");
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3", null);
             DataStreamAlias result = alias1.merge(alias2);
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
             assertThat(result.getWriteDataStream(), equalTo("ds-2"));
         }
         {
-            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null);
-            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3");
+            DataStreamAlias alias1 = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), null, null);
+            DataStreamAlias alias2 = new DataStreamAlias("my-alias", List.of("ds-2", "ds-3"), "ds-3", null);
             DataStreamAlias result = alias1.merge(alias2);
             assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-2", "ds-3"));
             assertThat(result.getWriteDataStream(), equalTo("ds-3"));
@@ -155,7 +205,7 @@ public class DataStreamAliasTests extends AbstractSerializingTestCase<DataStream
     }
 
     public void testRenameDataStreams() {
-        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2");
+        DataStreamAlias alias = new DataStreamAlias("my-alias", List.of("ds-1", "ds-2"), "ds-2", null);
         DataStreamAlias result = alias.renameDataStreams("ds-2", "ds-3");
         assertThat(result.getDataStreams(), containsInAnyOrder("ds-1", "ds-3"));
         assertThat(result.getWriteDataStream(), equalTo("ds-3"));

+ 45 - 5
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -70,6 +70,7 @@ import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class IndexNameExpressionResolverTests extends ESTestCase {
 
@@ -700,7 +701,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
         assertThat(infe.getResourceId().toString(), equalTo("[_all]"));
     }
 
-    private static IndexMetadata.Builder indexBuilder(String index) {
+    public static IndexMetadata.Builder indexBuilder(String index) {
         return indexBuilder(index, Settings.EMPTY);
     }
 
@@ -1431,6 +1432,45 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
         assertArrayEquals(null, strings);
     }
 
+    public void testIndexAliasesDataStreamAliases() {
+        final String dataStreamName1 = "logs-foobar";
+        final String dataStreamName2 = "logs-barbaz";
+        IndexMetadata backingIndex1 = createBackingIndex(dataStreamName1, 1).build();
+        IndexMetadata backingIndex2 = createBackingIndex(dataStreamName2, 1).build();
+        Metadata.Builder mdBuilder = Metadata.builder()
+            .put(backingIndex1, false)
+            .put(backingIndex2, false)
+            .put(new DataStream(dataStreamName1, createTimestampField("@timestamp"), List.of(backingIndex1.getIndex())))
+            .put(new DataStream(dataStreamName2, createTimestampField("@timestamp"), List.of(backingIndex2.getIndex())));
+        mdBuilder.put("logs_foo", dataStreamName1, null, "{ \"term\": \"foo\"}");
+        mdBuilder.put("logs", dataStreamName1, null, "{ \"term\": \"logs\"}");
+        mdBuilder.put("logs_bar", dataStreamName1, null, null);
+        mdBuilder.put("logs_baz", dataStreamName2, null, "{ \"term\": \"logs\"}");
+        mdBuilder.put("logs_baz2", dataStreamName2, null, null);
+        ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
+        {
+            // Only resolve aliases with with that refer to dataStreamName1
+            Set<String> resolvedExpressions = indexNameExpressionResolver.resolveExpressions(state, "l*");
+            String index = backingIndex1.getIndex().getName();
+            String[] result = indexNameExpressionResolver.indexAliases(state, index, x -> true, true, resolvedExpressions);
+            assertThat(result, arrayContainingInAnyOrder("logs_foo", "logs"));
+        }
+        {
+            // Only resolve aliases with with that refer to dataStreamName2
+            Set<String> resolvedExpressions = indexNameExpressionResolver.resolveExpressions(state, "l*");
+            String index = backingIndex2.getIndex().getName();
+            String[] result = indexNameExpressionResolver.indexAliases(state, index, x -> true, true, resolvedExpressions);
+            assertThat(result, arrayContainingInAnyOrder("logs_baz"));
+        }
+        {
+            // Null is returned, because skipping identity check and resolvedExpressions contains the backing index name
+            Set<String> resolvedExpressions = indexNameExpressionResolver.resolveExpressions(state, "l*");
+            String index = backingIndex2.getIndex().getName();
+            String[] result = indexNameExpressionResolver.indexAliases(state, index, x -> true, false, resolvedExpressions);
+            assertThat(result, nullValue());
+        }
+    }
+
     public void testIndexAliasesSkipIdentity() {
         Metadata.Builder mdBuilder = Metadata.builder()
                 .put(indexBuilder("test-0").state(State.OPEN)
@@ -2149,10 +2189,10 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
             .put(new DataStream(dataStream1, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())))
             .put(new DataStream(dataStream2, createTimestampField("@timestamp"), List.of(index3.getIndex(), index4.getIndex())))
             .put(new DataStream(dataStream3, createTimestampField("@timestamp"), List.of(index5.getIndex(), index6.getIndex())));
-        mdBuilder.put(dataStreamAlias1, dataStream1, null);
-        mdBuilder.put(dataStreamAlias1, dataStream2, true);
-        mdBuilder.put(dataStreamAlias2, dataStream2, null);
-        mdBuilder.put(dataStreamAlias3, dataStream3, null);
+        mdBuilder.put(dataStreamAlias1, dataStream1, null, null);
+        mdBuilder.put(dataStreamAlias1, dataStream2, true, null);
+        mdBuilder.put(dataStreamAlias2, dataStream2, null, null);
+        mdBuilder.put(dataStreamAlias3, dataStream3, null, "{\"term\":{\"year\":2021}}");
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {

+ 6 - 6
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java

@@ -507,8 +507,8 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
             new Tuple<>("logs-foobar", 1), new Tuple<>("metrics-foobar", 1)), List.of());
 
         ClusterState result = service.applyAliasActions(state, List.of(
-            new AliasAction.AddDataStreamAlias("foobar", "logs-foobar", null),
-            new AliasAction.AddDataStreamAlias("foobar", "metrics-foobar", null)
+            new AliasAction.AddDataStreamAlias("foobar", "logs-foobar", null, null),
+            new AliasAction.AddDataStreamAlias("foobar", "metrics-foobar", null, null)
         ));
         assertThat(result.metadata().dataStreamAliases().get("foobar"), notNullValue());
         assertThat(result.metadata().dataStreamAliases().get("foobar").getDataStreams(),
@@ -531,8 +531,8 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
             new Tuple<>("logs-http-emea", 1), new Tuple<>("logs-http-nasa", 1)), List.of());
 
         ClusterState result = service.applyAliasActions(state, List.of(
-            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", true),
-            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", null)
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", true, null),
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", null, null)
         ));
         assertThat(result.metadata().dataStreamAliases().get("logs-http"), notNullValue());
         assertThat(result.metadata().dataStreamAliases().get("logs-http").getDataStreams(),
@@ -540,8 +540,8 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
         assertThat(result.metadata().dataStreamAliases().get("logs-http").getWriteDataStream(), equalTo("logs-http-emea"));
 
         result = service.applyAliasActions(state, List.of(
-            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", false),
-            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", true)
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-emea", false, null),
+            new AliasAction.AddDataStreamAlias("logs-http", "logs-http-nasa", true, null)
         ));
         assertThat(result.metadata().dataStreamAliases().get("logs-http"), notNullValue());
         assertThat(result.metadata().dataStreamAliases().get("logs-http").getDataStreams(),

+ 30 - 30
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

@@ -1236,12 +1236,12 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
 
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-eu", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null), is(true));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-us", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-us", null, null), is(true));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null), is(false));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null, null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-au", null, null), is(false));
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1252,11 +1252,11 @@ public class MetadataTests extends ESTestCase {
     public void testDataStreamReferToNonExistingDataStream() {
         Metadata.Builder mdBuilder = Metadata.builder();
 
-        Exception e = expectThrows(IllegalArgumentException.class, () -> mdBuilder.put("logs-postgres", "logs-postgres-eu", null));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null));
         assertThat(e.getMessage(), equalTo("alias [logs-postgres] refers to a non existing data stream [logs-postgres-eu]"));
 
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-eu"));
@@ -1265,11 +1265,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamShouldUpdateAlias() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null, null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1297,11 +1297,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamAlias() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null, null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1329,11 +1329,11 @@ public class MetadataTests extends ESTestCase {
     public void testDeleteDataStreamAliasMustExists() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-eu"));
-        mdBuilder.put("logs-postgres", "logs-postgres-eu", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-eu", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-us"));
-        mdBuilder.put("logs-postgres", "logs-postgres-us", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-us", null, null);
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-au"));
-        mdBuilder.put("logs-postgres", "logs-postgres-au", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-au", null, null);
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(),
@@ -1347,7 +1347,7 @@ public class MetadataTests extends ESTestCase {
     public void testDataStreamWriteAlias() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
-        mdBuilder.put("logs-postgres", "logs-postgres-replicated", null);
+        mdBuilder.put("logs-postgres", "logs-postgres-replicated", null, null);
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1355,7 +1355,7 @@ public class MetadataTests extends ESTestCase {
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getDataStreams(), containsInAnyOrder("logs-postgres-replicated"));
 
         mdBuilder = Metadata.builder(metadata);
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true, null), is(true));
 
         metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1367,8 +1367,8 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-foobar"));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-barbaz"));
-        mdBuilder.put("logs", "logs-foobar", true);
-        mdBuilder.put("logs", "logs-barbaz", true);
+        mdBuilder.put("logs", "logs-foobar", true, null);
+        mdBuilder.put("logs", "logs-barbaz", true, null);
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs"), notNullValue());
@@ -1379,7 +1379,7 @@ public class MetadataTests extends ESTestCase {
     public void testDataStreamWriteAliasUnset() {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
-        mdBuilder.put("logs-postgres", "logs-postgres-replicated", true);
+        mdBuilder.put("logs-postgres", "logs-postgres-replicated", true, null);
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1388,9 +1388,9 @@ public class MetadataTests extends ESTestCase {
 
         mdBuilder = Metadata.builder(metadata);
         // Side check: null value isn't changing anything:
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(false));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null, null), is(false));
         // Unset write flag
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", false), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", false, null), is(true));
         metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), nullValue());
@@ -1401,8 +1401,8 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true, null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null, null), is(true));
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1412,8 +1412,8 @@ public class MetadataTests extends ESTestCase {
 
         // change write flag:
         mdBuilder = Metadata.builder(metadata);
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", false), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", false, null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", true, null), is(true));
         metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
         assertThat(metadata.dataStreamAliases().get("logs-postgres").getWriteDataStream(), equalTo("logs-postgres-replicated"));
@@ -1425,8 +1425,8 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true, null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null, null), is(true));
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());
@@ -1446,8 +1446,8 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder();
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-primary"));
         mdBuilder.put(DataStreamTestHelper.randomInstance("logs-postgres-replicated"));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true), is(true));
-        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-primary", true, null), is(true));
+        assertThat(mdBuilder.put("logs-postgres", "logs-postgres-replicated", null, null), is(true));
 
         Metadata metadata = mdBuilder.build();
         assertThat(metadata.dataStreamAliases().get("logs-postgres"), notNullValue());

+ 92 - 0
server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java

@@ -16,10 +16,13 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexGraveyard;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -41,6 +44,8 @@ import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
@@ -51,6 +56,7 @@ import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.MapperPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.test.hamcrest.RegexMatcher;
@@ -64,18 +70,27 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex;
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
+import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolverTests.indexBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -554,4 +569,81 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
                 ".*multiple engine factories provided for \\[foobar/.*\\]: \\[.*FooEngineFactory\\],\\[.*BarEngineFactory\\].*";
         assertThat(e, hasToString(new RegexMatcher(pattern)));
     }
+
+    public void testBuildAliasFilter() {
+        var indicesService = getIndicesService();
+
+        Metadata.Builder mdBuilder = Metadata.builder()
+            .put(indexBuilder("test-0").state(IndexMetadata.State.OPEN)
+                .putAlias(AliasMetadata.builder("test-alias-0").filter(Strings.toString(QueryBuilders.termQuery("foo", "bar"))))
+                .putAlias(AliasMetadata.builder("test-alias-1").filter(Strings.toString(QueryBuilders.termQuery("foo", "baz"))))
+                .putAlias(AliasMetadata.builder("test-alias-non-filtering"))
+            );
+        ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
+        {
+            AliasFilter result = indicesService.buildAliasFilter(state, "test-0", Set.of("test-alias-0"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("test-alias-0"));
+            assertThat(result.getQueryBuilder(), equalTo(QueryBuilders.termQuery("foo", "bar")));
+        }
+        {
+            AliasFilter result = indicesService.buildAliasFilter(state, "test-0", Set.of("test-alias-1"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("test-alias-1"));
+            assertThat(result.getQueryBuilder(), equalTo(QueryBuilders.termQuery("foo", "baz")));
+        }
+        {
+            AliasFilter result = indicesService.buildAliasFilter(state, "test-0", Set.of("test-alias-0", "test-alias-1"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("test-alias-0", "test-alias-1"));
+            BoolQueryBuilder filter = (BoolQueryBuilder) result.getQueryBuilder();
+            assertThat(filter.filter(), empty());
+            assertThat(filter.must(), empty());
+            assertThat(filter.mustNot(), empty());
+            assertThat(filter.should(), containsInAnyOrder(QueryBuilders.termQuery("foo", "baz"), QueryBuilders.termQuery("foo", "bar")));
+        }
+        {
+            AliasFilter result =
+                indicesService.buildAliasFilter(state, "test-0", Set.of("test-alias-0", "test-alias-1", "test-alias-non-filtering"));
+            assertThat(result.getAliases(), emptyArray());
+            assertThat(result.getQueryBuilder(), nullValue());
+        }
+    }
+
+    public void testBuildAliasFilterDataStreamAliases() {
+        var indicesService = getIndicesService();
+
+        final String dataStreamName1 = "logs-foobar";
+        IndexMetadata backingIndex1 = createBackingIndex(dataStreamName1, 1).build();
+        Metadata.Builder mdBuilder = Metadata.builder()
+            .put(backingIndex1, false)
+            .put(new DataStream(dataStreamName1, createTimestampField("@timestamp"), List.of(backingIndex1.getIndex())));
+        mdBuilder.put("logs_foo", dataStreamName1, null, Strings.toString(QueryBuilders.termQuery("foo", "bar")));
+        mdBuilder.put("logs", dataStreamName1, null, Strings.toString(QueryBuilders.termQuery("foo", "baz")));
+        mdBuilder.put("logs_bar", dataStreamName1, null, null);
+        ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
+        {
+            String index = backingIndex1.getIndex().getName();
+            AliasFilter result = indicesService.buildAliasFilter(state, index, Set.of("logs_foo"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("logs_foo"));
+            assertThat(result.getQueryBuilder(), equalTo(QueryBuilders.termQuery("foo", "bar")));
+        }
+        {
+            String index = backingIndex1.getIndex().getName();
+            AliasFilter result = indicesService.buildAliasFilter(state, index, Set.of("logs_foo", "logs"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("logs_foo", "logs"));
+            BoolQueryBuilder filter = (BoolQueryBuilder) result.getQueryBuilder();
+            assertThat(filter.filter(), empty());
+            assertThat(filter.must(), empty());
+            assertThat(filter.mustNot(), empty());
+            assertThat(filter.should(), containsInAnyOrder(QueryBuilders.termQuery("foo", "baz"), QueryBuilders.termQuery("foo", "bar")));
+        }
+        {
+            String index = backingIndex1.getIndex().getName();
+            AliasFilter result = indicesService.buildAliasFilter(state, index, Set.of("logs_foo", "logs", "logs_bar"));
+            assertThat(result.getAliases(), arrayContainingInAnyOrder("logs_foo", "logs"));
+            BoolQueryBuilder filter = (BoolQueryBuilder) result.getQueryBuilder();
+            assertThat(filter.filter(), empty());
+            assertThat(filter.must(), empty());
+            assertThat(filter.mustNot(), empty());
+            assertThat(filter.should(), containsInAnyOrder(QueryBuilders.termQuery("foo", "baz"), QueryBuilders.termQuery("foo", "bar")));
+        }
+    }
 }

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -12,8 +12,8 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Description;
@@ -35,6 +35,7 @@ import static org.elasticsearch.test.ESTestCase.generateRandomStringArray;
 import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
 import static org.elasticsearch.test.ESTestCase.randomBoolean;
 import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.ESTestCase.randomMap;
 
 public final class DataStreamTestHelper {
 
@@ -155,8 +156,8 @@ public final class DataStreamTestHelper {
         return new DataStreamAlias(
             randomAlphaOfLength(5),
             dataStreams,
-            randomBoolean() ? randomFrom(dataStreams) : null
-        );
+            randomBoolean() ? randomFrom(dataStreams) : null,
+            randomBoolean() ? randomMap(1, 4, () -> new Tuple<>("term", Map.of("year", "2022"))) : null);
     }
 
     /**

+ 129 - 15
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -91,6 +91,7 @@ import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DE
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -705,10 +706,134 @@ public class DataStreamIT extends ESIntegTestCase {
         GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
         assertThat(
             response.getDataStreamAliases(),
-            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("foo", List.of("metrics-foo"), null))))
+            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("foo", List.of("metrics-foo"), null, null))))
         );
     }
 
+    public void testDataSteamAliasWithFilter() throws Exception {
+        putComposableIndexTemplate("id1", List.of("logs-*"));
+        String dataStreamName = "logs-foobar";
+        client().prepareIndex(dataStreamName)
+            .setId("1")
+            .setSource("{\"@timestamp\": \"2022-12-12\", \"type\": \"x\"}", XContentType.JSON)
+            .setOpType(DocWriteRequest.OpType.CREATE)
+            .get();
+        client().prepareIndex(dataStreamName)
+            .setId("2")
+            .setSource("{\"@timestamp\": \"2022-12-12\", \"type\": \"y\"}", XContentType.JSON)
+            .setOpType(DocWriteRequest.OpType.CREATE)
+            .get();
+        refresh(dataStreamName);
+
+        AliasActions addAction = new AliasActions(AliasActions.Type.ADD).index(dataStreamName)
+            .aliases("foo")
+            .filter(Map.of("term", Map.of("type", Map.of("value", "y"))));
+        IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();
+        aliasesAddRequest.addAliasAction(addAction);
+        assertAcked(client().admin().indices().aliases(aliasesAddRequest).actionGet());
+        GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
+        assertThat(
+            response.getDataStreamAliases(),
+            equalTo(
+                Map.of(
+                    "logs-foobar",
+                    List.of(new DataStreamAlias("foo", List.of("logs-foobar"), null, Map.of("term", Map.of("type", Map.of("value", "y")))))
+                )
+            )
+        );
+
+        // Searching the data stream directly should return all hits:
+        SearchResponse searchResponse = client().prepareSearch("logs-foobar").get();
+        assertSearchHits(searchResponse, "1", "2");
+        // Search the alias should only return document 2, because it matches with the defined filter in the alias:
+        searchResponse = client().prepareSearch("foo").get();
+        assertSearchHits(searchResponse, "2");
+
+        // Update alias:
+        addAction = new AliasActions(AliasActions.Type.ADD).index(dataStreamName)
+            .aliases("foo")
+            .filter(Map.of("term", Map.of("type", Map.of("value", "x"))));
+        aliasesAddRequest = new IndicesAliasesRequest();
+        aliasesAddRequest.addAliasAction(addAction);
+        assertAcked(client().admin().indices().aliases(aliasesAddRequest).actionGet());
+        response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
+        assertThat(
+            response.getDataStreamAliases(),
+            equalTo(
+                Map.of(
+                    "logs-foobar",
+                    List.of(new DataStreamAlias("foo", List.of("logs-foobar"), null, Map.of("term", Map.of("type", Map.of("value", "x")))))
+                )
+            )
+        );
+
+        // Searching the data stream directly should return all hits:
+        searchResponse = client().prepareSearch("logs-foobar").get();
+        assertSearchHits(searchResponse, "1", "2");
+        // Search the alias should only return document 1, because it matches with the defined filter in the alias:
+        searchResponse = client().prepareSearch("foo").get();
+        assertSearchHits(searchResponse, "1");
+    }
+
+    public void testRandomDataSteamAliasesUpdate() throws Exception {
+        putComposableIndexTemplate("id1", List.of("log-*"));
+
+        String alias = randomAlphaOfLength(4);
+        String[] dataStreams = Arrays.stream(generateRandomStringArray(16, 4, false, false))
+            .map(s -> "log-" + s.toLowerCase(Locale.ROOT))
+            .toArray(String[]::new);
+        for (String dataStream : dataStreams) {
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStream);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+        AliasActions addAction = new AliasActions(AliasActions.Type.ADD).aliases(alias)
+            .indices(dataStreams)
+            .filter(Map.of("term", Map.of("type", Map.of("value", "y"))));
+        assertAcked(client().admin().indices().aliases(new IndicesAliasesRequest().addAliasAction(addAction)).actionGet());
+
+        addAction = new AliasActions(AliasActions.Type.ADD).aliases(alias).indices(dataStreams[0]).writeIndex(true);
+        assertAcked(client().admin().indices().aliases(new IndicesAliasesRequest().addAliasAction(addAction)).actionGet());
+
+        GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
+        assertThat(response.getDataStreamAliases().size(), equalTo(dataStreams.length));
+        List<DataStreamAlias> result = response.getDataStreamAliases()
+            .values()
+            .stream()
+            .flatMap(Collection::stream)
+            .distinct()
+            .collect(Collectors.toList());
+        assertThat(result, hasSize(1));
+        assertThat(result.get(0).getName(), equalTo(alias));
+        assertThat(result.get(0).getDataStreams(), containsInAnyOrder(dataStreams));
+        assertThat(result.get(0).getWriteDataStream(), equalTo(dataStreams[0]));
+        assertThat(result.get(0).getFilter().string(), equalTo("{\"term\":{\"type\":{\"value\":\"y\"}}}"));
+    }
+
+    public void testDataSteamAliasWithMalformedFilter() throws Exception {
+        putComposableIndexTemplate("id1", List.of("log-*"));
+
+        String alias = randomAlphaOfLength(4);
+        String dataStream = "log-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT);
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStream);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        AliasActions addAction = new AliasActions(AliasActions.Type.ADD).aliases(alias).indices(dataStream);
+        if (randomBoolean()) {
+            // non existing attribute:
+            addAction.filter(Map.of("term", Map.of("foo", Map.of("value", "bar", "x", "y"))));
+        } else {
+            // Unknown query:
+            addAction.filter(Map.of("my_query", Map.of("x", "y")));
+        }
+        Exception e = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().admin().indices().aliases(new IndicesAliasesRequest().addAliasAction(addAction)).actionGet()
+        );
+        assertThat(e.getMessage(), equalTo("failed to parse filter for alias [" + alias + "]"));
+        GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
+        assertThat(response.getDataStreamAliases(), anEmptyMap());
+    }
+
     public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception {
         putComposableIndexTemplate("id1", List.of("metrics-foo*"));
         String dataStreamName = "metrics-foo";
@@ -761,7 +886,7 @@ public class DataStreamIT extends ESIntegTestCase {
         GetAliasesResponse response = client().admin().indices().getAliases(new GetAliasesRequest()).actionGet();
         assertThat(
             response.getDataStreamAliases(),
-            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("my-alias1", List.of("metrics-foo"), null))))
+            equalTo(Map.of("metrics-foo", List.of(new DataStreamAlias("my-alias1", List.of("metrics-foo"), null, null))))
         );
         assertThat(response.getAliases().get("metrics-myindex"), equalTo(List.of(new AliasMetadata.Builder("my-alias2").build())));
 
@@ -795,8 +920,8 @@ public class DataStreamIT extends ESIntegTestCase {
             assertThat(
                 response.getDataStreamAliases().get("metrics-foo"),
                 containsInAnyOrder(
-                    new DataStreamAlias("my-alias1", List.of("metrics-foo"), null),
-                    new DataStreamAlias("my-alias2", List.of("metrics-foo"), null)
+                    new DataStreamAlias("my-alias1", List.of("metrics-foo"), null, null),
+                    new DataStreamAlias("my-alias2", List.of("metrics-foo"), null, null)
                 )
             );
             assertThat(response.getAliases().size(), equalTo(0));
@@ -827,17 +952,6 @@ public class DataStreamIT extends ESIntegTestCase {
         String dataStreamName = "metrics-foo";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
-
-        {
-            AliasActions addAction = new AliasActions(AliasActions.Type.ADD).index("metrics-*").aliases("my-alias").filter("[filter]");
-            IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();
-            aliasesAddRequest.addAliasAction(addAction);
-            Exception e = expectThrows(
-                IllegalArgumentException.class,
-                () -> client().admin().indices().aliases(aliasesAddRequest).actionGet()
-            );
-            assertThat(e.getMessage(), equalTo("aliases that point to data streams don't support filters"));
-        }
         {
             AliasActions addAction = new AliasActions(AliasActions.Type.ADD).index("metrics-*").aliases("my-alias").routing("[routing]");
             IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();

+ 60 - 3
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.DataStreamAlias;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -118,8 +119,15 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         id = indexResponse.getId();
 
         IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
-        aliasesRequest.addAliasAction(new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("ds"));
-        aliasesRequest.addAliasAction(new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("other-ds").writeIndex(true));
+        aliasesRequest.addAliasAction(
+            new AliasActions(AliasActions.Type.ADD).alias("my-alias").index("ds").filter(QueryBuilders.matchAllQuery())
+        );
+        aliasesRequest.addAliasAction(
+            new AliasActions(AliasActions.Type.ADD).alias("my-alias")
+                .index("other-ds")
+                .filter(QueryBuilders.matchAllQuery())
+                .writeIndex(true)
+        );
         assertAcked(client.admin().indices().aliases(aliasesRequest).actionGet());
     }
 
@@ -178,10 +186,18 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
     }
 
     public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
@@ -360,7 +376,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
                     new DataStreamAlias(
                         "my-alias",
                         List.of(dataStreamToSnapshot),
-                        "other-ds".equals(dataStreamToSnapshot) ? "other-ds" : null
+                        "other-ds".equals(dataStreamToSnapshot) ? "other-ds" : null,
+                        Map.of("match_all", Map.of("boost", 1f))
                     )
                 )
             )
@@ -411,10 +428,18 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
 
         DeleteDataStreamAction.Request r = new DeleteDataStreamAction.Request(new String[] { "*" });
         assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, r).get());
@@ -461,11 +486,19 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
 
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getDataStreams(), containsInAnyOrder("ds", "other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
 
         assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
     }
@@ -550,12 +583,24 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "ds2", "other-ds"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds2").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds2").get(0).getName(), equalTo("my-alias"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds2").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
     }
 
     public void testRenameWriteDataStream() throws Exception {
@@ -592,12 +637,24 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").get(0).getWriteDataStream(), equalTo("other-ds2"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds2").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds2"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
         assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds2"));
+        assertThat(
+            getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getFilter().string(),
+            equalTo("{\"match_all\":{\"boost\":1.0}}")
+        );
     }
 
     public void testBackingIndexIsNotRenamedWhenRestoringDataStream() {

+ 81 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/140_data_stream_aliases.yml

@@ -71,3 +71,84 @@
         index: events
         body: { query: { match_all: {} } }
   - length:   { hits.hits: 2  }
+
+---
+"Create data stream alias with alias":
+  - skip:
+      version: " - 7.99.99"
+      reason: "data stream alias filter not yet backported to the 7.x branch"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [log-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [ log-* ]
+          template:
+            settings:
+              index.number_of_replicas: 0
+          data_stream: { }
+
+
+  - do:
+      index:
+        index: log-app1
+        refresh: true
+        body:
+          '@timestamp': '2022-12-12'
+          zone: 'a'
+
+  - do:
+      index:
+        index: log-app1
+        refresh: true
+        body:
+          '@timestamp': '2022-12-12'
+          zone: 'b'
+
+  - do:
+      indices.update_aliases:
+        body:
+          actions:
+            - add:
+                index: log-app1
+                alias: app1-zone-a
+                filter:
+                  term:
+                    zone: 'a'
+            - add:
+                index: log-app1
+                alias: app1-zone-b
+                filter:
+                  term:
+                    zone: 'b'
+            - add:
+                index: log-app1
+                alias: app1
+
+  - do:
+      indices.get_data_stream:
+        name: "*"
+  - match: { data_streams.0.name: log-app1 }
+
+  - do:
+      indices.get_alias: {}
+  - match: {log-app1.aliases.app1: {}}
+  - match: {log-app1.aliases.app1-zone-a.filter.term.zone: 'a'}
+  - match: {log-app1.aliases.app1-zone-b.filter.term.zone: 'b'}
+
+  - do:
+      search:
+        index: app1
+  - length:   { hits.hits: 2  }
+
+  - do:
+      search:
+        index: app1-zone-a
+  - length:   { hits.hits: 1  }
+  - do:
+      search:
+        index: app1-zone-b
+  - length:   { hits.hits: 1  }