Selaa lähdekoodia

HLRC support for data streams (#58106)

This change adds high level REST client support for data streams

Relates to #53100
Przemko Robakowski 5 vuotta sitten
vanhempi
commit
16e075e107

+ 108 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java

@@ -45,25 +45,29 @@ import org.elasticsearch.client.indices.AnalyzeRequest;
 import org.elasticsearch.client.indices.AnalyzeResponse;
 import org.elasticsearch.client.indices.CloseIndexRequest;
 import org.elasticsearch.client.indices.CloseIndexResponse;
+import org.elasticsearch.client.indices.ComposableIndexTemplateExistRequest;
+import org.elasticsearch.client.indices.CreateDataStreamRequest;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
 import org.elasticsearch.client.indices.DeleteAliasRequest;
 import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.DeleteDataStreamRequest;
 import org.elasticsearch.client.indices.FreezeIndexRequest;
+import org.elasticsearch.client.indices.GetComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.GetComposableIndexTemplatesResponse;
+import org.elasticsearch.client.indices.GetDataStreamRequest;
+import org.elasticsearch.client.indices.GetDataStreamResponse;
 import org.elasticsearch.client.indices.GetFieldMappingsRequest;
 import org.elasticsearch.client.indices.GetFieldMappingsResponse;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.client.indices.GetComposableIndexTemplateRequest;
 import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
 import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
-import org.elasticsearch.client.indices.GetComposableIndexTemplatesResponse;
 import org.elasticsearch.client.indices.GetMappingsRequest;
 import org.elasticsearch.client.indices.GetMappingsResponse;
-import org.elasticsearch.client.indices.ComposableIndexTemplateExistRequest;
 import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
-import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutMappingRequest;
 import org.elasticsearch.client.indices.ReloadAnalyzersRequest;
 import org.elasticsearch.client.indices.ReloadAnalyzersResponse;
@@ -155,6 +159,106 @@ public final class IndicesClient {
             CreateIndexResponse::fromXContent, listener, emptySet());
     }
 
+    /**
+     * Creates a data stream using the Create Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-data-streams.html">
+     * Data Streams API on elastic.co</a>
+     *
+     * @param createDataStreamRequest the request
+     * @param options                 the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
+     *                                customized
+     * @return the response
+     * @throws IOException in case there is a problem sending the request or parsing back the response
+     */
+    public AcknowledgedResponse createDataStream(CreateDataStreamRequest createDataStreamRequest,
+                                                 RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(createDataStreamRequest, IndicesRequestConverters::putDataStream, options,
+            AcknowledgedResponse::fromXContent, emptySet());
+    }
+
+    /**
+     * Asynchronously creates a data stream using the Create Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-data-streams.html">
+     * Data Streams API on elastic.co</a>
+     *
+     * @param createDataStreamRequest the request
+     * @param options                 the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
+     *                                customized
+     * @param listener                the listener to be notified upon request completion
+     * @return cancellable that may be used to cancel the request
+     */
+    public Cancellable createDataStreamAsync(CreateDataStreamRequest createDataStreamRequest,
+                                             RequestOptions options,
+                                             ActionListener<AcknowledgedResponse> listener) {
+        return restHighLevelClient.performRequestAsyncAndParseEntity(createDataStreamRequest, IndicesRequestConverters::putDataStream,
+            options, AcknowledgedResponse::fromXContent, listener, emptySet());
+    }
+
+    /**
+     * Deletes a data stream using the Delete Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-data-streams.html">
+     * Data Streams API on elastic.co</a>
+     *
+     * @param deleteDataStreamRequest the request
+     * @param options                 the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
+     *                                customized
+     * @return the response
+     * @throws IOException in case there is a problem sending the request or parsing back the response
+     */
+    public AcknowledgedResponse deleteDataStream(DeleteDataStreamRequest deleteDataStreamRequest,
+                                                 RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(deleteDataStreamRequest, IndicesRequestConverters::deleteDataStream,
+            options, AcknowledgedResponse::fromXContent, emptySet());
+    }
+
+    /**
+     * Asynchronously deletes a data stream using the Delete Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-data-streams.html">
+     * Data Streams API on elastic.co</a>
+     *
+     * @param deleteDataStreamRequest the request
+     * @param options                 the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
+     *                                customized
+     * @param listener                the listener to be notified upon request completion
+     * @return cancellable that may be used to cancel the request
+     */
+    public Cancellable deleteDataStreamAsync(DeleteDataStreamRequest deleteDataStreamRequest, RequestOptions options,
+                                             ActionListener<AcknowledgedResponse> listener) {
+        return restHighLevelClient.performRequestAsyncAndParseEntity(deleteDataStreamRequest, IndicesRequestConverters::deleteDataStream,
+            options, AcknowledgedResponse::fromXContent, listener, emptySet());
+    }
+
+    /**
+     * Gets one or more data streams using the Get Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html"> Data Streams API on
+     * elastic.co</a>
+     *
+     * @param dataStreamRequest the request
+     * @param options           the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     * @throws IOException in case there is a problem sending the request or parsing back the response
+     */
+    public GetDataStreamResponse getDataStream(GetDataStreamRequest dataStreamRequest, RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(dataStreamRequest, IndicesRequestConverters::getDataStreams, options,
+            GetDataStreamResponse::fromXContent, emptySet());
+    }
+
+    /**
+     * Asynchronously gets one or more data streams using the Get Data Stream API.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html"> Data Streams API on
+     * elastic.co</a>
+     *
+     * @param dataStreamRequest the request
+     * @param options           the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener          the listener to be notified upon request completion
+     * @return cancellable that may be used to cancel the request
+     */
+    public Cancellable getDataStreamAsync(GetDataStreamRequest dataStreamRequest, RequestOptions options,
+                                          ActionListener<GetDataStreamResponse> listener) {
+        return restHighLevelClient.performRequestAsyncAndParseEntity(dataStreamRequest, IndicesRequestConverters::getDataStreams, options,
+            GetDataStreamResponse::fromXContent, listener, emptySet());
+    }
+
     /**
      * Updates the mappings on an index using the Put Mapping API.
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html">

+ 25 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesRequestConverters.java

@@ -39,9 +39,12 @@ import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplat
 import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest;
 import org.elasticsearch.client.indices.AnalyzeRequest;
 import org.elasticsearch.client.indices.CloseIndexRequest;
+import org.elasticsearch.client.indices.CreateDataStreamRequest;
 import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetDataStreamRequest;
 import org.elasticsearch.client.indices.DeleteAliasRequest;
 import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.DeleteDataStreamRequest;
 import org.elasticsearch.client.indices.FreezeIndexRequest;
 import org.elasticsearch.client.indices.GetFieldMappingsRequest;
 import org.elasticsearch.client.indices.GetIndexRequest;
@@ -68,6 +71,28 @@ final class IndicesRequestConverters {
 
     private IndicesRequestConverters() {}
 
+    static Request putDataStream(CreateDataStreamRequest createDataStreamRequest) {
+        String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_data_stream")
+            .addPathPart(createDataStreamRequest.getName()).build();
+        Request request = new Request(HttpPut.METHOD_NAME, endpoint);
+        return request;
+    }
+
+    static Request deleteDataStream(DeleteDataStreamRequest deleteDataStreamRequest) {
+        String name = deleteDataStreamRequest.getName();
+        String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_data_stream").addPathPart(name).build();
+        Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
+        return request;
+    }
+
+    static Request getDataStreams(GetDataStreamRequest dataStreamRequest) {
+        final String endpoint = new RequestConverters.EndpointBuilder()
+            .addPathPartAsIs("_data_stream")
+            .addPathPart(dataStreamRequest.getName())
+            .build();
+        return new Request(HttpGet.METHOD_NAME, endpoint);
+    }
+
     static Request deleteIndex(DeleteIndexRequest deleteIndexRequest) {
         String endpoint = RequestConverters.endpoint(deleteIndexRequest.indices());
         Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

+ 38 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/CreateDataStreamRequest.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.client.Validatable;
+
+public class CreateDataStreamRequest implements Validatable {
+
+    private final String name;
+
+    public CreateDataStreamRequest(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("The data stream name cannot be null.");
+        }
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}

+ 113 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java

@@ -0,0 +1,113 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public final class DataStream implements ToXContentObject {
+
+    private final String name;
+    private final String timeStampField;
+    private final List<String> indices;
+    private long generation;
+
+    public DataStream(String name, String timeStampField, List<String> indices, long generation) {
+        this.name = name;
+        this.timeStampField = timeStampField;
+        this.indices = indices;
+        this.generation = generation;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getTimeStampField() {
+        return timeStampField;
+    }
+
+    public List<String> getIndices() {
+        return indices;
+    }
+
+    public long getGeneration() {
+        return generation;
+    }
+
+    public static final ParseField NAME_FIELD = new ParseField("name");
+    public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
+    public static final ParseField INDICES_FIELD = new ParseField("indices");
+    public static final ParseField GENERATION_FIELD = new ParseField("generation");
+
+    @SuppressWarnings("unchecked")
+    private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
+        args -> {
+            List<String> indices =
+                ((List<Map<String, String>>) args[2]).stream().map(m -> m.get("index_name")).collect(Collectors.toList());
+            return new DataStream((String) args[0], (String) args[1], indices, (Long) args[3]);
+        });
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
+        PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), INDICES_FIELD);
+        PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
+    }
+
+    public static DataStream fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(NAME_FIELD.getPreferredName(), name);
+        builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
+        builder.field(INDICES_FIELD.getPreferredName(), indices);
+        builder.field(GENERATION_FIELD.getPreferredName(), generation);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        DataStream that = (DataStream) o;
+        return name.equals(that.name) &&
+            timeStampField.equals(that.timeStampField) &&
+            indices.equals(that.indices) &&
+            generation == that.generation;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, timeStampField, indices, generation);
+    }
+}

+ 38 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DeleteDataStreamRequest.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.client.Validatable;
+
+public class DeleteDataStreamRequest implements Validatable {
+
+    private final String name;
+
+    public DeleteDataStreamRequest(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("The data stream name cannot be null.");
+        }
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}

+ 35 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/GetDataStreamRequest.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.client.Validatable;
+
+public class GetDataStreamRequest implements Validatable {
+
+    private final String name;
+
+    public GetDataStreamRequest(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}

+ 79 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/GetDataStreamResponse.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+
+
+public class GetDataStreamResponse {
+
+    private final List<DataStream> dataStreams;
+
+    GetDataStreamResponse(List<DataStream> dataStreams) {
+        this.dataStreams = dataStreams;
+    }
+
+    public List<DataStream> getDataStreams() {
+        return dataStreams;
+    }
+
+    public static GetDataStreamResponse fromXContent(XContentParser parser) throws IOException {
+        final List<DataStream> templates = new ArrayList<>();
+        for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_ARRAY; token = parser.nextToken()) {
+            if (token == XContentParser.Token.START_OBJECT) {
+                templates.add(DataStream.fromXContent(parser));
+            }
+        }
+        return new GetDataStreamResponse(templates);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(new HashSet<>(this.dataStreams));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        GetDataStreamResponse other = (GetDataStreamResponse) obj;
+        return Objects.equals(new HashSet<>(this.dataStreams), new HashSet<>(other.dataStreams));
+    }
+
+    @Override
+    public String toString() {
+        List<DataStream> thisList = new ArrayList<>(this.dataStreams);
+        thisList.sort(Comparator.comparing(DataStream::getName));
+        return "GetDataStreamResponse [dataStreams=" + thisList + "]";
+    }
+}

+ 64 - 5
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

@@ -58,26 +58,31 @@ import org.elasticsearch.client.indices.AnalyzeRequest;
 import org.elasticsearch.client.indices.AnalyzeResponse;
 import org.elasticsearch.client.indices.CloseIndexRequest;
 import org.elasticsearch.client.indices.CloseIndexResponse;
+import org.elasticsearch.client.indices.ComposableIndexTemplateExistRequest;
+import org.elasticsearch.client.indices.CreateDataStreamRequest;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.DataStream;
 import org.elasticsearch.client.indices.DeleteAliasRequest;
 import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.DeleteDataStreamRequest;
 import org.elasticsearch.client.indices.FreezeIndexRequest;
+import org.elasticsearch.client.indices.GetComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.GetComposableIndexTemplatesResponse;
+import org.elasticsearch.client.indices.GetDataStreamRequest;
+import org.elasticsearch.client.indices.GetDataStreamResponse;
 import org.elasticsearch.client.indices.GetFieldMappingsRequest;
 import org.elasticsearch.client.indices.GetFieldMappingsResponse;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexResponse;
-import org.elasticsearch.client.indices.GetComposableIndexTemplateRequest;
 import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
 import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
-import org.elasticsearch.client.indices.GetComposableIndexTemplatesResponse;
 import org.elasticsearch.client.indices.GetMappingsRequest;
 import org.elasticsearch.client.indices.GetMappingsResponse;
 import org.elasticsearch.client.indices.IndexTemplateMetadata;
-import org.elasticsearch.client.indices.ComposableIndexTemplateExistRequest;
 import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
-import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutComposableIndexTemplateRequest;
+import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutMappingRequest;
 import org.elasticsearch.client.indices.ReloadAnalyzersRequest;
 import org.elasticsearch.client.indices.ReloadAnalyzersResponse;
@@ -87,8 +92,8 @@ import org.elasticsearch.client.indices.UnfreezeIndexRequest;
 import org.elasticsearch.client.indices.rollover.RolloverRequest;
 import org.elasticsearch.client.indices.rollover.RolloverResponse;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Template;
 import org.elasticsearch.common.Strings;
@@ -1565,6 +1570,60 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         assertThat(aliasExists(index, alias2), equalTo(true));
     }
 
+    public void testDataStreams() throws Exception {
+        String dataStreamName = "data-stream";
+
+        CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"@timestamp\":{\"type\":\"date\"}}}");
+        Template template = new Template(null, mappings, null);
+        ComposableIndexTemplate indexTemplate = new ComposableIndexTemplate(Collections.singletonList(dataStreamName), template,
+            Collections.emptyList(), 1L, 1L, new HashMap<>(), new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
+        PutComposableIndexTemplateRequest putComposableIndexTemplateRequest =
+            new PutComposableIndexTemplateRequest().name("ds-template").create(true).indexTemplate(indexTemplate);
+        AcknowledgedResponse response = execute(putComposableIndexTemplateRequest,
+            highLevelClient().indices()::putIndexTemplate, highLevelClient().indices()::putIndexTemplateAsync);
+        assertThat(response.isAcknowledged(), equalTo(true));
+
+        CreateDataStreamRequest createDataStreamRequest = new CreateDataStreamRequest(dataStreamName);
+        IndicesClient indices = highLevelClient().indices();
+        response = execute(createDataStreamRequest, indices::createDataStream, indices::createDataStreamAsync);
+        assertThat(response.isAcknowledged(), equalTo(true));
+
+        GetDataStreamRequest getDataStreamRequest = new GetDataStreamRequest(dataStreamName);
+        GetDataStreamResponse getDataStreamResponse = execute(getDataStreamRequest, indices::getDataStream, indices::getDataStreamAsync);
+        List<DataStream> dataStreams = getDataStreamResponse.getDataStreams();
+        assertThat(dataStreams, hasSize(1));
+        DataStream dataStream = dataStreams.get(0);
+        assertThat(dataStream.getName(), equalTo(dataStreamName));
+        assertThat(dataStream.getGeneration(), equalTo(1L));
+        assertThat(dataStream.getTimeStampField(), equalTo("@timestamp"));
+        assertThat(dataStream.getIndices(), hasSize(1));
+
+        getDataStreamRequest = new GetDataStreamRequest(null);
+        getDataStreamResponse = execute(getDataStreamRequest, indices::getDataStream, indices::getDataStreamAsync);
+        dataStreams = getDataStreamResponse.getDataStreams();
+        assertThat(dataStreams, hasSize(1));
+        dataStream = dataStreams.get(0);
+        assertThat(dataStream.getName(), equalTo(dataStreamName));
+        assertThat(dataStream.getGeneration(), equalTo(1L));
+        assertThat(dataStream.getTimeStampField(), equalTo("@timestamp"));
+        assertThat(dataStream.getIndices(), hasSize(1));
+
+        DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest(dataStreamName);
+        response = execute(deleteDataStreamRequest, indices::deleteDataStream, indices::deleteDataStreamAsync);
+        assertThat(response.isAcknowledged(), equalTo(true));
+
+        getDataStreamRequest = new GetDataStreamRequest(null);
+        getDataStreamResponse = execute(getDataStreamRequest, indices::getDataStream, indices::getDataStreamAsync);
+        dataStreams = getDataStreamResponse.getDataStreams();
+        assertThat(dataStreams, hasSize(0));
+
+        getDataStreamRequest = new GetDataStreamRequest(dataStreamName);
+        GetDataStreamRequest finalGetDataStreamRequest = getDataStreamRequest;
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> execute(finalGetDataStreamRequest,
+            indices::getDataStream, indices::getDataStreamAsync));
+        assertThat(e.status(), equalTo(RestStatus.NOT_FOUND));
+    }
+
     public void testIndexTemplates() throws Exception {
         String templateName = "my-template";
         Settings settings = Settings.builder().put("index.number_of_shards", 1).build();

+ 30 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesRequestConvertersTests.java

@@ -43,8 +43,11 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryReques
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.client.indices.AnalyzeRequest;
 import org.elasticsearch.client.indices.CloseIndexRequest;
+import org.elasticsearch.client.indices.CreateDataStreamRequest;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.DeleteAliasRequest;
+import org.elasticsearch.client.indices.DeleteDataStreamRequest;
+import org.elasticsearch.client.indices.GetDataStreamRequest;
 import org.elasticsearch.client.indices.GetFieldMappingsRequest;
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
@@ -256,6 +259,33 @@ public class IndicesRequestConvertersTests extends ESTestCase {
         Assert.assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
     }
 
+    public void testPutDataStream() {
+        String name = randomAlphaOfLength(10);
+        CreateDataStreamRequest createDataStreamRequest = new CreateDataStreamRequest(name);
+        Request request = IndicesRequestConverters.putDataStream(createDataStreamRequest);
+        Assert.assertEquals("/_data_stream/" + name, request.getEndpoint());
+        Assert.assertEquals(HttpPut.METHOD_NAME, request.getMethod());
+        Assert.assertNull(request.getEntity());
+    }
+
+    public void testGetDataStream() {
+        String name = randomAlphaOfLength(10);
+        GetDataStreamRequest getDataStreamRequest = new GetDataStreamRequest(name);
+        Request request = IndicesRequestConverters.getDataStreams(getDataStreamRequest);
+        Assert.assertEquals("/_data_stream/" + name, request.getEndpoint());
+        Assert.assertEquals(HttpGet.METHOD_NAME, request.getMethod());
+        Assert.assertNull(request.getEntity());
+    }
+
+    public void testDeleteDataStream() {
+        String name = randomAlphaOfLength(10);
+        DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest(name);
+        Request request = IndicesRequestConverters.deleteDataStream(deleteDataStreamRequest);
+        Assert.assertEquals("/_data_stream/" + name, request.getEndpoint());
+        Assert.assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
+        Assert.assertNull(request.getEntity());
+    }
+
     public void testDeleteIndex() {
         String[] indices = RequestConvertersTests.randomIndicesNames(0, 5);
         DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indices);

+ 0 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

@@ -801,9 +801,6 @@ public class RestHighLevelClientTests extends ESTestCase {
             "indices.put_alias",
             "render_search_template",
             "scripts_painless_execute",
-            "indices.create_data_stream",
-            "indices.get_data_stream",
-            "indices.delete_data_stream",
             "indices.simulate_template",
             "indices.resolve_index"
         };

+ 93 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.indices;
+
+import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
+import org.elasticsearch.client.AbstractResponseTestCase;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.Index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
+
+public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {
+
+    private static List<Index> randomIndexInstances() {
+        int numIndices = randomIntBetween(0, 128);
+        List<Index> indices = new ArrayList<>(numIndices);
+        for (int i = 0; i < numIndices; i++) {
+            indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
+        }
+        return indices;
+    }
+
+    private static DataStream randomInstance() {
+        List<Index> indices = randomIndexInstances();
+        long generation = indices.size() + randomLongBetween(1, 128);
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
+        return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
+    }
+
+    private static GetDataStreamResponse fromXContent(XContentParser parser) throws IOException {
+        parser.nextToken();
+        return GetDataStreamResponse.fromXContent(parser);
+    }
+
+    @Override
+    protected GetDataStreamAction.Response createServerTestInstance(XContentType xContentType) {
+        ArrayList<DataStream> dataStreams = new ArrayList<>();
+        int count = randomInt(10);
+        for (int i = 0; i < count; i++) {
+            dataStreams.add(randomInstance());
+        }
+        return new GetDataStreamAction.Response(dataStreams);
+    }
+
+    @Override
+    protected GetDataStreamResponse doParseToClientInstance(XContentParser parser) throws IOException {
+        return GetDataStreamResponse.fromXContent(parser);
+    }
+
+    @Override
+    protected void assertInstances(GetDataStreamAction.Response serverTestInstance, GetDataStreamResponse clientInstance) {
+        assertEquals(serverTestInstance.getDataStreams().size(), clientInstance.getDataStreams().size());
+        Iterator<DataStream> serverIt = serverTestInstance.getDataStreams().iterator();
+
+        Iterator<org.elasticsearch.client.indices.DataStream> clientIt = clientInstance.getDataStreams().iterator();
+        while (serverIt.hasNext()) {
+            org.elasticsearch.client.indices.DataStream client = clientIt.next();
+            DataStream server = serverIt.next();
+            assertEquals(server.getName(), client.getName());
+            assertEquals(server.getIndices().stream().map(Index::getName).collect(Collectors.toList()), client.getIndices());
+            assertEquals(server.getTimeStampField(), client.getTimeStampField());
+            assertEquals(server.getGeneration(), client.getGeneration());
+        }
+    }
+}