Browse Source

Remove trappy timeout from `ClusterSearchShardsRequest` (#111442)

Exposes the `?master_timeout` parameter to the REST API and sets it
appropriately on internal/test requests.

Relates #107984
David Turner 1 year ago
parent
commit
586405d11f
18 changed files with 90 additions and 131 deletions
  1. 1 0
      docs/reference/search/search-shards.asciidoc
  2. 9 2
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  3. 9 6
      modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java
  4. 4 0
      rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json
  5. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java
  6. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/action/termvectors/GetTermVectorsIT.java
  7. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java
  8. 12 8
      server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterSearchShardsIT.java
  9. 3 9
      server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java
  10. 0 66
      server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java
  11. 5 3
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  12. 26 9
      server/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java
  13. 0 13
      server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java
  14. 2 1
      server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
  15. 2 5
      server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java
  16. 11 3
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java
  17. 2 2
      server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java
  18. 1 1
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java

+ 1 - 0
docs/reference/search/search-shards.asciidoc

@@ -63,6 +63,7 @@ include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=preference]
 
 include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=routing]
 
+include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
 
 [[search-shards-api-example]]
 ==== {api-examples-title}

+ 9 - 2
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -585,7 +585,9 @@ public class DataStreamIT extends ESIntegTestCase {
         verifyResolvability(dataStreamName, indicesAdmin().prepareOpen(dataStreamName), false);
         verifyResolvability(dataStreamName, indicesAdmin().prepareClose(dataStreamName), true);
         verifyResolvability(aliasToDataStream, indicesAdmin().prepareClose(aliasToDataStream), true);
-        verifyResolvability(client().execute(TransportClusterSearchShardsAction.TYPE, new ClusterSearchShardsRequest(dataStreamName)));
+        verifyResolvability(
+            client().execute(TransportClusterSearchShardsAction.TYPE, new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, dataStreamName))
+        );
         verifyResolvability(client().execute(TransportIndicesShardStoresAction.TYPE, new IndicesShardStoresRequest(dataStreamName)));
 
         request = new CreateDataStreamAction.Request("logs-barbaz");
@@ -629,7 +631,12 @@ public class DataStreamIT extends ESIntegTestCase {
         verifyResolvability(wildcardExpression, indicesAdmin().prepareGetIndex().addIndices(wildcardExpression), false);
         verifyResolvability(wildcardExpression, indicesAdmin().prepareOpen(wildcardExpression), false);
         verifyResolvability(wildcardExpression, indicesAdmin().prepareClose(wildcardExpression), false);
-        verifyResolvability(client().execute(TransportClusterSearchShardsAction.TYPE, new ClusterSearchShardsRequest(wildcardExpression)));
+        verifyResolvability(
+            client().execute(
+                TransportClusterSearchShardsAction.TYPE,
+                new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, wildcardExpression)
+            )
+        );
         verifyResolvability(client().execute(TransportIndicesShardStoresAction.TYPE, new IndicesShardStoresRequest(wildcardExpression)));
     }
 

+ 9 - 6
modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -114,12 +115,14 @@ class BulkByScrollParallelizationHelper {
     ) {
         int configuredSlices = request.getSlices();
         if (configuredSlices == AbstractBulkByScrollRequest.AUTO_SLICES) {
-            ClusterSearchShardsRequest shardsRequest = new ClusterSearchShardsRequest();
-            shardsRequest.indices(request.getSearchRequest().indices());
-            client.admin().cluster().searchShards(shardsRequest, listener.safeMap(response -> {
-                setWorkerCount(request, task, countSlicesBasedOnShards(response));
-                return null;
-            }));
+            client.execute(
+                TransportClusterSearchShardsAction.TYPE,
+                new ClusterSearchShardsRequest(request.getTimeout(), request.getSearchRequest().indices()),
+                listener.safeMap(response -> {
+                    setWorkerCount(request, task, countSlicesBasedOnShards(response));
+                    return null;
+                })
+            );
         } else {
             setWorkerCount(request, task, configuredSlices);
             listener.onResponse(null);

+ 4 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json

@@ -65,6 +65,10 @@
         ],
         "default":"open",
         "description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Explicit operation timeout for connection to master node"
       }
     }
   }

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/action/search/SearchProgressActionListenerIT.java

@@ -202,7 +202,7 @@ public class SearchProgressActionListenerIT extends ESSingleNodeTestCase {
         ClusterSearchShardsResponse resp = safeExecute(
             client,
             TransportClusterSearchShardsAction.TYPE,
-            new ClusterSearchShardsRequest("index-*")
+            new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "index-*")
         );
         return Arrays.stream(resp.getGroups()).map(e -> new SearchShard(null, e.getShardId())).sorted().toList();
     }

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/action/termvectors/GetTermVectorsIT.java

@@ -1017,7 +1017,7 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
         // Get search shards
         ClusterSearchShardsResponse searchShardsResponse = safeExecute(
             TransportClusterSearchShardsAction.TYPE,
-            new ClusterSearchShardsRequest("test")
+            new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test")
         );
         List<Integer> shardIds = Arrays.stream(searchShardsResponse.getGroups()).map(s -> s.getShardId().id()).toList();
 

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java

@@ -550,7 +550,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase {
             }
             // search-shards API
             for (int i = 0; i < 10; i++) {
-                final var search = new ClusterSearchShardsRequest(INDEX_NAME);
+                final var search = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, INDEX_NAME);
                 switch (randomIntBetween(0, 2)) {
                     case 0 -> search.routing(randomAlphaOfLength(10));
                     case 1 -> search.routing(randomSearchPreference(routingTableWatcher.numShards, internalCluster().getNodeNames()));

+ 12 - 8
server/src/internalClusterTest/java/org/elasticsearch/cluster/shards/ClusterSearchShardsIT.java

@@ -45,7 +45,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
     public void testSingleShardAllocation() {
         indicesAdmin().prepareCreate("test").setSettings(indexSettings(1, 0).put("index.routing.allocation.include.tag", "A")).get();
         ensureGreen();
-        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest("test"));
+        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test"));
         assertThat(response.getGroups().length, equalTo(1));
         assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
         assertThat(response.getGroups()[0].getShardId().getId(), equalTo(0));
@@ -53,7 +53,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
         assertThat(response.getNodes().length, equalTo(1));
         assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
 
-        response = safeExecute(new ClusterSearchShardsRequest("test").routing("A"));
+        response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test").routing("A"));
         assertThat(response.getGroups().length, equalTo(1));
         assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
         assertThat(response.getGroups()[0].getShardId().getId(), equalTo(0));
@@ -67,16 +67,16 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
         indicesAdmin().prepareCreate("test").setSettings(indexSettings(4, 0).put("index.routing.allocation.include.tag", "A")).get();
         ensureGreen();
 
-        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest("test"));
+        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test"));
         assertThat(response.getGroups().length, equalTo(4));
         assertThat(response.getGroups()[0].getShardId().getIndexName(), equalTo("test"));
         assertThat(response.getNodes().length, equalTo(1));
         assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
 
-        response = safeExecute(new ClusterSearchShardsRequest("test").routing("ABC"));
+        response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test").routing("ABC"));
         assertThat(response.getGroups().length, equalTo(1));
 
-        response = safeExecute(new ClusterSearchShardsRequest("test").preference("_shards:2"));
+        response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test").preference("_shards:2"));
         assertThat(response.getGroups().length, equalTo(1));
         assertThat(response.getGroups()[0].getShardId().getId(), equalTo(2));
     }
@@ -90,7 +90,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
             .get();
         clusterAdmin().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get();
 
-        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest("routing_alias"));
+        ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "routing_alias"));
         assertThat(response.getGroups().length, equalTo(2));
         assertThat(response.getGroups()[0].getShards().length, equalTo(2));
         assertThat(response.getGroups()[1].getShards().length, equalTo(2));
@@ -132,7 +132,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
         )) {
             try {
                 enableIndexBlock("test-blocks", blockSetting);
-                ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest("test-blocks"));
+                ClusterSearchShardsResponse response = safeExecute(new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test-blocks"));
                 assertThat(response.getGroups().length, equalTo(numShards.numPrimaries));
             } finally {
                 disableIndexBlock("test-blocks", blockSetting);
@@ -149,7 +149,11 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
                     ExceptionsHelper.unwrapCause(
                         safeAwaitFailure(
                             ClusterSearchShardsResponse.class,
-                            l -> client().execute(TransportClusterSearchShardsAction.TYPE, new ClusterSearchShardsRequest("test-blocks"), l)
+                            l -> client().execute(
+                                TransportClusterSearchShardsAction.TYPE,
+                                new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, "test-blocks"),
+                                l
+                            )
                         )
                     )
                 )

+ 3 - 9
server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -31,22 +32,16 @@ public final class ClusterSearchShardsRequest extends MasterNodeReadRequest<Clus
     private String preference;
     private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
 
-    public ClusterSearchShardsRequest() {
-        super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
-    }
-
-    public ClusterSearchShardsRequest(String... indices) {
-        super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
+    public ClusterSearchShardsRequest(TimeValue masterNodeTimeout, String... indices) {
+        super(masterNodeTimeout);
         indices(indices);
     }
 
     public ClusterSearchShardsRequest(StreamInput in) throws IOException {
         super(in);
         indices = in.readStringArray();
-
         routing = in.readOptionalString();
         preference = in.readOptionalString();
-
         indicesOptions = IndicesOptions.readIndicesOptions(in);
     }
 
@@ -56,7 +51,6 @@ public final class ClusterSearchShardsRequest extends MasterNodeReadRequest<Clus
         out.writeStringArray(indices);
         out.writeOptionalString(routing);
         out.writeOptionalString(preference);
-
         indicesOptions.writeIndicesOptions(out);
     }
 

+ 0 - 66
server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java

@@ -1,66 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.action.admin.cluster.shards;
-
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
-import org.elasticsearch.client.internal.ElasticsearchClient;
-
-public class ClusterSearchShardsRequestBuilder extends MasterNodeReadOperationRequestBuilder<
-    ClusterSearchShardsRequest,
-    ClusterSearchShardsResponse,
-    ClusterSearchShardsRequestBuilder> {
-
-    public ClusterSearchShardsRequestBuilder(ElasticsearchClient client) {
-        super(client, TransportClusterSearchShardsAction.TYPE, new ClusterSearchShardsRequest());
-    }
-
-    /**
-     * Sets the indices the search will be executed on.
-     */
-    public ClusterSearchShardsRequestBuilder setIndices(String... indices) {
-        request.indices(indices);
-        return this;
-    }
-
-    /**
-     * A comma separated list of routing values to control the shards the search will be executed on.
-     */
-    public ClusterSearchShardsRequestBuilder setRouting(String routing) {
-        request.routing(routing);
-        return this;
-    }
-
-    /**
-     * The routing values to control the shards that the search will be executed on.
-     */
-    public ClusterSearchShardsRequestBuilder setRouting(String... routing) {
-        request.routing(routing);
-        return this;
-    }
-
-    /**
-     * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
-     * {@code _local} to prefer local shards or a custom value, which guarantees that the same order
-     * will be used across different requests.
-     */
-    public ClusterSearchShardsRequestBuilder setPreference(String preference) {
-        request.preference(preference);
-        return this;
-    }
-
-    /**
-     * Specifies what type of requested indices to ignore and how to deal indices wildcard expressions.
-     * For example indices that don't exist.
-     */
-    public ClusterSearchShardsRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
-        request().indicesOptions(indicesOptions);
-        return this;
-    }
-}

+ 5 - 3
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShard
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -851,9 +852,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         );
                     } else {
                         // does not do a can-match
-                        ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(
-                            indicesOptions
-                        ).local(true).preference(preference).routing(routing);
+                        ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
+                            MasterNodeRequest.infiniteMasterNodeTimeout(connection.getTransportVersion()),
+                            indices
+                        ).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
                         transportService.sendRequest(
                             connection,
                             TransportClusterSearchShardsAction.TYPE.name(),

+ 26 - 9
server/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.action.support.master;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -32,7 +33,8 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
      * timeout.
      * <p>
      * For internally-generated requests, choose an appropriate timeout. Often this will be {@link TimeValue#MAX_VALUE} (or {@link
-     * TimeValue#MINUS_ONE} which means an infinite timeout in 8.15.0 onwards) since usually we want internal requests to wait for as long
+     * TimeValue#MINUS_ONE} which means an infinite timeout in 8.14.0 onwards (see <a
+     * href="https://github.com/elastic/elasticsearch/pull/107050">#107050</a>) since usually we want internal requests to wait for as long
      * as necessary to complete.
      *
      * @deprecated all requests should specify a timeout, see <a href="https://github.com/elastic/elasticsearch/issues/107984">#107984</a>.
@@ -51,15 +53,20 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
 
     /**
      * @param masterNodeTimeout Specifies how long to wait when the master has not been discovered yet, or is disconnected, or is busy
-     *                          processing other tasks. The value {@link TimeValue#MINUS_ONE} means to wait forever in 8.15.0 onwards.
-     *                          <p>
-     *                          For requests which originate in the REST layer, use {@link
-     *                          org.elasticsearch.rest.RestUtils#getMasterNodeTimeout} to determine the timeout.
-     *                          <p>
-     *                          For internally-generated requests, choose an appropriate timeout. Often this will be {@link
-     *                          TimeValue#MAX_VALUE} (or {@link TimeValue#MINUS_ONE} which means an infinite timeout in 8.15.0 onwards)
-     *                          since usually we want internal requests to wait for as long as necessary to complete.
+     *                          processing other tasks:
+     *                          <ul>
+     *                          <li>
+     *                              For requests which originate in the REST layer, use
+     *                              {@link org.elasticsearch.rest.RestUtils#getMasterNodeTimeout} to determine the timeout.
+     *                          </li>
+     *                          <li>
+     *                              For internally-generated requests, choose an appropriate timeout. Often this will be an infinite
+     *                              timeout, see {@link #infiniteMasterNodeTimeout}, since it is reasonable to wait for as long as necessary
+     *                              for internal requests to complete.
+     *                          </li>
+     *                          </ul>
      */
+    // TODO forbid TimeValue#MAX_VALUE once support for version prior to 8.14 dropped
     protected MasterNodeRequest(TimeValue masterNodeTimeout) {
         this.masterNodeTimeout = Objects.requireNonNull(masterNodeTimeout);
         this.masterTerm = 0L;
@@ -127,4 +134,14 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
     public final long masterTerm() {
         return masterTerm;
     }
+
+    /**
+     * @return a {@link TimeValue} which represents an infinite master-node timeout, suitable for sending using the given transport version.
+     *         Versions prior to 8.14 did not reliably support {@link TimeValue#MINUS_ONE} for this purpose so for these versions we use
+     *         {@link TimeValue#MAX_VALUE} as the best available alternative.
+     * @see <a href="https://github.com/elastic/elasticsearch/pull/107050">#107050</a>
+     */
+    public static TimeValue infiniteMasterNodeTimeout(TransportVersion transportVersion) {
+        return transportVersion.onOrAfter(TransportVersions.V_8_14_0) ? TimeValue.MINUS_ONE : TimeValue.MAX_VALUE;
+    }
 }

+ 0 - 13
server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java

@@ -64,10 +64,6 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsActi
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
-import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
-import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
-import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
-import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
 import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
@@ -286,15 +282,6 @@ public class ClusterAdminClient implements ElasticsearchClient {
         return new CancelTasksRequestBuilder(this).setNodesIds(nodesIds);
     }
 
-    public void searchShards(final ClusterSearchShardsRequest request, final ActionListener<ClusterSearchShardsResponse> listener) {
-        execute(TransportClusterSearchShardsAction.TYPE, request, listener);
-    }
-
-    @Deprecated(forRemoval = true) // temporary compatibility shim
-    public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices) {
-        return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
-    }
-
     public void putRepository(PutRepositoryRequest request, ActionListener<AcknowledgedResponse> listener) {
         execute(TransportPutRepositoryAction.TYPE, request, listener);
     }

+ 2 - 1
server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -24,6 +24,7 @@ import org.elasticsearch.tasks.TaskId;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.core.TimeValue.timeValueMillis;
@@ -248,7 +249,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
      * Timeout to wait for the shards on to be available for each bulk request?
      */
     public Self setTimeout(TimeValue timeout) {
-        this.timeout = timeout;
+        this.timeout = Objects.requireNonNull(timeout);
         return self();
     }
 

+ 2 - 5
server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java

@@ -18,12 +18,12 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.Version;
-import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.shard.TransportGetShardSnapshotAction;
 import org.elasticsearch.action.support.ThreadedActionListener;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -32,7 +32,6 @@ import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
 import org.elasticsearch.index.store.StoreFileMetadata;
@@ -94,9 +93,7 @@ public class ShardSnapshotsService {
         logger.debug("Searching for peer recovery compatible snapshots in [{}]", repositories);
 
         GetShardSnapshotRequest request = GetShardSnapshotRequest.latestSnapshotInRepositories(
-            clusterService.state().getMinTransportVersion().onOrAfter(TransportVersions.SNAPSHOT_REQUEST_TIMEOUTS)
-                ? TimeValue.MINUS_ONE
-                : TimeValue.MAX_VALUE,
+            MasterNodeRequest.infiniteMasterNodeTimeout(clusterService.state().getMinTransportVersion()),
             shardId,
             repositories
         );

+ 11 - 3
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java

@@ -9,11 +9,13 @@
 package org.elasticsearch.rest.action.admin.cluster;
 
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
+import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.action.RestToXContentListener;
 
 import java.io.IOException;
@@ -41,12 +43,18 @@ public class RestClusterSearchShardsAction extends BaseRestHandler {
 
     @Override
     public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
-        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
-        final ClusterSearchShardsRequest clusterSearchShardsRequest = new ClusterSearchShardsRequest(indices);
+        final ClusterSearchShardsRequest clusterSearchShardsRequest = new ClusterSearchShardsRequest(
+            RestUtils.getMasterNodeTimeout(request),
+            Strings.splitStringByCommaToArray(request.param("index"))
+        );
         clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
         clusterSearchShardsRequest.routing(request.param("routing"));
         clusterSearchShardsRequest.preference(request.param("preference"));
         clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
-        return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel));
+        return channel -> client.execute(
+            TransportClusterSearchShardsAction.TYPE,
+            clusterSearchShardsRequest,
+            new RestToXContentListener<>(channel)
+        );
     }
 }

+ 2 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsRequestTests.java

@@ -19,7 +19,7 @@ import org.elasticsearch.test.TransportVersionUtils;
 public class ClusterSearchShardsRequestTests extends ESTestCase {
 
     public void testSerialization() throws Exception {
-        ClusterSearchShardsRequest request = new ClusterSearchShardsRequest();
+        ClusterSearchShardsRequest request = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT);
         if (randomBoolean()) {
             int numIndices = randomIntBetween(1, 5);
             String[] indices = new String[numIndices];
@@ -66,7 +66,7 @@ public class ClusterSearchShardsRequestTests extends ESTestCase {
     }
 
     public void testIndicesMustNotBeNull() {
-        ClusterSearchShardsRequest request = new ClusterSearchShardsRequest();
+        ClusterSearchShardsRequest request = new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT);
         assertNotNull(request.indices());
         expectThrows(NullPointerException.class, () -> request.indices((String[]) null));
         expectThrows(NullPointerException.class, () -> request.indices((String) null));

+ 1 - 1
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java

@@ -329,7 +329,7 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
                 final String candidateNode = safeExecute(
                     cluster.client(clientNode),
                     TransportClusterSearchShardsAction.TYPE,
-                    new ClusterSearchShardsRequest(sourceIndex)
+                    new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, sourceIndex)
                 ).getNodes()[0].getName();
                 logger.info("Candidate node [" + candidateNode + "]");
                 disruption.accept(candidateNode);