Browse Source

Makes the index.write.wait_for_active_shards setting index-level and
dynamically updatable for both index creation and write operations.

Ali Beyad 9 years ago
parent
commit
6a7d005081
34 changed files with 284 additions and 46 deletions
  1. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
  2. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
  3. 4 1
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java
  4. 1 8
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java
  5. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java
  6. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java
  7. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java
  8. 9 0
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java
  9. 9 0
      core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
  10. 9 0
      core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java
  11. 1 0
      core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
  12. 1 0
      core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  13. 9 1
      core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java
  14. 2 1
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
  15. 10 0
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  16. 10 0
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java
  17. 6 7
      core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  18. 9 0
      core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
  19. 9 0
      core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
  20. 31 2
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  21. 6 1
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
  22. 1 1
      core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  23. 0 11
      core/src/main/java/org/elasticsearch/index/IndexSettings.java
  24. 31 0
      core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java
  25. 10 0
      core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
  26. 8 0
      core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java
  27. 5 4
      core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
  28. 3 4
      core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java
  29. 2 1
      core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
  30. 37 0
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  31. 5 2
      core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  32. 1 0
      core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java
  33. 9 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
  34. 1 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java

@@ -466,6 +466,15 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
         return this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
 
     @Override
     public void readFrom(StreamInput in) throws IOException {

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java

@@ -269,4 +269,13 @@ public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder<Create
         request.waitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 4 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.refresh;
 
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.BasicReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
@@ -54,7 +55,9 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
 
     @Override
     protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
-        return new BasicReplicationRequest(shardId);
+        BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
+        replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
+        return replicationRequest;
     }
 
     @Override

+ 1 - 8
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.action.admin.indices.refresh;
 
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.BasicReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
@@ -36,24 +35,18 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.function.Supplier;
 
 public class TransportShardRefreshAction
         extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {
 
     public static final String NAME = RefreshAction.NAME + "[s]";
-    private static final Supplier<BasicReplicationRequest> requestSupplier = () -> {
-        BasicReplicationRequest req = new BasicReplicationRequest();
-        req.waitForActiveShards(ActiveShardCount.NONE);
-        return req;
-    };
 
     @Inject
     public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
                                        IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
                                        ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
         super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
-                indexNameExpressionResolver, requestSupplier, requestSupplier, ThreadPool.Names.REFRESH);
+                indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
     }
 
     @Override

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

@@ -225,4 +225,13 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         this.createIndexRequest.waitForActiveShards(waitForActiveShards);
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public void setWaitForActiveShards(final int waitForActiveShards) {
+        setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
 }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java

@@ -90,4 +90,13 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
         this.request.setWaitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public RolloverRequestBuilder waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java

@@ -144,6 +144,15 @@ public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements
         this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public void setWaitForActiveShards(final int waitForActiveShards) {
+        setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     public void source(BytesReference source) {
         XContentType xContentType = XContentFactory.xContentType(source);
         if (xContentType != null) {

+ 9 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java

@@ -64,4 +64,13 @@ public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder<ShrinkReque
         this.request.setWaitForActiveShards(waitForActiveShards);
         return this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public ShrinkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -441,6 +441,15 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
         return this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public BulkRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     public ActiveShardCount waitForActiveShards() {
         return this.waitForActiveShards;
     }

+ 9 - 0
core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java

@@ -120,6 +120,15 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
         return this;
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
      */

+ 1 - 0
core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

@@ -93,6 +93,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
 
     @Override
     protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
+        super.resolveRequest(metaData, indexMetaData, request);
         resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
         ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
             indexMetaData.getIndex().getName(), request.id(), request.routing());

+ 1 - 0
core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -121,6 +121,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
 
     @Override
     protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
+        super.resolveRequest(metaData, indexMetaData, request);
         MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
         request.resolveRouting(metaData);
         request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());

+ 9 - 1
core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java

@@ -30,6 +30,8 @@ import org.elasticsearch.common.io.stream.Writeable;
 
 import java.io.IOException;
 
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
+
 /**
  * A class whose instances represent a value for counting the number
  * of active shard copies for a given shard in an index.
@@ -67,6 +69,7 @@ public final class ActiveShardCount implements Writeable {
      * Validates that the instance is valid for the given number of replicas in an index.
      */
     public boolean validate(final int numberOfReplicas) {
+        assert numberOfReplicas >= 0;
         return value <= numberOfReplicas + 1;
     }
 
@@ -81,6 +84,7 @@ public final class ActiveShardCount implements Writeable {
             case 0:
                 return NONE;
             default:
+                assert value > 1;
                 return new ActiveShardCount(value);
         }
     }
@@ -138,8 +142,12 @@ public final class ActiveShardCount implements Writeable {
             // all primary shards aren't active yet
             return false;
         }
+        ActiveShardCount waitForActiveShards = this;
+        if (waitForActiveShards == ActiveShardCount.DEFAULT) {
+            waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
+        }
         for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
-            if (enoughShardsActive(shardRouting.value) == false) {
+            if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
                 // not enough active shard copies yet
                 return false;
             }

+ 2 - 1
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -190,10 +190,11 @@ public class ReplicationOperation<
      * Checks whether we can perform a write based on the required active shard count setting.
      * Returns **null* if OK to proceed, or a string describing the reason to stop
      */
-    String checkActiveShardCount() {
+    protected String checkActiveShardCount() {
         final ShardId shardId = primary.routingEntry().shardId();
         final String indexName = shardId.getIndexName();
         final ClusterState state = clusterStateSupplier.get();
+        assert state != null : "replication operation must have access to the cluster state";
         final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
         if (waitForActiveShards == ActiveShardCount.NONE) {
             return null;  // not waiting for any shards

+ 10 - 0
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -146,6 +146,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         return (Request) this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    @SuppressWarnings("unchecked")
+    public final Request waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
      * Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}

+ 10 - 0
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java

@@ -68,4 +68,14 @@ public abstract class ReplicationRequestBuilder<Request extends ReplicationReque
         request.waitForActiveShards(waitForActiveShards);
         return (RequestBuilder) this;
     }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    @SuppressWarnings("unchecked")
+    public RequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }

+ 6 - 7
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -49,7 +49,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
 import org.elasticsearch.index.shard.ShardId;
@@ -91,7 +90,6 @@ public abstract class TransportReplicationAction<
     protected final ClusterService clusterService;
     protected final IndicesService indicesService;
     private final ShardStateAction shardStateAction;
-    private final ActiveShardCount defaultWaitForActiveShards;
     private final TransportRequestOptions transportOptions;
     private final String executor;
 
@@ -123,8 +121,6 @@ public abstract class TransportReplicationAction<
 
         this.transportOptions = transportOptions();
 
-        this.defaultWaitForActiveShards = IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.get(settings);
-
         this.replicasProxy = new ReplicasProxy();
     }
 
@@ -150,6 +146,11 @@ public abstract class TransportReplicationAction<
      * @param request       the request to resolve
      */
     protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Request request) {
+        if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
+            // if the wait for active shard count has not been set in the request,
+            // resolve it from the index settings
+            request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
+        }
     }
 
     /**
@@ -560,11 +561,9 @@ public abstract class TransportReplicationAction<
             }
 
             // resolve all derived request fields, so we can route and apply it
-            if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
-                request.waitForActiveShards(defaultWaitForActiveShards);
-            }
             resolveRequest(state.metaData(), indexMetaData, request);
             assert request.shardId() != null : "request shardId must be set in resolveRequest";
+            assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
 
             final ShardRouting primary = primary(state);
             if (retryIfUnavailable(state, primary)) {

+ 9 - 0
core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

@@ -447,6 +447,15 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
         return this;
     }
 
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public UpdateRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the doc to use for updates when a script is not specified.
      */

+ 9 - 0
core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java

@@ -131,6 +131,15 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
         return this;
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public UpdateRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Sets the doc to use for updates when a script is not specified.
      */

+ 31 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
@@ -219,6 +220,16 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
         Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
 
+    /**
+     * The number of active shard copies to check for before proceeding with a write operation.
+     */
+    public static final Setting<ActiveShardCount> SETTING_WAIT_FOR_ACTIVE_SHARDS =
+        new Setting<>("index.write.wait_for_active_shards",
+                      "1",
+                      ActiveShardCount::parseString,
+                      Setting.Property.Dynamic,
+                      Setting.Property.IndexScope);
+
     public static final IndexMetaData PROTO = IndexMetaData.builder("")
             .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
             .numberOfShards(1).numberOfReplicas(0).build();
@@ -266,12 +277,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private final Version indexUpgradedVersion;
     private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
 
+    private final ActiveShardCount waitForActiveShards;
+
     private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
                           ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
                           ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
                           DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
                           Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
-                          int routingNumShards) {
+                          int routingNumShards, ActiveShardCount waitForActiveShards) {
 
         this.index = index;
         this.version = version;
@@ -295,6 +308,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
         this.routingNumShards = routingNumShards;
         this.routingFactor = routingNumShards / numberOfShards;
+        this.waitForActiveShards = waitForActiveShards;
         assert numberOfShards * routingFactor == routingNumShards :  routingNumShards + " must be a multiple of " + numberOfShards;
     }
 
@@ -378,6 +392,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         return totalNumberOfShards;
     }
 
+    /**
+     * Returns the configured {@link #SETTING_WAIT_FOR_ACTIVE_SHARDS}, which defaults
+     * to an active shard count of 1 if not specified.
+     */
+    public ActiveShardCount getWaitForActiveShards() {
+        return waitForActiveShards;
+    }
+
     public Settings getSettings() {
         return settings;
     }
@@ -973,10 +995,17 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                     + "] but should be equal to number of shards [" + numberOfShards() + "]");
             }
 
+            final ActiveShardCount waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(settings);
+            if (waitForActiveShards.validate(numberOfReplicas) == false) {
+                throw new IllegalArgumentException("invalid " + SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey() +
+                                                   "[" + waitForActiveShards + "]: cannot be greater than " +
+                                                   "number of shard copies [" + (numberOfReplicas + 1) + "]");
+            }
+
             final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
             return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
                 tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
-                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards());
+                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards);
         }
 
         public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {

+ 6 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -27,6 +27,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.ActiveShardsObserver;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
@@ -347,7 +348,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
                                 .setRoutingNumShards(routingNumShards);
                             // Set up everything, now locally create the index to see that things are ok, and apply
                             final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
-                            if (request.waitForActiveShards().validate(tmpImd.getNumberOfReplicas()) == false) {
+                            ActiveShardCount waitForActiveShards = request.waitForActiveShards();
+                            if (waitForActiveShards == ActiveShardCount.DEFAULT) {
+                                waitForActiveShards = tmpImd.getWaitForActiveShards();
+                            }
+                            if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
                                 throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
                                                                    "]: cannot be greater than number of shard copies [" +
                                                                    (tmpImd.getNumberOfReplicas() + 1) + "]");

+ 1 - 1
core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -140,7 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
         FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
         EngineConfig.INDEX_CODEC_SETTING,
-        IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING,
+        IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
         // validate that built-in similarities don't get redefined
         Setting.groupSetting("index.similarity.", (s) -> {
             Map<String, Settings> groups = s.getAsGroups();

+ 0 - 11
core/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -20,7 +20,6 @@ package org.elasticsearch.index;
 
 import org.apache.lucene.index.MergePolicy;
 import org.elasticsearch.Version;
-import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.ParseFieldMatcher;
 import org.elasticsearch.common.logging.ESLogger;
@@ -129,16 +128,6 @@ public final class IndexSettings {
     public static final Setting<Integer> MAX_SLICES_PER_SCROLL = Setting.intSetting("index.max_slices_per_scroll",
         1024, 1, Property.Dynamic, Property.IndexScope);
 
-    /**
-     * The number of active shard copies required for a write operation.
-     */
-    public static final Setting<ActiveShardCount> WAIT_FOR_ACTIVE_SHARDS_SETTING =
-        new Setting<>("index.write.wait_for_active_shards",
-                      "1",
-                      ActiveShardCount::parseString,
-                      Setting.Property.Dynamic,
-                      Setting.Property.IndexScope);
-
     private final Index index;
     private final Version version;
     private final ESLogger logger;

+ 31 - 0
core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
@@ -475,4 +476,34 @@ public class CreateIndexIT extends ESIntegTestCase {
         ensureGreen();
         assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
     }
+
+    /**
+     * This test ensures that index creation adheres to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS}.
+     */
+    public void testChangeWaitForActiveShardsSetting() throws Exception {
+        final String indexName = "test";
+        final int numReplicas = internalCluster().numDataNodes();
+        Settings settings = Settings.builder()
+                                .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas))
+                                .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                                .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas)
+                                .build();
+        assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(settings).get());
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        // all should fail
+        settings = Settings.builder()
+                       .put(settings)
+                       .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "all")
+                       .build();
+        assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked());
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        // the numeric equivalent of all should also fail
+        settings = Settings.builder()
+                       .put(settings)
+                       .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas + 1))
+                       .build();
+        assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked());
+    }
 }

+ 10 - 0
core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.action.index;
 
 import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.test.ESTestCase;
@@ -131,4 +132,13 @@ public class IndexRequestTests extends ESTestCase {
         assertThat(validate, notNullValue());
         assertThat(validate.getMessage(), containsString("ttl must not be negative"));
     }
+
+    public void testWaitForActiveShards() {
+        IndexRequest request = new IndexRequest("index", "type");
+        final int count = randomIntBetween(0, 10);
+        request.waitForActiveShards(ActiveShardCount.from(count));
+        assertEquals(request.waitForActiveShards(), ActiveShardCount.from(count));
+        // test negative shard count value not allowed
+        expectThrows(IllegalArgumentException.class, () -> request.waitForActiveShards(ActiveShardCount.from(randomIntBetween(-10, -1))));
+    }
 }

+ 8 - 0
core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java

@@ -68,6 +68,14 @@ public class ActiveShardCountTests extends ESTestCase {
         expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + ""));
     }
 
+    public void testValidate() {
+        assertTrue(ActiveShardCount.parseString("all").validate(randomIntBetween(0, 10)));
+        final int numReplicas = randomIntBetween(0, 10);
+        assertTrue(ActiveShardCount.from(randomIntBetween(0, numReplicas + 1)).validate(numReplicas));
+        // invalid values shouldn't validate
+        assertFalse(ActiveShardCount.from(numReplicas + randomIntBetween(2, 10)).validate(numReplicas));
+    }
+
     private void doWriteRead(ActiveShardCount activeShardCount) throws IOException {
         final BytesStreamOutput out = new BytesStreamOutput();
         activeShardCount.writeTo(out);

+ 5 - 4
core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java

@@ -67,7 +67,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
         Settings settings = settingsBuilder.build();
         CreateIndexResponse response = prepareCreate("test-idx")
                                            .setSettings(settings)
-                                           .setWaitForActiveShards(ActiveShardCount.from(0))
+                                           .setWaitForActiveShards(ActiveShardCount.NONE)
                                            .get();
         assertTrue(response.isAcknowledged());
     }
@@ -83,7 +83,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
         final String indexName = "test-idx";
         assertFalse(prepareCreate(indexName)
                        .setSettings(settings)
-                       .setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1)))
+                       .setWaitForActiveShards(randomIntBetween(numDataNodes + 1, numReplicas + 1))
                        .setTimeout("100ms")
                        .get()
                        .isShardsAcked());
@@ -97,8 +97,9 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
                                 .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
                                 .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3))
                                 .build();
-        ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes()));
-        assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get());
+        assertAcked(prepareCreate(indexName).setSettings(settings)
+                        .setWaitForActiveShards(randomIntBetween(0, internalCluster().numDataNodes()))
+                        .get());
     }
 
     public void testCreateIndexWaitsForAllActiveShards() throws Exception {

+ 3 - 4
core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java

@@ -45,15 +45,14 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
 
         // indexing, by default, will work (waiting for one shard copy only)
         client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).execute().actionGet();
-        ActiveShardCount activeShardCount = ActiveShardCount.from(2); // wait for two active shard copies
         try {
             client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                    .setWaitForActiveShards(activeShardCount)
+                    .setWaitForActiveShards(2) // wait for 2 active shard copies
                     .setTimeout(timeValueMillis(100)).execute().actionGet();
             fail("can't index, does not enough active shard copies");
         } catch (UnavailableShardsException e) {
             assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
-            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + activeShardCount + "] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
+            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
             // but really, all is well
         }
 
@@ -72,7 +71,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
 
         // this should work, since we now have two
         client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
-                .setWaitForActiveShards(activeShardCount)
+                .setWaitForActiveShards(2)
                 .setTimeout(timeValueSeconds(1)).execute().actionGet();
 
         try {

+ 2 - 1
core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

@@ -258,7 +258,8 @@ public class ReplicationOperationTests extends ESTestCase {
         final int unassignedReplicas = randomInt(2);
         final int totalShards = 1 + assignedReplicas + unassignedReplicas;
         final int activeShardCount = randomIntBetween(0, totalShards);
-        Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(activeShardCount));
+        Request request = new Request(shardId).waitForActiveShards(
+            activeShardCount == totalShards ? ActiveShardCount.ALL : ActiveShardCount.from(activeShardCount));
         final boolean passesActiveShardCheck = activeShardCount <= assignedReplicas + 1;
 
         ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];

+ 37 - 0
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -85,6 +85,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -678,6 +679,42 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertIndexShardCounter(0);
     }
 
+    /**
+     * This test ensures that replication operations adhere to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS} setting
+     * when the request is using the default value for waitForActiveShards.
+     */
+    public void testChangeWaitForActiveShardsSetting() throws Exception {
+        final String indexName = "test";
+        final ShardId shardId = new ShardId(indexName, "_na_", 0);
+
+        // test wait_for_active_shards index setting used when the default is set on the request
+        int numReplicas = randomIntBetween(0, 5);
+        int idxSettingWaitForActiveShards = randomIntBetween(0, numReplicas + 1);
+        ClusterState state = changeWaitForActiveShardsSetting(indexName,
+            stateWithActivePrimary(indexName, randomBoolean(), numReplicas),
+            idxSettingWaitForActiveShards);
+        setState(clusterService, state);
+        Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.DEFAULT); // set to default so index settings are used
+        action.resolveRequest(state.metaData(), state.metaData().index(indexName), request);
+        assertEquals(ActiveShardCount.from(idxSettingWaitForActiveShards), request.waitForActiveShards());
+
+        // test wait_for_active_shards when default not set on the request (request value should be honored over index setting)
+        int requestWaitForActiveShards = randomIntBetween(0, numReplicas + 1);
+        request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(requestWaitForActiveShards));
+        action.resolveRequest(state.metaData(), state.metaData().index(indexName), request);
+        assertEquals(ActiveShardCount.from(requestWaitForActiveShards), request.waitForActiveShards());
+    }
+
+    private ClusterState changeWaitForActiveShardsSetting(String indexName, ClusterState state, int waitForActiveShards) {
+        IndexMetaData indexMetaData = state.metaData().index(indexName);
+        Settings indexSettings = Settings.builder().put(indexMetaData.getSettings())
+                                     .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(waitForActiveShards))
+                                     .build();
+        MetaData.Builder metaDataBuilder = MetaData.builder(state.metaData())
+                                               .put(IndexMetaData.builder(indexMetaData).settings(indexSettings).build(), true);
+        return ClusterState.builder(state).metaData(metaDataBuilder).build();
+    }
+
     private void assertIndexShardCounter(int expected) {
         assertThat(count.get(), equalTo(expected));
     }

+ 5 - 2
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.index.TransportIndexAction;
-import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.replication.ReplicationOperation;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -252,7 +251,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
         public int indexDocs(final int numOfDoc) throws Exception {
             for (int doc = 0; doc < numOfDoc; doc++) {
                 final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet()))
-                    .source("{}").waitForActiveShards(ActiveShardCount.NONE);
+                    .source("{}");
                 final IndexResponse response = index(indexRequest);
                 assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
             }
@@ -410,6 +409,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
             return replicationGroup.shardRoutings();
         }
 
+        @Override
+        protected String checkActiveShardCount() {
+            return null;
+        }
     }
 
     private static class PrimaryRef implements ReplicationOperation.Primary<IndexRequest, IndexRequest, IndexingResult> {

+ 1 - 0
core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java

@@ -226,4 +226,5 @@ public class IndexActionIT extends ESIntegTestCase {
                     e.getMessage().contains("Invalid index name [..], must not be \'.\' or '..'"), equalTo(true));
         }
     }
+
 }

+ 9 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -238,6 +238,15 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
         return self();
     }
 
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public Self setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     /**
      * Initial delay after a rejection before retrying request.
      */

+ 1 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

@@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -104,7 +103,7 @@ public class RoundTripTests extends ESTestCase {
         request.setAbortOnVersionConflict(random().nextBoolean());
         request.setRefresh(rarely());
         request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test"));
-        request.setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(0, 10)));
+        request.setWaitForActiveShards(randomIntBetween(0, 10));
         request.setScript(random().nextBoolean() ? null : randomScript());
         request.setRequestsPerSecond(between(0, Integer.MAX_VALUE));
     }