浏览代码

Add wait_for_active_shards parameter to index open command (#26682)

Adds the wait_for_active_shards parameter to the index open command. Similar to the index creation command, the index open command will now, by default, wait until the primaries have been allocated.

Closes #20937
Alexander Kazakov 8 年之前
父节点
当前提交
ff737a880c
共有 17 个文件被更改,包括 278 次插入41 次删除
  1. 12 0
      core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java
  2. 41 0
      core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java
  3. 29 0
      core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequestBuilder.java
  4. 21 1
      core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java
  5. 6 7
      core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java
  6. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  7. 26 22
      core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java
  8. 6 5
      core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java
  9. 39 0
      core/src/main/java/org/elasticsearch/cluster/ack/OpenIndexClusterStateUpdateResponse.java
  10. 1 1
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
  11. 25 2
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java
  12. 9 1
      core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestOpenIndexAction.java
  13. 1 1
      core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java
  14. 26 0
      core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java
  15. 7 0
      docs/reference/indices/open-close.asciidoc
  16. 4 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json
  17. 24 0
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml

+ 12 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java

@@ -18,6 +18,7 @@
  */
  */
 package org.elasticsearch.action.admin.indices.open;
 package org.elasticsearch.action.admin.indices.open;
 
 
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
 import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
 
 
 /**
 /**
@@ -25,7 +26,18 @@ import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
  */
  */
 public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<OpenIndexClusterStateUpdateRequest> {
 public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<OpenIndexClusterStateUpdateRequest> {
 
 
+    private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
+
     OpenIndexClusterStateUpdateRequest() {
     OpenIndexClusterStateUpdateRequest() {
 
 
     }
     }
+
+    public ActiveShardCount waitForActiveShards() {
+        return waitForActiveShards;
+    }
+
+    public OpenIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
+        return this;
+    }
 }
 }

+ 41 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java

@@ -19,8 +19,10 @@
 
 
 package org.elasticsearch.action.admin.indices.open;
 package org.elasticsearch.action.admin.indices.open;
 
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -38,6 +40,7 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
 
 
     private String[] indices;
     private String[] indices;
     private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, false, true);
     private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, false, true);
+    private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
 
 
     public OpenIndexRequest() {
     public OpenIndexRequest() {
     }
     }
@@ -101,11 +104,46 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
         return this;
         return this;
     }
     }
 
 
+    public ActiveShardCount waitForActiveShards() {
+        return waitForActiveShards;
+    }
+
+    /**
+     * Sets the number of shard copies that should be active for indices opening to return.
+     * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
+     * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
+     * wait for all shards (primary and all replicas) to be active before returning.
+     * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
+     * non-negative integer, up to the number of copies per shard (number of replicas + 1),
+     * to wait for the desired amount of shard copies to become active before returning.
+     * Indices opening will only wait up until the timeout value for the number of shard copies
+     * to be active before returning.  Check {@link OpenIndexResponse#isShardsAcknowledged()} to
+     * determine if the requisite shard copies were all started before returning or timing out.
+     *
+     * @param waitForActiveShards number of active shard copies to wait on
+     */
+    public OpenIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
+        return this;
+    }
+
+    /**
+     * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public OpenIndexRequest waitForActiveShards(final int waitForActiveShards) {
+        return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
+
     @Override
     @Override
     public void readFrom(StreamInput in) throws IOException {
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         super.readFrom(in);
         indices = in.readStringArray();
         indices = in.readStringArray();
         indicesOptions = IndicesOptions.readIndicesOptions(in);
         indicesOptions = IndicesOptions.readIndicesOptions(in);
+        if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            waitForActiveShards = ActiveShardCount.readFrom(in);
+        }
     }
     }
 
 
     @Override
     @Override
@@ -113,5 +151,8 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
         super.writeTo(out);
         super.writeTo(out);
         out.writeStringArray(indices);
         out.writeStringArray(indices);
         indicesOptions.writeIndicesOptions(out);
         indicesOptions.writeIndicesOptions(out);
+        if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            waitForActiveShards.writeTo(out);
+        }
     }
     }
 }
 }

+ 29 - 0
core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequestBuilder.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.action.admin.indices.open;
 package org.elasticsearch.action.admin.indices.open;
 
 
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
 import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.client.ElasticsearchClient;
@@ -58,4 +59,32 @@ public class OpenIndexRequestBuilder extends AcknowledgedRequestBuilder<OpenInde
         request.indicesOptions(indicesOptions);
         request.indicesOptions(indicesOptions);
         return this;
         return this;
     }
     }
+
+    /**
+     * Sets the number of shard copies that should be active for indices opening to return.
+     * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
+     * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
+     * wait for all shards (primary and all replicas) to be active before returning.
+     * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
+     * non-negative integer, up to the number of copies per shard (number of replicas + 1),
+     * to wait for the desired amount of shard copies to become active before returning.
+     * Indices opening will only wait up until the timeout value for the number of shard copies
+     * to be active before returning.  Check {@link OpenIndexResponse#isShardsAcknowledged()} to
+     * determine if the requisite shard copies were all started before returning or timing out.
+     *
+     * @param waitForActiveShards number of active shard copies to wait on
+     */
+    public OpenIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
+        request.waitForActiveShards(waitForActiveShards);
+        return this;
+    }
+
+    /**
+     * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
+     * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
+     * to get the ActiveShardCount.
+     */
+    public OpenIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
+        return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
+    }
 }
 }

+ 21 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.action.admin.indices.open;
 package org.elasticsearch.action.admin.indices.open;
 
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,22 +31,41 @@ import java.io.IOException;
  */
  */
 public class OpenIndexResponse extends AcknowledgedResponse {
 public class OpenIndexResponse extends AcknowledgedResponse {
 
 
+    private boolean shardsAcknowledged;
+
     OpenIndexResponse() {
     OpenIndexResponse() {
     }
     }
 
 
-    OpenIndexResponse(boolean acknowledged) {
+    OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
         super(acknowledged);
         super(acknowledged);
+        assert acknowledged || shardsAcknowledged == false; // if its not acknowledged, then shards acked should be false too
+        this.shardsAcknowledged = shardsAcknowledged;
+    }
+
+    /**
+     * Returns true if the requisite number of shards were started before
+     * returning from the indices opening operation.  If {@link #isAcknowledged()}
+     * is false, then this also returns false.
+     */
+    public boolean isShardsAcknowledged() {
+        return shardsAcknowledged;
     }
     }
 
 
     @Override
     @Override
     public void readFrom(StreamInput in) throws IOException {
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         super.readFrom(in);
         readAcknowledged(in);
         readAcknowledged(in);
+        if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            shardsAcknowledged = in.readBoolean();
+        }
     }
     }
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
         super.writeTo(out);
         writeAcknowledged(out);
         writeAcknowledged(out);
+        if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            out.writeBoolean(shardsAcknowledged);
+        }
     }
     }
 }
 }

+ 6 - 7
core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java

@@ -22,12 +22,11 @@ package org.elasticsearch.action.admin.indices.open;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
 import org.apache.logging.log4j.util.Supplier;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
+import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -84,18 +83,18 @@ public class TransportOpenIndexAction extends TransportMasterNodeAction<OpenInde
     protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) {
     protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) {
         final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
         final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
         if (concreteIndices == null || concreteIndices.length == 0) {
         if (concreteIndices == null || concreteIndices.length == 0) {
-            listener.onResponse(new OpenIndexResponse(true));
+            listener.onResponse(new OpenIndexResponse(true, true));
             return;
             return;
         }
         }
         OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
         OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
-                .indices(concreteIndices);
+                .indices(concreteIndices).waitForActiveShards(request.waitForActiveShards());
 
 
-        indexStateService.openIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
+        indexStateService.openIndex(updateRequest, new ActionListener<OpenIndexClusterStateUpdateResponse>() {
 
 
             @Override
             @Override
-            public void onResponse(ClusterStateUpdateResponse response) {
-                listener.onResponse(new OpenIndexResponse(response.isAcknowledged()));
+            public void onResponse(OpenIndexClusterStateUpdateResponse response) {
+                listener.onResponse(new OpenIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged()));
             }
             }
 
 
             @Override
             @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -136,7 +136,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                                     rolloverRequest),
                                     rolloverRequest),
                                 ActionListener.wrap(aliasClusterStateUpdateResponse -> {
                                 ActionListener.wrap(aliasClusterStateUpdateResponse -> {
                                     if (aliasClusterStateUpdateResponse.isAcknowledged()) {
                                     if (aliasClusterStateUpdateResponse.isAcknowledged()) {
-                                        activeShardsObserver.waitForActiveShards(rolloverIndexName,
+                                        activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
                                             rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
                                             rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
                                             rolloverRequest.masterNodeTimeout(),
                                             rolloverRequest.masterNodeTimeout(),
                                             isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName,
                                             isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName,

+ 26 - 22
core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java

@@ -138,36 +138,40 @@ public final class ActiveShardCount implements Writeable {
 
 
     /**
     /**
      * Returns true iff the given cluster state's routing table contains enough active
      * Returns true iff the given cluster state's routing table contains enough active
-     * shards for the given index to meet the required shard count represented by this instance.
+     * shards for the given indices to meet the required shard count represented by this instance.
      */
      */
-    public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
+    public boolean enoughShardsActive(final ClusterState clusterState, final String... indices) {
         if (this == ActiveShardCount.NONE) {
         if (this == ActiveShardCount.NONE) {
             // not waiting for any active shards
             // not waiting for any active shards
             return true;
             return true;
         }
         }
-        final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
-        if (indexMetaData == null) {
-            // its possible the index was deleted while waiting for active shard copies,
-            // in this case, we'll just consider it that we have enough active shard copies
-            // and we can stop waiting
-            return true;
-        }
-        final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
-        assert indexRoutingTable != null;
-        if (indexRoutingTable.allPrimaryShardsActive() == false) {
-            // all primary shards aren't active yet
-            return false;
-        }
-        ActiveShardCount waitForActiveShards = this;
-        if (waitForActiveShards == ActiveShardCount.DEFAULT) {
-            waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
-        }
-        for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
-            if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
-                // not enough active shard copies yet
+
+        for (final String indexName : indices) {
+            final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
+            if (indexMetaData == null) {
+                // its possible the index was deleted while waiting for active shard copies,
+                // in this case, we'll just consider it that we have enough active shard copies
+                // and we can stop waiting
+                continue;
+            }
+            final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
+            assert indexRoutingTable != null;
+            if (indexRoutingTable.allPrimaryShardsActive() == false) {
+                // all primary shards aren't active yet
                 return false;
                 return false;
             }
             }
+            ActiveShardCount waitForActiveShards = this;
+            if (waitForActiveShards == ActiveShardCount.DEFAULT) {
+                waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
+            }
+            for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
+                if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
+                    // not enough active shard copies yet
+                    return false;
+                }
+            }
         }
         }
+
         return true;
         return true;
     }
     }
 
 

+ 6 - 5
core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
 
+import java.util.Arrays;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Predicate;
 
 
@@ -50,13 +51,13 @@ public class ActiveShardsObserver extends AbstractComponent {
     /**
     /**
      * Waits on the specified number of active shards to be started before executing the
      * Waits on the specified number of active shards to be started before executing the
      *
      *
-     * @param indexName the index to wait for active shards on
+     * @param indexNames the indices to wait for active shards on
      * @param activeShardCount the number of active shards to wait on before returning
      * @param activeShardCount the number of active shards to wait on before returning
      * @param timeout the timeout value
      * @param timeout the timeout value
      * @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first)
      * @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first)
      * @param onFailure a function that is executed in response to an error occurring during waiting for the active shards
      * @param onFailure a function that is executed in response to an error occurring during waiting for the active shards
      */
      */
-    public void waitForActiveShards(final String indexName,
+    public void waitForActiveShards(final String[] indexNames,
                                     final ActiveShardCount activeShardCount,
                                     final ActiveShardCount activeShardCount,
                                     final TimeValue timeout,
                                     final TimeValue timeout,
                                     final Consumer<Boolean> onResult,
                                     final Consumer<Boolean> onResult,
@@ -71,10 +72,10 @@ public class ActiveShardsObserver extends AbstractComponent {
 
 
         final ClusterState state = clusterService.state();
         final ClusterState state = clusterService.state();
         final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
         final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
-        if (activeShardCount.enoughShardsActive(state, indexName)) {
+        if (activeShardCount.enoughShardsActive(state, indexNames)) {
             onResult.accept(true);
             onResult.accept(true);
         } else {
         } else {
-            final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexName);
+            final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexNames);
 
 
             final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() {
             final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() {
                 @Override
                 @Override
@@ -84,7 +85,7 @@ public class ActiveShardsObserver extends AbstractComponent {
 
 
                 @Override
                 @Override
                 public void onClusterServiceClose() {
                 public void onClusterServiceClose() {
-                    logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName);
+                    logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", Arrays.toString(indexNames));
                     onFailure.accept(new NodeClosedException(clusterService.localNode()));
                     onFailure.accept(new NodeClosedException(clusterService.localNode()));
                 }
                 }
 
 

+ 39 - 0
core/src/main/java/org/elasticsearch/cluster/ack/OpenIndexClusterStateUpdateResponse.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.ack;
+
+/**
+ * A cluster state update response with specific fields for index opening.
+ */
+public class OpenIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse {
+
+    private final boolean shardsAcknowledged;
+
+    public OpenIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcknowledged) {
+        super(acknowledged);
+        this.shardsAcknowledged = shardsAcknowledged;
+    }
+
+    /**
+     * Returns whether the requisite number of shard copies started before the completion of the operation.
+     */
+    public boolean isShardsAcknowledged() {
+        return shardsAcknowledged;
+    }
+}

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -204,7 +204,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
                             final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
                             final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
         onlyCreateIndex(request, ActionListener.wrap(response -> {
         onlyCreateIndex(request, ActionListener.wrap(response -> {
             if (response.isAcknowledged()) {
             if (response.isAcknowledged()) {
-                activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(),
+                activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
                     shardsAcked -> {
                     shardsAcked -> {
                         if (shardsAcked == false) {
                         if (shardsAcked == false) {
                             logger.debug("[{}] index created, but the operation timed out while waiting for " +
                             logger.debug("[{}] index created, but the operation timed out while waiting for " +

+ 25 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -24,9 +24,11 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
+import org.elasticsearch.action.support.ActiveShardsObserver;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
+import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -42,6 +44,7 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.threadpool.ThreadPool;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -62,16 +65,18 @@ public class MetaDataIndexStateService extends AbstractComponent {
 
 
     private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
     private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
     private final IndicesService indicesService;
     private final IndicesService indicesService;
+    private final ActiveShardsObserver activeShardsObserver;
 
 
     @Inject
     @Inject
     public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService,
     public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService,
                                      MetaDataIndexUpgradeService metaDataIndexUpgradeService,
                                      MetaDataIndexUpgradeService metaDataIndexUpgradeService,
-                                     IndicesService indicesService) {
+                                     IndicesService indicesService, ThreadPool threadPool) {
         super(settings);
         super(settings);
         this.indicesService = indicesService;
         this.indicesService = indicesService;
         this.clusterService = clusterService;
         this.clusterService = clusterService;
         this.allocationService = allocationService;
         this.allocationService = allocationService;
         this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
         this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
+        this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool);
     }
     }
 
 
     public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
     public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
@@ -130,7 +135,25 @@ public class MetaDataIndexStateService extends AbstractComponent {
         });
         });
     }
     }
 
 
-    public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
+    public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
+        onlyOpenIndex(request, ActionListener.wrap(response -> {
+            if (response.isAcknowledged()) {
+                String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
+                activeShardsObserver.waitForActiveShards(indexNames, request.waitForActiveShards(), request.ackTimeout(),
+                    shardsAcknowledged -> {
+                        if (shardsAcknowledged == false) {
+                            logger.debug("[{}] indices opened, but the operation timed out while waiting for " +
+                                "enough shards to be started.", Arrays.toString(indexNames));
+                        }
+                        listener.onResponse(new OpenIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
+                    }, listener::onFailure);
+            } else {
+                listener.onResponse(new OpenIndexClusterStateUpdateResponse(false, false));
+            }
+        }, listener::onFailure));
+    }
+
+    private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
         if (request.indices() == null || request.indices().length == 0) {
         if (request.indices() == null || request.indices().length == 0) {
             throw new IllegalArgumentException("Index name is required");
             throw new IllegalArgumentException("Index name is required");
         }
         }

+ 9 - 1
core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestOpenIndexAction.java

@@ -21,10 +21,12 @@ package org.elasticsearch.rest.action.admin.indices;
 
 
 import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
 import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestRequest;
@@ -50,6 +52,12 @@ public class RestOpenIndexAction extends BaseRestHandler {
         openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout()));
         openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout()));
         openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
         openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
         openIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, openIndexRequest.indicesOptions()));
         openIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, openIndexRequest.indicesOptions()));
-        return channel -> client.admin().indices().open(openIndexRequest, new AcknowledgedRestListener<OpenIndexResponse>(channel));
+        openIndexRequest.waitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
+        return channel -> client.admin().indices().open(openIndexRequest, new AcknowledgedRestListener<OpenIndexResponse>(channel) {
+            @Override
+            protected void addCustomFields(XContentBuilder builder, OpenIndexResponse response) throws IOException {
+                builder.field("shards_acknowledged", response.isShardsAcknowledged());
+            }
+        });
     }
     }
 }
 }

+ 1 - 1
core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -168,7 +168,7 @@ public class ClusterStateChanges extends AbstractComponent {
             }
             }
         };
         };
         MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(settings, clusterService, allocationService,
         MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(settings, clusterService, allocationService,
-            metaDataIndexUpgradeService, indicesService);
+            metaDataIndexUpgradeService, indicesService, threadPool);
         MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(settings, clusterService, allocationService);
         MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(settings, clusterService, allocationService);
         MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService,
         MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService,
             allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);
             allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);

+ 26 - 0
core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java

@@ -30,6 +30,7 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -67,6 +68,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1");
         assertIndexIsOpened("test1");
     }
     }
 
 
@@ -123,6 +125,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1", "test2")
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1", "test2")
                 .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
                 .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1");
         assertIndexIsOpened("test1");
     }
     }
 
 
@@ -141,8 +144,10 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet();
         OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet();
         assertThat(openIndexResponse1.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse1.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse1.isShardsAcknowledged(), equalTo(true));
         OpenIndexResponse openIndexResponse2 = client.admin().indices().prepareOpen("test2").execute().actionGet();
         OpenIndexResponse openIndexResponse2 = client.admin().indices().prepareOpen("test2").execute().actionGet();
         assertThat(openIndexResponse2.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse2.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse2.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2", "test3");
         assertIndexIsOpened("test1", "test2", "test3");
     }
     }
 
 
@@ -159,6 +164,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test*").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test*").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2", "a");
         assertIndexIsOpened("test1", "test2", "a");
     }
     }
 
 
@@ -174,6 +180,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("_all").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("_all").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2", "test3");
         assertIndexIsOpened("test1", "test2", "test3");
     }
     }
 
 
@@ -189,6 +196,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("*").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("*").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2", "test3");
         assertIndexIsOpened("test1", "test2", "test3");
     }
     }
 
 
@@ -229,6 +237,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
         //no problem if we try to open an index that's already in open state
         //no problem if we try to open an index that's already in open state
         OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet();
         OpenIndexResponse openIndexResponse1 = client.admin().indices().prepareOpen("test1").execute().actionGet();
         assertThat(openIndexResponse1.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse1.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse1.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1");
         assertIndexIsOpened("test1");
     }
     }
 
 
@@ -264,6 +273,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1-alias").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test1-alias").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1");
         assertIndexIsOpened("test1");
     }
     }
 
 
@@ -284,9 +294,24 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
 
 
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test-alias").execute().actionGet();
         OpenIndexResponse openIndexResponse = client.admin().indices().prepareOpen("test-alias").execute().actionGet();
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
         assertThat(openIndexResponse.isAcknowledged(), equalTo(true));
+        assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
         assertIndexIsOpened("test1", "test2");
         assertIndexIsOpened("test1", "test2");
     }
     }
 
 
+    public void testOpenWaitingForActiveShardsFailed() {
+        Client client = client();
+        Settings settings = Settings.builder()
+            .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+            .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
+            .build();
+        assertAcked(client.admin().indices().prepareCreate("test").setSettings(settings).get());
+        assertAcked(client.admin().indices().prepareClose("test").get());
+
+        OpenIndexResponse response = client.admin().indices().prepareOpen("test").setTimeout("100ms").setWaitForActiveShards(2).get();
+        assertAcked(response);
+        assertThat(response.isShardsAcknowledged(), equalTo(false));
+    }
+
     private void assertIndexIsOpened(String... indices) {
     private void assertIndexIsOpened(String... indices) {
         checkIndexState(IndexMetaData.State.OPEN, indices);
         checkIndexState(IndexMetaData.State.OPEN, indices);
     }
     }
@@ -359,6 +384,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
                 // Opening an index is not blocked
                 // Opening an index is not blocked
                 OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test").execute().actionGet();
                 OpenIndexResponse openIndexResponse = client().admin().indices().prepareOpen("test").execute().actionGet();
                 assertAcked(openIndexResponse);
                 assertAcked(openIndexResponse);
+                assertThat(openIndexResponse.isShardsAcknowledged(), equalTo(true));
                 assertIndexIsOpened("test");
                 assertIndexIsOpened("test");
             } finally {
             } finally {
                 disableIndexBlock("test", blockSetting);
                 disableIndexBlock("test", blockSetting);

+ 7 - 0
docs/reference/indices/open-close.asciidoc

@@ -32,3 +32,10 @@ This setting can also be changed via the cluster update settings api.
 
 
 Closed indices consume a significant amount of disk-space which can cause problems in managed environments. Closing indices can be disabled via the cluster settings
 Closed indices consume a significant amount of disk-space which can cause problems in managed environments. Closing indices can be disabled via the cluster settings
 API by setting `cluster.indices.close.enable` to `false`. The default is `true`.
 API by setting `cluster.indices.close.enable` to `false`. The default is `true`.
+
+[float]
+=== Wait For Active Shards
+
+Because opening an index allocates its shards, the
+<<create-index-wait-for-active-shards,`wait_for_active_shards`>> setting on
+index creation applies to the index opening action as well.

+ 4 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json

@@ -34,6 +34,10 @@
            "options" : ["open","closed","none","all"],
            "options" : ["open","closed","none","all"],
            "default" : "closed",
            "default" : "closed",
            "description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
            "description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
+        },
+        "wait_for_active_shards": {
+          "type" : "string",
+          "description" : "Sets the number of active shards to wait for before the operation returns."
         }
         }
       }
       }
     },
     },

+ 24 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml

@@ -36,3 +36,27 @@
       search:
       search:
         index: test_index
         index: test_index
 
 
+---
+"Open index with wait_for_active_shards set to all":
+  - skip:
+      version: " - 6.99.99"
+      reason: wait_for_active_shards parameter was added in 7.0.0
+
+  - do:
+      indices.create:
+        index: test_index
+        body:
+          settings:
+            number_of_replicas: 0
+
+  - do:
+      indices.close:
+        index: test_index
+
+  - do:
+      indices.open:
+        index: test_index
+        wait_for_active_shards: all
+
+  - match: { acknowledged: true }
+  - match: { shards_acknowledged: true }