瀏覽代碼

REST high-level client: add synced flush API (2) (#30650)

Adds the synced flush API to the high level REST client.

Relates to #27205.
Sohaib Iftikhar 7 年之前
父節點
當前提交
2c27c58718

+ 23 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java

@@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -269,6 +270,28 @@ public final class IndicesClient {
                 listener, emptySet(), headers);
     }
 
+    /** Initiate a synced flush manually using the synced flush API
+      * <p>
+      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
+      *     Synced flush API on elastic.co</a>
+      */
+    public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced,
+                SyncedFlushResponse::fromXContent, emptySet(), headers);
+    }
+
+    /**
+     * Asynchronously initiate a synced flush manually using the synced flush API
+     * <p>
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
+     *     Synced flush API on elastic.co</a>
+     */
+    public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener<SyncedFlushResponse> listener, Header... headers) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced,
+                SyncedFlushResponse::fromXContent, listener, emptySet(), headers);
+    }
+
+
     /**
      * Retrieve the settings of one or more indices
      * <p>

+ 9 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

@@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@@ -211,6 +212,14 @@ final class RequestConverters {
         return request;
     }
 
+    static Request flushSynced(SyncedFlushRequest syncedFlushRequest) {
+        String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices();
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced"));
+        Params parameters = new Params(request);
+        parameters.withIndicesOptions(syncedFlushRequest.indicesOptions());
+        return request;
+    }
+
     static Request forceMerge(ForceMergeRequest forceMergeRequest) {
         String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices();
         Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge"));

+ 344 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java

@@ -0,0 +1,344 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentLocation;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser.Token;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+
+public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment {
+
+    public static final String SHARDS_FIELD = "_shards";
+
+    private ShardCounts totalCounts;
+    private Map<String, IndexResult> indexResults;
+
+    SyncedFlushResponse(ShardCounts totalCounts, Map<String, IndexResult> indexResults) {
+        this.totalCounts = new ShardCounts(totalCounts.total, totalCounts.successful, totalCounts.failed);
+        this.indexResults = Collections.unmodifiableMap(indexResults);
+    }
+
+    /**
+     * @return The total number of shard copies that were processed across all indexes
+     */
+    public int totalShards() {
+        return totalCounts.total;
+    }
+
+    /**
+     * @return The number of successful shard copies that were processed across all indexes
+     */
+    public int successfulShards() {
+        return totalCounts.successful;
+    }
+
+    /**
+     * @return The number of failed shard copies that were processed across all indexes
+     */
+    public int failedShards() {
+        return totalCounts.failed;
+    }
+
+    /**
+     * @return A map of results for each index where the keys of the map are the index names
+     *          and the values are the results encapsulated in {@link IndexResult}.
+     */
+    public Map<String, IndexResult> getIndexResults() {
+        return indexResults;
+    }
+
+    ShardCounts getShardCounts() {
+        return totalCounts;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(SHARDS_FIELD);
+        totalCounts.toXContent(builder, params);
+        builder.endObject();
+        for (Map.Entry<String, IndexResult> entry: indexResults.entrySet()) {
+            String indexName = entry.getKey();
+            IndexResult indexResult = entry.getValue();
+            builder.startObject(indexName);
+            indexResult.toXContent(builder, params);
+            builder.endObject();
+        }
+        return builder;
+    }
+
+    public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException {
+        ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
+        ShardCounts totalCounts = null;
+        Map<String, IndexResult> indexResults = new HashMap<>();
+        XContentLocation startLoc = parser.getTokenLocation();
+        while (parser.nextToken().equals(Token.FIELD_NAME)) {
+            if (parser.currentName().equals(SHARDS_FIELD)) {
+                ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
+                totalCounts = ShardCounts.fromXContent(parser);
+            } else {
+                String indexName = parser.currentName();
+                IndexResult indexResult = IndexResult.fromXContent(parser);
+                indexResults.put(indexName, indexResult);
+            }
+        }
+        if (totalCounts != null) {
+            return new SyncedFlushResponse(totalCounts, indexResults);
+        } else {
+            throw new ParsingException(
+                startLoc,
+                "Unable to reconstruct object. Total counts for shards couldn't be parsed."
+            );
+        }
+    }
+
+    /**
+     * Encapsulates the number of total successful and failed shard copies
+     */
+    public static final class ShardCounts implements ToXContentFragment {
+
+        public static final String TOTAL_FIELD = "total";
+        public static final String SUCCESSFUL_FIELD = "successful";
+        public static final String FAILED_FIELD = "failed";
+
+        private static final ConstructingObjectParser<ShardCounts, Void> PARSER =
+            new ConstructingObjectParser<>(
+                "shardcounts",
+                a -> new ShardCounts((Integer) a[0], (Integer) a[1], (Integer) a[2])
+            );
+        static {
+            PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD));
+            PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD));
+            PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD));
+        }
+
+        private int total;
+        private int successful;
+        private int failed;
+
+
+        ShardCounts(int total, int successful, int failed) {
+            this.total = total;
+            this.successful = successful;
+            this.failed = failed;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field(TOTAL_FIELD, total);
+            builder.field(SUCCESSFUL_FIELD, successful);
+            builder.field(FAILED_FIELD, failed);
+            return builder;
+        }
+
+        public static ShardCounts fromXContent(XContentParser parser) throws IOException {
+            return PARSER.parse(parser, null);
+        }
+
+        public boolean equals(ShardCounts other) {
+            if (other != null) {
+                return
+                    other.total == this.total &&
+                    other.successful == this.successful &&
+                    other.failed == this.failed;
+            } else {
+                return false;
+            }
+        }
+
+    }
+
+    /**
+     * Description for the flush/synced results for a particular index.
+     * This includes total, successful and failed copies along with failure description for each failed copy.
+     */
+    public static final class IndexResult implements ToXContentFragment {
+
+        public static final String TOTAL_FIELD = "total";
+        public static final String SUCCESSFUL_FIELD = "successful";
+        public static final String FAILED_FIELD = "failed";
+        public static final String FAILURES_FIELD = "failures";
+
+        @SuppressWarnings("unchecked")
+        private static final ConstructingObjectParser<IndexResult, Void> PARSER =
+            new ConstructingObjectParser<>(
+                "indexresult",
+                a -> new IndexResult((Integer) a[0], (Integer) a[1], (Integer) a[2], (List<ShardFailure>)a[3])
+            );
+        static {
+            PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD));
+            PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD));
+            PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD));
+            PARSER.declareObjectArray(optionalConstructorArg(), ShardFailure.PARSER, new ParseField(FAILURES_FIELD));
+        }
+
+        private ShardCounts counts;
+        private List<ShardFailure> failures;
+
+        IndexResult(int total, int successful, int failed, List<ShardFailure> failures) {
+            counts = new ShardCounts(total, successful, failed);
+            if (failures != null) {
+                this.failures = Collections.unmodifiableList(failures);
+            } else {
+                this.failures = Collections.unmodifiableList(new ArrayList<>());
+            }
+        }
+
+        /**
+         * @return The total number of shard copies that were processed for this index.
+         */
+        public int totalShards() {
+            return counts.total;
+        }
+
+        /**
+         * @return The number of successful shard copies that were processed for this index.
+         */
+        public int successfulShards() {
+            return counts.successful;
+        }
+
+        /**
+         * @return The number of failed shard copies that were processed for this index.
+         */
+        public int failedShards() {
+            return counts.failed;
+        }
+
+        /**
+         * @return A list of {@link ShardFailure} objects that describe each of the failed shard copies for this index.
+         */
+        public List<ShardFailure> failures() {
+            return failures;
+        }
+
+        ShardCounts getShardCounts() {
+            return counts;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            counts.toXContent(builder, params);
+            if (failures.size() > 0) {
+                builder.startArray(FAILURES_FIELD);
+                for (ShardFailure failure : failures) {
+                    failure.toXContent(builder, params);
+                }
+                builder.endArray();
+            }
+            return builder;
+        }
+
+        public static IndexResult fromXContent(XContentParser parser) throws IOException {
+            return PARSER.parse(parser, null);
+        }
+    }
+
+    /**
+     * Description of a failed shard copy for an index.
+     */
+    public static final class ShardFailure implements ToXContentFragment {
+
+        public static String SHARD_ID_FIELD = "shard";
+        public static String FAILURE_REASON_FIELD = "reason";
+        public static String ROUTING_FIELD = "routing";
+
+        private int shardId;
+        private String failureReason;
+        private Map<String, Object> routing;
+
+        @SuppressWarnings("unchecked")
+        static ConstructingObjectParser<ShardFailure, Void> PARSER = new ConstructingObjectParser<>(
+            "shardfailure",
+            a -> new ShardFailure((Integer)a[0], (String)a[1], (Map<String, Object>)a[2])
+        );
+        static {
+            PARSER.declareInt(constructorArg(), new ParseField(SHARD_ID_FIELD));
+            PARSER.declareString(constructorArg(), new ParseField(FAILURE_REASON_FIELD));
+            PARSER.declareObject(
+                optionalConstructorArg(),
+                (parser, c) -> parser.map(),
+                new ParseField(ROUTING_FIELD)
+            );
+        }
+
+        ShardFailure(int shardId, String failureReason, Map<String, Object> routing) {
+            this.shardId = shardId;
+            this.failureReason = failureReason;
+            if (routing != null) {
+                this.routing = Collections.unmodifiableMap(routing);
+            } else {
+                this.routing = Collections.unmodifiableMap(new HashMap<>());
+            }
+        }
+
+        /**
+         * @return Id of the shard whose copy failed
+         */
+        public int getShardId() {
+            return shardId;
+        }
+
+        /**
+         * @return Reason for failure of the shard copy
+         */
+        public String getFailureReason() {
+            return failureReason;
+        }
+
+        /**
+         * @return Additional information about the failure.
+         */
+        public Map<String, Object> getRouting() {
+            return routing;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field(SHARD_ID_FIELD, shardId);
+            builder.field(FAILURE_REASON_FIELD, failureReason);
+            if (routing.size() > 0) {
+                builder.field(ROUTING_FIELD, routing);
+            }
+            builder.endObject();
+            return builder;
+        }
+
+        public static ShardFailure fromXContent(XContentParser parser) throws IOException {
+            return PARSER.parse(parser, null);
+        }
+    }
+}

+ 34 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

@@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -563,6 +564,39 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testSyncedFlush() throws IOException {
+        {
+            String index = "index";
+            Settings settings = Settings.builder()
+                    .put("number_of_shards", 1)
+                    .put("number_of_replicas", 0)
+                    .build();
+            createIndex(index, settings);
+            SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index);
+            SyncedFlushResponse flushResponse =
+                    execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync);
+            assertThat(flushResponse.totalShards(), equalTo(1));
+            assertThat(flushResponse.successfulShards(), equalTo(1));
+            assertThat(flushResponse.failedShards(), equalTo(0));
+        }
+        {
+            String nonExistentIndex = "non_existent_index";
+            assertFalse(indexExists(nonExistentIndex));
+            SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex);
+            ElasticsearchException exception = expectThrows(
+                ElasticsearchException.class,
+                () ->
+                    execute(
+                        syncedFlushRequest,
+                        highLevelClient().indices()::flushSynced,
+                        highLevelClient().indices()::flushSyncedAsync
+                    )
+            );
+            assertEquals(RestStatus.NOT_FOUND, exception.status());
+        }
+    }
+
+
     public void testClearCache() throws IOException {
         {
             String index = "index";

+ 24 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

@@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@@ -645,6 +646,29 @@ public class RequestConvertersTests extends ESTestCase {
         assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
     }
 
+    public void testSyncedFlush() {
+        String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
+        SyncedFlushRequest syncedFlushRequest;
+        if (randomBoolean()) {
+            syncedFlushRequest = new SyncedFlushRequest(indices);
+        } else {
+            syncedFlushRequest = new SyncedFlushRequest();
+            syncedFlushRequest.indices(indices);
+        }
+        Map<String, String> expectedParams = new HashMap<>();
+        setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams);
+        Request request = RequestConverters.flushSynced(syncedFlushRequest);
+        StringJoiner endpoint = new StringJoiner("/", "/", "");
+        if (indices != null && indices.length > 0) {
+                endpoint.add(String.join(",", indices));
+            }
+        endpoint.add("_flush/synced");
+        assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
+        assertThat(request.getParameters(), equalTo(expectedParams));
+        assertThat(request.getEntity(), nullValue());
+        assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
+    }
+
     public void testForceMerge() {
         String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
         ForceMergeRequest forceMergeRequest;

+ 269 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java

@@ -0,0 +1,269 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import com.carrotsearch.hppc.ObjectIntHashMap;
+import com.carrotsearch.hppc.ObjectIntMap;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.flush.ShardsSyncedFlushResult;
+import org.elasticsearch.indices.flush.SyncedFlushService;
+import org.elasticsearch.test.ESTestCase;
+
+public class SyncedFlushResponseTests extends ESTestCase {
+
+    public void testXContentSerialization() throws IOException {
+        final XContentType xContentType = randomFrom(XContentType.values());
+        TestPlan plan = createTestPlan();
+
+        XContentBuilder serverResponsebuilder = XContentBuilder.builder(xContentType.xContent());
+        assertNotNull(plan.result);
+        serverResponsebuilder.startObject();
+        plan.result.toXContent(serverResponsebuilder, ToXContent.EMPTY_PARAMS);
+        serverResponsebuilder.endObject();
+        XContentBuilder clientResponsebuilder = XContentBuilder.builder(xContentType.xContent());
+        assertNotNull(plan.result);
+        clientResponsebuilder.startObject();
+        plan.clientResult.toXContent(clientResponsebuilder, ToXContent.EMPTY_PARAMS);
+        clientResponsebuilder.endObject();
+        Map<String, Object> serverContentMap = convertFailureListToSet(
+            serverResponsebuilder
+                .generator()
+                .contentType()
+                .xContent()
+                .createParser(
+                    xContentRegistry(),
+                    LoggingDeprecationHandler.INSTANCE,
+                    BytesReference.bytes(serverResponsebuilder).streamInput()
+                ).map()
+        );
+        Map<String, Object> clientContentMap = convertFailureListToSet(
+            clientResponsebuilder
+                .generator()
+                .contentType()
+                .xContent()
+                .createParser(
+                    xContentRegistry(),
+                    LoggingDeprecationHandler.INSTANCE,
+                    BytesReference.bytes(clientResponsebuilder).streamInput()
+                )
+                .map()
+        );
+        assertEquals(serverContentMap, clientContentMap);
+    }
+
+    public void testXContentDeserialization() throws IOException {
+        final XContentType xContentType = randomFrom(XContentType.values());
+        TestPlan plan = createTestPlan();
+        XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
+        builder.startObject();
+        plan.result.toXContent(builder, ToXContent.EMPTY_PARAMS);
+        builder.endObject();
+        XContentParser parser = builder
+            .generator()
+            .contentType()
+            .xContent()
+            .createParser(
+                xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()
+            );
+        SyncedFlushResponse originalResponse = plan.clientResult;
+        SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser);
+        assertNotNull(parsedResponse);
+        assertShardCounts(originalResponse.getShardCounts(), parsedResponse.getShardCounts());
+        for (Map.Entry<String, SyncedFlushResponse.IndexResult> entry: originalResponse.getIndexResults().entrySet()) {
+            String index = entry.getKey();
+            SyncedFlushResponse.IndexResult responseResult = entry.getValue();
+            SyncedFlushResponse.IndexResult parsedResult = parsedResponse.getIndexResults().get(index);
+            assertNotNull(responseResult);
+            assertNotNull(parsedResult);
+            assertShardCounts(responseResult.getShardCounts(), parsedResult.getShardCounts());
+            assertEquals(responseResult.failures().size(), parsedResult.failures().size());
+            for (SyncedFlushResponse.ShardFailure responseShardFailure: responseResult.failures()) {
+                assertTrue(containsFailure(parsedResult.failures(), responseShardFailure));
+            }
+        }
+    }
+
+    static class TestPlan {
+        SyncedFlushResponse.ShardCounts totalCounts;
+        Map<String, SyncedFlushResponse.ShardCounts> countsPerIndex = new HashMap<>();
+        ObjectIntMap<String> expectedFailuresPerIndex = new ObjectIntHashMap<>();
+        org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse result;
+        SyncedFlushResponse clientResult;
+    }
+
+    TestPlan createTestPlan() throws IOException {
+        final TestPlan testPlan = new TestPlan();
+        final Map<String, List<ShardsSyncedFlushResult>> indicesResults = new HashMap<>();
+        Map<String, SyncedFlushResponse.IndexResult> indexResults = new HashMap<>();
+        final XContentType xContentType = randomFrom(XContentType.values());
+        final int indexCount = randomIntBetween(1, 10);
+        int totalShards = 0;
+        int totalSuccessful = 0;
+        int totalFailed = 0;
+        for (int i = 0; i < indexCount; i++) {
+            final String index = "index_" + i;
+            int shards = randomIntBetween(1, 4);
+            int replicas = randomIntBetween(0, 2);
+            int successful = 0;
+            int failed = 0;
+            int failures = 0;
+            List<ShardsSyncedFlushResult> shardsResults = new ArrayList<>();
+            List<SyncedFlushResponse.ShardFailure> shardFailures = new ArrayList<>();
+            for (int shard = 0; shard < shards; shard++) {
+                final ShardId shardId = new ShardId(index, "_na_", shard);
+                if (randomInt(5) < 2) {
+                    // total shard failure
+                    failed += replicas + 1;
+                    failures++;
+                    shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure"));
+                    shardFailures.add(
+                        new SyncedFlushResponse.ShardFailure(
+                            shardId.id(),
+                            "simulated total failure",
+                            new HashMap<>()
+                        )
+                    );
+                } else {
+                    Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses = new HashMap<>();
+                    for (int copy = 0; copy < replicas + 1; copy++) {
+                        final ShardRouting shardRouting =
+                            TestShardRouting.newShardRouting(
+                                index, shard, "node_" + shardId + "_" + copy, null,
+                                copy == 0, ShardRoutingState.STARTED
+                            );
+                        if (randomInt(5) < 2) {
+                            // shard copy failure
+                            failed++;
+                            failures++;
+                            shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId));
+                            // Building the shardRouting map here.
+                            XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
+                            Map<String, Object> routing =
+                                shardRouting.toXContent(builder, ToXContent.EMPTY_PARAMS)
+                                    .generator()
+                                    .contentType()
+                                    .xContent()
+                                    .createParser(
+                                        xContentRegistry(), LoggingDeprecationHandler.INSTANCE,
+                                        BytesReference.bytes(builder).streamInput()
+                                    )
+                                    .map();
+                            shardFailures.add(
+                                new SyncedFlushResponse.ShardFailure(
+                                    shardId.id(),
+                                    "copy failure " + shardId,
+                                    routing
+                                )
+                            );
+                        } else {
+                            successful++;
+                            shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse());
+                        }
+                    }
+                    shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));
+                }
+            }
+            indicesResults.put(index, shardsResults);
+            indexResults.put(
+                index,
+                new SyncedFlushResponse.IndexResult(
+                    shards * (replicas + 1),
+                    successful,
+                    failed,
+                    shardFailures
+                )
+            );
+            testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed));
+            testPlan.expectedFailuresPerIndex.put(index, failures);
+            totalFailed += failed;
+            totalShards += shards * (replicas + 1);
+            totalSuccessful += successful;
+        }
+        testPlan.result = new org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse(indicesResults);
+        testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed);
+        testPlan.clientResult = new SyncedFlushResponse(
+            new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed),
+            indexResults
+        );
+        return testPlan;
+    }
+
+    public boolean containsFailure(List<SyncedFlushResponse.ShardFailure> failures, SyncedFlushResponse.ShardFailure origFailure) {
+        for (SyncedFlushResponse.ShardFailure failure: failures) {
+            if (failure.getShardId() == origFailure.getShardId() &&
+                failure.getFailureReason().equals(origFailure.getFailureReason()) &&
+                failure.getRouting().equals(origFailure.getRouting())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) {
+        if (first == null) {
+            assertNull(second);
+        } else {
+            assertTrue(first.equals(second));
+        }
+    }
+
+    public Map<String, Object> convertFailureListToSet(Map<String, Object> input) {
+        Map<String, Object> retMap = new HashMap<>();
+        for (Map.Entry<String, Object> entry: input.entrySet()) {
+            if (entry.getKey().equals(SyncedFlushResponse.SHARDS_FIELD)) {
+                retMap.put(entry.getKey(), entry.getValue());
+            } else {
+                // This was an index entry.
+                @SuppressWarnings("unchecked")
+                Map<String, Object> indexResult = (Map<String, Object>)entry.getValue();
+                Map<String, Object> retResult = new HashMap<>();
+                for (Map.Entry<String, Object> entry2: indexResult.entrySet()) {
+                    if (entry2.getKey().equals(SyncedFlushResponse.IndexResult.FAILURES_FIELD)) {
+                        @SuppressWarnings("unchecked")
+                        List<Object> failures = (List<Object>)entry2.getValue();
+                        Set<Object> retSet = new HashSet<>(failures);
+                        retResult.put(entry.getKey(), retSet);
+                    } else {
+                        retResult.put(entry2.getKey(), entry2.getValue());
+                    }
+                }
+                retMap.put(entry.getKey(), retResult);
+            }
+        }
+        return retMap;
+    }
+}

+ 85 - 4
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java

@@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -55,8 +56,6 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
 import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
 import org.elasticsearch.action.admin.indices.shrink.ResizeType;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
 import org.elasticsearch.action.support.ActiveShardCount;
@@ -64,6 +63,7 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.ESRestHighLevelClientTestCase;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.SyncedFlushResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -81,8 +81,6 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.Matchers.equalTo;
-
 /**
  * This class is used to generate the Java Indices API documentation.
  * You need to wrap your code between two tags like:
@@ -784,6 +782,89 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
         }
     }
 
+    public void testSyncedFlushIndex() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        {
+            createIndex("index1", Settings.EMPTY);
+        }
+
+        {
+            // tag::flush-synced-request
+            SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1>
+            SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2>
+            SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3>
+            // end::flush-synced-request
+
+            // tag::flush-synced-request-indicesOptions
+            request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
+            // end::flush-synced-request-indicesOptions
+
+            // tag::flush-synced-execute
+            SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request);
+            // end::flush-synced-execute
+
+            // tag::flush-synced-response
+            int totalShards = flushSyncedResponse.totalShards(); // <1>
+            int successfulShards = flushSyncedResponse.successfulShards(); // <2>
+            int failedShards = flushSyncedResponse.failedShards(); // <3>
+
+            for (Map.Entry<String, SyncedFlushResponse.IndexResult> responsePerIndexEntry:
+                flushSyncedResponse.getIndexResults().entrySet()) {
+                String indexName = responsePerIndexEntry.getKey(); // <4>
+                SyncedFlushResponse.IndexResult indexResult = responsePerIndexEntry.getValue();
+                int totalShardsForIndex = indexResult.totalShards(); // <5>
+                int successfulShardsForIndex = indexResult.successfulShards(); // <6>
+                int failedShardsForIndex = indexResult.failedShards(); // <7>
+                if (failedShardsForIndex > 0) {
+                    for (SyncedFlushResponse.ShardFailure failureEntry: indexResult.failures()) {
+                        int shardId = failureEntry.getShardId(); // <8>
+                        String failureReason = failureEntry.getFailureReason(); // <9>
+                        Map<String, Object> routing = failureEntry.getRouting(); // <10>
+                    }
+                }
+            }
+            // end::flush-synced-response
+
+            // tag::flush-synced-execute-listener
+            ActionListener<SyncedFlushResponse> listener = new ActionListener<SyncedFlushResponse>() {
+                @Override
+                public void onResponse(SyncedFlushResponse refreshResponse) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            // end::flush-synced-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::flush-synced-execute-async
+            client.indices().flushSyncedAsync(request, listener); // <1>
+            // end::flush-synced-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+
+        {
+            // tag::flush-synced-notfound
+            try {
+                SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist");
+                client.indices().flushSynced(request);
+            } catch (ElasticsearchException exception) {
+                if (exception.status() == RestStatus.NOT_FOUND) {
+                    // <1>
+                }
+            }
+            // end::flush-synced-notfound
+        }
+    }
+
     public void testGetSettings() throws Exception {
         RestHighLevelClient client = highLevelClient();
 

+ 91 - 0
docs/java-rest/high-level/indices/flush_synced.asciidoc

@@ -0,0 +1,91 @@
+[[java-rest-high-flush]]
+=== Flush Synced API
+
+[[java-rest-high-flush-synced-request]]
+==== Flush Synced Request
+
+A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request]
+--------------------------------------------------
+<1> Flush synced one index
+<2> Flush synced multiple indices
+<3> Flush synced all the indices
+
+==== Optional arguments
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions]
+--------------------------------------------------
+<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
+how wildcard expressions are expanded
+
+[[java-rest-high-flush-synced-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute]
+--------------------------------------------------
+
+[[java-rest-high-flush-synced-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a flush request requires both the `SyncedFlushRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async]
+--------------------------------------------------
+<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `SyncedFlushResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+[[java-rest-high-flush-response]]
+==== Flush Synced Response
+
+The returned `SyncedFlushResponse` allows to retrieve information about the
+executed operation as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response]
+--------------------------------------------------
+<1> Total number of shards hit by the flush request
+<2> Number of shards where the flush has succeeded
+<3> Number of shards where the flush has failed
+<4> Name of the index whose results we are about to calculate.
+<5> Total number of shards for index mentioned in 4.
+<6> Successful shards for index mentioned in 4.
+<7> Failed shards for index mentioned in 4.
+<8> One of the failed shard ids of the failed index mentioned in 4.
+<9> Reason for failure of copies of the shard mentioned in 8.
+<10> JSON represented by a Map<String, Object>. Contains shard related information like id, state, version etc.
+for the failed shard copies. If the entire shard failed then this returns an empty map.
+
+By default, if the indices were not found, an `ElasticsearchException` will be thrown:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound]
+--------------------------------------------------
+<1> Do something if the indices to be flushed were not found

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -67,6 +67,7 @@ Index Management::
 * <<java-rest-high-split-index>>
 * <<java-rest-high-refresh>>
 * <<java-rest-high-flush>>
+* <<java-rest-high-flush-synced>>
 * <<java-rest-high-clear-cache>>
 * <<java-rest-high-force-merge>>
 * <<java-rest-high-rollover-index>>
@@ -89,6 +90,7 @@ include::indices/shrink_index.asciidoc[]
 include::indices/split_index.asciidoc[]
 include::indices/refresh.asciidoc[]
 include::indices/flush.asciidoc[]
+include::indices/flush_synced.asciidoc[]
 include::indices/clear_cache.asciidoc[]
 include::indices/force_merge.asciidoc[]
 include::indices/rollover.asciidoc[]