Browse Source

Add custom metadata support to data steams. (#63991)

Composable index template may hold custom metadata. This change adds behaviour that
when a data stream gets created the custom metadata of the matching composable index
template is copied to new data stream. The get data stream api can then be used to
view the custom metadata.

Example:

```
PUT /_index_template/my-logs-template
{
  "index_patterns": [ "logs-*" ],
  "data_stream": { },
  "template": {
      "settings": {
          "index.number_of_replicas": 0
      }
  },
  "_meta": {
      "managed": true
  }
}

PUT /_data_stream/logs-myapp

GET /_data_stream
```

The get data stream api then yields the following response:

```
{
    "data_streams": [
        {
            "name": "logs-myapp",
            "timestamp_field": {
                "name": "@timestamp"
            },
            "indices": [
                {
                    "index_name": ".ds-logs-myapp-000001",
                    "index_uuid": "3UaBxM3mQXuHR6qx0IDVCw"
                }
            ],
            "generation": 1,
            "_meta": {
                "managed": true
            },
            "status": "GREEN",
            "template": "my-logs-template"
        }
    ]
}
```

Closes #59195
Martijn van Groningen 5 years ago
parent
commit
5dace550d9
16 changed files with 156 additions and 60 deletions
  1. 15 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java
  2. 4 22
      client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java
  3. 30 9
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  4. 3 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  5. 2 1
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  6. 2 2
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java
  7. 5 7
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java
  8. 9 2
      test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java
  9. 3 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java
  10. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java
  11. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java
  12. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java
  13. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java
  14. 36 7
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  15. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java
  16. 42 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml

+ 15 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java

@@ -41,9 +41,11 @@ public final class DataStream {
     String indexTemplate;
     @Nullable
     String ilmPolicyName;
+    @Nullable
+    private final Map<String, Object> metadata;
 
     public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
-                      @Nullable String indexTemplate, @Nullable String ilmPolicyName) {
+                      @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable  Map<String, Object> metadata) {
         this.name = name;
         this.timeStampField = timeStampField;
         this.indices = indices;
@@ -51,6 +53,7 @@ public final class DataStream {
         this.dataStreamStatus = dataStreamStatus;
         this.indexTemplate = indexTemplate;
         this.ilmPolicyName = ilmPolicyName;
+        this.metadata = metadata;
     }
 
     public String getName() {
@@ -81,6 +84,10 @@ public final class DataStream {
         return ilmPolicyName;
     }
 
+    public Map<String, Object> getMetadata() {
+        return metadata;
+    }
+
     public static final ParseField NAME_FIELD = new ParseField("name");
     public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
     public static final ParseField INDICES_FIELD = new ParseField("indices");
@@ -88,6 +95,7 @@ public final class DataStream {
     public static final ParseField STATUS_FIELD = new ParseField("status");
     public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
     public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
+    public static final ParseField METADATA_FIELD = new ParseField("_meta");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
@@ -101,7 +109,8 @@ public final class DataStream {
             ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
             String indexTemplate = (String) args[5];
             String ilmPolicy = (String) args[6];
-            return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy);
+            Map<String, Object> metadata = (Map<String, Object>) args[7];
+            return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata);
         });
 
     static {
@@ -112,6 +121,7 @@ public final class DataStream {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
     }
 
     public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -129,11 +139,12 @@ public final class DataStream {
             indices.equals(that.indices) &&
             dataStreamStatus == that.dataStreamStatus &&
             Objects.equals(indexTemplate, that.indexTemplate) &&
-            Objects.equals(ilmPolicyName, that.ilmPolicyName);
+            Objects.equals(ilmPolicyName, that.ilmPolicyName) &&
+            Objects.equals(metadata, that.metadata);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName);
+        return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata);
     }
 }

+ 4 - 22
client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java

@@ -19,43 +19,25 @@
 
 package org.elasticsearch.client.indices;
 
-import org.elasticsearch.xpack.core.action.GetDataStreamAction;
-import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
 import org.elasticsearch.client.AbstractResponseTestCase;
+import org.elasticsearch.cluster.DataStreamTestHelper;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.xpack.core.action.GetDataStreamAction;
+import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
-import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
-
 public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {
 
-    private static List<Index> randomIndexInstances() {
-        int numIndices = randomIntBetween(0, 128);
-        List<Index> indices = new ArrayList<>(numIndices);
-        for (int i = 0; i < numIndices; i++) {
-            indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
-        }
-        return indices;
-    }
-
     private static DataStreamInfo randomInstance() {
-        List<Index> indices = randomIndexInstances();
-        long generation = indices.size() + randomLongBetween(1, 128);
-        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
-        indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
-        DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
+        DataStream dataStream = DataStreamTestHelper.randomInstance();
         return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10),
             randomAlphaOfLengthBetween(2, 10));
     }

+ 30 - 9
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -18,8 +18,10 @@
  */
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -35,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 
 public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
@@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     private final TimestampField timeStampField;
     private final List<Index> indices;
     private final long generation;
+    private final Map<String, Object> metadata;
 
-    public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation) {
+    public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
         this.name = name;
         this.timeStampField = timeStampField;
         this.indices = Collections.unmodifiableList(indices);
         this.generation = generation;
+        this.metadata = metadata;
         assert indices.size() > 0;
         assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
     }
 
     public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
-        this(name, timeStampField, indices, indices.size());
+        this(name, timeStampField, indices, indices.size(), null);
     }
 
     public String getName() {
@@ -75,6 +80,11 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         return generation;
     }
 
+    @Nullable
+    public Map<String, Object> getMetadata() {
+        return metadata;
+    }
+
     /**
      * Performs a rollover on a {@code DataStream} instance and returns a new instance containing
      * the updated list of backing indices and incremented generation.
@@ -87,7 +97,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
         List<Index> backingIndices = new ArrayList<>(indices);
         backingIndices.add(newWriteIndex);
-        return new DataStream(name, timeStampField, backingIndices, generation + 1);
+        return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata);
     }
 
     /**
@@ -101,7 +111,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         List<Index> backingIndices = new ArrayList<>(indices);
         backingIndices.remove(index);
         assert backingIndices.size() == indices.size() - 1;
-        return new DataStream(name, timeStampField, backingIndices, generation);
+        return new DataStream(name, timeStampField, backingIndices, generation, metadata);
     }
 
     /**
@@ -126,7 +136,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
                 "it is the write index", existingBackingIndex.getName(), name));
         }
         backingIndices.set(backingIndexPosition, newBackingIndex);
-        return new DataStream(name, timeStampField, backingIndices, generation);
+        return new DataStream(name, timeStampField, backingIndices, generation, metadata);
     }
 
     /**
@@ -142,7 +152,8 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     }
 
     public DataStream(StreamInput in) throws IOException {
-        this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong());
+        this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
+            in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null);
     }
 
     public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -155,22 +166,28 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         timeStampField.writeTo(out);
         out.writeList(indices);
         out.writeVLong(generation);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeMap(metadata);
+        }
     }
 
     public static final ParseField NAME_FIELD = new ParseField("name");
     public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
     public static final ParseField INDICES_FIELD = new ParseField("indices");
     public static final ParseField GENERATION_FIELD = new ParseField("generation");
+    public static final ParseField METADATA_FIELD = new ParseField("_meta");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
-        args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3]));
+        args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
+            (Map<String, Object>) args[4]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
         PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
         PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
     }
 
     public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -184,6 +201,9 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
         builder.field(INDICES_FIELD.getPreferredName(), indices);
         builder.field(GENERATION_FIELD.getPreferredName(), generation);
+        if (metadata != null) {
+            builder.field(METADATA_FIELD.getPreferredName(), metadata);
+        }
         builder.endObject();
         return builder;
     }
@@ -196,12 +216,13 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         return name.equals(that.name) &&
             timeStampField.equals(that.timeStampField) &&
             indices.equals(that.indices) &&
-            generation == that.generation;
+            generation == that.generation &&
+            Objects.equals(metadata, that.metadata);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, timeStampField, indices, generation);
+        return Objects.hash(name, timeStampField, indices, generation, metadata);
     }
 
     public static final class TimestampField implements Writeable, ToXContentObject {

+ 3 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -158,7 +158,9 @@ public class MetadataCreateDataStreamService {
 
         String fieldName = template.getDataStreamTemplate().getTimestampField();
         DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
-        DataStream newDataStream = new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()));
+        DataStream newDataStream =
+            new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()), 1L,
+                template.metadata() != null ? Map.copyOf(template.metadata()) : null);
         Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
         logger.info("adding data stream [{}]", request.name);
         return ClusterState.builder(currentState).metadata(builder).build();

+ 2 - 1
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -608,7 +608,8 @@ public class RestoreService implements ClusterStateApplier {
         List<Index> updatedIndices = dataStream.getIndices().stream()
             .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
             .collect(Collectors.toList());
-        return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration());
+        return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
+            dataStream.getMetadata());
     }
 
     public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -1727,7 +1727,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
 
         Metadata.Builder mdBuilder = Metadata.builder()
             .put(backingIndex, false)
-            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex()), 1));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex())));
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {
@@ -1916,7 +1916,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder()
             .put(index1, false)
             .put(index2, false)
-            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex()), 2));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())));
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {

+ 5 - 7
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

@@ -1016,7 +1016,7 @@ public class MetadataTests extends ESTestCase {
             backingIndices.add(im.getIndex());
         }
 
-        b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
+        b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
         Metadata metadata = b.build();
         assertThat(metadata.dataStreams().size(), equalTo(1));
         assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -1034,7 +1034,7 @@ public class MetadataTests extends ESTestCase {
                 indices.add(idx.getIndex());
                 b.put(idx, true);
             }
-            b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size()));
+            b.put(new DataStream(name, createTimestampField("@timestamp"), indices));
         }
 
         Metadata metadata = b.build();
@@ -1100,8 +1100,7 @@ public class MetadataTests extends ESTestCase {
         DataStream dataStream = new DataStream(
             dataStreamName,
             createTimestampField("@timestamp"),
-            backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
-            backingIndices.size()
+            backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
         );
 
         IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
@@ -1174,8 +1173,7 @@ public class MetadataTests extends ESTestCase {
         DataStream dataStream = new DataStream(
             dataStreamName,
             createTimestampField("@timestamp"),
-            backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
-            backingIndices.size()
+            backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
         );
 
         IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
@@ -1275,7 +1273,7 @@ public class MetadataTests extends ESTestCase {
             b.put(im, false);
             backingIndices.add(im.getIndex());
         }
-        b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
+        b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
         return new CreateIndexResult(indices, backingIndices, b.build());
     }
 

+ 9 - 2
test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java

@@ -32,11 +32,13 @@ import org.elasticsearch.test.ESTestCase;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
 import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
 
 public final class DataStreamTestHelper {
 
@@ -103,7 +105,11 @@ public final class DataStreamTestHelper {
         long generation = indices.size() + ESTestCase.randomLongBetween(1, 128);
         String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random())));
-        return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
+        Map<String, Object> metadata = null;
+        if (randomBoolean()) {
+            metadata = Map.of("key", "value");
+        }
+        return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata);
     }
 
     /**
@@ -127,7 +133,8 @@ public final class DataStreamTestHelper {
                 dsTuple.v1(),
                 createTimestampField("@timestamp"),
                 backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
-                dsTuple.v2()
+                dsTuple.v2(),
+                null
             );
             builder.put(ds);
         }

+ 3 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java

@@ -159,6 +159,9 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), dataStream.getTimeStampField());
                 builder.field(DataStream.INDICES_FIELD.getPreferredName(), dataStream.getIndices());
                 builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration());
+                if (dataStream.getMetadata() != null) {
+                    builder.field(DataStream.METADATA_FIELD.getPreferredName(), dataStream.getMetadata());
+                }
                 builder.field(STATUS_FIELD.getPreferredName(), dataStreamStatus);
                 if (indexTemplate != null) {
                     builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java

@@ -136,7 +136,7 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
             .metadata(
                 Metadata.builder()
                     .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
-                        List.of(indexMetadata.getIndex()), 1L))
+                        List.of(indexMetadata.getIndex())))
                     .put(indexMetadata, true)
             )
             .build();

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java

@@ -98,7 +98,7 @@ public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<U
             .metadata(
                 Metadata.builder()
                     .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
-                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex())))
                     .put(originalIndexMeta, true)
                     .put(rolledIndexMeta, true)
             ).build();

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java

@@ -169,7 +169,7 @@ public class WaitForActiveShardsTests extends AbstractStepTestCase<WaitForActive
             .metadata(
                 Metadata.builder()
                     .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
-                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex())))
                     .put(originalIndexMeta, true)
                     .put(rolledIndexMeta, true)
             )

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java

@@ -152,7 +152,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         SetOnce<Boolean> conditionsMet = new SetOnce<>();
         Metadata metadata = Metadata.builder().put(indexMetadata, true)
             .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
-                List.of(indexMetadata.getIndex()), 1L))
+                List.of(indexMetadata.getIndex())))
             .build();
         step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 

+ 36 - 7
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -56,6 +56,7 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
 import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
 import org.elasticsearch.xpack.core.action.GetDataStreamAction;
+import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
 import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
 import org.junit.After;
 
@@ -724,7 +725,7 @@ public class DataStreamIT extends ESIntegTestCase {
             + "        }\n"
             + "      }\n"
             + "    }";
-        putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null);
+        putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null, null);
 
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
         client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
@@ -908,7 +909,7 @@ public class DataStreamIT extends ESIntegTestCase {
 
     public void testGetDataStream() throws Exception {
         Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maximumNumberOfReplicas() + 2).build();
-        putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings);
+        putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null);
 
         int numDocsFoo = randomIntBetween(2, 16);
         indexDocs("metrics-foo", numDocsFoo);
@@ -918,7 +919,7 @@ public class DataStreamIT extends ESIntegTestCase {
             new GetDataStreamAction.Request(new String[] { "metrics-foo" })
         ).actionGet();
         assertThat(response.getDataStreams().size(), is(1));
-        GetDataStreamAction.Response.DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0);
+        DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0);
         assertThat(metricsFooDataStream.getDataStream().getName(), is("metrics-foo"));
         assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
         assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
@@ -1113,6 +1114,29 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(searchResponse.getHits().getTotalHits().relation, equalTo(TotalHits.Relation.EQUAL_TO));
     }
 
+    public void testDataStreamMetadata() throws Exception {
+        Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
+        putComposableIndexTemplate("id1", null, List.of("logs-*"), settings, Map.of("managed_by", "core-features"));
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { "*" });
+        GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+            .actionGet();
+        getDataStreamResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName()));
+        assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+        DataStreamInfo info = getDataStreamResponse.getDataStreams().get(0);
+        assertThat(info.getIndexTemplate(), equalTo("id1"));
+        assertThat(info.getDataStreamStatus(), equalTo(ClusterHealthStatus.GREEN));
+        assertThat(info.getIlmPolicy(), nullValue());
+        DataStream dataStream = info.getDataStream();
+        assertThat(dataStream.getName(), equalTo("logs-foobar"));
+        assertThat(dataStream.getTimeStampField().getName(), equalTo("@timestamp"));
+        assertThat(dataStream.getIndices().size(), equalTo(1));
+        assertThat(dataStream.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
+        assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features")));
+    }
+
     private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
         verifyResolvability(dataStream, requestBuilder, fail, 0);
     }
@@ -1186,11 +1210,16 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public static void putComposableIndexTemplate(String id, List<String> patterns) throws IOException {
-        putComposableIndexTemplate(id, null, patterns, null);
+        putComposableIndexTemplate(id, null, patterns, null, null);
     }
 
-    static void putComposableIndexTemplate(String id, @Nullable String mappings, List<String> patterns, @Nullable Settings settings)
-        throws IOException {
+    static void putComposableIndexTemplate(
+        String id,
+        @Nullable String mappings,
+        List<String> patterns,
+        @Nullable Settings settings,
+        @Nullable Map<String, Object> metadata
+    ) throws IOException {
         PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
         request.indexTemplate(
             new ComposableIndexTemplate(
@@ -1199,7 +1228,7 @@ public class DataStreamIT extends ESIntegTestCase {
                 null,
                 null,
                 null,
-                null,
+                metadata,
                 new ComposableIndexTemplate.DataStreamTemplate(),
                 null
             )

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java

@@ -523,7 +523,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
 
         clusterState = ClusterState.builder(new ClusterName("cluster_name"))
             .metadata(new Metadata.Builder()
-                .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index), 1L))
+                .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index)))
                 .putCustom(PersistentTasksCustomMetadata.TYPE, tasks)
                 .putCustom(MlMetadata.TYPE, mlMetadata)
                 .put(indexMetadata, false))

+ 42 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml

@@ -400,6 +400,48 @@ setup:
         name: logs-foobar
   - is_true: acknowledged
 
+---
+"Include metadata in a data stream":
+  - skip:
+      version: " - 7.99.99"
+      reason: "re-enable in 7.11 when backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          _meta:
+            managed_by: 'core-features'
+            managed: true
+          template:
+            settings:
+              number_of_replicas: 0
+          data_stream: {}
+
+  - do:
+      indices.create_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_stream:
+        name: "*"
+  - length: { data_streams: 1 }
+  - match: { data_streams.0.name: 'logs-foobar' }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - match: { data_streams.0.generation: 1 }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '.ds-logs-foobar-000001' }
+  - match: { data_streams.0.status: 'GREEN' }
+  - match: { data_streams.0.template: 'generic_logs_template' }
+  - length: { data_streams.0._meta: 2 }
+  - match: { data_streams.0._meta.managed: true }
+  - match: { data_streams.0._meta.managed_by: 'core-features' }
+
 ---
 "Create index into a namespace that is governed by a data stream template":
   - skip: