|
@@ -9,13 +9,14 @@
|
|
|
package org.elasticsearch.health;
|
|
|
|
|
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
|
|
+import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.health.node.DiskHealthInfo;
|
|
|
-import org.elasticsearch.health.node.HealthInfoCache;
|
|
|
+import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
|
|
|
import org.elasticsearch.health.node.LocalHealthMonitor;
|
|
|
import org.elasticsearch.health.node.selection.HealthNode;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
@@ -23,6 +24,7 @@ import org.elasticsearch.test.InternalTestCluster;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
@@ -40,14 +42,7 @@ public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
|
|
|
String[] nodeIds = getNodes(internalCluster).keySet().toArray(new String[0]);
|
|
|
DiscoveryNode healthNode = waitAndGetHealthNode(internalCluster);
|
|
|
assertThat(healthNode, notNullValue());
|
|
|
- assertBusy(() -> {
|
|
|
- Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
|
|
|
- .getDiskHealthInfo();
|
|
|
- assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
|
|
|
- for (String nodeId : nodeIds) {
|
|
|
- assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
|
|
|
- }
|
|
|
- });
|
|
|
+ assertBusy(() -> assertResultsCanBeFetched(internalCluster, healthNode, List.of(nodeIds), null));
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
|
|
|
}
|
|
@@ -66,18 +61,14 @@ public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
|
|
|
return isMaster == false && isHealthNode == false;
|
|
|
}).findAny().orElseThrow();
|
|
|
internalCluster.stopNode(nodeToLeave.getName());
|
|
|
- assertBusy(() -> {
|
|
|
- Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
|
|
|
- .getDiskHealthInfo();
|
|
|
- assertThat(healthInfoCache.size(), equalTo(nodes.size() - 1));
|
|
|
- for (DiscoveryNode node : nodes) {
|
|
|
- if (node.getId().equals(nodeToLeave.getId())) {
|
|
|
- assertThat(healthInfoCache.containsKey(node.getId()), equalTo(false));
|
|
|
- } else {
|
|
|
- assertThat(healthInfoCache.get(node.getId()), equalTo(GREEN));
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
+ assertBusy(
|
|
|
+ () -> assertResultsCanBeFetched(
|
|
|
+ internalCluster,
|
|
|
+ healthNode,
|
|
|
+ nodes.stream().filter(node -> node.equals(nodeToLeave) == false).map(DiscoveryNode::getId).toList(),
|
|
|
+ nodeToLeave.getId()
|
|
|
+ )
|
|
|
+ );
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
|
|
|
}
|
|
@@ -94,14 +85,7 @@ public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
|
|
|
DiscoveryNode newHealthNode = waitAndGetHealthNode(internalCluster);
|
|
|
assertThat(newHealthNode, notNullValue());
|
|
|
logger.info("Previous health node {}, new health node {}.", healthNodeToBeShutDown, newHealthNode);
|
|
|
- assertBusy(() -> {
|
|
|
- Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
|
|
|
- .getDiskHealthInfo();
|
|
|
- assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
|
|
|
- for (String nodeId : nodeIds) {
|
|
|
- assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
|
|
|
- }
|
|
|
- });
|
|
|
+ assertBusy(() -> assertResultsCanBeFetched(internalCluster, newHealthNode, List.of(nodeIds), null));
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
|
|
|
}
|
|
@@ -119,14 +103,7 @@ public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
|
|
|
ensureStableCluster(nodeIds.length);
|
|
|
DiscoveryNode newHealthNode = waitAndGetHealthNode(internalCluster);
|
|
|
assertThat(newHealthNode, notNullValue());
|
|
|
- assertBusy(() -> {
|
|
|
- Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
|
|
|
- .getDiskHealthInfo();
|
|
|
- assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
|
|
|
- for (String nodeId : nodeIds) {
|
|
|
- assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
|
|
|
- }
|
|
|
- });
|
|
|
+ assertBusy(() -> assertResultsCanBeFetched(internalCluster, newHealthNode, List.of(nodeIds), null));
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
|
|
|
}
|
|
@@ -151,6 +128,40 @@ public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
|
|
|
return healthNode[0];
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method fetches the health data using FetchHealthInfoCacheAction. It does this using both a random non-health-node client and
|
|
|
+ * a health node client. It asserts that all expected nodeIds are there with GREEN status, and that the notExpectedNodeId (if not
|
|
|
+ * null) is not in the results.
|
|
|
+ * @param internalCluster The cluster to use to get clients
|
|
|
+ * @param healthNode The health node
|
|
|
+ * @param expectedNodeIds The list of nodeIds we expect to see in the results (with a GREEN status)
|
|
|
+ * @param notExpectedNodeId A nullable nodeId that we do _not_ expect to see in the results
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private void assertResultsCanBeFetched(
|
|
|
+ InternalTestCluster internalCluster,
|
|
|
+ DiscoveryNode healthNode,
|
|
|
+ Iterable<String> expectedNodeIds,
|
|
|
+ @Nullable String notExpectedNodeId
|
|
|
+ ) throws Exception {
|
|
|
+ Client nonHealthNodeClient = internalCluster.client(randomValueOtherThan(healthNode.getName(), internalCluster::getRandomNodeName));
|
|
|
+ FetchHealthInfoCacheAction.Response healthResponse = nonHealthNodeClient.execute(
|
|
|
+ FetchHealthInfoCacheAction.INSTANCE,
|
|
|
+ new FetchHealthInfoCacheAction.Request()
|
|
|
+ ).get();
|
|
|
+ for (String nodeId : expectedNodeIds) {
|
|
|
+ assertThat(healthResponse.getHealthInfo().diskInfoByNode().get(nodeId), equalTo(GREEN));
|
|
|
+ }
|
|
|
+ if (notExpectedNodeId != null) {
|
|
|
+ assertThat(healthResponse.getHealthInfo().diskInfoByNode().containsKey(notExpectedNodeId), equalTo(false));
|
|
|
+ }
|
|
|
+ Client healthNodeClient = internalCluster.client(healthNode.getName());
|
|
|
+ healthResponse = healthNodeClient.execute(FetchHealthInfoCacheAction.INSTANCE, new FetchHealthInfoCacheAction.Request()).get();
|
|
|
+ for (String nodeId : expectedNodeIds) {
|
|
|
+ assertThat(healthResponse.getHealthInfo().diskInfoByNode().get(nodeId), equalTo(GREEN));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void decreasePollingInterval(InternalTestCluster internalCluster) {
|
|
|
internalCluster.client()
|
|
|
.admin()
|