Browse Source

REST high-level client: add put ingest pipeline API (#30793)

REST high-level client: add put ingest pipeline API

Adds the put ingest pipeline API to the high level rest client.
Sohaib Iftikhar 7 years ago
parent
commit
5a97423b7a

+ 24 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

@@ -25,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.ingest.PutPipelineResponse;
 
 import java.io.IOException;
 
@@ -87,4 +89,26 @@ public final class ClusterClient {
         restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
                 listener, emptySet(), headers);
     }
+
+    /**
+     * Add a pipeline or update an existing pipeline in the cluster
+     * <p>
+     * See
+     * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
+     */
+    public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline,
+            PutPipelineResponse::fromXContent, emptySet(), headers);
+    }
+
+    /**
+     * Asynchronously add a pipeline or update an existing pipeline in the cluster
+     * <p>
+     * See
+     * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
+     */
+    public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener, Header... headers) {
+        restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
+            PutPipelineResponse::fromXContent, listener, emptySet(), headers);
+    }
 }

+ 16 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

@@ -58,6 +58,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.MultiGetRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.MultiSearchRequest;
 import org.elasticsearch.action.search.SearchRequest;
@@ -609,6 +610,21 @@ final class RequestConverters {
         return request;
     }
 
+    static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
+        String endpoint = new EndpointBuilder()
+            .addPathPartAsIs("_ingest/pipeline")
+            .addPathPart(putPipelineRequest.getId())
+            .build();
+        Request request = new Request(HttpPut.METHOD_NAME, endpoint);
+
+        Params parameters = new Params(request);
+        parameters.withTimeout(putPipelineRequest.timeout());
+        parameters.withMasterTimeout(putPipelineRequest.masterNodeTimeout());
+
+        request.setEntity(createEntity(putPipelineRequest, REQUEST_BODY_CONTENT_TYPE));
+        return request;
+    }
+
     static Request listTasks(ListTasksRequest listTaskRequest) {
         if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
             throw new IllegalArgumentException("TaskId cannot be used for list tasks request");

+ 42 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

@@ -25,12 +25,17 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.ingest.PutPipelineResponse;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.TaskInfo;
 
@@ -136,4 +141,41 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
         }
         assertTrue("List tasks were not found", listTasksFound);
     }
+
+    public void testPutPipeline() throws IOException {
+        String id = "some_pipeline_id";
+        XContentType xContentType = randomFrom(XContentType.values());
+        XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
+        pipelineBuilder.startObject();
+        {
+            pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
+            pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
+            {
+                pipelineBuilder.startObject().startObject("set");
+                {
+                    pipelineBuilder
+                        .field("field", "foo")
+                        .field("value", "bar");
+                }
+                pipelineBuilder.endObject().endObject();
+                pipelineBuilder.startObject().startObject("convert");
+                {
+                    pipelineBuilder
+                        .field("field", "rank")
+                        .field("type", "integer");
+                }
+                pipelineBuilder.endObject().endObject();
+            }
+            pipelineBuilder.endArray();
+        }
+        pipelineBuilder.endObject();
+        PutPipelineRequest request = new PutPipelineRequest(
+            id,
+            BytesReference.bytes(pipelineBuilder),
+            pipelineBuilder.contentType());
+
+        PutPipelineResponse putPipelineResponse =
+            execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
+        assertTrue(putPipelineResponse.isAcknowledged());
+    }
 }

+ 23 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

@@ -61,6 +61,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.MultiGetRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.MultiSearchRequest;
 import org.elasticsearch.action.search.SearchRequest;
@@ -91,6 +92,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.RandomCreateIndexGenerator;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -119,6 +121,7 @@ import org.elasticsearch.test.RandomObjects;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1402,6 +1405,26 @@ public class RequestConvertersTests extends ESTestCase {
         assertEquals(expectedParams, expectedRequest.getParameters());
     }
 
+    public void testPutPipeline() throws IOException {
+        String pipelineId = "some_pipeline_id";
+        PutPipelineRequest request = new PutPipelineRequest(
+            "some_pipeline_id",
+            new BytesArray("{}".getBytes(StandardCharsets.UTF_8)),
+            XContentType.JSON
+        );
+        Map<String, String> expectedParams = new HashMap<>();
+        setRandomMasterTimeout(request, expectedParams);
+        setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
+
+        Request expectedRequest = RequestConverters.putPipeline(request);
+        StringJoiner endpoint = new StringJoiner("/", "/", "");
+        endpoint.add("_ingest/pipeline");
+        endpoint.add(pipelineId);
+        assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
+        assertEquals(HttpPut.METHOD_NAME, expectedRequest.getMethod());
+        assertEquals(expectedParams, expectedRequest.getParameters());
+    }
+
     public void testRollover() throws IOException {
         RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
                 randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));

+ 88 - 9
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java

@@ -21,7 +21,6 @@ package org.elasticsearch.client.documentation;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
@@ -29,9 +28,12 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.ingest.PutPipelineResponse;
 import org.elasticsearch.client.ESRestHighLevelClientTestCase;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.TimeValue;
@@ -41,6 +43,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskInfo;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -80,19 +83,19 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
         // end::put-settings-request
 
         // tag::put-settings-create-settings
-        String transientSettingKey = 
+        String transientSettingKey =
                 RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey();
         int transientSettingValue = 10;
-        Settings transientSettings = 
+        Settings transientSettings =
                 Settings.builder()
                 .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES)
                 .build(); // <1>
 
-        String persistentSettingKey = 
+        String persistentSettingKey =
                 EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey();
-        String persistentSettingValue = 
+        String persistentSettingValue =
                 EnableAllocationDecider.Allocation.NONE.name();
-        Settings persistentSettings = 
+        Settings persistentSettings =
                 Settings.builder()
                 .put(persistentSettingKey, persistentSettingValue)
                 .build(); // <2>
@@ -105,9 +108,9 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
 
         {
             // tag::put-settings-settings-builder
-            Settings.Builder transientSettingsBuilder = 
+            Settings.Builder transientSettingsBuilder =
                     Settings.builder()
-                    .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES); 
+                    .put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
             request.transientSettings(transientSettingsBuilder); // <1>
             // end::put-settings-settings-builder
         }
@@ -164,7 +167,7 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
             ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
 
             // tag::put-settings-execute-listener
-            ActionListener<ClusterUpdateSettingsResponse> listener = 
+            ActionListener<ClusterUpdateSettingsResponse> listener =
                     new ActionListener<ClusterUpdateSettingsResponse>() {
                 @Override
                 public void onResponse(ClusterUpdateSettingsResponse response) {
@@ -272,4 +275,80 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
             assertTrue(latch.await(30L, TimeUnit.SECONDS));
         }
     }
+
+    public void testPutPipeline() throws IOException {
+        RestHighLevelClient client = highLevelClient();
+
+        {
+            // tag::put-pipeline-request
+            String source =
+                "{\"description\":\"my set of processors\"," +
+                    "\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
+            PutPipelineRequest request = new PutPipelineRequest(
+                "my-pipeline-id", // <1>
+                new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <2>
+                XContentType.JSON // <3>
+            );
+            // end::put-pipeline-request
+
+            // tag::put-pipeline-request-timeout
+            request.timeout(TimeValue.timeValueMinutes(2)); // <1>
+            request.timeout("2m"); // <2>
+            // end::put-pipeline-request-timeout
+
+            // tag::put-pipeline-request-masterTimeout
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
+            request.masterNodeTimeout("1m"); // <2>
+            // end::put-pipeline-request-masterTimeout
+
+            // tag::put-pipeline-execute
+            PutPipelineResponse response = client.cluster().putPipeline(request); // <1>
+            // end::put-pipeline-execute
+
+            // tag::put-pipeline-response
+            boolean acknowledged = response.isAcknowledged(); // <1>
+            // end::put-pipeline-response
+            assertTrue(acknowledged);
+        }
+    }
+
+    public void testPutPipelineAsync() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        {
+            String source =
+                "{\"description\":\"my set of processors\"," +
+                    "\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
+            PutPipelineRequest request = new PutPipelineRequest(
+                "my-pipeline-id",
+                new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
+                XContentType.JSON
+            );
+
+            // tag::put-pipeline-execute-listener
+            ActionListener<PutPipelineResponse> listener =
+                new ActionListener<PutPipelineResponse>() {
+                    @Override
+                    public void onResponse(PutPipelineResponse response) {
+                        // <1>
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        // <2>
+                    }
+                };
+            // end::put-pipeline-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::put-pipeline-execute-async
+            client.cluster().putPipelineAsync(request, listener); // <1>
+            // end::put-pipeline-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
 }

+ 83 - 0
docs/java-rest/high-level/cluster/put_pipeline.asciidoc

@@ -0,0 +1,83 @@
+[[java-rest-high-cluster-put-pipeline]]
+=== Put Pipeline API
+
+[[java-rest-high-cluster-put-pipeline-request]]
+==== Put Pipeline Request
+
+A `PutPipelineRequest` requires an `id` argument, a source and a `XContentType`. The source consists
+of a description and a list of `Processor` objects.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request]
+--------------------------------------------------
+<1> The pipeline id
+<2> The source for the pipeline as a `ByteArray`.
+<3> The XContentType for the pipeline source supplied above.
+
+==== Optional arguments
+The following arguments can optionally be provided:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-timeout]
+--------------------------------------------------
+<1> Timeout to wait for the all the nodes to acknowledge the index creation as a `TimeValue`
+<2> Timeout to wait for the all the nodes to acknowledge the index creation as a `String`
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-masterTimeout]
+--------------------------------------------------
+<1> Timeout to connect to the master node as a `TimeValue`
+<2> Timeout to connect to the master node as a `String`
+
+[[java-rest-high-cluster-put-pipeline-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute]
+--------------------------------------------------
+<1> Execute the request and get back the response in a PutPipelineResponse object.
+
+[[java-rest-high-cluster-put-pipeline-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a put pipeline request requires both the `PutPipelineRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute-async]
+--------------------------------------------------
+<1> The `PutPipelineRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `PutPipelineResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+[[java-rest-high-cluster-put-pipeline-response]]
+==== Put Pipeline Response
+
+The returned `PutPipelineResponse` allows to retrieve information about the executed
+ operation as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-response]
+--------------------------------------------------
+<1> Indicates whether all of the nodes have acknowledged the request

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -105,9 +105,11 @@ The Java High Level REST Client supports the following Cluster APIs:
 
 * <<java-rest-high-cluster-put-settings>>
 * <<java-rest-high-cluster-list-tasks>>
+* <<java-rest-high-cluster-put-pipeline>>
 
 include::cluster/put_settings.asciidoc[]
 include::cluster/list_tasks.asciidoc[]
+include::cluster/put_pipeline.asciidoc[]
 
 == Snapshot APIs
 

+ 13 - 1
server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java

@@ -25,13 +25,15 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 
 import java.io.IOException;
 import java.util.Objects;
 
-public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> {
+public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {
 
     private String id;
     private BytesReference source;
@@ -96,4 +98,14 @@ public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest>
             out.writeEnum(xContentType);
         }
     }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        if (source != null) {
+            builder.rawValue(source.streamInput(), xContentType);
+        } else {
+            builder.startObject().endObject();
+        }
+        return builder;
+    }
 }

+ 62 - 0
server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.ingest;
+
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+
+public class PutPipelineResponse extends AcknowledgedResponse implements ToXContentObject {
+
+    private static final ConstructingObjectParser<PutPipelineResponse, Void> PARSER = new ConstructingObjectParser<>(
+        "cluster_put_pipeline", true, args -> new PutPipelineResponse((boolean) args[0]));
+
+    static {
+        declareAcknowledgedField(PARSER);
+    }
+
+    public PutPipelineResponse() {
+    }
+
+    public PutPipelineResponse(boolean acknowledged) {
+        super(acknowledged);
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        readAcknowledged(in);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        writeAcknowledged(out);
+    }
+
+    public static PutPipelineResponse fromXContent(XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
+}

+ 4 - 4
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -32,10 +32,10 @@ import java.util.Map;
  */
 public final class Pipeline {
 
-    static final String DESCRIPTION_KEY = "description";
-    static final String PROCESSORS_KEY = "processors";
-    static final String VERSION_KEY = "version";
-    static final String ON_FAILURE_KEY = "on_failure";
+    public static final String DESCRIPTION_KEY = "description";
+    public static final String PROCESSORS_KEY = "processors";
+    public static final String VERSION_KEY = "version";
+    public static final String ON_FAILURE_KEY = "on_failure";
 
     private final String id;
     @Nullable

+ 25 - 0
server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java

@@ -20,9 +20,13 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
@@ -43,4 +47,25 @@ public class PutPipelineRequestTests extends ESTestCase {
         assertEquals(XContentType.JSON, serialized.getXContentType());
         assertEquals("{}", serialized.getSource().utf8ToString());
     }
+
+    public void testToXContent() throws IOException {
+        XContentType xContentType = randomFrom(XContentType.values());
+        XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
+        pipelineBuilder.startObject().field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
+        pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
+        //Start first processor
+        pipelineBuilder.startObject();
+        pipelineBuilder.startObject("set");
+        pipelineBuilder.field("field", "foo");
+        pipelineBuilder.field("value", "bar");
+        pipelineBuilder.endObject();
+        pipelineBuilder.endObject();
+        //End first processor
+        pipelineBuilder.endArray();
+        pipelineBuilder.endObject();
+        PutPipelineRequest request = new PutPipelineRequest("1", BytesReference.bytes(pipelineBuilder), xContentType);
+        XContentBuilder requestBuilder = XContentBuilder.builder(xContentType.xContent());
+        BytesReference actualRequestBody = BytesReference.bytes(request.toXContent(requestBuilder, ToXContent.EMPTY_PARAMS));
+        assertEquals(BytesReference.bytes(pipelineBuilder), actualRequestBody);
+    }
 }

+ 53 - 0
server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.ingest;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractStreamableXContentTestCase;
+
+public class PutPipelineResponseTests extends AbstractStreamableXContentTestCase<PutPipelineResponse> {
+
+    public void testToXContent() {
+        PutPipelineResponse response = new PutPipelineResponse(true);
+        String output = Strings.toString(response);
+        assertEquals("{\"acknowledged\":true}", output);
+    }
+
+    @Override
+    protected PutPipelineResponse doParseInstance(XContentParser parser) {
+        return PutPipelineResponse.fromXContent(parser);
+    }
+
+    @Override
+    protected PutPipelineResponse createTestInstance() {
+        return new PutPipelineResponse(randomBoolean());
+    }
+
+    @Override
+    protected PutPipelineResponse createBlankInstance() {
+        return new PutPipelineResponse();
+    }
+
+    @Override
+    protected PutPipelineResponse mutateInstance(PutPipelineResponse response) {
+        return new PutPipelineResponse(response.isAcknowledged() == false);
+    }
+}