Ver Fonte

Merge pull request #15396 from brwe/java-api-for-synced-flush

Add java API for synced flush

closes #12812
Britta Weber há 10 anos atrás
pai
commit
db357f078a
19 ficheiros alterados com 696 adições e 130 exclusões
  1. 3 0
      core/src/main/java/org/elasticsearch/action/ActionModule.java
  2. 44 0
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushAction.java
  3. 64 0
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequest.java
  4. 41 0
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequestBuilder.java
  5. 83 14
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java
  6. 52 0
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportSyncedFlushAction.java
  7. 28 2
      core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java
  8. 13 1
      core/src/main/java/org/elasticsearch/client/Requests.java
  9. 19 0
      core/src/main/java/org/elasticsearch/client/support/AbstractClient.java
  10. 47 9
      core/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java
  11. 46 40
      core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java
  12. 7 10
      core/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java
  13. 68 16
      core/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java
  14. 1 1
      core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
  15. 161 0
      core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java
  16. 5 4
      core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
  17. 2 2
      core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java
  18. 4 20
      core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
  19. 8 11
      test-framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

+ 3 - 0
core/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -107,6 +107,8 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
 import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushAction;
+import org.elasticsearch.action.admin.indices.flush.TransportSyncedFlushAction;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
@@ -293,6 +295,7 @@ public class ActionModule extends AbstractModule {
         registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
         registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
         registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
+        registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
         registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
         registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
         registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);

+ 44 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushAction.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.flush;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.client.ElasticsearchClient;
+
+
+public class SyncedFlushAction extends Action<SyncedFlushRequest, SyncedFlushResponse, SyncedFlushRequestBuilder> {
+
+    public static final SyncedFlushAction INSTANCE = new SyncedFlushAction();
+    public static final String NAME = "indices:admin/synced_flush";
+
+    private SyncedFlushAction() {
+        super(NAME);
+    }
+
+    @Override
+    public SyncedFlushResponse newResponse() {
+        return new SyncedFlushResponse();
+    }
+
+    @Override
+    public SyncedFlushRequestBuilder newRequestBuilder(ElasticsearchClient client) {
+        return new SyncedFlushRequestBuilder(client, this);
+    }
+}

+ 64 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequest.java

@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.flush;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.support.broadcast.BroadcastRequest;
+
+import java.util.Arrays;
+
+/**
+ * A synced flush request to sync flush one or more indices. The synced flush process of an index performs a flush
+ * and writes the same sync id to primary and all copies.
+ *
+ * <p>Best created with {@link org.elasticsearch.client.Requests#syncedFlushRequest(String...)}. </p>
+ *
+ * @see org.elasticsearch.client.Requests#flushRequest(String...)
+ * @see org.elasticsearch.client.IndicesAdminClient#syncedFlush(SyncedFlushRequest)
+ * @see SyncedFlushResponse
+ */
+public class SyncedFlushRequest extends BroadcastRequest<SyncedFlushRequest> {
+
+    public SyncedFlushRequest() {
+    }
+
+    /**
+     * Copy constructor that creates a new synced flush request that is a copy of the one provided as an argument.
+     * The new request will inherit though headers and context from the original request that caused it.
+     */
+    public SyncedFlushRequest(ActionRequest originalRequest) {
+        super(originalRequest);
+    }
+
+    /**
+     * Constructs a new synced flush request against one or more indices. If nothing is provided, all indices will
+     * be sync flushed.
+     */
+    public SyncedFlushRequest(String... indices) {
+        super(indices);
+    }
+
+
+    @Override
+    public String toString() {
+        return "SyncedFlushRequest{" +
+            "indices=" + Arrays.toString(indices) + "}";
+    }
+}

+ 41 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequestBuilder.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.flush;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.ElasticsearchClient;
+
+public class SyncedFlushRequestBuilder extends ActionRequestBuilder<SyncedFlushRequest, SyncedFlushResponse, SyncedFlushRequestBuilder> {
+
+    public SyncedFlushRequestBuilder(ElasticsearchClient client, SyncedFlushAction action) {
+        super(client, action, new SyncedFlushRequest());
+    }
+
+    public SyncedFlushRequestBuilder setIndices(String[] indices) {
+        super.request().indices(indices);
+        return this;
+    }
+
+    public SyncedFlushRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
+        super.request().indicesOptions(indicesOptions);
+        return this;
+    }
+}

+ 83 - 14
core/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java → core/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java

@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.indices.flush;
+package org.elasticsearch.action.admin.indices.flush;
 
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.indices.flush.ShardsSyncedFlushResult;
+import org.elasticsearch.indices.flush.SyncedFlushService;
 import org.elasticsearch.rest.RestStatus;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -34,13 +43,16 @@ import static java.util.Collections.unmodifiableMap;
 /**
  * The result of performing a sync flush operation on all shards of multiple indices
  */
-public class IndicesSyncedFlushResult implements ToXContent {
+public class SyncedFlushResponse extends ActionResponse implements ToXContent {
 
-    final Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex;
-    final ShardCounts shardCounts;
+    Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex;
+    ShardCounts shardCounts;
 
+    SyncedFlushResponse() {
 
-    public IndicesSyncedFlushResult(Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex) {
+    }
+
+    public SyncedFlushResponse(Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex) {
         // shardsResultPerIndex is never modified after it is passed to this
         // constructor so this is safe even though shardsResultPerIndex is a
         // ConcurrentHashMap
@@ -48,17 +60,23 @@ public class IndicesSyncedFlushResult implements ToXContent {
         this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values()));
     }
 
-    /** total number shards, including replicas, both assigned and unassigned */
+    /**
+     * total number shards, including replicas, both assigned and unassigned
+     */
     public int totalShards() {
         return shardCounts.total;
     }
 
-    /** total number of shards for which the operation failed */
+    /**
+     * total number of shards for which the operation failed
+     */
     public int failedShards() {
         return shardCounts.failed;
     }
 
-    /** total number of shards which were successfully sync-flushed */
+    /**
+     * total number of shards which were successfully sync-flushed
+     */
     public int successfulShards() {
         return shardCounts.successful;
     }
@@ -91,8 +109,8 @@ public class IndicesSyncedFlushResult implements ToXContent {
                         builder.endObject();
                         continue;
                     }
-                    Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failedShards = shardResults.failedShards();
-                    for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardEntry : failedShards.entrySet()) {
+                    Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> failedShards = shardResults.failedShards();
+                    for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardEntry : failedShards.entrySet()) {
                         builder.startObject();
                         builder.field(Fields.SHARD, shardResults.shardId().id());
                         builder.field(Fields.REASON, shardEntry.getValue().failureReason());
@@ -123,11 +141,11 @@ public class IndicesSyncedFlushResult implements ToXContent {
         return new ShardCounts(total, successful, failed);
     }
 
-    static final class ShardCounts implements ToXContent {
+    static final class ShardCounts implements ToXContent, Streamable {
 
-        public final int total;
-        public final int successful;
-        public final int failed;
+        public int total;
+        public int successful;
+        public int failed;
 
         ShardCounts(int total, int successful, int failed) {
             this.total = total;
@@ -135,6 +153,10 @@ public class IndicesSyncedFlushResult implements ToXContent {
             this.failed = failed;
         }
 
+        ShardCounts() {
+
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.field(Fields.TOTAL, total);
@@ -142,6 +164,20 @@ public class IndicesSyncedFlushResult implements ToXContent {
             builder.field(Fields.FAILED, failed);
             return builder;
         }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            total = in.readInt();
+            successful = in.readInt();
+            failed = in.readInt();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeInt(total);
+            out.writeInt(successful);
+            out.writeInt(failed);
+        }
     }
 
     static final class Fields {
@@ -154,4 +190,37 @@ public class IndicesSyncedFlushResult implements ToXContent {
         static final XContentBuilderString ROUTING = new XContentBuilderString("routing");
         static final XContentBuilderString REASON = new XContentBuilderString("reason");
     }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        shardCounts = new ShardCounts();
+        shardCounts.readFrom(in);
+        Map<String, List<ShardsSyncedFlushResult>> tmpShardsResultPerIndex = new HashMap<>();
+        int numShardsResults = in.readInt();
+        for (int i =0 ; i< numShardsResults; i++) {
+            String index = in.readString();
+            List<ShardsSyncedFlushResult> shardsSyncedFlushResults = new ArrayList<>();
+            int numShards = in.readInt();
+            for (int j =0; j< numShards; j++) {
+                shardsSyncedFlushResults.add(ShardsSyncedFlushResult.readShardsSyncedFlushResult(in));
+            }
+            tmpShardsResultPerIndex.put(index, shardsSyncedFlushResults);
+        }
+        shardsResultPerIndex = Collections.unmodifiableMap(tmpShardsResultPerIndex);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        shardCounts.writeTo(out);
+        out.writeInt(shardsResultPerIndex.size());
+        for (Map.Entry<String, List<ShardsSyncedFlushResult>> entry : shardsResultPerIndex.entrySet()) {
+            out.writeString(entry.getKey());
+            out.writeInt(entry.getValue().size());
+            for (ShardsSyncedFlushResult shardsSyncedFlushResult : entry.getValue()) {
+                shardsSyncedFlushResult.writeTo(out);
+            }
+        }
+    }
 }

+ 52 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportSyncedFlushAction.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.flush;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.flush.SyncedFlushService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+/**
+ * Synced flush Action.
+ */
+public class TransportSyncedFlushAction extends HandledTransportAction<SyncedFlushRequest, SyncedFlushResponse> {
+
+    SyncedFlushService syncedFlushService;
+
+    @Inject
+    public TransportSyncedFlushAction(Settings settings, ThreadPool threadPool,
+                                      TransportService transportService, ActionFilters actionFilters,
+                                      IndexNameExpressionResolver indexNameExpressionResolver,
+                                      SyncedFlushService syncedFlushService) {
+        super(settings, SyncedFlushAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SyncedFlushRequest::new);
+        this.syncedFlushService = syncedFlushService;
+    }
+
+    @Override
+    protected void doExecute(SyncedFlushRequest request, ActionListener<SyncedFlushResponse> listener) {
+        syncedFlushService.attemptSyncedFlush(request.indices(), request.indicesOptions(), listener);
+    }
+}

+ 28 - 2
core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java

@@ -53,8 +53,8 @@ import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
@@ -82,11 +82,14 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
 import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
-import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
 import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
+import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequestBuilder;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
@@ -390,6 +393,29 @@ public interface IndicesAdminClient extends ElasticsearchClient {
      */
     FlushRequestBuilder prepareFlush(String... indices);
 
+    /**
+     * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
+     *
+     * @param request The sync flush request
+     * @return A result future
+     * @see org.elasticsearch.client.Requests#syncedFlushRequest(String...)
+     */
+    ActionFuture<SyncedFlushResponse> syncedFlush(SyncedFlushRequest request);
+
+    /**
+     * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
+     *
+     * @param request  The sync flush request
+     * @param listener A listener to be notified with a result
+     * @see org.elasticsearch.client.Requests#syncedFlushRequest(String...)
+     */
+    void syncedFlush(SyncedFlushRequest request, ActionListener <SyncedFlushResponse> listener);
+
+    /**
+     * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
+     */
+    SyncedFlushRequestBuilder prepareSyncedFlush(String... indices);
+
     /**
      * Explicitly force merge one or more indices into a the number of segments.
      *

+ 13 - 1
core/src/main/java/org/elasticsearch/client/Requests.java

@@ -50,6 +50,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
 import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -131,7 +132,7 @@ public class Requests {
     public static SuggestRequest suggestRequest(String... indices) {
         return new SuggestRequest(indices);
     }
-    
+
     /**
      * Creates a search request against one or more indices. Note, the search source must be set either using the
      * actual JSON search source, or the {@link org.elasticsearch.search.builder.SearchSourceBuilder}.
@@ -265,6 +266,17 @@ public class Requests {
         return new FlushRequest(indices);
     }
 
+    /**
+     * Creates a synced flush indices request.
+     *
+     * @param indices The indices to sync flush. Use <tt>null</tt> or <tt>_all</tt> to execute against all indices
+     * @return The synced flush request
+     * @see org.elasticsearch.client.IndicesAdminClient#syncedFlush(SyncedFlushRequest)
+     */
+    public static SyncedFlushRequest syncedFlushRequest(String... indices) {
+        return new SyncedFlushRequest(indices);
+    }
+
     /**
      * Creates a force merge request.
      *

+ 19 - 0
core/src/main/java/org/elasticsearch/client/support/AbstractClient.java

@@ -188,6 +188,10 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushAction;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequestBuilder;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
@@ -1315,6 +1319,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
             return new FlushRequestBuilder(this, FlushAction.INSTANCE).setIndices(indices);
         }
 
+        @Override
+        public ActionFuture<SyncedFlushResponse> syncedFlush(SyncedFlushRequest request) {
+            return execute(SyncedFlushAction.INSTANCE, request);
+        }
+
+        @Override
+        public void syncedFlush(SyncedFlushRequest request, ActionListener<SyncedFlushResponse> listener) {
+            execute(SyncedFlushAction.INSTANCE, request, listener);
+        }
+
+        @Override
+        public SyncedFlushRequestBuilder prepareSyncedFlush(String... indices) {
+            return new SyncedFlushRequestBuilder(this, SyncedFlushAction.INSTANCE).setIndices(indices);
+        }
+
         @Override
         public void getMappings(GetMappingsRequest request, ActionListener<GetMappingsResponse> listener) {
             execute(GetMappingsAction.INSTANCE, request, listener);

+ 47 - 9
core/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java

@@ -19,8 +19,12 @@
 package org.elasticsearch.indices.flush;
 
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.index.shard.ShardId;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,15 +34,15 @@ import static java.util.Collections.unmodifiableMap;
 /**
  * Result for all copies of a shard
  */
-public class ShardsSyncedFlushResult {
+public class ShardsSyncedFlushResult implements Streamable {
     private String failureReason;
-    private Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses;
+    private Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses;
     private String syncId;
     private ShardId shardId;
     // some shards may be unassigned, so we need this as state
     private int totalShards;
 
-    public ShardsSyncedFlushResult() {
+    private ShardsSyncedFlushResult() {
     }
 
     public ShardId getShardId() {
@@ -59,7 +63,7 @@ public class ShardsSyncedFlushResult {
     /**
      * success constructor
      */
-    public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses) {
+    public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses) {
         this.failureReason = null;
         this.shardResponses = unmodifiableMap(new HashMap<>(shardResponses));
         this.syncId = syncId;
@@ -98,7 +102,7 @@ public class ShardsSyncedFlushResult {
      */
     public int successfulShards() {
         int i = 0;
-        for (SyncedFlushService.SyncedFlushResponse result : shardResponses.values()) {
+        for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) {
             if (result.success()) {
                 i++;
             }
@@ -109,9 +113,9 @@ public class ShardsSyncedFlushResult {
     /**
      * @return an array of shard failures
      */
-    public Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failedShards() {
-        Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failures = new HashMap<>();
-        for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> result : shardResponses.entrySet()) {
+    public Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> failedShards() {
+        Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> failures = new HashMap<>();
+        for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> result : shardResponses.entrySet()) {
             if (result.getValue().success() == false) {
                 failures.put(result.getKey(), result.getValue());
             }
@@ -123,11 +127,45 @@ public class ShardsSyncedFlushResult {
      * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
      * Empty if synced flush failed before step three.
      */
-    public Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses() {
+    public Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses() {
         return shardResponses;
     }
 
     public ShardId shardId() {
         return shardId;
     }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        failureReason = in.readOptionalString();
+        int numResponses = in.readInt();
+        shardResponses = new HashMap<>();
+        for (int i = 0; i < numResponses; i++) {
+            ShardRouting shardRouting = ShardRouting.readShardRoutingEntry(in);
+            SyncedFlushService.ShardSyncedFlushResponse response = SyncedFlushService.ShardSyncedFlushResponse.readSyncedFlushResponse(in);
+            shardResponses.put(shardRouting, response);
+        }
+        syncId = in.readOptionalString();
+        shardId = ShardId.readShardId(in);
+        totalShards = in.readInt();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeOptionalString(failureReason);
+        out.writeInt(shardResponses.size());
+        for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> entry : shardResponses.entrySet()) {
+            entry.getKey().writeTo(out);
+            entry.getValue().writeTo(out);
+        }
+        out.writeOptionalString(syncId);
+        shardId.writeTo(out);
+        out.writeInt(totalShards);
+    }
+
+    public static ShardsSyncedFlushResult readShardsSyncedFlushResult(StreamInput in) throws IOException {
+        ShardsSyncedFlushResult shardsSyncedFlushResult = new ShardsSyncedFlushResult();
+        shardsSyncedFlushResult.readFrom(in);
+        return shardsSyncedFlushResult;
+    }
 }

+ 46 - 40
core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

@@ -21,6 +21,7 @@ package org.elasticsearch.indices.flush;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
@@ -81,9 +82,8 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         this.clusterService = clusterService;
         this.transportService = transportService;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
-
-        transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
-        transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, SyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
+        transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
+        transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
         transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler());
     }
 
@@ -109,7 +109,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
      * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)}
      * for more details.
      */
-    public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<IndicesSyncedFlushResult> listener) {
+    public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<SyncedFlushResponse> listener) {
         final ClusterState state = clusterService.state();
         final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
         final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
@@ -123,7 +123,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
 
         }
         if (numberOfShards == 0) {
-            listener.onResponse(new IndicesSyncedFlushResult(results));
+            listener.onResponse(new SyncedFlushResponse(results));
             return;
         }
         final int finalTotalNumberOfShards = totalNumberOfShards;
@@ -138,7 +138,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
                     public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
                         results.get(index).add(syncedFlushResult);
                         if (countDown.countDown()) {
-                            listener.onResponse(new IndicesSyncedFlushResult(results));
+                            listener.onResponse(new SyncedFlushResponse(results));
                         }
                     }
 
@@ -147,7 +147,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
                         logger.debug("{} unexpected error while executing synced flush", shardId);
                         results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage()));
                         if (countDown.countDown()) {
-                            listener.onResponse(new IndicesSyncedFlushResult(results));
+                            listener.onResponse(new SyncedFlushResponse(results));
                         }
                     }
                 });
@@ -297,33 +297,33 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
     void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds,
                           final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
         final CountDown countDown = new CountDown(shards.size());
-        final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
+        final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
         for (final ShardRouting shard : shards) {
             final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
             if (node == null) {
                 logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
-                results.put(shard, new SyncedFlushResponse("unknown node"));
+                results.put(shard, new ShardSyncedFlushResponse("unknown node"));
                 contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                 continue;
             }
             final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
             if (expectedCommitId == null) {
                 logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
-                results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
+                results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
                 contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                 continue;
             }
             logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
-            transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new SyncedFlushRequest(shard.shardId(), syncId, expectedCommitId),
-                    new BaseTransportResponseHandler<SyncedFlushResponse>() {
+            transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, expectedCommitId),
+                    new BaseTransportResponseHandler<ShardSyncedFlushResponse>() {
                         @Override
-                        public SyncedFlushResponse newInstance() {
-                            return new SyncedFlushResponse();
+                        public ShardSyncedFlushResponse newInstance() {
+                            return new ShardSyncedFlushResponse();
                         }
 
                         @Override
-                        public void handleResponse(SyncedFlushResponse response) {
-                            SyncedFlushResponse existing = results.put(shard, response);
+                        public void handleResponse(ShardSyncedFlushResponse response) {
+                            ShardSyncedFlushResponse existing = results.put(shard, response);
                             assert existing == null : "got two answers for node [" + node + "]";
                             // count after the assert so we won't decrement twice in handleException
                             contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
@@ -332,7 +332,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
                         @Override
                         public void handleException(TransportException exp) {
                             logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
-                            results.put(shard, new SyncedFlushResponse(exp.getMessage()));
+                            results.put(shard, new ShardSyncedFlushResponse(exp.getMessage()));
                             contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                         }
 
@@ -346,7 +346,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
     }
 
     private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
-            ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, SyncedFlushResponse> results) {
+            ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
         if (countDown.countDown()) {
             assert results.size() == shards.size();
             listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
@@ -369,7 +369,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
                 }
                 continue;
             }
-            transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler<PreSyncedFlushResponse>() {
+            transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler<PreSyncedFlushResponse>() {
                 @Override
                 public PreSyncedFlushResponse newInstance() {
                     return new PreSyncedFlushResponse();
@@ -401,7 +401,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         }
     }
 
-    private PreSyncedFlushResponse performPreSyncedFlush(PreSyncedFlushRequest request) {
+    private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
         IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
         FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
         logger.trace("{} performing pre sync flush", request.shardId());
@@ -410,7 +410,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         return new PreSyncedFlushResponse(commitId);
     }
 
-    private SyncedFlushResponse performSyncedFlush(SyncedFlushRequest request) {
+    private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
         IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
         IndexShard indexShard = indexService.getShard(request.shardId().id());
         logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
@@ -418,11 +418,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
         switch (result) {
             case SUCCESS:
-                return new SyncedFlushResponse();
+                return new ShardSyncedFlushResponse();
             case COMMIT_MISMATCH:
-                return new SyncedFlushResponse("commit has changed");
+                return new ShardSyncedFlushResponse("commit has changed");
             case PENDING_OPERATIONS:
-                return new SyncedFlushResponse("pending operations");
+                return new ShardSyncedFlushResponse("pending operations");
             default:
                 throw new ElasticsearchException("unknown synced flush result [" + result + "]");
         }
@@ -439,19 +439,19 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         return new InFlightOpsResponse(opCount);
     }
 
-    public final static class PreSyncedFlushRequest extends TransportRequest {
+    public final static class PreShardSyncedFlushRequest extends TransportRequest {
         private ShardId shardId;
 
-        public PreSyncedFlushRequest() {
+        public PreShardSyncedFlushRequest() {
         }
 
-        public PreSyncedFlushRequest(ShardId shardId) {
+        public PreShardSyncedFlushRequest(ShardId shardId) {
             this.shardId = shardId;
         }
 
         @Override
         public String toString() {
-            return "PreSyncedFlushRequest{" +
+            return "PreShardSyncedFlushRequest{" +
                     "shardId=" + shardId +
                     '}';
         }
@@ -504,16 +504,16 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         }
     }
 
-    public static final class SyncedFlushRequest extends TransportRequest {
+    public static final class ShardSyncedFlushRequest extends TransportRequest {
 
         private String syncId;
         private Engine.CommitId expectedCommitId;
         private ShardId shardId;
 
-        public SyncedFlushRequest() {
+        public ShardSyncedFlushRequest() {
         }
 
-        public SyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) {
+        public ShardSyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) {
             this.expectedCommitId = expectedCommitId;
             this.shardId = shardId;
             this.syncId = syncId;
@@ -549,7 +549,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
 
         @Override
         public String toString() {
-            return "SyncedFlushRequest{" +
+            return "ShardSyncedFlushRequest{" +
                     "shardId=" + shardId +
                     ",syncId='" + syncId + '\'' +
                     '}';
@@ -559,18 +559,18 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
     /**
      * Response for third step of synced flush (writing the sync id) for one shard copy
      */
-    public static final class SyncedFlushResponse extends TransportResponse {
+    public static final class ShardSyncedFlushResponse extends TransportResponse {
 
         /**
          * a non null value indicates a failure to sync flush. null means success
          */
         String failureReason;
 
-        public SyncedFlushResponse() {
+        public ShardSyncedFlushResponse() {
             failureReason = null;
         }
 
-        public SyncedFlushResponse(String failureReason) {
+        public ShardSyncedFlushResponse(String failureReason) {
             this.failureReason = failureReason;
         }
 
@@ -596,11 +596,17 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
 
         @Override
         public String toString() {
-            return "SyncedFlushResponse{" +
+            return "ShardSyncedFlushResponse{" +
                     "success=" + success() +
                     ", failureReason='" + failureReason + '\'' +
                     '}';
         }
+
+        public static ShardSyncedFlushResponse readSyncedFlushResponse(StreamInput in) throws IOException {
+            ShardSyncedFlushResponse shardSyncedFlushResponse = new ShardSyncedFlushResponse();
+            shardSyncedFlushResponse.readFrom(in);
+            return shardSyncedFlushResponse;
+        }
     }
 
 
@@ -677,18 +683,18 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
         }
     }
 
-    private final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreSyncedFlushRequest> {
+    private final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {
 
         @Override
-        public void messageReceived(PreSyncedFlushRequest request, TransportChannel channel) throws Exception {
+        public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
             channel.sendResponse(performPreSyncedFlush(request));
         }
     }
 
-    private final class SyncedFlushTransportHandler implements TransportRequestHandler<SyncedFlushRequest> {
+    private final class SyncedFlushTransportHandler implements TransportRequestHandler<ShardSyncedFlushRequest> {
 
         @Override
-        public void messageReceived(SyncedFlushRequest request, TransportChannel channel) throws Exception {
+        public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
             channel.sendResponse(performSyncedFlush(request));
         }
     }

+ 7 - 10
core/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java

@@ -19,14 +19,14 @@
 
 package org.elasticsearch.rest.action.admin.indices.flush;
 
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.flush.IndicesSyncedFlushResult;
-import org.elasticsearch.indices.flush.SyncedFlushService;
 import org.elasticsearch.rest.*;
 import org.elasticsearch.rest.action.support.RestBuilderListener;
 
@@ -38,12 +38,9 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
  */
 public class RestSyncedFlushAction extends BaseRestHandler {
 
-    private final SyncedFlushService syncedFlushService;
-
     @Inject
-    public RestSyncedFlushAction(Settings settings, RestController controller, Client client, SyncedFlushService syncedFlushService) {
+    public RestSyncedFlushAction(Settings settings, RestController controller, Client client) {
         super(settings, controller, client);
-        this.syncedFlushService = syncedFlushService;
         controller.registerHandler(POST, "/_flush/synced", this);
         controller.registerHandler(POST, "/{index}/_flush/synced", this);
 
@@ -53,12 +50,12 @@ public class RestSyncedFlushAction extends BaseRestHandler {
 
     @Override
     public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
-        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
         IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.lenientExpandOpen());
-
-        syncedFlushService.attemptSyncedFlush(indices, indicesOptions, new RestBuilderListener<IndicesSyncedFlushResult>(channel) {
+        SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
+        syncedFlushRequest.indicesOptions(indicesOptions);
+        client.admin().indices().syncedFlush(syncedFlushRequest, new RestBuilderListener<SyncedFlushResponse>(channel) {
             @Override
-            public RestResponse buildResponse(IndicesSyncedFlushResult results, XContentBuilder builder) throws Exception {
+            public RestResponse buildResponse(SyncedFlushResponse results, XContentBuilder builder) throws Exception {
                 builder.startObject();
                 results.toXContent(builder, request);
                 builder.endObject();

+ 68 - 16
core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java → core/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java

@@ -17,16 +17,20 @@
  * under the License.
  */
 
-package org.elasticsearch.indices.flush;
+package org.elasticsearch.action.admin.indices.flush;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
 import com.carrotsearch.hppc.ObjectIntMap;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse.ShardCounts;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts;
-import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse;
+import org.elasticsearch.indices.flush.ShardsSyncedFlushResult;
+import org.elasticsearch.indices.flush.SyncedFlushService;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 
@@ -42,14 +46,11 @@ import static org.hamcrest.Matchers.hasSize;
 
 public class SyncedFlushUnitTests extends ESTestCase {
 
-
     private static class TestPlan {
-        public ShardCounts totalCounts;
-        public Map<String, ShardCounts> countsPerIndex = new HashMap<>();
+        public SyncedFlushResponse.ShardCounts totalCounts;
+        public Map<String, SyncedFlushResponse.ShardCounts> countsPerIndex = new HashMap<>();
         public ObjectIntMap<String> expectedFailuresPerIndex = new ObjectIntHashMap<>();
-
-        public IndicesSyncedFlushResult result;
-
+        public SyncedFlushResponse result;
     }
 
     public void testIndicesSyncedFlushResult() throws IOException {
@@ -76,6 +77,56 @@ public class SyncedFlushUnitTests extends ESTestCase {
         }
     }
 
+    public void testResponseStreaming() throws IOException {
+        final TestPlan testPlan = createTestPlan();
+        assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total));
+        assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful));
+        assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed));
+        assertThat(testPlan.result.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK));
+        BytesStreamOutput out = new BytesStreamOutput();
+        testPlan.result.writeTo(out);
+        StreamInput in = StreamInput.wrap(out.bytes());
+        SyncedFlushResponse readResponse = new SyncedFlushResponse();
+        readResponse.readFrom(in);
+        assertThat(readResponse.totalShards(), equalTo(testPlan.totalCounts.total));
+        assertThat(readResponse.successfulShards(), equalTo(testPlan.totalCounts.successful));
+        assertThat(readResponse.failedShards(), equalTo(testPlan.totalCounts.failed));
+        assertThat(readResponse.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK));
+        assertThat(readResponse.shardsResultPerIndex.size(), equalTo(testPlan.result.getShardsResultPerIndex().size()));
+        for (Map.Entry<String, List<ShardsSyncedFlushResult>> entry : readResponse.getShardsResultPerIndex().entrySet()) {
+            List<ShardsSyncedFlushResult> originalShardsResults = testPlan.result.getShardsResultPerIndex().get(entry.getKey());
+            assertNotNull(originalShardsResults);
+            List<ShardsSyncedFlushResult> readShardsResults = entry.getValue();
+            assertThat(readShardsResults.size(), equalTo(originalShardsResults.size()));
+            for (int i = 0; i < readShardsResults.size(); i++) {
+                ShardsSyncedFlushResult originalShardResult = originalShardsResults.get(i);
+                ShardsSyncedFlushResult readShardResult = readShardsResults.get(i);
+                assertThat(originalShardResult.failureReason(), equalTo(readShardResult.failureReason()));
+                assertThat(originalShardResult.failed(), equalTo(readShardResult.failed()));
+                assertThat(originalShardResult.getShardId(), equalTo(readShardResult.getShardId()));
+                assertThat(originalShardResult.successfulShards(), equalTo(readShardResult.successfulShards()));
+                assertThat(originalShardResult.syncId(), equalTo(readShardResult.syncId()));
+                assertThat(originalShardResult.totalShards(), equalTo(readShardResult.totalShards()));
+                assertThat(originalShardResult.failedShards().size(), equalTo(readShardResult.failedShards().size()));
+                for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardEntry : originalShardResult.failedShards().entrySet()) {
+                    SyncedFlushService.ShardSyncedFlushResponse readShardResponse = readShardResult.failedShards().get(shardEntry.getKey());
+                    assertNotNull(readShardResponse);
+                    SyncedFlushService.ShardSyncedFlushResponse originalShardResponse = shardEntry.getValue();
+                    assertThat(originalShardResponse.failureReason(), equalTo(readShardResponse.failureReason()));
+                    assertThat(originalShardResponse.success(), equalTo(readShardResponse.success()));
+                }
+                assertThat(originalShardResult.shardResponses().size(), equalTo(readShardResult.shardResponses().size()));
+                for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardEntry : originalShardResult.shardResponses().entrySet()) {
+                    SyncedFlushService.ShardSyncedFlushResponse readShardResponse = readShardResult.shardResponses().get(shardEntry.getKey());
+                    assertNotNull(readShardResponse);
+                    SyncedFlushService.ShardSyncedFlushResponse originalShardResponse = shardEntry.getValue();
+                    assertThat(originalShardResponse.failureReason(), equalTo(readShardResponse.failureReason()));
+                    assertThat(originalShardResponse.success(), equalTo(readShardResponse.success()));
+                }
+            }
+        }
+    }
+
     private void assertShardCount(String name, Map<String, Object> header, ShardCounts expectedCounts) {
         assertThat(name + " has unexpected total count", (Integer) header.get("total"), equalTo(expectedCounts.total));
         assertThat(name + " has unexpected successful count", (Integer) header.get("successful"), equalTo(expectedCounts.successful));
@@ -105,32 +156,33 @@ public class SyncedFlushUnitTests extends ESTestCase {
                     failures++;
                     shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure"));
                 } else {
-                    Map<ShardRouting, SyncedFlushResponse> shardResponses = new HashMap<>();
+                    Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses = new HashMap<>();
                     for (int copy = 0; copy < replicas + 1; copy++) {
                         final ShardRouting shardRouting = TestShardRouting.newShardRouting(index, shard, "node_" + shardId + "_" + copy, null,
-                                copy == 0, ShardRoutingState.STARTED, 0);
+                            copy == 0, ShardRoutingState.STARTED, 0);
                         if (randomInt(5) < 2) {
                             // shard copy failure
                             failed++;
                             failures++;
-                            shardResponses.put(shardRouting, new SyncedFlushResponse("copy failure " + shardId));
+                            shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId));
                         } else {
                             successful++;
-                            shardResponses.put(shardRouting, new SyncedFlushResponse());
+                            shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse());
                         }
                     }
                     shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));
                 }
             }
             indicesResults.put(index, shardsResults);
-            testPlan.countsPerIndex.put(index, new ShardCounts(shards * (replicas + 1), successful, failed));
+            testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed));
             testPlan.expectedFailuresPerIndex.put(index, failures);
             totalFailed += failed;
             totalShards += shards * (replicas + 1);
             totalSuccesful += successful;
         }
-        testPlan.result = new IndicesSyncedFlushResult(indicesResults);
-        testPlan.totalCounts = new ShardCounts(totalShards, totalSuccesful, totalFailed);
+        testPlan.result = new SyncedFlushResponse(indicesResults);
+        testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccesful, totalFailed);
         return testPlan;
     }
+
 }

+ 1 - 1
core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java

@@ -369,7 +369,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
             ensureGreen();
         } else {
             logger.info("--> trying to sync flush");
-            assertEquals(SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").failedShards(), 0);
+            assertEquals(client().admin().indices().prepareSyncedFlush("test").get().failedShards(), 0);
             assertSyncIdsNotNull();
         }
 

+ 161 - 0
core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java

@@ -0,0 +1,161 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gateway;
+
+import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.indices.flush.SyncedFlushUtil;
+import org.elasticsearch.indices.recovery.RecoveryState;
+
+import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.elasticsearch.test.ESIntegTestCase.client;
+import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test of file reuse on recovery shared between integration tests and backwards
+ * compatibility tests.
+ */
+public class ReusePeerRecoverySharedTest {
+    /**
+     * Test peer reuse on recovery. This is shared between RecoverFromGatewayIT
+     * and RecoveryBackwardsCompatibilityIT.
+     *
+     * @param indexSettings
+     *            settings for the index to test
+     * @param restartCluster
+     *            runnable that will restart the cluster under test
+     * @param logger
+     *            logger for logging
+     * @param useSyncIds
+     *            should this use synced flush? can't use synced from in the bwc
+     *            tests
+     */
+    public static void testCase(Settings indexSettings, Runnable restartCluster, ESLogger logger, boolean useSyncIds) {
+        /*
+         * prevent any rebalance actions during the peer recovery if we run into
+         * a relocation the reuse count will be 0 and this fails the test. We
+         * are testing here if we reuse the files on disk after full restarts
+         * for replicas.
+         */
+        assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put(indexSettings)
+                .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
+        client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
+        logger.info("--> indexing docs");
+        for (int i = 0; i < 1000; i++) {
+            client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
+            if ((i % 200) == 0) {
+                client().admin().indices().prepareFlush().execute().actionGet();
+            }
+        }
+        if (randomBoolean()) {
+            client().admin().indices().prepareFlush().execute().actionGet();
+        }
+        logger.info("--> running cluster health");
+        client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
+        // just wait for merges
+        client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get();
+        client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
+
+        if (useSyncIds == false) {
+            logger.info("--> disabling allocation while the cluster is shut down");
+
+            // Disable allocations while we are closing nodes
+            client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
+                    .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)).get();
+            logger.info("--> full cluster restart");
+            restartCluster.run();
+
+            logger.info("--> waiting for cluster to return to green after first shutdown");
+            client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
+        } else {
+            logger.info("--> trying to sync flush");
+            assertEquals(client().admin().indices().prepareSyncedFlush("test").get().failedShards(), 0);
+            assertSyncIdsNotNull();
+        }
+
+        logger.info("--> disabling allocation while the cluster is shut down", useSyncIds ? "" : " a second time");
+        // Disable allocations while we are closing nodes
+        client().admin().cluster().prepareUpdateSettings().setTransientSettings(
+                settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
+                .get();
+        logger.info("--> full cluster restart");
+        restartCluster.run();
+
+        logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
+        client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
+
+        if (useSyncIds) {
+            assertSyncIdsNotNull();
+        }
+        RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
+        for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
+            long recovered = 0;
+            for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) {
+                if (file.name().startsWith("segments")) {
+                    recovered += file.length();
+                }
+            }
+            if (!recoveryState.getPrimary() && (useSyncIds == false)) {
+                logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", recoveryState.getShardId().getId(),
+                        recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
+                        recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
+                assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
+                assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l));
+                // we have to recover the segments file since we commit the translog ID on engine startup
+                assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(),
+                        equalTo(recoveryState.getIndex().totalBytes() - recovered));
+                assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(),
+                        equalTo(1));
+                assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(),
+                        equalTo(recoveryState.getIndex().totalFileCount() - 1));
+                assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
+            } else {
+                if (useSyncIds && !recoveryState.getPrimary()) {
+                    logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
+                            recoveryState.getShardId().getId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
+                            recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
+                }
+                assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l));
+                assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
+                assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
+                assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
+            }
+        }
+    }
+
+    public static void assertSyncIdsNotNull() {
+        IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
+        for (ShardStats shardStats : indexStats.getShards()) {
+            assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
+        }
+    }
+}

+ 5 - 4
core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

@@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -99,7 +100,7 @@ public class FlushIT extends ESIntegTestCase {
             result = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), new ShardId("test", 0));
         } else {
             logger.info("--> sync flushing index [test]");
-            IndicesSyncedFlushResult indicesResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test");
+            SyncedFlushResponse indicesResult = client().admin().indices().prepareSyncedFlush("test").get();
             result = indicesResult.getShardsResultPerIndex().get("test").get(0);
         }
         assertFalse(result.failed());
@@ -171,7 +172,7 @@ public class FlushIT extends ESIntegTestCase {
             assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
         }
         logger.info("--> trying sync flush");
-        IndicesSyncedFlushResult syncedFlushResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test");
+        SyncedFlushResponse syncedFlushResult = client().admin().indices().prepareSyncedFlush("test").get();
         logger.info("--> sync flush done");
         stop.set(true);
         indexingThread.join();
@@ -191,7 +192,7 @@ public class FlushIT extends ESIntegTestCase {
         for (final ShardStats shardStats : shardsStats) {
             for (final ShardsSyncedFlushResult shardResult : syncedFlushResults) {
                 if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) {
-                    for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> singleResponse : shardResult.shardResponses().entrySet()) {
+                    for (Map.Entry<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> singleResponse : shardResult.shardResponses().entrySet()) {
                         if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) {
                             if (singleResponse.getValue().success()) {
                                 logger.info("{} sync flushed on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
@@ -212,7 +213,7 @@ public class FlushIT extends ESIntegTestCase {
         prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get();
 
         // this should not hang but instead immediately return with empty result set
-        List<ShardsSyncedFlushResult> shardsResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").getShardsResultPerIndex().get("test");
+        List<ShardsSyncedFlushResult> shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test");
         // just to make sure the test actually tests the right thing
         int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1);
         assertThat(shardsResult.size(), equalTo(numShards));

+ 2 - 2
core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java

@@ -98,7 +98,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
         assertNotNull(syncedFlushResult);
         assertEquals(1, syncedFlushResult.successfulShards());
         assertEquals(1, syncedFlushResult.totalShards());
-        SyncedFlushService.SyncedFlushResponse response = syncedFlushResult.shardResponses().values().iterator().next();
+        SyncedFlushService.ShardSyncedFlushResponse response = syncedFlushResult.shardResponses().values().iterator().next();
         assertTrue(response.success());
     }
 
@@ -157,7 +157,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
         assertNull(listener.result);
         assertEquals("no such index", listener.error.getMessage());
     }
-    
+
     public void testFailAfterIntermediateCommit() throws InterruptedException {
         createIndex("test");
         client().prepareIndex("test", "test", "1").setSource("{}").get();

+ 4 - 20
core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java

@@ -20,6 +20,7 @@ package org.elasticsearch.indices.flush;
 
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -31,6 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
+import static org.elasticsearch.test.ESIntegTestCase.client;
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
+
 /** Utils for SyncedFlush */
 public class SyncedFlushUtil {
 
@@ -38,25 +42,6 @@ public class SyncedFlushUtil {
 
     }
 
-    /**
-     * Blocking single index version of {@link SyncedFlushService#attemptSyncedFlush(String[], IndicesOptions, ActionListener)}
-     */
-    public static IndicesSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, String index) {
-        SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
-        LatchedListener<IndicesSyncedFlushResult> listener = new LatchedListener();
-        service.attemptSyncedFlush(new String[]{index}, IndicesOptions.lenientExpandOpen(), listener);
-        try {
-            listener.latch.await();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        if (listener.error != null) {
-            throw ExceptionsHelper.convertToElastic(listener.error);
-        }
-        return listener.result;
-    }
-
-
     /**
      * Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
      */
@@ -109,5 +94,4 @@ public class SyncedFlushUtil {
         }
         return listener.result;
     }
-
 }

+ 8 - 11
test-framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -20,15 +20,12 @@ package org.elasticsearch.test;
 
 import com.carrotsearch.randomizedtesting.RandomizedContext;
 import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.Randomness;
 import com.carrotsearch.randomizedtesting.annotations.TestGroup;
 import com.carrotsearch.randomizedtesting.generators.RandomInts;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
-
 import org.apache.http.impl.client.HttpClients;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.SuppressForbidden;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
@@ -36,7 +33,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -44,8 +40,9 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
@@ -63,6 +60,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -137,9 +135,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -162,6 +158,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 
+import static org.elasticsearch.client.Requests.syncedFlushRequest;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@@ -1499,13 +1496,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 if (randomBoolean()) {
                     client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
                             new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
-                } else if (isInternalCluster()) {
-                    internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(indices, IndicesOptions.lenientExpandOpen(),
-                            new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
+                } else {
+                    client().admin().indices().syncedFlush(syncedFlushRequest(indices).indicesOptions(IndicesOptions.lenientExpandOpen()),
+                        new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
                 }
             } else if (rarely()) {
                 client().admin().indices().prepareForceMerge(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute(
-                        new LatchedActionListener<ForceMergeResponse>(newLatch(inFlightAsyncOperations)));
+                        new LatchedActionListener<>(newLatch(inFlightAsyncOperations)));
             }
         }
         while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) {