|
@@ -21,6 +21,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
+import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
|
|
|
+import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
@@ -411,6 +413,154 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
assertNull(indicesToRelease.get());
|
|
|
}
|
|
|
|
|
|
+ public void testNoAutoReleaseOfIndicesOnReplacementNodes() {
|
|
|
+ AtomicReference<Set<String>> indicesToMarkReadOnly = new AtomicReference<>();
|
|
|
+ AtomicReference<Set<String>> indicesToRelease = new AtomicReference<>();
|
|
|
+ AtomicReference<ClusterState> currentClusterState = new AtomicReference<>();
|
|
|
+ AllocationService allocation = createAllocationService(Settings.builder()
|
|
|
+ .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
|
|
|
+ Metadata metadata = Metadata.builder()
|
|
|
+ .put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
|
|
|
+ .put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
|
|
|
+ .build();
|
|
|
+ RoutingTable routingTable = RoutingTable.builder()
|
|
|
+ .addAsNew(metadata.index("test_1"))
|
|
|
+ .addAsNew(metadata.index("test_2"))
|
|
|
+ .build();
|
|
|
+ final ClusterState clusterState = applyStartedShardsUntilNoChange(
|
|
|
+ ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
|
|
+ .metadata(metadata).routingTable(routingTable)
|
|
|
+ .nodes(DiscoveryNodes.builder().add(newNormalNode("node1", "my-node1"))
|
|
|
+ .add(newNormalNode("node2", "my-node2"))).build(), allocation);
|
|
|
+ assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));
|
|
|
+
|
|
|
+ final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder
|
|
|
+ = ImmutableOpenMap.builder();
|
|
|
+ final int reservedSpaceNode1 = between(0, 10);
|
|
|
+ reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"),
|
|
|
+ new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build());
|
|
|
+ final int reservedSpaceNode2 = between(0, 10);
|
|
|
+ reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"),
|
|
|
+ new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build());
|
|
|
+ ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = reservedSpacesBuilder.build();
|
|
|
+
|
|
|
+ currentClusterState.set(clusterState);
|
|
|
+
|
|
|
+ final DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, currentClusterState::get,
|
|
|
+ new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
|
|
|
+ (reason, priority, listener) -> {
|
|
|
+ assertNotNull(listener);
|
|
|
+ assertThat(priority, equalTo(Priority.HIGH));
|
|
|
+ listener.onResponse(currentClusterState.get());
|
|
|
+ }) {
|
|
|
+ @Override
|
|
|
+ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
|
|
|
+ if (readOnly) {
|
|
|
+ assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
|
|
|
+ } else {
|
|
|
+ assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
|
|
|
+ }
|
|
|
+ listener.onResponse(null);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ indicesToMarkReadOnly.set(null);
|
|
|
+ indicesToRelease.set(null);
|
|
|
+ ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
|
|
|
+ builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
|
|
+ builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
|
|
+ monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
|
|
+ assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get());
|
|
|
+ assertNull(indicesToRelease.get());
|
|
|
+
|
|
|
+ // Reserved space is ignored when applying block
|
|
|
+ indicesToMarkReadOnly.set(null);
|
|
|
+ indicesToRelease.set(null);
|
|
|
+ builder = ImmutableOpenMap.builder();
|
|
|
+ builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90)));
|
|
|
+ builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90)));
|
|
|
+ monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
|
|
+ assertNull(indicesToMarkReadOnly.get());
|
|
|
+ assertNull(indicesToRelease.get());
|
|
|
+
|
|
|
+ // Change cluster state so that "test_2" index is blocked (read only)
|
|
|
+ IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder()
|
|
|
+ .put(clusterState.metadata()
|
|
|
+ .index("test_2").getSettings())
|
|
|
+ .put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();
|
|
|
+
|
|
|
+ final String sourceNode;
|
|
|
+ final String targetNode;
|
|
|
+ if (randomBoolean()) {
|
|
|
+ sourceNode = "node1";
|
|
|
+ targetNode = "my-node2";
|
|
|
+ } else {
|
|
|
+ sourceNode = "node2";
|
|
|
+ targetNode = "my-node1";
|
|
|
+ }
|
|
|
+
|
|
|
+ final ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState)
|
|
|
+ .metadata(Metadata.builder(clusterState.metadata())
|
|
|
+ .put(indexMetadata, true)
|
|
|
+ .putCustom(NodesShutdownMetadata.TYPE,
|
|
|
+ new NodesShutdownMetadata(Collections.singletonMap(sourceNode,
|
|
|
+ SingleNodeShutdownMetadata.builder()
|
|
|
+ .setNodeId(sourceNode)
|
|
|
+ .setReason("testing")
|
|
|
+ .setType(SingleNodeShutdownMetadata.Type.REPLACE)
|
|
|
+ .setTargetNodeName(targetNode)
|
|
|
+ .setStartedAtMillis(randomNonNegativeLong())
|
|
|
+ .build())))
|
|
|
+ .build())
|
|
|
+ .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
|
|
|
+
|
|
|
+ currentClusterState.set(clusterStateWithBlocks);
|
|
|
+
|
|
|
+ // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block
|
|
|
+ indicesToMarkReadOnly.set(null);
|
|
|
+ indicesToRelease.set(null);
|
|
|
+ builder = ImmutableOpenMap.builder();
|
|
|
+ builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100)));
|
|
|
+ builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
|
|
+ monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
|
|
+ assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
|
|
+ assertNull(indicesToRelease.get());
|
|
|
+
|
|
|
+ // While the REPLACE is ongoing the lock will not be removed from the index
|
|
|
+ indicesToMarkReadOnly.set(null);
|
|
|
+ indicesToRelease.set(null);
|
|
|
+ builder = ImmutableOpenMap.builder();
|
|
|
+ builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
|
|
|
+ builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
|
|
|
+ monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
|
|
+ assertNull(indicesToMarkReadOnly.get());
|
|
|
+ assertNull(indicesToRelease.get());
|
|
|
+
|
|
|
+ final ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState)
|
|
|
+ .metadata(Metadata.builder(clusterState.metadata())
|
|
|
+ .put(indexMetadata, true)
|
|
|
+ .removeCustom(NodesShutdownMetadata.TYPE)
|
|
|
+ .build())
|
|
|
+ .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
|
|
|
+
|
|
|
+ currentClusterState.set(clusterStateNoShutdown);
|
|
|
+
|
|
|
+ // Now that the REPLACE is gone, auto-releasing can occur for the index
|
|
|
+ indicesToMarkReadOnly.set(null);
|
|
|
+ indicesToRelease.set(null);
|
|
|
+ builder = ImmutableOpenMap.builder();
|
|
|
+ builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
|
|
|
+ builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
|
|
|
+ monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
|
|
+ assertNull(indicesToMarkReadOnly.get());
|
|
|
+ assertThat(indicesToRelease.get(), contains("test_2"));
|
|
|
+ }
|
|
|
+
|
|
|
@TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging")
|
|
|
public void testDiskMonitorLogging() throws IllegalAccessException {
|
|
|
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
|
@@ -656,12 +806,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
return newNode(nodeId, Sets.union(Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE), irrelevantRoles));
|
|
|
}
|
|
|
|
|
|
- private static DiscoveryNode newNormalNode(String nodeId) {
|
|
|
+ private static DiscoveryNode newNormalNode(String nodeId, String nodeName) {
|
|
|
Set<DiscoveryNodeRole> randomRoles =
|
|
|
new HashSet<>(randomSubsetOf(DiscoveryNodeRole.roles()));
|
|
|
Set<DiscoveryNodeRole> roles = Sets.union(randomRoles, Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE,
|
|
|
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE,
|
|
|
DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
|
|
|
- return newNode(nodeId, roles);
|
|
|
+ return newNode(nodeName, nodeId, roles);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static DiscoveryNode newNormalNode(String nodeId) {
|
|
|
+ return newNormalNode(nodeId, "");
|
|
|
}
|
|
|
}
|