Browse Source

Merge pull request #19454 from abeyad/remove-write-consistency-level

Removes write consistency level across replication action APIs in favor of wait_for_active_shards
Ali Beyad 9 years ago
parent
commit
3d2a105825
61 changed files with 523 additions and 443 deletions
  1. 1 2
      buildSrc/src/main/resources/checkstyle_suppressions.xml
  2. 0 70
      core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java
  3. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
  4. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
  5. 2 0
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java
  6. 0 5
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java
  7. 4 1
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java
  8. 1 5
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java
  9. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java
  10. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java
  11. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java
  12. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java
  13. 20 9
      core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
  14. 15 4
      core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java
  15. 1 1
      core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  16. 1 0
      core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
  17. 1 0
      core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  18. 1 1
      core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java
  19. 27 37
      core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java
  20. 32 43
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
  21. 27 9
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  22. 15 4
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java
  23. 8 15
      core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  24. 3 3
      core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java
  25. 20 9
      core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
  26. 15 4
      core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
  27. 31 2
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  28. 6 1
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
  29. 1 0
      core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  30. 4 4
      core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java
  31. 4 4
      core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java
  32. 4 4
      core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java
  33. 4 4
      core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java
  34. 31 0
      core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java
  35. 10 0
      core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
  36. 19 53
      core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java
  37. 5 4
      core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
  38. 17 19
      core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java
  39. 21 37
      core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
  40. 35 6
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  41. 5 1
      core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  42. 1 0
      core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java
  43. 2 6
      core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java
  44. 1 2
      core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java
  45. 2 5
      core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
  46. 1 1
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
  47. 4 4
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java
  48. 21 11
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
  49. 6 4
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java
  50. 3 4
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java
  51. 4 4
      modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_wait_for_active_shards.yaml
  52. 4 4
      modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_wait_for_active_shards.yaml
  53. 4 4
      modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml
  54. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
  55. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/delete.json
  56. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json
  57. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/index.json
  58. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json
  59. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/update.json
  60. 3 4
      rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json
  61. 4 5
      test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java

+ 1 - 2
buildSrc/src/main/resources/checkstyle_suppressions.xml

@@ -681,6 +681,7 @@
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]AutoCreateIndexTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptionsTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
+  <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
@@ -799,7 +800,6 @@
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]cbor[/\\]JsonVsCborTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]smile[/\\]JsonVsSmileTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]consistencylevel[/\\]WriteConsistencyLevelIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
@@ -1025,7 +1025,6 @@
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWhileRelocatingIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomExceptionsIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomIOExceptionsIT.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportSearchFailuresIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportTwoNodesSearchIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ChildQuerySearchIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ParentFieldLoadingIT.java" checks="LineLength" />

+ 0 - 70
core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java

@@ -1,70 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action;
-
-
-/**
- * Write Consistency Level control how many replicas should be active for a write operation to occur (a write operation
- * can be index, or delete).
- *
- *
- */
-public enum WriteConsistencyLevel {
-    DEFAULT((byte) 0),
-    ONE((byte) 1),
-    QUORUM((byte) 2),
-    ALL((byte) 3);
-
-    private final byte id;
-
-    WriteConsistencyLevel(byte id) {
-        this.id = id;
-    }
-
-    public byte id() {
-        return id;
-    }
-
-    public static WriteConsistencyLevel fromId(byte value) {
-        if (value == 0) {
-            return DEFAULT;
-        } else if (value == 1) {
-            return ONE;
-        } else if (value == 2) {
-            return QUORUM;
-        } else if (value == 3) {
-            return ALL;
-        }
-        throw new IllegalArgumentException("No write consistency match [" + value + "]");
-    }
-
-    public static WriteConsistencyLevel fromString(String value) {
-        if (value.equals("default")) {
-            return DEFAULT;
-        } else if (value.equals("one")) {
-            return ONE;
-        } else if (value.equals("quorum")) {
-            return QUORUM;
-        } else if (value.equals("all")) {
-            return ALL;
-        }
-        throw new IllegalArgumentException("No write consistency match [" + value + "]");
-    }
-}

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java

@@ -466,6 +466,15 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
         return this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
 
     @Override
     public void readFrom(StreamInput in) throws IOException {

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java

@@ -269,4 +269,13 @@ public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder<Create
         request.waitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 2 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.admin.indices.flush;
 
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
     public ShardFlushRequest(FlushRequest request, ShardId shardId) {
         super(shardId);
         this.request = request;
+        this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
     }
 
     public ShardFlushRequest() {

+ 0 - 5
core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

@@ -69,11 +69,6 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
         return new ReplicaResult();
     }
 
-    @Override
-    protected boolean checkWriteConsistency() {
-        return false;
-    }
-
     @Override
     protected ClusterBlockLevel globalBlockLevel() {
         return ClusterBlockLevel.METADATA_WRITE;

+ 4 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.refresh;
 
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.BasicReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
@@ -54,7 +55,9 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
 
     @Override
     protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
-        return new BasicReplicationRequest(shardId);
+        BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
+        replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
+        return replicationRequest;
     }
 
     @Override

+ 1 - 5
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -35,6 +35,7 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+
 public class TransportShardRefreshAction
         extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {
 
@@ -70,11 +71,6 @@ public class TransportShardRefreshAction
         return new ReplicaResult();
     }
 
-    @Override
-    protected boolean checkWriteConsistency() {
-        return false;
-    }
-
     @Override
     protected ClusterBlockLevel globalBlockLevel() {
         return ClusterBlockLevel.METADATA_WRITE;

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

@@ -225,4 +225,13 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         this.createIndexRequest.waitForActiveShards(waitForActiveShards);
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public void setWaitForActiveShards(final int waitForActiveShards) {
+        setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
 }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java

@@ -90,4 +90,13 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
         this.request.setWaitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public RolloverRequestBuilder waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java

@@ -144,6 +144,15 @@ public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements
         this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public void setWaitForActiveShards(final int waitForActiveShards) {
+        setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     public void source(BytesReference source) {
         XContentType xContentType = XContentFactory.xContentType(source);
         if (xContentType != null) {

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java

@@ -64,4 +64,13 @@ public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder<ShrinkReque
         this.request.setWaitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public ShrinkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 20 - 9
core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -23,10 +23,11 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.CompositeIndicesRequest;
 import org.elasticsearch.action.IndicesRequest;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
@@ -68,7 +69,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
     List<Object> payloads = null;
 
     protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
-    private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
+    private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
     private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
 
     private long sizeInBytes = 0;
@@ -432,15 +433,25 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
     }
 
     /**
-     * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        this.consistencyLevel = consistencyLevel;
+    public BulkRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
         return this;
     }
 
-    public WriteConsistencyLevel consistencyLevel() {
-        return this.consistencyLevel;
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public BulkRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
+    public ActiveShardCount waitForActiveShards() {
+        return this.waitForActiveShards;
     }
 
     @Override
@@ -525,7 +536,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
+        waitForActiveShards = ActiveShardCount.readFrom(in);
         int size = in.readVInt();
         for (int i = 0; i < size; i++) {
             byte type = in.readByte();
@@ -550,7 +561,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeByte(consistencyLevel.id());
+        waitForActiveShards.writeTo(out);
         out.writeVInt(requests.size());
         for (ActionRequest<?> request : requests) {
             if (request instanceof IndexRequest) {

+ 15 - 4
core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java

@@ -20,12 +20,13 @@
 package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteRequestBuilder;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
@@ -111,13 +112,23 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
     }
 
     /**
-     * Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        request.consistencyLevel(consistencyLevel);
+    public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
+        request.waitForActiveShards(waitForActiveShards);
         return this;
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
      */

+ 1 - 1
core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -339,7 +339,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             final List<BulkItemRequest> requests = entry.getValue();
             BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                     requests.toArray(new BulkItemRequest[requests.size()]));
-            bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
+            bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
             bulkShardRequest.timeout(bulkRequest.timeout());
             if (task != null) {
                 bulkShardRequest.setParentTask(nodeId, task.getId());

+ 1 - 0
core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

@@ -93,6 +93,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
 
     @Override
     protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
+        super.resolveRequest(metaData, indexMetaData, request);
         resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
         ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
             indexMetaData.getIndex().getName(), request.id(), request.routing());

+ 1 - 0
core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -121,6 +121,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
 
     @Override
     protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
+        super.resolveRequest(metaData, indexMetaData, request);
         MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
         request.resolveRouting(metaData);
         request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());

+ 1 - 1
core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java

@@ -163,7 +163,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
             } else {
                 BulkRequest modifiedBulkRequest = new BulkRequest();
                 modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
-                modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
+                modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                 modifiedBulkRequest.timeout(bulkRequest.timeout());
 
                 int slot = 0;

+ 27 - 37
core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java

@@ -30,6 +30,8 @@ import org.elasticsearch.common.io.stream.Writeable;
 
 import java.io.IOException;
 
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
+
 /**
  * A class whose instances represent a value for counting the number
  * of active shard copies for a given shard in an index.
@@ -63,8 +65,16 @@ public final class ActiveShardCount implements Writeable {
         return get(value);
     }
 
+    /**
+     * Validates that the instance is valid for the given number of replicas in an index.
+     */
+    public boolean validate(final int numberOfReplicas) {
+        assert numberOfReplicas >= 0;
+        return value <= numberOfReplicas + 1;
+    }
+
     private static ActiveShardCount get(final int value) {
-        switch (validateValue(value)) {
+        switch (value) {
             case ACTIVE_SHARD_COUNT_DEFAULT:
                 return DEFAULT;
             case ALL_ACTIVE_SHARDS:
@@ -74,6 +84,7 @@ public final class ActiveShardCount implements Writeable {
             case 0:
                 return NONE;
             default:
+                assert value > 1;
                 return new ActiveShardCount(value);
         }
     }
@@ -87,29 +98,6 @@ public final class ActiveShardCount implements Writeable {
         return get(in.readInt());
     }
 
-    private static int validateValue(final int value) {
-        if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) {
-            throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]");
-        }
-        return value;
-    }
-
-    /**
-     * Resolve this instance to an actual integer value for the number of active shard counts.
-     * If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is
-     * used to determine what the actual active shard count should be.  The default value indicates
-     * one active shard.
-     */
-    public int resolve(final IndexMetaData indexMetaData) {
-        if (this == ActiveShardCount.DEFAULT) {
-            return 1;
-        } else if (this == ActiveShardCount.ALL) {
-            return indexMetaData.getNumberOfReplicas() + 1;
-        } else {
-            return value;
-        }
-    }
-
     /**
      * Parses the active shard count from the given string.  Valid values are "all" for
      * all shard copies, null for the default value (which defaults to one shard copy),
@@ -154,8 +142,12 @@ public final class ActiveShardCount implements Writeable {
             // all primary shards aren't active yet
             return false;
         }
+        ActiveShardCount waitForActiveShards = this;
+        if (waitForActiveShards == ActiveShardCount.DEFAULT) {
+            waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
+        }
         for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
-            if (enoughShardsActive(shardRouting.value, indexMetaData) == false) {
+            if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
                 // not enough active shard copies yet
                 return false;
             }
@@ -167,12 +159,14 @@ public final class ActiveShardCount implements Writeable {
      * Returns true iff the active shard count in the shard routing table is enough
      * to meet the required shard count represented by this instance.
      */
-    public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) {
-        if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) {
-            // not enough active shard copies yet
-            return false;
+    public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) {
+        if (this == ActiveShardCount.ALL) {
+            return shardRoutingTable.allShardsStarted();
+        } else if (this == ActiveShardCount.DEFAULT) {
+            return shardRoutingTable.primaryShard().started();
+        } else {
+            return shardRoutingTable.activeShards().size() >= value;
         }
-        return true;
     }
 
     @Override
@@ -194,18 +188,14 @@ public final class ActiveShardCount implements Writeable {
 
     @Override
     public String toString() {
-        final String valStr;
         switch (value) {
             case ALL_ACTIVE_SHARDS:
-                valStr = "ALL";
-                break;
+                return "ALL";
             case ACTIVE_SHARD_COUNT_DEFAULT:
-                valStr = "DEFAULT";
-                break;
+                return "DEFAULT";
             default:
-                valStr = Integer.toString(value);
+                return Integer.toString(value);
         }
-        return "ActiveShardCount[" + valStr + "]";
     }
 
 }

+ 32 - 43
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -22,7 +22,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.UnavailableShardsException;
-import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -68,7 +68,6 @@ public class ReplicationOperation<
     private final AtomicInteger pendingShards = new AtomicInteger();
     private final AtomicInteger successfulShards = new AtomicInteger();
     private final boolean executeOnReplicas;
-    private final boolean checkWriteConsistency;
     private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
     private final Replicas<ReplicaRequest> replicasProxy;
     private final AtomicBoolean finished = new AtomicBoolean();
@@ -80,10 +79,8 @@ public class ReplicationOperation<
 
     public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
                                 ActionListener<PrimaryResultT> listener,
-                                boolean executeOnReplicas, boolean checkWriteConsistency,
-                                Replicas<ReplicaRequest> replicas,
+                                boolean executeOnReplicas, Replicas<ReplicaRequest> replicas,
                                 Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
-        this.checkWriteConsistency = checkWriteConsistency;
         this.executeOnReplicas = executeOnReplicas;
         this.replicasProxy = replicas;
         this.primary = primary;
@@ -95,12 +92,12 @@ public class ReplicationOperation<
     }
 
     public void execute() throws Exception {
-        final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
+        final String activeShardCountFailure = checkActiveShardCount();
         final ShardRouting primaryRouting = primary.routingEntry();
         final ShardId primaryId = primaryRouting.shardId();
-        if (writeConsistencyFailure != null) {
+        if (activeShardCountFailure != null) {
             finishAsFailed(new UnavailableShardsException(primaryId,
-                "{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
+                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
             return;
         }
 
@@ -190,46 +187,38 @@ public class ReplicationOperation<
     }
 
     /**
-     * checks whether we can perform a write based on the write consistency setting
-     * returns **null* if OK to proceed, or a string describing the reason to stop
+     * Checks whether we can perform a write based on the required active shard count setting.
+     * Returns **null* if OK to proceed, or a string describing the reason to stop
      */
-    String checkWriteConsistency() {
-        assert request.consistencyLevel() != WriteConsistencyLevel.DEFAULT : "consistency level should be set";
+    protected String checkActiveShardCount() {
         final ShardId shardId = primary.routingEntry().shardId();
+        final String indexName = shardId.getIndexName();
         final ClusterState state = clusterStateSupplier.get();
-        final WriteConsistencyLevel consistencyLevel = request.consistencyLevel();
-        final int sizeActive;
-        final int requiredNumber;
-        IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
-        if (indexRoutingTable != null) {
-            IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
-            if (shardRoutingTable != null) {
-                sizeActive = shardRoutingTable.activeShards().size();
-                if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
-                    // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica,
-                    // quorum is 1 (which is what it is initialized to)
-                    requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
-                } else if (consistencyLevel == WriteConsistencyLevel.ALL) {
-                    requiredNumber = shardRoutingTable.getSize();
-                } else {
-                    requiredNumber = 1;
-                }
-            } else {
-                sizeActive = 0;
-                requiredNumber = 1;
-            }
-        } else {
-            sizeActive = 0;
-            requiredNumber = 1;
+        assert state != null : "replication operation must have access to the cluster state";
+        final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
+        if (waitForActiveShards == ActiveShardCount.NONE) {
+            return null;  // not waiting for any shards
         }
-
-        if (sizeActive < requiredNumber) {
-            logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." +
-                " op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request);
-            return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed "
-                + requiredNumber + ").";
-        } else {
+        IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName);
+        if (indexRoutingTable == null) {
+            logger.trace("[{}] index not found in the routing table", shardId);
+            return "Index " + indexName + " not found in the routing table";
+        }
+        IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
+        if (shardRoutingTable == null) {
+            logger.trace("[{}] shard not found in the routing table", shardId);
+            return "Shard " + shardId + " not found in the routing table";
+        }
+        if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) {
             return null;
+        } else {
+            final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL ? Integer.toString(shardRoutingTable.shards().size())
+                                              : waitForActiveShards.toString();
+            logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " +
+                         "request [{}]", shardId, waitForActiveShards, shardRoutingTable.activeShards().size(),
+                         resolvedShards, opType, request);
+            return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " +
+                       shardRoutingTable.activeShards().size() + ", needed " + resolvedShards + ").";
         }
     }
 

+ 27 - 9
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -22,9 +22,9 @@ package org.elasticsearch.action.support.replication;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -60,7 +60,10 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
     protected TimeValue timeout = DEFAULT_TIMEOUT;
     protected String index;
 
-    private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
+    /**
+     * The number of shard copies that must be active before proceeding with the replication action.
+     */
+    protected ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
 
     private long routedBasedOnClusterVersion = 0;
 
@@ -116,8 +119,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
     }
 
-    public WriteConsistencyLevel consistencyLevel() {
-        return this.consistencyLevel;
+    public ActiveShardCount waitForActiveShards() {
+        return this.waitForActiveShards;
     }
 
     /**
@@ -130,14 +133,29 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
     }
 
     /**
-     * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
+     * Sets the number of shard copies that must be active before proceeding with the replication
+     * operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy
+     * (the primary) to be active. Set this value to {@link ActiveShardCount#ALL} to
+     * wait for all shards (primary and all replicas) to be active. Otherwise, use
+     * {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
+     * total number of shard copies (number of replicas + 1).
      */
     @SuppressWarnings("unchecked")
-    public final Request consistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        this.consistencyLevel = consistencyLevel;
+    public final Request waitForActiveShards(ActiveShardCount waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
         return (Request) this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    @SuppressWarnings("unchecked")
+    public final Request waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
      * Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
@@ -179,7 +197,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         } else {
             shardId = null;
         }
-        consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
+        waitForActiveShards = ActiveShardCount.readFrom(in);
         timeout = new TimeValue(in);
         index = in.readString();
         routedBasedOnClusterVersion = in.readVLong();
@@ -195,7 +213,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         } else {
             out.writeBoolean(false);
         }
-        out.writeByte(consistencyLevel.id());
+        waitForActiveShards.writeTo(out);
         timeout.writeTo(out);
         out.writeString(index);
         out.writeVLong(routedBasedOnClusterVersion);

+ 15 - 4
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java

@@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication;
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.unit.TimeValue;
 
@@ -60,11 +60,22 @@ public abstract class ReplicationRequestBuilder<Request extends ReplicationReque
     }
 
     /**
-     * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
     @SuppressWarnings("unchecked")
-    public RequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        request.consistencyLevel(consistencyLevel);
+    public RequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
+        request.waitForActiveShards(waitForActiveShards);
         return (RequestBuilder) this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    @SuppressWarnings("unchecked")
+    public RequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 8 - 15
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -23,8 +23,8 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.UnavailableShardsException;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.client.transport.NoNodeAvailableException;
@@ -90,7 +90,6 @@ public abstract class TransportReplicationAction<
     protected final ClusterService clusterService;
     protected final IndicesService indicesService;
     private final ShardStateAction shardStateAction;
-    private final WriteConsistencyLevel defaultWriteConsistencyLevel;
     private final TransportRequestOptions transportOptions;
     private final String executor;
 
@@ -122,8 +121,6 @@ public abstract class TransportReplicationAction<
 
         this.transportOptions = transportOptions();
 
-        this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
-
         this.replicasProxy = new ReplicasProxy();
     }
 
@@ -149,6 +146,11 @@ public abstract class TransportReplicationAction<
      * @param request       the request to resolve
      */
     protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Request request) {
+        if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
+            // if the wait for active shard count has not been set in the request,
+            // resolve it from the index settings
+            request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
+        }
     }
 
     /**
@@ -164,13 +166,6 @@ public abstract class TransportReplicationAction<
      */
     protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);
 
-    /**
-     * True if write consistency should be checked for an implementation
-     */
-    protected boolean checkWriteConsistency() {
-        return true;
-    }
-
     /**
      * Cluster level block to check before request execution
      */
@@ -353,7 +348,7 @@ public abstract class TransportReplicationAction<
             Request request, ActionListener<PrimaryResult> listener,
             PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
             return new ReplicationOperation<>(request, primaryShardReference, listener,
-                executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName
+                executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
             );
         }
     }
@@ -566,11 +561,9 @@ public abstract class TransportReplicationAction<
             }
 
             // resolve all derived request fields, so we can route and apply it
-            if (request.consistencyLevel() == WriteConsistencyLevel.DEFAULT) {
-                request.consistencyLevel(defaultWriteConsistencyLevel);
-            }
             resolveRequest(state.metaData(), indexMetaData, request);
             assert request.shardId() != null : "request shardId must be set in resolveRequest";
+            assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
 
             final ShardRouting primary = primary(state);
             if (retryIfUnavailable(state, primary)) {

+ 3 - 3
core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

@@ -131,7 +131,7 @@ public class UpdateHelper extends AbstractComponent {
                     .setRefreshPolicy(request.getRefreshPolicy())
                     .routing(request.routing())
                     .parent(request.parent())
-                    .consistencyLevel(request.consistencyLevel());
+                    .waitForActiveShards(request.waitForActiveShards());
             if (request.versionType() != VersionType.INTERNAL) {
                 // in all but the internal versioning mode, we want to create the new document using the given version.
                 indexRequest.version(request.version()).versionType(request.versionType());
@@ -224,14 +224,14 @@ public class UpdateHelper extends AbstractComponent {
             final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
                     .source(updatedSourceAsMap, updateSourceContentType)
                     .version(updateVersion).versionType(request.versionType())
-                    .consistencyLevel(request.consistencyLevel())
+                    .waitForActiveShards(request.waitForActiveShards())
                     .timestamp(timestamp).ttl(ttl)
                     .setRefreshPolicy(request.getRefreshPolicy());
             return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
         } else if ("delete".equals(operation)) {
             DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
                     .version(updateVersion).versionType(request.versionType())
-                    .consistencyLevel(request.consistencyLevel())
+                    .waitForActiveShards(request.waitForActiveShards())
                     .setRefreshPolicy(request.getRefreshPolicy());
             return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
         } else if ("none".equals(operation)) {

+ 20 - 9
core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

@@ -21,9 +21,10 @@ package org.elasticsearch.action.update;
 
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.DocumentRequest;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseFieldMatcher;
@@ -74,7 +75,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
 
     private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
 
-    private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
+    private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
 
     private IndexRequest upsertRequest;
 
@@ -433,18 +434,28 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
         return refreshPolicy;
     }
 
-    public WriteConsistencyLevel consistencyLevel() {
-        return this.consistencyLevel;
+    public ActiveShardCount waitForActiveShards() {
+        return this.waitForActiveShards;
     }
 
     /**
-     * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        this.consistencyLevel = consistencyLevel;
+    public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
         return this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public UpdateRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the doc to use for updates when a script is not specified.
      */
@@ -703,7 +714,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
+        waitForActiveShards = ActiveShardCount.readFrom(in);
         type = in.readString();
         id = in.readString();
         routing = in.readOptionalString();
@@ -738,7 +749,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeByte(consistencyLevel.id());
+        waitForActiveShards.writeTo(out);
         out.writeString(type);
         out.writeString(id);
         out.writeOptionalString(routing);

+ 15 - 4
core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java

@@ -19,9 +19,10 @@
 
 package org.elasticsearch.action.update;
 
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteRequestBuilder;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -122,13 +123,23 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
     }
 
     /**
-     * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public UpdateRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
-        request.consistencyLevel(consistencyLevel);
+    public UpdateRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
+        request.waitForActiveShards(waitForActiveShards);
         return this;
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public UpdateRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the doc to use for updates when a script is not specified.
      */

+ 31 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
@@ -219,6 +220,16 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
         Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
 
+    /**
+     * The number of active shard copies to check for before proceeding with a write operation.
+     */
+    public static final Setting<ActiveShardCount> SETTING_WAIT_FOR_ACTIVE_SHARDS =
+        new Setting<>("index.write.wait_for_active_shards",
+                      "1",
+                      ActiveShardCount::parseString,
+                      Setting.Property.Dynamic,
+                      Setting.Property.IndexScope);
+
     public static final IndexMetaData PROTO = IndexMetaData.builder("")
             .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
             .numberOfShards(1).numberOfReplicas(0).build();
@@ -266,12 +277,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private final Version indexUpgradedVersion;
     private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
 
+    private final ActiveShardCount waitForActiveShards;
+
     private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
                           ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
                           ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
                           DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
                           Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
-                          int routingNumShards) {
+                          int routingNumShards, ActiveShardCount waitForActiveShards) {
 
         this.index = index;
         this.version = version;
@@ -295,6 +308,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
         this.routingNumShards = routingNumShards;
         this.routingFactor = routingNumShards / numberOfShards;
+        this.waitForActiveShards = waitForActiveShards;
         assert numberOfShards * routingFactor == routingNumShards :  routingNumShards + " must be a multiple of " + numberOfShards;
     }
 
@@ -378,6 +392,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         return totalNumberOfShards;
     }
 
+    /**
+     * Returns the configured {@link #SETTING_WAIT_FOR_ACTIVE_SHARDS}, which defaults
+     * to an active shard count of 1 if not specified.
+     */
+    public ActiveShardCount getWaitForActiveShards() {
+        return waitForActiveShards;
+    }
+
     public Settings getSettings() {
         return settings;
     }
@@ -973,10 +995,17 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                     + "] but should be equal to number of shards [" + numberOfShards() + "]");
             }
 
+            final ActiveShardCount waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(settings);
+            if (waitForActiveShards.validate(numberOfReplicas) == false) {
+                throw new IllegalArgumentException("invalid " + SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey() +
+                                                   "[" + waitForActiveShards + "]: cannot be greater than " +
+                                                   "number of shard copies [" + (numberOfReplicas + 1) + "]");
+            }
+
             final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
             return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
                 tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
-                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards());
+                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards);
         }
 
         public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {

+ 6 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -27,6 +27,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.ActiveShardsObserver;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
@@ -347,7 +348,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
                                 .setRoutingNumShards(routingNumShards);
                             // Set up everything, now locally create the index to see that things are ok, and apply
                             final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
-                            if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) {
+                            ActiveShardCount waitForActiveShards = request.waitForActiveShards();
+                            if (waitForActiveShards == ActiveShardCount.DEFAULT) {
+                                waitForActiveShards = tmpImd.getWaitForActiveShards();
+                            }
+                            if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
                                 throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
                                                                    "]: cannot be greater than number of shard copies [" +
                                                                    (tmpImd.getNumberOfReplicas() + 1) + "]");

+ 1 - 0
core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
         FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
         EngineConfig.INDEX_CODEC_SETTING,
+        IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
         // validate that built-in similarities don't get redefined
         Setting.groupSetting("index.similarity.", (s) -> {
             Map<String, Settings> groups = s.getAsGroups();

+ 4 - 4
core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java

@@ -19,11 +19,11 @@
 
 package org.elasticsearch.rest.action.bulk;
 
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.BulkShardRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.Strings;
@@ -79,9 +79,9 @@ public class RestBulkAction extends BaseRestHandler {
         String defaultPipeline = request.param("pipeline");
         String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null;
 
-        String consistencyLevel = request.param("consistency");
-        if (consistencyLevel != null) {
-            bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
+        String waitForActiveShards = request.param("wait_for_active_shards");
+        if (waitForActiveShards != null) {
+            bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
         }
         bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
         bulkRequest.setRefreshPolicy(request.param("refresh"));

+ 4 - 4
core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.rest.action.delete;
 
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -55,9 +55,9 @@ public class RestDeleteAction extends BaseRestHandler {
         deleteRequest.version(RestActions.parseVersion(request));
         deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
 
-        String consistencyLevel = request.param("consistency");
-        if (consistencyLevel != null) {
-            deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
+        String waitForActiveShards = request.param("wait_for_active_shards");
+        if (waitForActiveShards != null) {
+            deleteRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
         }
 
         client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));

+ 4 - 4
core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.rest.action.index;
 
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -98,9 +98,9 @@ public class RestIndexAction extends BaseRestHandler {
                 }
             }
         }
-        String consistencyLevel = request.param("consistency");
-        if (consistencyLevel != null) {
-            indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
+        String waitForActiveShards = request.param("wait_for_active_shards");
+        if (waitForActiveShards != null) {
+            indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
         }
         client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
     }

+ 4 - 4
core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.rest.action.update;
 
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.Strings;
@@ -53,9 +53,9 @@ public class RestUpdateAction extends BaseRestHandler {
         updateRequest.parent(request.param("parent"));
         updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
         updateRequest.setRefreshPolicy(request.param("refresh"));
-        String consistencyLevel = request.param("consistency");
-        if (consistencyLevel != null) {
-            updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
+        String waitForActiveShards = request.param("wait_for_active_shards");
+        if (waitForActiveShards != null) {
+            updateRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
         }
         updateRequest.docAsUpsert(request.paramAsBoolean("doc_as_upsert", updateRequest.docAsUpsert()));
         String sField = request.param("fields");

+ 31 - 0
core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
@@ -475,4 +476,34 @@ public class CreateIndexIT extends ESIntegTestCase {
         ensureGreen();
         assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
     }
+
+    /**
+     * This test ensures that index creation adheres to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS}.
+     */
+    public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception {
+        final String indexName = "test";
+        final int numReplicas = internalCluster().numDataNodes();
+        Settings settings = Settings.builder()
+                                .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas))
+                                .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                                .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas)
+                                .build();
+        assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(settings).get());
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        // all should fail
+        settings = Settings.builder()
+                       .put(settings)
+                       .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "all")
+                       .build();
+        assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked());
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        // the numeric equivalent of all should also fail
+        settings = Settings.builder()
+                       .put(settings)
+                       .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas + 1))
+                       .build();
+        assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked());
+    }
 }

+ 10 - 0
core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.action.index;
 
 import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.test.ESTestCase;
@@ -131,4 +132,13 @@ public class IndexRequestTests extends ESTestCase {
         assertThat(validate, notNullValue());
         assertThat(validate.getMessage(), containsString("ttl must not be negative"));
     }
+
+    public void testWaitForActiveShards() {
+        IndexRequest request = new IndexRequest("index", "type");
+        final int count = randomIntBetween(0, 10);
+        request.waitForActiveShards(ActiveShardCount.from(count));
+        assertEquals(request.waitForActiveShards(), ActiveShardCount.from(count));
+        // test negative shard count value not allowed
+        expectThrows(IllegalArgumentException.class, () -> request.waitForActiveShards(ActiveShardCount.from(randomIntBetween(-10, -1))));
+    }
 }

+ 19 - 53
core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java

@@ -37,8 +37,6 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.hamcrest.Matchers.equalTo;
-
 /**
  * Tests for the {@link ActiveShardCount} class
  */
@@ -47,41 +45,10 @@ public class ActiveShardCountTests extends ESTestCase {
     public void testFromIntValue() {
         assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE);
         final int value = randomIntBetween(1, 50);
-        IndexMetaData indexMetaData = IndexMetaData.builder("test")
-                                                   .settings(settings(Version.CURRENT))
-                                                   .numberOfShards(1)
-                                                   .numberOfReplicas(0)
-                                                   .build();
-        assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value);
+        assertEquals(ActiveShardCount.from(value).toString(), Integer.toString(value));
         expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1)));
     }
 
-    public void testResolve() {
-        // one shard
-        IndexMetaData indexMetaData = IndexMetaData.builder("test")
-                                                   .settings(settings(Version.CURRENT))
-                                                   .numberOfShards(1)
-                                                   .numberOfReplicas(0)
-                                                   .build();
-        assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1));
-        assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
-        assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
-        final int value = randomIntBetween(2, 20);
-        assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
-
-        // more than one shard
-        final int numNewShards = randomIntBetween(1, 20);
-        indexMetaData = IndexMetaData.builder("test")
-                                     .settings(settings(Version.CURRENT))
-                                     .numberOfShards(1)
-                                     .numberOfReplicas(numNewShards)
-                                     .build();
-        assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1));
-        assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
-        assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
-        assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
-    }
-
     public void testSerialization() throws IOException {
         doWriteRead(ActiveShardCount.ALL);
         doWriteRead(ActiveShardCount.DEFAULT);
@@ -101,6 +68,14 @@ public class ActiveShardCountTests extends ESTestCase {
         expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + ""));
     }
 
+    public void testValidate() {
+        assertTrue(ActiveShardCount.parseString("all").validate(randomIntBetween(0, 10)));
+        final int numReplicas = randomIntBetween(0, 10);
+        assertTrue(ActiveShardCount.from(randomIntBetween(0, numReplicas + 1)).validate(numReplicas));
+        // invalid values shouldn't validate
+        assertFalse(ActiveShardCount.from(numReplicas + randomIntBetween(2, 10)).validate(numReplicas));
+    }
+
     private void doWriteRead(ActiveShardCount activeShardCount) throws IOException {
         final BytesStreamOutput out = new BytesStreamOutput();
         activeShardCount.writeTo(out);
@@ -119,15 +94,11 @@ public class ActiveShardCountTests extends ESTestCase {
         final String indexName = "test-idx";
         final int numberOfShards = randomIntBetween(1, 5);
         final int numberOfReplicas = randomIntBetween(4, 7);
-        final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0);
+        final ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
         ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startPrimaries(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
-        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
-        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startAllShards(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
     }
@@ -145,14 +116,15 @@ public class ActiveShardCountTests extends ESTestCase {
         final String indexName = "test-idx";
         final int numberOfShards = randomIntBetween(1, 5);
         final int numberOfReplicas = randomIntBetween(4, 7);
-        final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas));
+        final int activeShardCount = randomIntBetween(2, numberOfReplicas);
+        final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount);
         ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startPrimaries(clusterState, indexName);
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
+        clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2);
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
+        clusterState = startWaitOnShards(clusterState, indexName, activeShardCount - 1);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startAllShards(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
@@ -168,7 +140,7 @@ public class ActiveShardCountTests extends ESTestCase {
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startPrimaries(clusterState, indexName);
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
+        clusterState = startLessThanWaitOnShards(clusterState, indexName, numberOfReplicas - randomIntBetween(1, numberOfReplicas));
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startAllShards(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
@@ -184,10 +156,6 @@ public class ActiveShardCountTests extends ESTestCase {
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startPrimaries(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
-        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
-        clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
-        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startAllShards(clusterState, indexName);
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
     }
@@ -223,16 +191,15 @@ public class ActiveShardCountTests extends ESTestCase {
         return ClusterState.builder(clusterState).routingTable(routingTable).build();
     }
 
-    private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName,
-                                                   final ActiveShardCount waitForActiveShards) {
+    private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) {
         RoutingTable routingTable = clusterState.routingTable();
         IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
         IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
         for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
             final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
             assert shardRoutingTable.getSize() > 2;
+            int numToStart = numShardsToStart;
             // want less than half, and primary is already started
-            int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2;
             for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
                 if (shardRouting.primary()) {
                     assertTrue(shardRouting.active());
@@ -250,15 +217,14 @@ public class ActiveShardCountTests extends ESTestCase {
         return ClusterState.builder(clusterState).routingTable(routingTable).build();
     }
 
-    private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName,
-                                           final ActiveShardCount waitForActiveShards) {
+    private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) {
         RoutingTable routingTable = clusterState.routingTable();
         IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
         IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
         for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
             final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
             assert shardRoutingTable.getSize() > 2;
-            int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started
+            int numToStart = numShardsToStart;
             for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
                 if (shardRouting.primary()) {
                     assertTrue(shardRouting.active());

+ 5 - 4
core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java

@@ -67,7 +67,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
         Settings settings = settingsBuilder.build();
         CreateIndexResponse response = prepareCreate("test-idx")
                                            .setSettings(settings)
-                                           .setWaitForActiveShards(ActiveShardCount.from(0))
+                                           .setWaitForActiveShards(ActiveShardCount.NONE)
                                            .get();
         assertTrue(response.isAcknowledged());
     }
@@ -83,7 +83,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
         final String indexName = "test-idx";
         assertFalse(prepareCreate(indexName)
                        .setSettings(settings)
-                       .setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1)))
+                       .setWaitForActiveShards(randomIntBetween(numDataNodes + 1, numReplicas + 1))
                        .setTimeout("100ms")
                        .get()
                        .isShardsAcked());
@@ -97,8 +97,9 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
                                 .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
                                 .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3))
                                 .build();
-        ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes()));
-        assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get());
+        assertAcked(prepareCreate(indexName).setSettings(settings)
+                        .setWaitForActiveShards(randomIntBetween(0, internalCluster().numDataNodes()))
+                        .get());
     }
 
     public void testCreateIndexWaitsForAllActiveShards() throws Exception {

+ 17 - 19
core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java → core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java

@@ -17,10 +17,9 @@
  * under the License.
  */
 
-package org.elasticsearch.consistencylevel;
+package org.elasticsearch.action.support;
 
 import org.elasticsearch.action.UnavailableShardsException;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
@@ -35,26 +34,25 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
 import static org.hamcrest.Matchers.equalTo;
 
 /**
- *
+ * Tests setting the active shard count for replication operations (e.g. index) operates correctly.
  */
-public class WriteConsistencyLevelIT extends ESIntegTestCase {
-    public void testWriteConsistencyLevelReplication2() throws Exception {
+public class WaitActiveShardCountIT extends ESIntegTestCase {
+    public void testReplicationWaitsForActiveShardCount() throws Exception {
         CreateIndexResponse createIndexResponse =
-            prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2))
-                .get();
+            prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)).get();
 
         assertAcked(createIndexResponse);
 
-        // indexing, by default, will work (ONE consistency level)
-        client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
+        // indexing, by default, will work (waiting for one shard copy only)
+        client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).execute().actionGet();
         try {
             client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                    .setConsistencyLevel(WriteConsistencyLevel.QUORUM)
+                    .setWaitForActiveShards(2) // wait for 2 active shard copies
                     .setTimeout(timeValueMillis(100)).execute().actionGet();
-            fail("can't index, does not match consistency");
+            fail("can't index, does not enough active shard copies");
         } catch (UnavailableShardsException e) {
             assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
-            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
+            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
             // but really, all is well
         }
 
@@ -71,19 +69,19 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
         assertThat(clusterHealth.isTimedOut(), equalTo(false));
         assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
 
-        // this should work, since we now have
+        // this should work, since we now have two
         client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                .setConsistencyLevel(WriteConsistencyLevel.QUORUM)
+                .setWaitForActiveShards(2)
                 .setTimeout(timeValueSeconds(1)).execute().actionGet();
 
         try {
             client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                    .setConsistencyLevel(WriteConsistencyLevel.ALL)
+                    .setWaitForActiveShards(ActiveShardCount.ALL)
                     .setTimeout(timeValueMillis(100)).execute().actionGet();
-            fail("can't index, does not match consistency");
+            fail("can't index, not enough active shard copies");
         } catch (UnavailableShardsException e) {
             assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
-            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
+            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
             // but really, all is well
         }
 
@@ -93,9 +91,9 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
         assertThat(clusterHealth.isTimedOut(), equalTo(false));
         assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
 
-        // this should work, since we now have
+        // this should work, since we now have all shards started
         client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                .setConsistencyLevel(WriteConsistencyLevel.ALL)
+                .setWaitForActiveShards(ActiveShardCount.ALL)
                 .setTimeout(timeValueSeconds(1)).execute().actionGet();
     }
 

+ 21 - 37
core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

@@ -22,7 +22,7 @@ import org.apache.lucene.index.CorruptIndexException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.UnavailableShardsException;
-import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.elasticsearch.cluster.ClusterState;
@@ -136,7 +136,7 @@ public class ReplicationOperationTests extends ESTestCase {
         Request request = new Request(shardId);
         PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
         final TestReplicationOperation op = new TestReplicationOperation(request,
-            new TestPrimary(primaryShard, primaryTerm), listener, false, false,
+            new TestPrimary(primaryShard, primaryTerm), listener, false,
             new TestReplicaProxy(), () -> state, logger, "test");
         op.execute();
         assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@@ -251,34 +251,17 @@ public class ReplicationOperationTests extends ESTestCase {
         assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
     }
 
-    public void testWriteConsistency() throws Exception {
+    public void testWaitForActiveShards() throws Exception {
         final String index = "test";
         final ShardId shardId = new ShardId(index, "_na_", 0);
         final int assignedReplicas = randomInt(2);
         final int unassignedReplicas = randomInt(2);
         final int totalShards = 1 + assignedReplicas + unassignedReplicas;
-        final boolean passesWriteConsistency;
-        Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values()));
-        switch (request.consistencyLevel()) {
-            case ONE:
-                passesWriteConsistency = true;
-                break;
-            case DEFAULT:
-            case QUORUM:
-                if (totalShards <= 2) {
-                    passesWriteConsistency = true; // primary is enough
-                } else {
-                    passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1;
-                }
-                // we have to reset default (as the transport replication action will do)
-                request.consistencyLevel(WriteConsistencyLevel.QUORUM);
-                break;
-            case ALL:
-                passesWriteConsistency = unassignedReplicas == 0;
-                break;
-            default:
-                throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]");
-        }
+        final int activeShardCount = randomIntBetween(0, totalShards);
+        Request request = new Request(shardId).waitForActiveShards(
+            activeShardCount == totalShards ? ActiveShardCount.ALL : ActiveShardCount.from(activeShardCount));
+        final boolean passesActiveShardCheck = activeShardCount <= assignedReplicas + 1;
+
         ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
         for (int i = 0; i < assignedReplicas; i++) {
             replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
@@ -288,10 +271,10 @@ public class ReplicationOperationTests extends ESTestCase {
         }
 
         final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates);
-        logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." +
+        logger.debug("using active shard count of [{}], assigned shards [{}], total shards [{}]." +
                 " expecting op to [{}]. using state: \n{}",
-            request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas,
-            passesWriteConsistency ? "succeed" : "retry",
+            request.waitForActiveShards(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas,
+            passesActiveShardCheck ? "succeed" : "retry",
             state.prettyPrint());
         final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
         final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
@@ -299,17 +282,17 @@ public class ReplicationOperationTests extends ESTestCase {
         final ShardRouting primaryShard = shardRoutingTable.primaryShard();
         final TestReplicationOperation op = new TestReplicationOperation(request,
             new TestPrimary(primaryShard, primaryTerm),
-            listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test");
+            listener, randomBoolean(), new TestReplicaProxy(), () -> state, logger, "test");
 
-        if (passesWriteConsistency) {
-            assertThat(op.checkWriteConsistency(), nullValue());
+        if (passesActiveShardCheck) {
+            assertThat(op.checkActiveShardCount(), nullValue());
             op.execute();
-            assertTrue("operations should have been performed, consistency level is met",
+            assertTrue("operations should have been performed, active shard count is met",
                 request.processedOnPrimary.get());
         } else {
-            assertThat(op.checkWriteConsistency(), notNullValue());
+            assertThat(op.checkActiveShardCount(), notNullValue());
             op.execute();
-            assertFalse("operations should not have been perform, consistency level is *NOT* met",
+            assertFalse("operations should not have been perform, active shard count is *NOT* met",
                 request.processedOnPrimary.get());
             assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class);
         }
@@ -347,6 +330,7 @@ public class ReplicationOperationTests extends ESTestCase {
             this();
             this.shardId = shardId;
             this.index = shardId.getIndexName();
+            this.waitForActiveShards = ActiveShardCount.NONE;
             // keep things simple
         }
 
@@ -458,13 +442,13 @@ public class ReplicationOperationTests extends ESTestCase {
     class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
         public TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
                 ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier) {
-            this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
+            this(request, primary, listener, true, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
         }
 
         public TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
-                ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas, boolean checkWriteConsistency,
+                ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas,
                 Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
-            super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType);
+            super(request, primary, listener, executeOnReplicas, replicas, clusterStateSupplier, logger, opType);
         }
     }
 

+ 35 - 6
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.UnavailableShardsException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.transport.NoNodeAvailableException;
 import org.elasticsearch.cluster.ClusterState;
@@ -84,6 +85,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -677,6 +679,37 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertIndexShardCounter(0);
     }
 
+    /**
+     * This test ensures that replication operations adhere to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS} setting
+     * when the request is using the default value for waitForActiveShards.
+     */
+    public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception {
+        final String indexName = "test";
+        final ShardId shardId = new ShardId(indexName, "_na_", 0);
+
+        // test wait_for_active_shards index setting used when the default is set on the request
+        int numReplicas = randomIntBetween(0, 5);
+        int idxSettingWaitForActiveShards = randomIntBetween(0, numReplicas + 1);
+        ClusterState state = stateWithActivePrimary(indexName, randomBoolean(), numReplicas);
+        IndexMetaData indexMetaData = state.metaData().index(indexName);
+        Settings indexSettings = Settings.builder().put(indexMetaData.getSettings())
+                                     .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(idxSettingWaitForActiveShards))
+                                     .build();
+        MetaData.Builder metaDataBuilder = MetaData.builder(state.metaData())
+                                               .put(IndexMetaData.builder(indexMetaData).settings(indexSettings).build(), true);
+        state = ClusterState.builder(state).metaData(metaDataBuilder).build();
+        setState(clusterService, state);
+        Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.DEFAULT); // set to default so index settings are used
+        action.resolveRequest(state.metaData(), state.metaData().index(indexName), request);
+        assertEquals(ActiveShardCount.from(idxSettingWaitForActiveShards), request.waitForActiveShards());
+
+        // test wait_for_active_shards when default not set on the request (request value should be honored over index setting)
+        int requestWaitForActiveShards = randomIntBetween(0, numReplicas + 1);
+        request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(requestWaitForActiveShards));
+        action.resolveRequest(state.metaData(), state.metaData().index(indexName), request);
+        assertEquals(ActiveShardCount.from(requestWaitForActiveShards), request.waitForActiveShards());
+    }
+
     private void assertIndexShardCounter(int expected) {
         assertThat(count.get(), equalTo(expected));
     }
@@ -719,6 +752,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             this();
             this.shardId = shardId;
             this.index = shardId.getIndexName();
+            this.waitForActiveShards = ActiveShardCount.NONE;
             // keep things simple
         }
 
@@ -765,11 +799,6 @@ public class TransportReplicationActionTests extends ESTestCase {
             return new ReplicaResult();
         }
 
-        @Override
-        protected boolean checkWriteConsistency() {
-            return false;
-        }
-
         @Override
         protected boolean resolveIndex() {
             return false;
@@ -815,7 +844,7 @@ public class TransportReplicationActionTests extends ESTestCase {
 
     class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult> {
         public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult> listener) {
-            super(request, null, listener, true, true, null, null, TransportReplicationActionTests.this.logger, "noop");
+            super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
         }
 
         @Override

+ 5 - 1
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -398,7 +398,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
         private final ReplicationGroup replicationGroup;
 
         public IndexingOp(IndexRequest request, ActionListener<IndexingResult> listener, ReplicationGroup replicationGroup) {
-            super(request, new PrimaryRef(replicationGroup), listener, true, false, new ReplicasRef(replicationGroup),
+            super(request, new PrimaryRef(replicationGroup), listener, true, new ReplicasRef(replicationGroup),
                 () -> null, logger, "indexing");
             this.replicationGroup = replicationGroup;
             request.process(null, true, request.index());
@@ -409,6 +409,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
             return replicationGroup.shardRoutings();
         }
 
+        @Override
+        protected String checkActiveShardCount() {
+            return null;
+        }
     }
 
     private static class PrimaryRef implements ReplicationOperation.Primary<IndexRequest, IndexRequest, IndexingResult> {

+ 1 - 0
core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java

@@ -226,4 +226,5 @@ public class IndexActionIT extends ESIntegTestCase {
                     e.getMessage().contains("Invalid index name [..], must not be \'.\' or '..'"), equalTo(true));
         }
     }
+
 }

+ 2 - 6
core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java

@@ -54,12 +54,8 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
 
     private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception {
 
-        // make sure we have enough nodes to guaranty default QUORUM consistency.
-        // TODO: add a smarter choice based on actual consistency (when that is randomized)
-        int shardsNo = numberOfReplicas + 1;
-        int neededNodes = shardsNo <= 2 ? 1 : shardsNo / 2 + 1;
-        internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(neededNodes, shardsNo));
-
+        // TODO: randomize the wait for active shards value on index creation and ensure the appropriate
+        // number of data nodes are started for the randomized active shard count value
         String id = randomAsciiOfLength(5);
         // we will go the primary or the replica, but in a
         // randomized re-creatable manner

+ 1 - 2
core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.search.basic;
 
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
@@ -118,7 +117,7 @@ public class TransportSearchFailuresIT extends ESIntegTestCase {
     }
 
     private void index(Client client, String id, String nameValue, int age) throws IOException {
-        client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age)).consistencyLevel(WriteConsistencyLevel.ONE)).actionGet();
+        client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet();
     }
 
     private XContentBuilder source(String id, String nameValue, int age) throws IOException {

+ 2 - 5
core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.Client;
@@ -47,7 +46,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.zen.ZenDiscovery;
 import org.elasticsearch.discovery.zen.elect.ElectMasterService;
-import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.store.IndexStore;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.indices.ttl.IndicesTTLService;
@@ -690,11 +688,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621")
     public void testChaosSnapshot() throws Exception {
         final List<String> indices = new CopyOnWriteArrayList<>();
-        Settings settings = Settings.builder().put("action.write_consistency", "one").build();
         int initialNodes = between(1, 3);
         logger.info("--> start {} nodes", initialNodes);
         for (int i = 0; i < initialNodes; i++) {
-            internalCluster().startNode(settings);
+            internalCluster().startNode();
         }
 
         logger.info("-->  creating repository");
@@ -713,7 +710,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
 
         int asyncNodes = between(0, 5);
         logger.info("--> start {} additional nodes asynchronously", asyncNodes);
-        InternalTestCluster.Async<List<String>> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes, settings);
+        InternalTestCluster.Async<List<String>> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes);
 
         int asyncIndices = between(0, 10);
         logger.info("--> create {} additional indices asynchronously", asyncIndices);

+ 1 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -215,7 +215,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             return;
         }
         request.timeout(mainRequest.getTimeout());
-        request.consistencyLevel(mainRequest.getConsistency());
+        request.waitForActiveShards(mainRequest.getWaitForActiveShards());
         if (logger.isDebugEnabled()) {
             logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
                     new ByteSizeValue(request.estimatedSizeInBytes()));

+ 4 - 4
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java

@@ -21,7 +21,7 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.GenericAction;
-import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
@@ -108,9 +108,9 @@ public abstract class AbstractBaseReindexRestHandler<
         request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh()));
         request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout()));
 
-        String consistency = restRequest.param("consistency");
-        if (consistency != null) {
-            request.setConsistency(WriteConsistencyLevel.fromString(consistency));
+        String waitForActiveShards = restRequest.param("wait_for_active_shards");
+        if (waitForActiveShards != null) {
+            request.setWaitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
         }
 
         Float requestsPerSecond = parseRequestsPerSecond(restRequest);

+ 21 - 11
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -21,8 +21,8 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -71,9 +71,9 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
     private TimeValue timeout = ReplicationRequest.DEFAULT_TIMEOUT;
 
     /**
-     * Consistency level for write requests.
+     * The number of shard copies that must be active before proceeding with the write.
      */
-    private WriteConsistencyLevel consistency = WriteConsistencyLevel.DEFAULT;
+    private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT;
 
     /**
      * Initial delay after a rejection before retrying a bulk request. With the default maxRetries the total backoff for retrying rejections
@@ -223,20 +223,30 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
     }
 
     /**
-     * Consistency level for write requests.
+     * The number of shard copies that must be active before proceeding with the write.
      */
-    public WriteConsistencyLevel getConsistency() {
-        return consistency;
+    public ActiveShardCount getWaitForActiveShards() {
+        return activeShardCount;
     }
 
     /**
-     * Consistency level for write requests.
+     * Sets the number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public Self setConsistency(WriteConsistencyLevel consistency) {
-        this.consistency = consistency;
+    public Self setWaitForActiveShards(ActiveShardCount activeShardCount) {
+        this.activeShardCount = activeShardCount;
         return self();
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public Self setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Initial delay after a rejection before retrying request.
      */
@@ -317,7 +327,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
         size = in.readVInt();
         refresh = in.readBoolean();
         timeout = new TimeValue(in);
-        consistency = WriteConsistencyLevel.fromId(in.readByte());
+        activeShardCount = ActiveShardCount.readFrom(in);
         retryBackoffInitialTime = new TimeValue(in);
         maxRetries = in.readVInt();
         requestsPerSecond = in.readFloat();
@@ -331,7 +341,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
         out.writeVInt(size);
         out.writeBoolean(refresh);
         timeout.writeTo(out);
-        out.writeByte(consistency.id());
+        activeShardCount.writeTo(out);
         retryBackoffInitialTime.writeTo(out);
         out.writeVInt(maxRetries);
         out.writeFloat(requestsPerSecond);

+ 6 - 4
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java

@@ -21,8 +21,9 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -98,10 +99,11 @@ public abstract class AbstractBulkByScrollRequestBuilder<
     }
 
     /**
-     * Consistency level for write requests.
+     * The number of shard copies that must be active before proceeding with the write.
+     * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
      */
-    public Self consistency(WriteConsistencyLevel consistency) {
-        request.setConsistency(consistency);
+    public Self waitForActiveShards(ActiveShardCount activeShardCount) {
+        request.setWaitForActiveShards(activeShardCount);
         return self();
     }
 

+ 3 - 4
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
@@ -104,7 +103,7 @@ public class RoundTripTests extends ESTestCase {
         request.setAbortOnVersionConflict(random().nextBoolean());
         request.setRefresh(rarely());
         request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test"));
-        request.setConsistency(randomFrom(WriteConsistencyLevel.values()));
+        request.setWaitForActiveShards(randomIntBetween(0, 10));
         request.setScript(random().nextBoolean() ? null : randomScript());
         request.setRequestsPerSecond(between(0, Integer.MAX_VALUE));
     }
@@ -116,7 +115,7 @@ public class RoundTripTests extends ESTestCase {
         assertEquals(request.isAbortOnVersionConflict(), tripped.isAbortOnVersionConflict());
         assertEquals(request.isRefresh(), tripped.isRefresh());
         assertEquals(request.getTimeout(), tripped.getTimeout());
-        assertEquals(request.getConsistency(), tripped.getConsistency());
+        assertEquals(request.getWaitForActiveShards(), tripped.getWaitForActiveShards());
         assertEquals(request.getScript(), tripped.getScript());
         assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime());
         assertEquals(request.getMaxRetries(), tripped.getMaxRetries());
@@ -234,7 +233,7 @@ public class RoundTripTests extends ESTestCase {
             assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
             assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
         }
-        
+
     }
 
     private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {

+ 4 - 4
modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_consistency.yaml → modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_wait_for_active_shards.yaml

@@ -1,5 +1,5 @@
 ---
-"can override consistency":
+"can override wait_for_active_shards":
   - do:
       indices.create:
           index: test
@@ -12,7 +12,6 @@
         type:        test
         id:          1
         body:        {"text": "test"}
-        consistency: one
   - do:
       indices.refresh: {}
 
@@ -21,12 +20,13 @@
       delete_by_query:
         index: test
         timeout: 1s
+        wait_for_active_shards: 4
         body:
           query:
             match_all: {}
 
   - match:
-      failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
+      failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
 
   - do:
       indices.refresh: {}
@@ -40,7 +40,7 @@
   - do:
       delete_by_query:
         index: test
-        consistency: one
+        wait_for_active_shards: 1
         body:
           query:
             match_all: {}

+ 4 - 4
modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_consistency.yaml → modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_wait_for_active_shards.yaml

@@ -1,5 +1,5 @@
 ---
-"can override consistency":
+"can override wait_for_active_shards":
   - do:
       indices.create:
           index: dest
@@ -12,7 +12,6 @@
         type:        test
         id:          1
         body:        {"text": "test"}
-        consistency: one
   - do:
       indices.refresh: {}
 
@@ -20,17 +19,18 @@
       catch: unavailable
       reindex:
         timeout: 1s
+        wait_for_active_shards: 4
         body:
           source:
             index: src
           dest:
             index: dest
   - match:
-      failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/
+      failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/
 
   - do:
       reindex:
-        consistency: one
+        wait_for_active_shards: 1
         body:
           source:
             index: src

+ 4 - 4
modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml

@@ -1,5 +1,5 @@
 ---
-"can override consistency":
+"can override wait_for_active_shards":
   - do:
       indices.create:
           index: test
@@ -12,7 +12,6 @@
         type:        test
         id:          1
         body:        {"text": "test"}
-        consistency: one
   - do:
       indices.refresh: {}
 
@@ -20,14 +19,15 @@
       catch: unavailable
       update_by_query:
         index: test
+        wait_for_active_shards: 4
         timeout: 1s
   - match:
-      failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
+      failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
 
   - do:
       update_by_query:
         index: test
-        consistency: one
+        wait_for_active_shards: 1
   - match: {failures: []}
   - match: {updated: 1}
   - match: {version_conflicts: 0}

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json

@@ -16,10 +16,9 @@
         }
       },
       "params": {
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the bulk operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" 
         },
         "refresh": {
           "type" : "enum",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/delete.json

@@ -23,10 +23,9 @@
         }
       },
       "params": {
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Specific write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the delete operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "parent": {
           "type" : "string",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json

@@ -177,10 +177,9 @@
           "default": "1m",
           "description" : "Time each individual bulk request should wait for shards that are unavailable."
         },
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the delete by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "scroll_size": {
           "type": "integer",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/index.json

@@ -22,10 +22,9 @@
         }
       },
       "params": {
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the index operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "op_type": {
           "type" : "enum",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json

@@ -16,10 +16,9 @@
           "default": "1m",
           "description" : "Time each individual bulk request should wait for shards that are unavailable."
         },
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "wait_for_completion": {
           "type" : "boolean",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/update.json

@@ -23,10 +23,9 @@
         }
       },
       "params": {
-        "consistency": {
-          "type": "enum",
-          "options": ["one", "quorum", "all"],
-          "description": "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type": "string",
+          "description": "Sets the number of shard copies that must be active before proceeding with the update operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "fields": {
           "type": "list",

+ 3 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json

@@ -185,10 +185,9 @@
           "default": "1m",
           "description" : "Time each individual bulk request should wait for shards that are unavailable."
         },
-        "consistency": {
-          "type" : "enum",
-          "options" : ["one", "quorum", "all"],
-          "description" : "Explicit write consistency setting for the operation"
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
         },
         "scroll_size": {
           "type": "integer",

+ 4 - 5
test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java

@@ -45,7 +45,7 @@ public class ClientYamlSuiteRestApiParserTests extends AbstractParserTestCase {
         assertThat(restApi.getPathParts().get(1), equalTo("index"));
         assertThat(restApi.getPathParts().get(2), equalTo("type"));
         assertThat(restApi.getParams().size(), equalTo(4));
-        assertThat(restApi.getParams(), contains("consistency", "op_type", "parent", "refresh"));
+        assertThat(restApi.getParams(), contains("wait_for_active_shards", "op_type", "parent", "refresh"));
         assertThat(restApi.isBodySupported(), equalTo(true));
         assertThat(restApi.isBodyRequired(), equalTo(true));
     }
@@ -163,10 +163,9 @@ public class ClientYamlSuiteRestApiParserTests extends AbstractParserTestCase {
             "        }\n" +
             "      }   ,\n" +
             "      \"params\": {\n" +
-            "        \"consistency\": {\n" +
-            "          \"type\" : \"enum\",\n" +
-            "          \"options\" : [\"one\", \"quorum\", \"all\"],\n" +
-            "          \"description\" : \"Explicit write consistency setting for the operation\"\n" +
+            "        \"wait_for_active_shards\": {\n" +
+            "          \"type\" : \"string\",\n" +
+            "          \"description\" : \"The number of active shard copies required to perform the operation\"\n" +
             "        },\n" +
             "        \"op_type\": {\n" +
             "          \"type\" : \"enum\",\n" +