Browse Source

REST high-level client: add flush API (#28852)

Relates to #27205
Luca Cavanna 7 years ago
parent
commit
184a8718d8

+ 22 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java

@@ -30,6 +30,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -237,6 +239,26 @@ public final class IndicesClient {
                 listener, emptySet(), headers);
     }
 
+    /**
+     * Flush one or more indices using the Flush API
+     * <p>
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
+     */
+    public FlushResponse flush(FlushRequest flushRequest, Header... headers) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
+                emptySet(), headers);
+    }
+
+    /**
+     * Asynchronously flush one or more indices using the Flush API
+     * <p>
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
+     */
+    public void flushAsync(FlushRequest flushRequest, ActionListener<FlushResponse> listener, Header... headers) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
+                listener, emptySet(), headers);
+    }
+
     /**
      * Checks if the index (indices) exists or not.
      * <p>

+ 9 - 1
client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java

@@ -35,6 +35,7 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@@ -219,10 +220,17 @@ public final class Request {
 
     static Request refresh(RefreshRequest refreshRequest) {
         String endpoint = endpoint(refreshRequest.indices(), "_refresh");
-
         Params parameters = Params.builder();
         parameters.withIndicesOptions(refreshRequest.indicesOptions());
+        return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
+    }
 
+    static Request flush(FlushRequest flushRequest) {
+        String endpoint = endpoint(flushRequest.indices(), "_flush");
+        Params parameters = Params.builder();
+        parameters.withIndicesOptions(flushRequest.indicesOptions());
+        parameters.putParam("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));
+        parameters.putParam("force", Boolean.toString(flushRequest.force()));
         return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
     }
 

+ 28 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

@@ -34,6 +34,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -410,6 +412,32 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testFlush() throws IOException {
+        {
+            String index = "index";
+            Settings settings = Settings.builder()
+                    .put("number_of_shards", 1)
+                    .put("number_of_replicas", 0)
+                    .build();
+            createIndex(index, settings);
+            FlushRequest flushRequest = new FlushRequest(index);
+            FlushResponse flushResponse =
+                    execute(flushRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync);
+            assertThat(flushResponse.getTotalShards(), equalTo(1));
+            assertThat(flushResponse.getSuccessfulShards(), equalTo(1));
+            assertThat(flushResponse.getFailedShards(), equalTo(0));
+            assertThat(flushResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
+        }
+        {
+            String nonExistentIndex = "non_existent_index";
+            assertFalse(indexExists(nonExistentIndex));
+            FlushRequest refreshRequest = new FlushRequest(nonExistentIndex);
+            ElasticsearchException exception = expectThrows(ElasticsearchException.class,
+                    () -> execute(refreshRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync));
+            assertEquals(RestStatus.NOT_FOUND, exception.status());
+        }
+    }
+
     public void testExistsAlias() throws IOException {
         GetAliasesRequest getAliasesRequest = new GetAliasesRequest("alias");
         assertFalse(execute(getAliasesRequest, highLevelClient().indices()::existsAlias, highLevelClient().indices()::existsAliasAsync));

+ 34 - 5
client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@@ -537,15 +538,43 @@ public class RequestTests extends ESTestCase {
     }
 
     public void testRefresh() {
-        String[] indices = randomIndicesNames(1, 5);
+        String[] indices = randomIndicesNames(0, 5);
         RefreshRequest refreshRequest = new RefreshRequest(indices);
-
         Map<String, String> expectedParams = new HashMap<>();
         setRandomIndicesOptions(refreshRequest::indicesOptions, refreshRequest::indicesOptions, expectedParams);
-
         Request request = Request.refresh(refreshRequest);
-        StringJoiner endpoint = new StringJoiner("/", "/", "").add(String.join(",", indices)).add("_refresh");
-        assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
+        StringJoiner endpoint = new StringJoiner("/", "/", "");
+        if (indices.length > 0) {
+            endpoint.add(String.join(",", indices));
+        }
+        endpoint.add("_refresh");
+        assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
+        assertThat(request.getParameters(), equalTo(expectedParams));
+        assertThat(request.getEntity(), nullValue());
+        assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
+    }
+
+    public void testFlush() {
+        String[] indices = randomIndicesNames(0, 5);
+        FlushRequest flushRequest = new FlushRequest(indices);
+        Map<String, String> expectedParams = new HashMap<>();
+        setRandomIndicesOptions(flushRequest::indicesOptions, flushRequest::indicesOptions, expectedParams);
+        if (randomBoolean()) {
+            flushRequest.force(randomBoolean());
+        }
+        expectedParams.put("force", Boolean.toString(flushRequest.force()));
+        if (randomBoolean()) {
+            flushRequest.waitIfOngoing(randomBoolean());
+        }
+        expectedParams.put("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));
+
+        Request request = Request.flush(flushRequest);
+        StringJoiner endpoint = new StringJoiner("/", "/", "");
+        if (indices.length > 0) {
+            endpoint.add(String.join(",", indices));
+        }
+        endpoint.add("_flush");
+        assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
         assertThat(request.getParameters(), equalTo(expectedParams));
         assertThat(request.getEntity(), nullValue());
         assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));

+ 78 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java

@@ -33,6 +33,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -691,6 +693,82 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
         }
     }
 
+    public void testFlushIndex() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        {
+            createIndex("index1", Settings.EMPTY);
+        }
+
+        {
+            // tag::flush-request
+            FlushRequest request = new FlushRequest("index1"); // <1>
+            FlushRequest requestMultiple = new FlushRequest("index1", "index2"); // <2>
+            FlushRequest requestAll = new FlushRequest(); // <3>
+            // end::flush-request
+
+            // tag::flush-request-indicesOptions
+            request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
+            // end::flush-request-indicesOptions
+
+            // tag::flush-request-wait
+            request.waitIfOngoing(true); // <1>
+            // end::flush-request-wait
+
+            // tag::flush-request-force
+            request.force(true); // <1>
+            // end::flush-request-force
+
+            // tag::flush-execute
+            FlushResponse flushResponse = client.indices().flush(request);
+            // end::flush-execute
+
+            // tag::flush-response
+            int totalShards = flushResponse.getTotalShards(); // <1>
+            int successfulShards = flushResponse.getSuccessfulShards(); // <2>
+            int failedShards = flushResponse.getFailedShards(); // <3>
+            DefaultShardOperationFailedException[] failures = flushResponse.getShardFailures(); // <4>
+            // end::flush-response
+
+            // tag::flush-execute-listener
+            ActionListener<FlushResponse> listener = new ActionListener<FlushResponse>() {
+                @Override
+                public void onResponse(FlushResponse refreshResponse) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            // end::flush-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::flush-execute-async
+            client.indices().flushAsync(request, listener); // <1>
+            // end::flush-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+
+        {
+            // tag::flush-notfound
+            try {
+                FlushRequest request = new FlushRequest("does_not_exist");
+                client.indices().flush(request);
+            } catch (ElasticsearchException exception) {
+                if (exception.status() == RestStatus.NOT_FOUND) {
+                    // <1>
+                }
+            }
+            // end::flush-notfound
+        }
+    }
+
     public void testCloseIndex() throws Exception {
         RestHighLevelClient client = highLevelClient();
 

+ 96 - 0
docs/java-rest/high-level/indices/flush.asciidoc

@@ -0,0 +1,96 @@
+[[java-rest-high-flush]]
+=== Flush API
+
+[[java-rest-high-flush-request]]
+==== Flush Request
+
+A `FlushRequest` can be applied to one or more indices, or even on `_all` the indices:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request]
+--------------------------------------------------
+<1> Flush one index
+<2> Flush multiple indices
+<3> Flush all the indices
+
+==== Optional arguments
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-indicesOptions]
+--------------------------------------------------
+<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
+how wildcard expressions are expanded
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-wait]
+--------------------------------------------------
+<1> Set the `wait_if_ongoing` flag to `true`
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-force]
+--------------------------------------------------
+<1> Set the `force` flag to `true`
+
+[[java-rest-high-flush-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute]
+--------------------------------------------------
+
+[[java-rest-high-flush-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a flush request requires both the `FlushRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-async]
+--------------------------------------------------
+<1> The `FlushRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `FlushResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+[[java-rest-high-flush-response]]
+==== Flush Response
+
+The returned `FlushResponse` allows to retrieve information about the
+executed operation as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-response]
+--------------------------------------------------
+<1> Total number of shards hit by the flush request
+<2> Number of shards where the flush has succeeded
+<3> Number of shards where the flush has failed
+<4> A list of failures if the operation failed on one or more shards
+
+By default, if the indices were not found, an `ElasticsearchException` will be thrown:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-notfound]
+--------------------------------------------------
+<1> Do something if the indices to be flushed were not found

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -53,6 +53,7 @@ Index Management::
 * <<java-rest-high-shrink-index>>
 * <<java-rest-high-split-index>>
 * <<java-rest-high-refresh>>
+* <<java-rest-high-flush>>
 * <<java-rest-high-rollover-index>>
 
 Mapping Management::
@@ -70,6 +71,7 @@ include::indices/close_index.asciidoc[]
 include::indices/shrink_index.asciidoc[]
 include::indices/split_index.asciidoc[]
 include::indices/refresh.asciidoc[]
+include::indices/flush.asciidoc[]
 include::indices/rollover.asciidoc[]
 include::indices/put_mapping.asciidoc[]
 include::indices/update_aliases.asciidoc[]

+ 17 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java

@@ -21,16 +21,28 @@ package org.elasticsearch.action.admin.indices.flush;
 
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.broadcast.BroadcastResponse;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
 
+import java.util.Arrays;
 import java.util.List;
 
 /**
  * A response to flush action.
- *
- *
  */
 public class FlushResponse extends BroadcastResponse {
 
+    private static final ConstructingObjectParser<FlushResponse, Void> PARSER = new ConstructingObjectParser<>("flush", true,
+        arg -> {
+            BroadcastResponse response = (BroadcastResponse) arg[0];
+            return new FlushResponse(response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(),
+                    Arrays.asList(response.getShardFailures()));
+        });
+
+    static {
+        declareBroadcastFields(PARSER);
+    }
+
     FlushResponse() {
 
     }
@@ -39,4 +51,7 @@ public class FlushResponse extends BroadcastResponse {
         super(totalShards, successfulShards, failedShards, shardFailures);
     }
 
+    public static FlushResponse fromXContent(XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
 }

+ 1 - 2
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestFlushAction.java

@@ -38,7 +38,6 @@ import java.io.IOException;
 import static org.elasticsearch.rest.RestRequest.Method.GET;
 import static org.elasticsearch.rest.RestRequest.Method.POST;
 import static org.elasticsearch.rest.RestStatus.OK;
-import static org.elasticsearch.rest.action.RestActions.buildBroadcastShardsHeader;
 
 public class RestFlushAction extends BaseRestHandler {
     public RestFlushAction(Settings settings, RestController controller) {
@@ -65,7 +64,7 @@ public class RestFlushAction extends BaseRestHandler {
             @Override
             public RestResponse buildResponse(FlushResponse response, XContentBuilder builder) throws Exception {
                 builder.startObject();
-                buildBroadcastShardsHeader(builder, request, response);
+                response.toXContent(builder, request);
                 builder.endObject();
                 return new BytesRestResponse(OK, builder);
             }

+ 40 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushResponseTests.java

@@ -0,0 +1,40 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.flush;
+
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.action.support.broadcast.AbstractBroadcastResponseTestCase;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.util.List;
+
+public class FlushResponseTests extends AbstractBroadcastResponseTestCase<FlushResponse> {
+
+    @Override
+    protected FlushResponse createTestInstance(int totalShards, int successfulShards, int failedShards,
+                                               List<DefaultShardOperationFailedException> failures) {
+        return new FlushResponse(totalShards, successfulShards, failedShards, failures);
+    }
+
+    @Override
+    protected FlushResponse doParseInstance(XContentParser parser) {
+        return FlushResponse.fromXContent(parser);
+    }
+}

+ 9 - 140
server/src/test/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponseTests.java

@@ -19,153 +19,22 @@
 
 package org.elasticsearch.action.admin.indices.refresh;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.action.support.broadcast.AbstractBroadcastResponseTestCase;
 import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.test.ESTestCase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
-import static org.hamcrest.CoreMatchers.anyOf;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
+public class RefreshResponseTests extends AbstractBroadcastResponseTestCase<RefreshResponse> {
 
-public class RefreshResponseTests extends ESTestCase {
-
-    public void testToXContent() {
-        RefreshResponse response = new RefreshResponse(10, 10, 0, null);
-        String output = Strings.toString(response);
-        assertEquals("{\"_shards\":{\"total\":10,\"successful\":10,\"failed\":0}}", output);
-    }
-
-    public void testToAndFromXContent() throws IOException {
-        doFromXContentTestWithRandomFields(false);
-    }
-
-    public void testFromXContentWithRandomFields() throws IOException {
-        doFromXContentTestWithRandomFields(true);
-    }
-
-    public void testFailuresDeduplication() throws IOException {
-        List<DefaultShardOperationFailedException> failures = new ArrayList<>();
-        Index index = new Index("test", "_na_");
-        ElasticsearchException exception1 = new ElasticsearchException("foo", new IllegalArgumentException("bar"));
-        exception1.setIndex(index);
-        exception1.setShard(new ShardId(index, 0));
-        ElasticsearchException exception2 = new ElasticsearchException("foo", new IllegalArgumentException("bar"));
-        exception2.setIndex(index);
-        exception2.setShard(new ShardId(index, 1));
-        ElasticsearchException exception3 = new ElasticsearchException("fizz", new IllegalStateException("buzz"));
-        exception3.setIndex(index);
-        exception3.setShard(new ShardId(index, 2));
-        failures.add(new DefaultShardOperationFailedException(exception1));
-        failures.add(new DefaultShardOperationFailedException(exception2));
-        failures.add(new DefaultShardOperationFailedException(exception3));
-
-        RefreshResponse response = new RefreshResponse(10, 7, 3, failures);
-        boolean humanReadable = randomBoolean();
-        XContentType xContentType = randomFrom(XContentType.values());
-        BytesReference bytesReference = toShuffledXContent(response, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);
-        RefreshResponse parsedResponse;
-        try(XContentParser parser = createParser(xContentType.xContent(), bytesReference)) {
-            parsedResponse = RefreshResponse.fromXContent(parser);
-            assertNull(parser.nextToken());
-        }
-
-        assertThat(parsedResponse.getShardFailures().length, equalTo(2));
-        DefaultShardOperationFailedException[] parsedFailures = parsedResponse.getShardFailures();
-        assertThat(parsedFailures[0].index(), equalTo("test"));
-        assertThat(parsedFailures[0].shardId(), anyOf(equalTo(0), equalTo(1)));
-        assertThat(parsedFailures[0].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-        assertThat(parsedFailures[0].getCause().getMessage(), containsString("foo"));
-        assertThat(parsedFailures[1].index(), equalTo("test"));
-        assertThat(parsedFailures[1].shardId(), equalTo(2));
-        assertThat(parsedFailures[1].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-        assertThat(parsedFailures[1].getCause().getMessage(), containsString("fizz"));
-
-        ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("group_shard_failures", "false"));
-        BytesReference bytesReferenceWithoutDedup = toShuffledXContent(response, xContentType, params, humanReadable);
-        try(XContentParser parser = createParser(xContentType.xContent(), bytesReferenceWithoutDedup)) {
-            parsedResponse = RefreshResponse.fromXContent(parser);
-            assertNull(parser.nextToken());
-        }
-
-        assertThat(parsedResponse.getShardFailures().length, equalTo(3));
-        parsedFailures = parsedResponse.getShardFailures();
-        for (int i = 0; i < 3; i++) {
-            if (i < 2) {
-                assertThat(parsedFailures[i].index(), equalTo("test"));
-                assertThat(parsedFailures[i].shardId(), equalTo(i));
-                assertThat(parsedFailures[i].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-                assertThat(parsedFailures[i].getCause().getMessage(), containsString("foo"));
-            } else {
-                assertThat(parsedFailures[i].index(), equalTo("test"));
-                assertThat(parsedFailures[i].shardId(), equalTo(i));
-                assertThat(parsedFailures[i].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-                assertThat(parsedFailures[i].getCause().getMessage(), containsString("fizz"));
-            }
-        }
-    }
-
-    private void doFromXContentTestWithRandomFields(boolean addRandomFields) throws IOException {
-        RefreshResponse response = createTestItem(10);
-        boolean humanReadable = randomBoolean();
-        XContentType xContentType = randomFrom(XContentType.values());
-        BytesReference bytesReference = toShuffledXContent(response, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);
-        if (addRandomFields) {
-            bytesReference = insertRandomFields(xContentType, bytesReference, null, random());
-        }
-        RefreshResponse parsedResponse;
-        try(XContentParser parser = createParser(xContentType.xContent(), bytesReference)) {
-            parsedResponse = RefreshResponse.fromXContent(parser);
-            assertNull(parser.nextToken());
-        }
-
-        assertThat(response.getTotalShards(), equalTo(parsedResponse.getTotalShards()));
-        assertThat(response.getSuccessfulShards(), equalTo(parsedResponse.getSuccessfulShards()));
-        assertThat(response.getFailedShards(), equalTo(parsedResponse.getFailedShards()));
-        assertFailureEquals(response.getShardFailures(), parsedResponse.getShardFailures());
-    }
-
-    private static void assertFailureEquals(DefaultShardOperationFailedException[] original,
-                                            DefaultShardOperationFailedException[] parsedback) {
-        assertThat(original.length, equalTo(parsedback.length));
-        for (int i = 0; i < original.length; i++) {
-            assertThat(original[i].index(), equalTo(parsedback[i].index()));
-            assertThat(original[i].shardId(), equalTo(parsedback[i].shardId()));
-            assertThat(original[i].status(), equalTo(parsedback[i].status()));
-            assertThat(parsedback[i].getCause().getMessage(), containsString(original[i].getCause().getMessage()));
-        }
+    @Override
+    protected RefreshResponse createTestInstance(int totalShards, int successfulShards, int failedShards,
+                                                 List<DefaultShardOperationFailedException> failures) {
+        return new RefreshResponse(totalShards, successfulShards, failedShards, failures);
     }
 
-    private static RefreshResponse createTestItem(int totalShards) {
-        List<DefaultShardOperationFailedException> failures = null;
-        int successfulShards = randomInt(totalShards);
-        int failedShards = totalShards - successfulShards;
-        if (failedShards > 0) {
-            failures = new ArrayList<>();
-            for (int i = 0; i < failedShards; i++) {
-                ElasticsearchException exception = new ElasticsearchException("exception message " + i);
-                exception.setIndex(new Index("index" + i, "_na_"));
-                exception.setShard(new ShardId("index" + i, "_na_", i));
-                if (randomBoolean()) {
-                    failures.add(new DefaultShardOperationFailedException(exception));
-                } else {
-                    failures.add(new DefaultShardOperationFailedException("index" + i, i, new Exception("exception message " + i)));
-                }
-            }
-        }
-        return new RefreshResponse(totalShards, successfulShards, failedShards, failures);
+    @Override
+    protected RefreshResponse doParseInstance(XContentParser parser) {
+        return RefreshResponse.fromXContent(parser);
     }
 }

+ 163 - 0
server/src/test/java/org/elasticsearch/action/support/broadcast/AbstractBroadcastResponseTestCase.java

@@ -0,0 +1,163 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.support.broadcast;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public abstract class AbstractBroadcastResponseTestCase<T extends BroadcastResponse> extends AbstractXContentTestCase<T> {
+
+    @Override
+    protected T createTestInstance() {
+        int totalShards = randomIntBetween(1, 10);
+        List<DefaultShardOperationFailedException> failures = null;
+        int successfulShards = randomInt(totalShards);
+        int failedShards = totalShards - successfulShards;
+        if (failedShards > 0) {
+            failures = new ArrayList<>();
+            for (int i = 0; i < failedShards; i++) {
+                ElasticsearchException exception = new ElasticsearchException("exception message " + i);
+                String index = randomAlphaOfLengthBetween(3, 10);
+                exception.setIndex(new Index(index, "_na_"));
+                exception.setShard(new ShardId(index, "_na_", i));
+                if (randomBoolean()) {
+                    failures.add(new DefaultShardOperationFailedException(exception));
+                } else {
+                    failures.add(new DefaultShardOperationFailedException(index, i, new Exception("exception message " + i)));
+                }
+            }
+        }
+        return createTestInstance(totalShards, successfulShards, failedShards, failures);
+    }
+
+    protected abstract T createTestInstance(int totalShards, int successfulShards, int failedShards,
+                                          List<DefaultShardOperationFailedException> failures);
+
+    @Override
+    protected void assertEqualInstances(T response, T parsedResponse) {
+        assertThat(response.getTotalShards(), equalTo(parsedResponse.getTotalShards()));
+        assertThat(response.getSuccessfulShards(), equalTo(parsedResponse.getSuccessfulShards()));
+        assertThat(response.getFailedShards(), equalTo(parsedResponse.getFailedShards()));
+        DefaultShardOperationFailedException[] originalFailures = response.getShardFailures();
+        DefaultShardOperationFailedException[] parsedFailures = parsedResponse.getShardFailures();
+        assertThat(originalFailures.length, equalTo(parsedFailures.length));
+        for (int i = 0; i < originalFailures.length; i++) {
+            assertThat(originalFailures[i].index(), equalTo(parsedFailures[i].index()));
+            assertThat(originalFailures[i].shardId(), equalTo(parsedFailures[i].shardId()));
+            assertThat(originalFailures[i].status(), equalTo(parsedFailures[i].status()));
+            assertThat(parsedFailures[i].getCause().getMessage(), containsString(originalFailures[i].getCause().getMessage()));
+        }
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected boolean assertToXContentEquivalence() {
+        return false;
+    }
+
+    public void testFailuresDeduplication() throws IOException {
+        List<DefaultShardOperationFailedException> failures = new ArrayList<>();
+        Index index = new Index("test", "_na_");
+        ElasticsearchException exception1 = new ElasticsearchException("foo", new IllegalArgumentException("bar"));
+        exception1.setIndex(index);
+        exception1.setShard(new ShardId(index, 0));
+        ElasticsearchException exception2 = new ElasticsearchException("foo", new IllegalArgumentException("bar"));
+        exception2.setIndex(index);
+        exception2.setShard(new ShardId(index, 1));
+        ElasticsearchException exception3 = new ElasticsearchException("fizz", new IllegalStateException("buzz"));
+        exception3.setIndex(index);
+        exception3.setShard(new ShardId(index, 2));
+        failures.add(new DefaultShardOperationFailedException(exception1));
+        failures.add(new DefaultShardOperationFailedException(exception2));
+        failures.add(new DefaultShardOperationFailedException(exception3));
+
+        T response = createTestInstance(10, 7, 3, failures);
+        boolean humanReadable = randomBoolean();
+        XContentType xContentType = randomFrom(XContentType.values());
+        BytesReference bytesReference = toShuffledXContent(response, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);
+        T parsedResponse;
+        try(XContentParser parser = createParser(xContentType.xContent(), bytesReference)) {
+            parsedResponse = doParseInstance(parser);
+            assertNull(parser.nextToken());
+        }
+
+        assertThat(parsedResponse.getShardFailures().length, equalTo(2));
+        DefaultShardOperationFailedException[] parsedFailures = parsedResponse.getShardFailures();
+        assertThat(parsedFailures[0].index(), equalTo("test"));
+        assertThat(parsedFailures[0].shardId(), anyOf(equalTo(0), equalTo(1)));
+        assertThat(parsedFailures[0].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+        assertThat(parsedFailures[0].getCause().getMessage(), containsString("foo"));
+        assertThat(parsedFailures[1].index(), equalTo("test"));
+        assertThat(parsedFailures[1].shardId(), equalTo(2));
+        assertThat(parsedFailures[1].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+        assertThat(parsedFailures[1].getCause().getMessage(), containsString("fizz"));
+
+        ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("group_shard_failures", "false"));
+        BytesReference bytesReferenceWithoutDedup = toShuffledXContent(response, xContentType, params, humanReadable);
+        try(XContentParser parser = createParser(xContentType.xContent(), bytesReferenceWithoutDedup)) {
+            parsedResponse = doParseInstance(parser);
+            assertNull(parser.nextToken());
+        }
+
+        assertThat(parsedResponse.getShardFailures().length, equalTo(3));
+        parsedFailures = parsedResponse.getShardFailures();
+        for (int i = 0; i < 3; i++) {
+            if (i < 2) {
+                assertThat(parsedFailures[i].index(), equalTo("test"));
+                assertThat(parsedFailures[i].shardId(), equalTo(i));
+                assertThat(parsedFailures[i].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+                assertThat(parsedFailures[i].getCause().getMessage(), containsString("foo"));
+            } else {
+                assertThat(parsedFailures[i].index(), equalTo("test"));
+                assertThat(parsedFailures[i].shardId(), equalTo(i));
+                assertThat(parsedFailures[i].status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+                assertThat(parsedFailures[i].getCause().getMessage(), containsString("fizz"));
+            }
+        }
+    }
+
+    public void testToXContent() {
+        T response = createTestInstance(10, 10, 0, null);
+        String output = Strings.toString(response);
+        assertEquals("{\"_shards\":{\"total\":10,\"successful\":10,\"failed\":0}}", output);
+    }
+}

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java

@@ -35,7 +35,7 @@ public abstract class AbstractSerializingTestCase<T extends ToXContent & Writeab
     public final void testFromXContent() throws IOException {
         AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(),
                 getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
-                this::assertEqualInstances);
+                this::assertEqualInstances, true);
     }
 
     /**

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableXContentTestCase.java

@@ -35,7 +35,7 @@ public abstract class AbstractStreamableXContentTestCase<T extends ToXContent &
     public final void testFromXContent() throws IOException {
         AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(),
                 getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
-                this::assertEqualInstances);
+                this::assertEqualInstances, true);
     }
 
     /**

+ 14 - 6
test/framework/src/main/java/org/elasticsearch/test/AbstractXContentTestCase.java

@@ -39,13 +39,16 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXC
 
 public abstract class AbstractXContentTestCase<T extends ToXContent> extends ESTestCase {
 
+    protected static final int NUMBER_OF_TEST_RUNS = 20;
+
     public static <T extends ToXContent> void testFromXContent(int numberOfTestRuns, Supplier<T> instanceSupplier,
                                                                boolean supportsUnknownFields, String[] shuffleFieldsExceptions,
                                                                Predicate<String> randomFieldsExcludeFilter,
                                                                CheckedBiFunction<XContent, BytesReference, XContentParser, IOException>
                                                                        createParserFunction,
                                                                CheckedFunction<XContentParser, T, IOException> parseFunction,
-                                                               BiConsumer<T, T> assertEqualsConsumer) throws IOException {
+                                                               BiConsumer<T, T> assertEqualsConsumer,
+                                                               boolean assertToXContentEquivalence) throws IOException {
         for (int runs = 0; runs < numberOfTestRuns; runs++) {
             T testInstance = instanceSupplier.get();
             XContentType xContentType = randomFrom(XContentType.values());
@@ -61,7 +64,9 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
             XContentParser parser = createParserFunction.apply(XContentFactory.xContent(xContentType), withRandomFields);
             T parsed = parseFunction.apply(parser);
             assertEqualsConsumer.accept(testInstance, parsed);
-            assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, false), xContentType);
+            if (assertToXContentEquivalence) {
+                assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, false), xContentType);
+            }
         }
     }
 
@@ -70,12 +75,11 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
      * both for equality and asserts equality on the two queries.
      */
     public final void testFromXContent() throws IOException {
-        testFromXContent(numberOfTestRuns(), this::createTestInstance, supportsUnknownFields(), getShuffleFieldsExceptions(),
-                getRandomFieldsExcludeFilter(), this::createParser, this::parseInstance, this::assertEqualInstances);
+        testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), getShuffleFieldsExceptions(),
+                getRandomFieldsExcludeFilter(), this::createParser, this::parseInstance, this::assertEqualInstances,
+                assertToXContentEquivalence());
     }
 
-    protected abstract int numberOfTestRuns();
-
     /**
      * Creates a random test instance to use in the tests. This method will be
      * called multiple times during test execution and should return a different
@@ -100,6 +104,10 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
         assertEquals(expectedInstance.hashCode(), newInstance.hashCode());
     }
 
+    protected boolean assertToXContentEquivalence() {
+        return true;
+    }
+
     /**
      * Indicates whether the parser supports unknown fields or not. In case it does, such behaviour will be tested by
      * inserting random fields before parsing and checking that they don't make parsing fail.