|
@@ -24,9 +24,11 @@ import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.DocWriteResponse;
|
|
|
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
|
|
+import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
|
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
@@ -53,6 +55,8 @@ import org.elasticsearch.action.search.TransportSearchAction;
|
|
|
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
|
|
import org.elasticsearch.action.support.DestructiveOperations;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
+import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
+import org.elasticsearch.action.support.RefCountingListener;
|
|
|
import org.elasticsearch.client.RestClient;
|
|
|
import org.elasticsearch.client.RestClientBuilder;
|
|
|
import org.elasticsearch.client.internal.AdminClient;
|
|
@@ -177,6 +181,7 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
@@ -961,32 +966,54 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|
|
// been removed by the master so that the health check applies to the set of nodes we expect to be part of the cluster.
|
|
|
.waitForNodes(Integer.toString(cluster().size()));
|
|
|
|
|
|
- ClusterHealthResponse actionGet = clusterAdmin().health(healthRequest).actionGet();
|
|
|
- if (actionGet.isTimedOut()) {
|
|
|
- final String hotThreads = clusterAdmin().prepareNodesHotThreads()
|
|
|
- .setThreads(99999)
|
|
|
- .setIgnoreIdleThreads(false)
|
|
|
- .get()
|
|
|
- .getNodes()
|
|
|
- .stream()
|
|
|
- .map(NodeHotThreads::getHotThreads)
|
|
|
- .collect(Collectors.joining("\n"));
|
|
|
+ final ClusterHealthResponse clusterHealthResponse = clusterAdmin().health(healthRequest).actionGet();
|
|
|
+ if (clusterHealthResponse.isTimedOut()) {
|
|
|
+ final var allocationExplainRef = new AtomicReference<ClusterAllocationExplainResponse>();
|
|
|
+ final var clusterStateRef = new AtomicReference<ClusterStateResponse>();
|
|
|
+ final var pendingTasksRef = new AtomicReference<PendingClusterTasksResponse>();
|
|
|
+ final var hotThreadsRef = new AtomicReference<NodesHotThreadsResponse>();
|
|
|
+
|
|
|
+ final var detailsFuture = new PlainActionFuture<Void>();
|
|
|
+ try (var listeners = new RefCountingListener(detailsFuture)) {
|
|
|
+ clusterAdmin().prepareAllocationExplain().execute(listeners.acquire(allocationExplainRef::set));
|
|
|
+ clusterAdmin().prepareState().execute(listeners.acquire(clusterStateRef::set));
|
|
|
+ clusterAdmin().preparePendingClusterTasks().execute(listeners.acquire(pendingTasksRef::set));
|
|
|
+ clusterAdmin().prepareNodesHotThreads()
|
|
|
+ .setThreads(9999)
|
|
|
+ .setIgnoreIdleThreads(false)
|
|
|
+ .execute(listeners.acquire(hotThreadsRef::set));
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ detailsFuture.get(60, TimeUnit.SECONDS);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("failed to get full debug details within 60s timeout", e);
|
|
|
+ }
|
|
|
+
|
|
|
logger.info(
|
|
|
- "{} timed out, cluster state:\n{}\npending tasks:\n{}\nhot threads:\n{}\n",
|
|
|
+ "{} timed out\nallocation explain:\n{}\ncluster state:\n{}\npending tasks:\n{}\nhot threads:\n{}\n",
|
|
|
method,
|
|
|
- clusterAdmin().prepareState().get().getState(),
|
|
|
- clusterAdmin().preparePendingClusterTasks().get(),
|
|
|
- hotThreads
|
|
|
+ safeFormat(allocationExplainRef.get(), r -> Strings.toString(r.getExplanation(), true, true)),
|
|
|
+ safeFormat(clusterStateRef.get(), r -> r.getState().toString()),
|
|
|
+ safeFormat(pendingTasksRef.get(), r -> Strings.toString(r, true, true)),
|
|
|
+ safeFormat(
|
|
|
+ hotThreadsRef.get(),
|
|
|
+ r -> r.getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n"))
|
|
|
+ )
|
|
|
);
|
|
|
fail("timed out waiting for " + color + " state");
|
|
|
}
|
|
|
assertThat(
|
|
|
- "Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
|
|
|
- actionGet.getStatus().value(),
|
|
|
+ "Expected at least " + clusterHealthStatus + " but got " + clusterHealthResponse.getStatus(),
|
|
|
+ clusterHealthResponse.getStatus().value(),
|
|
|
lessThanOrEqualTo(clusterHealthStatus.value())
|
|
|
);
|
|
|
logger.debug("indices {} are {}", indices.length == 0 ? "[_all]" : indices, color);
|
|
|
- return actionGet.getStatus();
|
|
|
+ return clusterHealthResponse.getStatus();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static <T> String safeFormat(@Nullable T value, Function<T, String> formatter) {
|
|
|
+ return value == null ? null : formatter.apply(value);
|
|
|
}
|
|
|
|
|
|
/**
|