Browse Source

Adding rest actions for getting and updating data stream mappings (#130241)

Keith Massey 3 months ago
parent
commit
ff52007996

+ 6 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -75,11 +75,13 @@ import org.elasticsearch.datastreams.options.rest.RestPutDataStreamOptionsAction
 import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
 import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
 import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction;
+import org.elasticsearch.datastreams.rest.RestGetDataStreamMappingsAction;
 import org.elasticsearch.datastreams.rest.RestGetDataStreamSettingsAction;
 import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction;
 import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
 import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
 import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
+import org.elasticsearch.datastreams.rest.RestUpdateDataStreamMappingsAction;
 import org.elasticsearch.datastreams.rest.RestUpdateDataStreamSettingsAction;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.health.HealthIndicatorService;
@@ -292,6 +294,10 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         handlers.add(new RestDeleteDataStreamOptionsAction());
         handlers.add(new RestGetDataStreamSettingsAction());
         handlers.add(new RestUpdateDataStreamSettingsAction());
+        if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
+            handlers.add(new RestGetDataStreamMappingsAction());
+            handlers.add(new RestUpdateDataStreamMappingsAction());
+        }
         return handlers;
     }
 

+ 51 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamMappingsAction.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.datastreams.rest;
+
+import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestGetDataStreamMappingsAction extends BaseRestHandler {
+    @Override
+    public String getName() {
+        return "get_data_stream_mappings_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/_data_stream/{name}/_mappings"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        GetDataStreamMappingsAction.Request getDataStreamRequest = new GetDataStreamMappingsAction.Request(
+            RestUtils.getMasterNodeTimeout(request)
+        ).indices(Strings.splitStringByCommaToArray(request.param("name")));
+        return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
+            GetDataStreamMappingsAction.INSTANCE,
+            getDataStreamRequest,
+            new RestRefCountedChunkedToXContentListener<>(channel)
+        );
+    }
+}

+ 64 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestUpdateDataStreamMappingsAction.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.rest;
+
+import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.PUT;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestUpdateDataStreamMappingsAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "update_data_stream_mappings_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(PUT, "/_data_stream/{name}/_mappings"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        CompressedXContent mappings;
+        try (XContentParser parser = request.contentParser()) {
+            parser.nextToken(); // advance the parser to the expected location
+            mappings = Template.parseMappings(parser);
+        }
+        boolean dryRun = request.paramAsBoolean("dry_run", false);
+
+        UpdateDataStreamMappingsAction.Request updateDataStreamMappingsRequest = new UpdateDataStreamMappingsAction.Request(
+            mappings,
+            dryRun,
+            RestUtils.getMasterNodeTimeout(request),
+            RestUtils.getAckTimeout(request)
+        ).indices(Strings.splitStringByCommaToArray(request.param("name")));
+        return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
+            UpdateDataStreamMappingsAction.INSTANCE,
+            updateDataStreamMappingsRequest,
+            new RestRefCountedChunkedToXContentListener<>(channel)
+        );
+    }
+}

+ 2 - 0
modules/data-streams/src/yamlRestTest/java/org/elasticsearch/datastreams/DataStreamsClientYamlTestSuiteIT.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.FeatureFlag;
 import org.elasticsearch.test.cluster.local.LocalClusterSpecBuilder;
 import org.elasticsearch.test.cluster.local.distribution.DistributionType;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
@@ -47,6 +48,7 @@ public class DataStreamsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase
             .setting("xpack.security.enabled", "true")
             .keystore("bootstrap.password", "x-pack-test-password")
             .user("x_pack_rest_user", "x-pack-test-password")
+            .feature(FeatureFlag.LOGS_STREAM)
             .systemProperty("es.queryable_built_in_roles_enabled", "false");
         if (initTestSeed().nextBoolean()) {
             clusterBuilder.setting("xpack.license.self_generated.type", "trial");

+ 94 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/250_data_stream_mappings.yml

@@ -0,0 +1,94 @@
+setup:
+  - skip:
+      features: allowed_warnings
+
+---
+"Test single data stream":
+  - requires:
+      cluster_features: [ "logs_stream" ]
+      reason: requires setting 'logs_stream' to get or set data stream settings
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [my-data-stream-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [ my-data-stream-* ]
+          data_stream: { }
+          template:
+            settings:
+              number_of_replicas: 0
+            mappings:
+              properties:
+                field1:
+                  type: keyword
+
+  - do:
+      indices.create_data_stream:
+        name: my-data-stream-1
+
+  - do:
+      cluster.health:
+        index: "my-data-stream-1"
+        wait_for_status: green
+
+  - do:
+      indices.get_data_stream_mappings:
+        name: my-data-stream-1
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.mappings: {} }
+  - length: { data_streams.0.effective_mappings.properties: 1 }
+
+  - do:
+      indices.get_data_stream:
+        name: my-data-stream-1
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.mappings: {} }
+  - match: { data_streams.0.effective_mappings: null }
+
+  - do:
+      indices.put_data_stream_mappings:
+        name: my-data-stream-1
+        body:
+          properties:
+            name:
+              type: keyword
+              fields:
+                english:
+                  type: text
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.applied_to_data_stream: true }
+  - match: { data_streams.0.mappings.properties.name.type: "keyword" }
+  - match: { data_streams.0.effective_mappings.properties.name.type: "keyword" }
+
+  - do:
+      indices.rollover:
+        alias: "my-data-stream-1"
+
+  - do:
+      cluster.health:
+        index: "my-data-stream-1"
+        wait_for_status: green
+
+  - do:
+      indices.get_data_stream_mappings:
+        name: my-data-stream-1
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - length: { data_streams.0.effective_mappings.properties: 2 }
+  - match: { data_streams.0.mappings.properties.name.type: "keyword" }
+  - match: { data_streams.0.effective_mappings.properties.name.type: "keyword" }
+
+  - do:
+      indices.get_data_stream:
+        name: my-data-stream-1
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.mappings.properties.name.type: "keyword" }
+  - match: { data_streams.0.effective_mappings: null }
+  - set: { data_streams.0.indices.0.index_name: oldIndexName }
+  - set: { data_streams.0.indices.1.index_name: newIndexName }
+
+  - do:
+      indices.get_mapping:
+        index: my-data-stream-1
+  - match: { .$oldIndexName.mappings.properties.name: null }
+  - match: { .$newIndexName.mappings.properties.name.type: "keyword" }

+ 36 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_stream_mappings.json

@@ -0,0 +1,36 @@
+{
+  "indices.get_data_stream_mappings":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
+      "description":"Gets a data stream's mappings"
+    },
+    "stability":"stable",
+    "visibility": "feature_flag",
+    "feature_flag": "logs_stream",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/{name}/_mappings",
+          "methods":[
+            "GET"
+          ],
+          "parts":{
+            "name":{
+              "type":"string",
+              "description":"Comma-separated list of data streams or data stream patterns"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "master_timeout":{
+        "type":"time",
+        "description":"Period to wait for a connection to the master node"
+      }
+    }
+  }
+}

+ 49 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_stream_mappings.json

@@ -0,0 +1,49 @@
+{
+  "indices.put_data_stream_mappings":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
+      "description":"Updates a data stream's mappings"
+    },
+    "stability":"stable",
+    "visibility": "feature_flag",
+    "feature_flag": "logs_stream",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/{name}/_mappings",
+          "methods":[
+            "PUT"
+          ],
+          "parts":{
+            "name":{
+              "type":"string",
+              "description":"Comma-separated list of data streams or data stream patterns"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "dry_run":{
+        "type":"boolean",
+        "description":"Whether this request should only be a dry run rather than actually applying mappings",
+        "default":false
+      },
+      "timeout":{
+        "type":"time",
+        "description":"Period to wait for a response"
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Period to wait for a connection to the master node"
+      }
+    },
+    "body":{
+      "description":"The data stream mappings to be updated",
+      "required":true
+    }
+  }
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java

@@ -129,7 +129,7 @@ public class GetDataStreamMappingsAction extends ActionType<GetDataStreamMapping
             Map<String, Object> uncompressedEffectiveMappings = XContentHelper.convertToMap(
                 effectiveMappings.uncompressed(),
                 true,
-                builder.contentType()
+                XContentType.JSON
             ).v2();
             builder.field("effective_mappings");
             builder.map(uncompressedEffectiveMappings);

+ 6 - 14
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -52,7 +52,6 @@ import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
@@ -61,7 +60,6 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -1488,18 +1486,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             DATA_STREAM_OPTIONS_FIELD
         );
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
-        PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
-            XContentParser.Token token = p.currentToken();
-            if (token == XContentParser.Token.VALUE_STRING) {
-                return new CompressedXContent(Base64.getDecoder().decode(p.text()));
-            } else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
-                return new CompressedXContent(p.binaryValue());
-            } else if (token == XContentParser.Token.START_OBJECT) {
-                return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
-            } else {
-                throw new IllegalArgumentException("Unexpected token: " + token);
-            }
-        }, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
+        PARSER.declareField(
+            ConstructingObjectParser.optionalConstructorArg(),
+            (p, c) -> { return Template.parseMappings(p); },
+            MAPPINGS_FIELD,
+            ObjectParser.ValueType.VALUE_OBJECT_ARRAY
+        );
     }
 
     public static DataStream fromXContent(XContentParser parser) throws IOException {

+ 19 - 12
server/src/main/java/org/elasticsearch/cluster/metadata/Template.java

@@ -67,18 +67,12 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
 
     static {
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS);
-        PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
-            XContentParser.Token token = p.currentToken();
-            if (token == XContentParser.Token.VALUE_STRING) {
-                return new CompressedXContent(Base64.getDecoder().decode(p.text()));
-            } else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
-                return new CompressedXContent(p.binaryValue());
-            } else if (token == XContentParser.Token.START_OBJECT) {
-                return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
-            } else {
-                throw new IllegalArgumentException("Unexpected token: " + token);
-            }
-        }, MAPPINGS, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
+        PARSER.declareField(
+            ConstructingObjectParser.optionalConstructorArg(),
+            (p, c) -> { return parseMappings(p); },
+            MAPPINGS,
+            ObjectParser.ValueType.VALUE_OBJECT_ARRAY
+        );
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
             Map<String, AliasMetadata> aliasMap = new HashMap<>();
             XContentParser.Token token;
@@ -104,6 +98,19 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
         );
     }
 
+    public static CompressedXContent parseMappings(XContentParser parser) throws IOException {
+        XContentParser.Token token = parser.currentToken();
+        if (token == XContentParser.Token.VALUE_STRING) {
+            return new CompressedXContent(Base64.getDecoder().decode(parser.text()));
+        } else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
+            return new CompressedXContent(parser.binaryValue());
+        } else if (token == XContentParser.Token.START_OBJECT) {
+            return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
+        } else {
+            throw new IllegalArgumentException("Unexpected token: " + token);
+        }
+    }
+
     @Nullable
     private final Settings settings;
     @Nullable