Browse Source

Move monitoring collection timeouts to coordinator (#67084)

With #66993 there is now support for coordinator-side timeouts on a
`BroadcastRequest`, which includes requests for node stats and
recoveries. This commit adjusts Monitoring to use these coordinator-side
timeouts where applicable, which will prevent partial stats responses
from accumulating on the master while one or more nodes are not
responding quickly enough. It also enhances the message logged on a
timeout to include the IDs of the nodes which did not respond in time.

Closes #60188.
David Turner 4 năm trước cách đây
mục cha
commit
1d2462e691
15 tập tin đã thay đổi với 359 bổ sung27 xóa
  1. 8 1
      server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequestBuilder.java
  2. 1 1
      server/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequestBuilder.java
  3. 3 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java
  4. 1 1
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java
  5. 103 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/TimeoutUtils.java
  6. 4 4
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java
  7. 6 2
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java
  8. 7 2
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java
  9. 5 3
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java
  10. 6 2
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java
  11. 62 3
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java
  12. 49 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java
  13. 40 2
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java
  14. 33 2
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java
  15. 31 2
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java

+ 8 - 1
server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequestBuilder.java

@@ -19,10 +19,11 @@
 
 package org.elasticsearch.action.support.broadcast;
 
-import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.common.unit.TimeValue;
 
 public abstract class BroadcastOperationRequestBuilder<
             Request extends BroadcastRequest<Request>,
@@ -45,4 +46,10 @@ public abstract class BroadcastOperationRequestBuilder<
         request.indicesOptions(indicesOptions);
         return (RequestBuilder) this;
     }
+
+    @SuppressWarnings("unchecked")
+    public RequestBuilder setTimeout(TimeValue timeout) {
+        request.timeout(timeout);
+        return (RequestBuilder) this;
+    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/action/support/nodes/NodesOperationRequestBuilder.java

@@ -39,7 +39,7 @@ public abstract class NodesOperationRequestBuilder<Request extends BaseNodesRequ
     }
 
     @SuppressWarnings("unchecked")
-    public final RequestBuilder setTimeout(TimeValue timeout) {
+    public RequestBuilder setTimeout(TimeValue timeout) {
         request.timeout(timeout);
         return (RequestBuilder) this;
     }

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java

@@ -127,7 +127,9 @@ public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response>
                 return false;
             }
             Request other = (Request) obj;
-            return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoMatch, other.allowNoMatch);
+            return Objects.equals(jobId, other.jobId)
+                    && Objects.equals(allowNoMatch, other.allowNoMatch)
+                    && Objects.equals(getTimeout(), other.getTimeout());
         }
     }
 

+ 1 - 1
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java

@@ -84,7 +84,7 @@ public abstract class Collector {
                 return doCollect(convertNode(timestamp, clusterService.localNode()), interval, clusterState);
             }
         } catch (ElasticsearchTimeoutException e) {
-            logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name()));
+            logger.error("collector [{}] timed out when collecting data: {}", name(), e.getMessage());
         } catch (Exception e) {
             logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] failed to collect data", name()), e);
         }

+ 103 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/TimeoutUtils.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.action.support.broadcast.BroadcastResponse;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.action.support.tasks.BaseTasksResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.util.HashSet;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utilities for identifying timeouts in responses to collection requests, since we prefer to fail the whole collection attempt if any of
+ * the involved nodes times out.
+ */
+public final class TimeoutUtils {
+    private TimeoutUtils() {
+    }
+
+    /**
+     * @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
+     *                                       nodes that timed out and mentions {@code collectionTimeout}.
+     */
+    public static <T extends BaseNodeResponse> void ensureNoTimeouts(TimeValue collectionTimeout, BaseNodesResponse<T> response) {
+        HashSet<String> timedOutNodeIds = null;
+        for (FailedNodeException failedNodeException : response.failures()) {
+            if (isTimeoutFailure(failedNodeException)) {
+                if (timedOutNodeIds == null) {
+                    timedOutNodeIds = new HashSet<>();
+                }
+                timedOutNodeIds.add(failedNodeException.nodeId());
+            }
+        }
+        ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
+    }
+
+    /**
+     * @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
+     *                                       nodes that timed out and mentions {@code collectionTimeout}.
+     */
+    public static void ensureNoTimeouts(TimeValue collectionTimeout, BaseTasksResponse response) {
+        HashSet<String> timedOutNodeIds = null;
+        for (ElasticsearchException nodeFailure : response.getNodeFailures()) {
+            if (nodeFailure instanceof FailedNodeException) {
+                FailedNodeException failedNodeException = (FailedNodeException) nodeFailure;
+                if (isTimeoutFailure(failedNodeException)) {
+                    if (timedOutNodeIds == null) {
+                        timedOutNodeIds = new HashSet<>();
+                    }
+                    timedOutNodeIds.add(failedNodeException.nodeId());
+                }
+            }
+        }
+        ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
+    }
+
+    /**
+     * @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
+     *                                       nodes that timed out and mentions {@code collectionTimeout}.
+     */
+    public static void ensureNoTimeouts(TimeValue collectionTimeout, BroadcastResponse response) {
+        HashSet<String> timedOutNodeIds = null;
+        for (DefaultShardOperationFailedException shardFailure : response.getShardFailures()) {
+            final Throwable shardFailureCause = shardFailure.getCause();
+            if (shardFailureCause instanceof FailedNodeException) {
+                FailedNodeException failedNodeException = (FailedNodeException) shardFailureCause;
+                if (isTimeoutFailure(failedNodeException)) {
+                    if (timedOutNodeIds == null) {
+                        timedOutNodeIds = new HashSet<>();
+                    }
+                    timedOutNodeIds.add(failedNodeException.nodeId());
+                }
+            }
+        }
+        ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
+    }
+
+    private static boolean isTimeoutFailure(FailedNodeException failedNodeException) {
+        final Throwable cause = failedNodeException.getCause();
+        return cause instanceof ElasticsearchTimeoutException
+                || cause instanceof TimeoutException
+                || cause instanceof ReceiveTimeoutTransportException;
+    }
+
+    private static void ensureNoTimeouts(TimeValue collectionTimeout, HashSet<String> timedOutNodeIds) {
+        if (timedOutNodeIds != null) {
+            throw new ElasticsearchTimeoutException((timedOutNodeIds.size() == 1 ? "node " : "nodes ") + timedOutNodeIds +
+                    " did not respond within [" + collectionTimeout + "]");
+        }
+    }
+
+}

+ 4 - 4
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java

@@ -37,6 +37,7 @@ import java.util.Objects;
 
 import static org.elasticsearch.xpack.core.XPackSettings.SECURITY_ENABLED;
 import static org.elasticsearch.xpack.core.XPackSettings.TRANSPORT_SSL_ENABLED;
+import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;
 
 /**
  * Collector for cluster stats.
@@ -82,13 +83,12 @@ public class ClusterStatsCollector extends Collector {
     @Override
     protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
                                                   final long interval,
-                                                  final ClusterState clusterState) throws Exception {
-        final Supplier<ClusterStatsResponse> clusterStatsSupplier =
-                () -> client.admin().cluster().prepareClusterStats().get(getCollectionTimeout());
+                                                  final ClusterState clusterState) {
         final Supplier<List<XPackFeatureSet.Usage>> usageSupplier =
                 () -> new XPackUsageRequestBuilder(client).get().getUsages();
 
-        final ClusterStatsResponse clusterStats = clusterStatsSupplier.get();
+        final ClusterStatsResponse clusterStats = client.admin().cluster().prepareClusterStats().setTimeout(getCollectionTimeout()).get();
+        ensureNoTimeouts(getCollectionTimeout(), clusterStats);
 
         final String clusterName = clusterService.getClusterName().value();
         final String clusterUuid = clusterUuid(clusterState);

+ 6 - 2
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java

@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 import static org.elasticsearch.common.settings.Setting.boolSetting;
+import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;
 
 /**
  * Collector for the Recovery API.
@@ -64,13 +65,16 @@ public class IndexRecoveryCollector extends Collector {
     @Override
     protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
                                                   final long interval,
-                                                  final ClusterState clusterState) throws Exception {
+                                                  final ClusterState clusterState) {
         List<MonitoringDoc> results = new ArrayList<>(1);
         RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries()
                 .setIndices(getCollectionIndices())
                 .setIndicesOptions(IndicesOptions.lenientExpandOpen())
                 .setActiveOnly(getActiveRecoveriesOnly())
-                .get(getCollectionTimeout());
+                .setTimeout(getCollectionTimeout())
+                .get();
+
+        ensureNoTimeouts(getCollectionTimeout(), recoveryResponse);
 
         if (recoveryResponse.hasRecoveries()) {
             final String clusterUuid = clusterUuid(clusterState);

+ 7 - 2
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java

@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;
+
 /**
  * Collector for indices and singular index statistics.
  * <p>
@@ -54,7 +56,7 @@ public class IndexStatsCollector extends Collector {
     @Override
     protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
                                                   final long interval,
-                                                  final ClusterState clusterState) throws Exception {
+                                                  final ClusterState clusterState) {
         final List<MonitoringDoc> results = new ArrayList<>();
         final IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats()
                 .setIndices(getCollectionIndices())
@@ -71,7 +73,10 @@ public class IndexStatsCollector extends Collector {
                 .setQueryCache(true)
                 .setRequestCache(true)
                 .setBulk(true)
-                .get(getCollectionTimeout());
+                .setTimeout(getCollectionTimeout())
+                .get();
+
+        ensureNoTimeouts(getCollectionTimeout(), indicesStatsResponse);
 
         final long timestamp = timestamp();
         final String clusterUuid = clusterUuid(clusterState);

+ 5 - 3
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java

@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
+import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;
 
 /**
  * Collector for Machine Learning Job Stats.
@@ -71,9 +72,10 @@ public class JobStatsCollector extends Collector {
                                             final ClusterState clusterState) throws Exception {
         // fetch details about all jobs
         try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(MONITORING_ORIGIN)) {
-            final GetJobsStatsAction.Response jobs =
-                    client.execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(Metadata.ALL))
-                            .actionGet(getCollectionTimeout());
+            final GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(Metadata.ALL).setTimeout(getCollectionTimeout());
+            final GetJobsStatsAction.Response jobs = client.execute(GetJobsStatsAction.INSTANCE, request).actionGet();
+
+            ensureNoTimeouts(getCollectionTimeout(), jobs);
 
             final long timestamp = timestamp();
             final String clusterUuid = clusterUuid(clusterState);

+ 6 - 2
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java

@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 
+import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;
+
 /**
  * Collector for nodes statistics.
  * <p>
@@ -65,7 +67,7 @@ public class NodeStatsCollector extends Collector {
     @Override
     protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
                                                   final long interval,
-                                                  final ClusterState clusterState) throws Exception {
+                                                  final ClusterState clusterState) {
         NodesStatsRequest request = new NodesStatsRequest("_local");
         request.indices(FLAGS);
         request.addMetrics(
@@ -74,8 +76,10 @@ public class NodeStatsCollector extends Collector {
             NodesStatsRequest.Metric.PROCESS.metricName(),
             NodesStatsRequest.Metric.THREAD_POOL.metricName(),
             NodesStatsRequest.Metric.FS.metricName());
+        request.timeout(getCollectionTimeout());
 
-        final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet(getCollectionTimeout());
+        final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet();
+        ensureNoTimeouts(getCollectionTimeout(), response);
 
         // if there's a failure, then we failed to work with the
         // _local node (guaranteed a single exception)

+ 62 - 3
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollectorTests.java

@@ -5,8 +5,10 @@
  */
 package org.elasticsearch.xpack.monitoring.collector.cluster;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder;
@@ -37,6 +39,7 @@ import org.elasticsearch.xpack.monitoring.MonitoringTestUtils;
 import org.junit.Assert;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
 
@@ -189,7 +192,8 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
         when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);
 
         final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
-        when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
+        when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
+        when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);
 
         final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
         when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
@@ -280,7 +284,7 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
         {
             indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
             when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
-                .thenReturn(new Index[0]);
+                .thenReturn(Index.EMPTY_ARRAY);
         }
 
         final Client client = mock(Client.class);
@@ -296,7 +300,8 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
             when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);
 
             final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
-            when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
+            when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
+            when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);
 
             final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
             when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
@@ -325,4 +330,58 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
         final ClusterStatsMonitoringDoc doc = (ClusterStatsMonitoringDoc) results.iterator().next();
         assertThat(doc.getLicense(), nullValue());
     }
+
+    public void testDoCollectThrowsTimeoutException() throws Exception {
+        final TimeValue timeout;
+        {
+            final String clusterName = randomAlphaOfLength(10);
+            whenClusterStateWithName(clusterName);
+            final String clusterUUID = UUID.randomUUID().toString();
+            whenClusterStateWithUUID(clusterUUID);
+            timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+            withCollectionTimeout(ClusterStatsCollector.CLUSTER_STATS_TIMEOUT, timeout);
+        }
+        final IndexNameExpressionResolver indexNameExpressionResolver;
+        {
+            indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
+            when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
+                    .thenReturn(Index.EMPTY_ARRAY);
+        }
+
+        final Client client = mock(Client.class);
+        {
+            final ClusterStatsResponse mockClusterStatsResponse = mock(ClusterStatsResponse.class);
+            final ClusterHealthStatus clusterStatus = randomFrom(ClusterHealthStatus.values());
+            when(mockClusterStatsResponse.getStatus()).thenReturn(clusterStatus);
+            when(mockClusterStatsResponse.getNodesStats()).thenReturn(mock(ClusterStatsNodes.class));
+            when(mockClusterStatsResponse.failures()).thenReturn(List.of(new FailedNodeException("node", "msg",
+                    new ElasticsearchTimeoutException("timed out"))));
+
+            final ClusterStatsIndices mockClusterStatsIndices = mock(ClusterStatsIndices.class);
+
+            when(mockClusterStatsIndices.getIndexCount()).thenReturn(0);
+            when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);
+
+            final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
+            when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
+            when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);
+
+            final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
+            when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
+
+            final AdminClient adminClient = mock(AdminClient.class);
+            when(adminClient.cluster()).thenReturn(clusterAdminClient);
+            when(client.admin()).thenReturn(adminClient);
+        }
+
+        final long interval = randomNonNegativeLong();
+        final Settings.Builder settings = Settings.builder();
+        final MonitoringDoc.Node node = MonitoringTestUtils.randomMonitoringNode(random());
+
+        final ClusterStatsCollector collector =
+                new ClusterStatsCollector(settings.build(), clusterService, licenseState,
+                        client, licenseService, indexNameExpressionResolver);
+        expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
+    }
+
 }

+ 49 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollectorTests.java

@@ -5,9 +5,12 @@
  */
 package org.elasticsearch.xpack.monitoring.collector.indices;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ElasticsearchClient;
@@ -31,6 +34,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
 import static org.hamcrest.Matchers.equalTo;
@@ -109,7 +113,7 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase {
 
         final RecoveryRequestBuilder recoveryRequestBuilder =
                 spy(new RecoveryRequestBuilder(mock(ElasticsearchClient.class), RecoveryAction.INSTANCE));
-        doReturn(recoveryResponse).when(recoveryRequestBuilder).get(eq(timeout));
+        doReturn(recoveryResponse).when(recoveryRequestBuilder).get();
 
         final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
         when(indicesAdminClient.prepareRecoveries()).thenReturn(recoveryRequestBuilder);
@@ -140,6 +144,7 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase {
             verify(clusterState).metadata();
             verify(metadata).clusterUUID();
         }
+        verify(recoveryRequestBuilder).setTimeout(eq(timeout));
 
         if (nbRecoveries == 0) {
             assertEquals(0, results.size());
@@ -164,4 +169,47 @@ public class IndexRecoveryCollectorTests extends BaseCollectorTestCase {
             assertThat(recoveries.shardRecoveryStates().size(), equalTo(nbRecoveries));
         }
     }
+
+    public void testDoCollectThrowsTimeoutException() throws Exception {
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(IndexRecoveryCollector.INDEX_RECOVERY_TIMEOUT, timeout);
+
+        whenLocalNodeElectedMaster(true);
+
+        final String clusterName = randomAlphaOfLength(10);
+        whenClusterStateWithName(clusterName);
+
+        final String clusterUUID = UUID.randomUUID().toString();
+        whenClusterStateWithUUID(clusterUUID);
+
+        final DiscoveryNode localNode = localNode(randomAlphaOfLength(5));
+        when(clusterService.localNode()).thenReturn(localNode);
+
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+
+        final RecoveryResponse recoveryResponse =
+                new RecoveryResponse(randomInt(), randomInt(), randomInt(), emptyMap(), List.of(new DefaultShardOperationFailedException(
+                        "test", 0, new FailedNodeException(node.getUUID(), "msg", new ElasticsearchTimeoutException("test timeout")))));
+
+        final RecoveryRequestBuilder recoveryRequestBuilder =
+                spy(new RecoveryRequestBuilder(mock(ElasticsearchClient.class), RecoveryAction.INSTANCE));
+        doReturn(recoveryResponse).when(recoveryRequestBuilder).get();
+
+        final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
+        when(indicesAdminClient.prepareRecoveries()).thenReturn(recoveryRequestBuilder);
+
+        final AdminClient adminClient = mock(AdminClient.class);
+        when(adminClient.indices()).thenReturn(indicesAdminClient);
+
+        final Client client = mock(Client.class);
+        when(client.admin()).thenReturn(adminClient);
+
+        final IndexRecoveryCollector collector = new IndexRecoveryCollector(clusterService, licenseState, client);
+        assertEquals(timeout, collector.getCollectionTimeout());
+
+        final long interval = randomNonNegativeLong();
+
+        expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
+    }
+
 }

+ 40 - 2
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollectorTests.java

@@ -5,10 +5,13 @@
  */
 package org.elasticsearch.xpack.monitoring.collector.indices;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ElasticsearchClient;
@@ -33,7 +36,6 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -109,13 +111,14 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
                 }
             }
         }
+        when(indicesStatsResponse.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[0]);
 
         final String[] indexNames = indicesMetadata.keySet().toArray(new String[0]);
         when(metadata.getConcreteAllIndices()).thenReturn(indexNames);
 
         final IndicesStatsRequestBuilder indicesStatsRequestBuilder =
                 spy(new IndicesStatsRequestBuilder(mock(ElasticsearchClient.class), IndicesStatsAction.INSTANCE));
-        doReturn(indicesStatsResponse).when(indicesStatsRequestBuilder).get(eq(timeout));
+        doReturn(indicesStatsResponse).when(indicesStatsRequestBuilder).get();
 
         final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
         when(indicesAdminClient.prepareStats()).thenReturn(indicesStatsRequestBuilder);
@@ -133,6 +136,7 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
 
         final Collection<MonitoringDoc> results = collector.doCollect(node, interval, clusterState);
         verify(indicesAdminClient).prepareStats();
+        verify(indicesStatsRequestBuilder).setTimeout(timeout);
 
         verify(indicesStatsResponse, times(existingIndices + deletedIndices)).getIndex(anyString());
         verify(metadata, times(existingIndices)).index(anyString());
@@ -166,4 +170,38 @@ public class IndexStatsCollectorTests extends BaseCollectorTestCase {
             }
         }
     }
+
+    public void testDoCollectThrowsTimeoutException() throws Exception {
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(IndexStatsCollector.INDEX_STATS_TIMEOUT, timeout);
+
+        whenLocalNodeElectedMaster(true);
+
+        final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+
+        when(indicesStatsResponse.getShardFailures()).thenReturn(new DefaultShardOperationFailedException[] {
+                new DefaultShardOperationFailedException("test", 0,
+                        new FailedNodeException(node.getUUID(), "msg", new ElasticsearchTimeoutException("test timeout")))
+        });
+
+        final IndicesStatsRequestBuilder indicesStatsRequestBuilder =
+                spy(new IndicesStatsRequestBuilder(mock(ElasticsearchClient.class), IndicesStatsAction.INSTANCE));
+        doReturn(indicesStatsResponse).when(indicesStatsRequestBuilder).get();
+
+        final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
+        when(indicesAdminClient.prepareStats()).thenReturn(indicesStatsRequestBuilder);
+
+        final AdminClient adminClient = mock(AdminClient.class);
+        when(adminClient.indices()).thenReturn(indicesAdminClient);
+
+        final Client client = mock(Client.class);
+        when(client.admin()).thenReturn(adminClient);
+
+        final IndexStatsCollector collector = new IndexStatsCollector(clusterService, licenseState, client);
+        final long interval = randomNonNegativeLong();
+
+        expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
+    }
+
 }

+ 33 - 2
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollectorTests.java

@@ -5,7 +5,9 @@
  */
 package org.elasticsearch.xpack.monitoring.collector.ml;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.settings.Settings;
@@ -113,8 +115,8 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
         final ActionFuture<Response> future = (ActionFuture<Response>)mock(ActionFuture.class);
         final Response response = new Response(new QueryPage<>(jobStats, jobStats.size(), Job.RESULTS_FIELD));
 
-        when(client.execute(eq(GetJobsStatsAction.INSTANCE), eq(new Request(Metadata.ALL)))).thenReturn(future);
-        when(future.actionGet(timeout)).thenReturn(response);
+        when(client.execute(eq(GetJobsStatsAction.INSTANCE), eq(new Request(Metadata.ALL).setTimeout(timeout)))).thenReturn(future);
+        when(future.actionGet()).thenReturn(response);
 
         final long interval = randomNonNegativeLong();
 
@@ -140,6 +142,35 @@ public class JobStatsCollectorTests extends BaseCollectorTestCase {
         }
     }
 
+    public void testDoCollectThrowsTimeoutException() throws Exception {
+        final String clusterUuid = randomAlphaOfLength(5);
+        whenClusterStateWithUUID(clusterUuid);
+
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+        final Client client = mock(Client.class);
+        final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(JobStatsCollector.JOB_STATS_TIMEOUT, timeout);
+
+        final JobStatsCollector collector = new JobStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
+        assertEquals(timeout, collector.getCollectionTimeout());
+
+        final List<JobStats> jobStats = mockJobStats();
+
+        @SuppressWarnings("unchecked")
+        final ActionFuture<Response> future = (ActionFuture<Response>)mock(ActionFuture.class);
+        final Response response = new Response(List.of(), List.of(new FailedNodeException("node", "msg",
+                new ElasticsearchTimeoutException("test timeout"))), new QueryPage<>(jobStats, jobStats.size(), Job.RESULTS_FIELD));
+
+        when(client.execute(eq(GetJobsStatsAction.INSTANCE), eq(new Request(Metadata.ALL).setTimeout(timeout)))).thenReturn(future);
+        when(future.actionGet()).thenReturn(response);
+
+        final long interval = randomNonNegativeLong();
+
+        expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
+    }
+
     private List<JobStats> mockJobStats() {
         final int jobs = randomIntBetween(1, 5);
         final List<JobStats> jobStats = new ArrayList<>(jobs);

+ 31 - 2
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollectorTests.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.monitoring.collector.node;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -21,6 +22,7 @@ import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
@@ -29,7 +31,6 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -118,10 +119,38 @@ public class NodeStatsCollectorTests extends BaseCollectorTestCase {
         assertThat(document.isMlockall(), equalTo(BootstrapInfo.isMemoryLocked()));
     }
 
+    public void testDoCollectThrowsTimeout() throws Exception {
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(NodeStatsCollector.NODE_STATS_TIMEOUT, timeout);
+
+        final boolean isMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isMaster);
+
+        final String clusterUUID = UUID.randomUUID().toString();
+        whenClusterStateWithUUID(clusterUUID);
+
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+
+        final NodesStatsResponse nodesStatsResponse = mock(NodesStatsResponse.class);
+        when(nodesStatsResponse.hasFailures()).thenReturn(true);
+        when(nodesStatsResponse.failures()).thenReturn(List.of(new FailedNodeException("node", "msg",
+                new ElasticsearchTimeoutException("test"))));
+
+        final Client client = mock(Client.class);
+        thenReturnNodeStats(client, timeout, nodesStatsResponse);
+
+        final NodeStatsCollector collector = new NodeStatsCollector(clusterService, licenseState, client);
+        assertEquals(timeout, collector.getCollectionTimeout());
+
+        final long interval = randomNonNegativeLong();
+
+        expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
+    }
+
     private void thenReturnNodeStats(final Client client, final TimeValue timeout, final NodesStatsResponse nodesStatsResponse) {
         @SuppressWarnings("unchecked")
         final ActionFuture<NodesStatsResponse> future = (ActionFuture<NodesStatsResponse>) mock(ActionFuture.class);
-        when(future.actionGet(eq(timeout))).thenReturn(nodesStatsResponse);
+        when(future.actionGet()).thenReturn(nodesStatsResponse);
 
         final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
         when(clusterAdminClient.nodesStats(any(NodesStatsRequest.class))).thenReturn(future);