Browse Source

Remove pending tasks helpers from client API (#103451)

There's no need for special helpers in `AdminClient` for this transport
action, we only use it in a few integ tests.
David Turner 1 year ago
parent
commit
b3c1637e45
14 changed files with 53 additions and 94 deletions
  1. 3 3
      server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java
  2. 2 2
      server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
  3. 2 10
      server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java
  4. 0 22
      server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java
  5. 0 15
      server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java
  6. 0 14
      server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java
  7. 6 3
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java
  8. 7 4
      server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java
  9. 25 9
      test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
  10. 1 1
      test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java
  11. 2 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java
  12. 1 5
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java
  13. 3 3
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java
  14. 1 1
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java

+ 3 - 3
server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java

@@ -37,7 +37,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
         )) {
             try {
                 enableIndexBlock("test", blockSetting);
-                PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get();
+                PendingClusterTasksResponse response = getClusterPendingTasks();
                 assertNotNull(response.pendingTasks());
             } finally {
                 disableIndexBlock("test", blockSetting);
@@ -53,7 +53,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
 
         try {
             setClusterReadOnly(true);
-            PendingClusterTasksResponse response = clusterAdmin().preparePendingClusterTasks().get();
+            PendingClusterTasksResponse response = getClusterPendingTasks();
             assertNotNull(response.pendingTasks());
         } finally {
             setClusterReadOnly(false);
@@ -80,7 +80,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
             }
         });
 
-        assertNotNull(clusterAdmin().preparePendingClusterTasks().get().pendingTasks());
+        assertNotNull(getClusterPendingTasks().pendingTasks());
 
         // starting one more node allows the cluster to recover
         internalCluster().startNode();

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java

@@ -357,7 +357,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
         assertTrue(controlSources.isEmpty());
 
         controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
-        PendingClusterTasksResponse response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
+        PendingClusterTasksResponse response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient());
         assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
         assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
         assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
@@ -419,7 +419,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
             }
             assertTrue(controlSources.isEmpty());
 
-            response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
+            response = getClusterPendingTasks(internalCluster().coordOnlyNodeClient());
             assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
             controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
             for (PendingClusterTask task : response.pendingTasks()) {

+ 2 - 10
server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java

@@ -182,11 +182,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
                 .waitForNoRelocatingShards(true)
         ).actionGet();
         if (health.isTimedOut()) {
-            logger.info(
-                "cluster state:\n{}\n{}",
-                clusterAdmin().prepareState().get().getState(),
-                clusterAdmin().preparePendingClusterTasks().get()
-            );
+            logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
             assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false));
         }
         assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@@ -295,11 +291,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
 
         if (response.getStatus() != ClusterHealthStatus.RED) {
             logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed);
-            logger.info(
-                "cluster state:\n{}\n{}",
-                clusterAdmin().prepareState().get().getState(),
-                clusterAdmin().preparePendingClusterTasks().get()
-            );
+            logger.info("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
         }
         assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
         ClusterState state = clusterAdmin().prepareState().get().getState();

+ 0 - 22
server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java

@@ -1,22 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.action.admin.cluster.tasks;
-
-import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
-import org.elasticsearch.client.internal.ElasticsearchClient;
-
-public class PendingClusterTasksRequestBuilder extends MasterNodeReadOperationRequestBuilder<
-    PendingClusterTasksRequest,
-    PendingClusterTasksResponse,
-    PendingClusterTasksRequestBuilder> {
-
-    public PendingClusterTasksRequestBuilder(ElasticsearchClient client) {
-        super(client, TransportPendingClusterTasksAction.TYPE, new PendingClusterTasksRequest());
-    }
-}

+ 0 - 15
server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java

@@ -85,9 +85,6 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptReque
 import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
 import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
 import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
 import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
@@ -441,18 +438,6 @@ public interface ClusterAdminClient extends ElasticsearchClient {
      */
     RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot);
 
-    /**
-     * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
-     * that update the cluster state (for example, a create index operation)
-     */
-    void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener);
-
-    /**
-     * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
-     * that update the cluster state (for example, a create index operation)
-     */
-    PendingClusterTasksRequestBuilder preparePendingClusterTasks();
-
     /**
      * Get snapshot status.
      */

+ 0 - 14
server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java

@@ -118,10 +118,6 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptReque
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
 import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction;
 import org.elasticsearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
-import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
 import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
@@ -851,16 +847,6 @@ public abstract class AbstractClient implements Client {
             return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
         }
 
-        @Override
-        public PendingClusterTasksRequestBuilder preparePendingClusterTasks() {
-            return new PendingClusterTasksRequestBuilder(this);
-        }
-
-        @Override
-        public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener) {
-            execute(TransportPendingClusterTasksAction.TYPE, request, listener);
-        }
-
         @Override
         public void putRepository(PutRepositoryRequest request, ActionListener<AcknowledgedResponse> listener) {
             execute(TransportPutRepositoryAction.TYPE, request, listener);

+ 6 - 3
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.rest.action.admin.cluster;
 
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
+import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
@@ -39,8 +40,10 @@ public class RestPendingClusterTasksAction extends BaseRestHandler {
         PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
         pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
         pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
-        return channel -> client.admin()
-            .cluster()
-            .pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel));
+        return channel -> client.execute(
+            TransportPendingClusterTasksAction.TYPE,
+            pendingClusterTasksRequest,
+            new RestChunkedToXContentListener<>(channel)
+        );
     }
 }

+ 7 - 4
server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java

@@ -10,6 +10,7 @@ package org.elasticsearch.rest.action.cat;
 
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
+import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.service.PendingClusterTask;
 import org.elasticsearch.common.Table;
@@ -46,15 +47,17 @@ public class RestPendingClusterTasksAction extends AbstractCatAction {
         PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
         pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
         pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
-        return channel -> client.admin()
-            .cluster()
-            .pendingClusterTasks(pendingClusterTasksRequest, new RestResponseListener<PendingClusterTasksResponse>(channel) {
+        return channel -> client.execute(
+            TransportPendingClusterTasksAction.TYPE,
+            pendingClusterTasksRequest,
+            new RestResponseListener<>(channel) {
                 @Override
                 public RestResponse buildResponse(PendingClusterTasksResponse pendingClusterTasks) throws Exception {
                     Table tab = buildTable(request, pendingClusterTasks);
                     return RestTable.buildResponse(tab, channel);
                 }
-            });
+            }
+        );
     }
 
     @Override

+ 25 - 9
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
+import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
@@ -860,7 +862,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
             for (Client client : clients()) {
                 ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
                 assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
-                PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
+                PendingClusterTasksResponse pendingTasks = client.execute(
+                    TransportPendingClusterTasksAction.TYPE,
+                    new PendingClusterTasksRequest().local(true)
+                ).get();
                 assertThat(
                     "client " + client + " still has pending tasks " + pendingTasks,
                     pendingTasks.pendingTasks(),
@@ -977,8 +982,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
             try (var listeners = new RefCountingListener(detailsFuture)) {
                 clusterAdmin().prepareAllocationExplain().execute(listeners.acquire(allocationExplainRef::set));
                 clusterAdmin().prepareState().execute(listeners.acquire(clusterStateRef::set));
-                clusterAdmin().preparePendingClusterTasks().execute(listeners.acquire(pendingTasksRef::set));
-
+                client().execute(
+                    TransportPendingClusterTasksAction.TYPE,
+                    new PendingClusterTasksRequest(),
+                    listeners.acquire(pendingTasksRef::set)
+                );
                 try (var writer = new StringWriter()) {
                     new HotThreads().busiestThreads(9999).ignoreIdleThreads(false).detect(writer);
                     hotThreadsRef.set(writer.toString());
@@ -1040,7 +1048,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 "waitForRelocation timed out (status={}), cluster state:\n{}\n{}",
                 status,
                 clusterAdmin().prepareState().get().getState(),
-                clusterAdmin().preparePendingClusterTasks().get()
+                getClusterPendingTasks()
             );
             assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
         }
@@ -1050,6 +1058,18 @@ public abstract class ESIntegTestCase extends ESTestCase {
         return actionGet.getStatus();
     }
 
+    public static PendingClusterTasksResponse getClusterPendingTasks() {
+        return getClusterPendingTasks(client());
+    }
+
+    public static PendingClusterTasksResponse getClusterPendingTasks(Client client) {
+        try {
+            return client.execute(TransportPendingClusterTasksAction.TYPE, new PendingClusterTasksRequest()).get(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            return fail(e);
+        }
+    }
+
     /**
      * Waits until at least a give number of document is visible for searchers
      *
@@ -1146,11 +1166,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
      * Prints the current cluster state as debug logging.
      */
     public void logClusterState() {
-        logger.debug(
-            "cluster state:\n{}\n{}",
-            clusterAdmin().prepareState().get().getState(),
-            clusterAdmin().preparePendingClusterTasks().get()
-        );
+        logger.debug("cluster state:\n{}\n{}", clusterAdmin().prepareState().get().getState(), getClusterPendingTasks());
     }
 
     protected void ensureClusterSizeConsistency() {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java

@@ -421,7 +421,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
             logger.info(
                 "ensureGreen timed out, cluster state:\n{}\n{}",
                 clusterAdmin().prepareState().get().getState(),
-                clusterAdmin().preparePendingClusterTasks().get()
+                ESIntegTestCase.getClusterPendingTasks(client())
             );
             assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
         }

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -431,9 +431,9 @@ public abstract class CcrIntegTestCase extends ESTestCase {
                     {}""",
                 method,
                 leaderClient().admin().cluster().prepareState().get().getState(),
-                leaderClient().admin().cluster().preparePendingClusterTasks().get(),
+                ESIntegTestCase.getClusterPendingTasks(leaderClient()),
                 followerClient().admin().cluster().prepareState().get().getState(),
-                followerClient().admin().cluster().preparePendingClusterTasks().get()
+                ESIntegTestCase.getClusterPendingTasks(followerClient())
             );
             HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at timeout", ReferenceDocs.LOGGING);
             fail("timed out waiting for " + color + " state");

+ 1 - 5
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

@@ -126,11 +126,7 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
             }
         })).start();
 
-        waitUntil(
-            () -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty(),
-            60,
-            TimeUnit.SECONDS
-        );
+        waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
         ensureStableCluster(cluster.numDataAndMasterNodes());
 
         final String targetIndex = "downsample-5m-" + sourceIndex;

+ 3 - 3
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleClusterDisruptionIT.java

@@ -203,7 +203,7 @@ public class DownsampleClusterDisruptionIT extends ESIntegTestCase {
             }
         })).start();
         startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd);
-        waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
+        waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
         ensureStableCluster(cluster.numDataAndMasterNodes());
         assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs);
     }
@@ -265,7 +265,7 @@ public class DownsampleClusterDisruptionIT extends ESIntegTestCase {
         })).start();
 
         startDownsampleTaskDuringDisruption(sourceIndex, targetIndex, config, disruptionStart, disruptionEnd);
-        waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
+        waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
         ensureStableCluster(cluster.numDataAndMasterNodes());
         assertTargetIndex(cluster, sourceIndex, targetIndex, indexedDocs);
     }
@@ -354,7 +354,7 @@ public class DownsampleClusterDisruptionIT extends ESIntegTestCase {
         })).start();
 
         startDownsampleTaskDuringDisruption(sourceIndex, downsampleIndex, config, disruptionStart, disruptionEnd);
-        waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
+        waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty());
         ensureStableCluster(cluster.numDataAndMasterNodes());
         assertTargetIndex(cluster, sourceIndex, downsampleIndex, indexedDocs);
     }

+ 1 - 1
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java

@@ -194,7 +194,7 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
 
         final String targetIndex = "downsample-1h-" + sourceIndex;
         startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd);
-        waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty());
+        waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
         ensureStableCluster(cluster.numDataAndMasterNodes());
         assertTargetIndex(cluster, targetIndex, indexedDocs);
     }