|
@@ -44,6 +44,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@@ -130,10 +131,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 4));
|
|
|
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 30));
|
|
|
builder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(0, 100)));
|
|
|
- monitor.onNewInfo(clusterInfo(builder.build()));
|
|
|
- assertFalse(reroute.get());
|
|
|
+ final ClusterInfo initialClusterInfo = clusterInfo(builder.build());
|
|
|
+ monitor.onNewInfo(initialClusterInfo);
|
|
|
+ assertTrue(reroute.get()); // reroute on new nodes
|
|
|
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
|
|
|
|
|
|
+ indices.set(null);
|
|
|
+ reroute.set(false);
|
|
|
+ monitor.onNewInfo(initialClusterInfo);
|
|
|
+ assertFalse(reroute.get()); // no reroute if no change
|
|
|
+
|
|
|
indices.set(null);
|
|
|
builder = ImmutableOpenMap.builder();
|
|
|
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 4));
|
|
@@ -230,13 +237,31 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
oneDiskAboveWatermarkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50));
|
|
|
final ImmutableOpenMap<String, DiskUsage> oneDiskAboveWatermark = oneDiskAboveWatermarkBuilder.build();
|
|
|
|
|
|
- // should not reroute when all disks are ok
|
|
|
+ // should reroute when receiving info about previously-unknown nodes
|
|
|
currentTime.addAndGet(randomLongBetween(0, 120000));
|
|
|
monitor.onNewInfo(clusterInfo(allDisksOk));
|
|
|
- assertNull(listenerReference.get());
|
|
|
+ assertNotNull(listenerReference.get());
|
|
|
+ listenerReference.getAndSet(null).onResponse(clusterState);
|
|
|
|
|
|
- // should reroute when one disk goes over the watermark
|
|
|
+ // should not reroute when all disks are ok and no new info received
|
|
|
currentTime.addAndGet(randomLongBetween(0, 120000));
|
|
|
+ monitor.onNewInfo(clusterInfo(allDisksOk));
|
|
|
+ assertNull(listenerReference.get());
|
|
|
+
|
|
|
+ // might or might not reroute when a node crosses a watermark, depending on whether the reroute interval has elapsed or not
|
|
|
+ if (randomBoolean()) {
|
|
|
+ currentTime.addAndGet(randomLongBetween(0, 120000));
|
|
|
+ monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
|
|
|
+ Optional.ofNullable(listenerReference.getAndSet(null)).ifPresent(l -> l.onResponse(clusterState));
|
|
|
+ }
|
|
|
+
|
|
|
+ // however once the reroute interval has elapsed then we must reroute again
|
|
|
+ currentTime.addAndGet(
|
|
|
+ randomLongBetween(
|
|
|
+ DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis(),
|
|
|
+ 120000
|
|
|
+ )
|
|
|
+ );
|
|
|
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
|
|
|
assertNotNull(listenerReference.get());
|
|
|
listenerReference.getAndSet(null).onResponse(clusterState);
|
|
@@ -672,7 +697,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
.nodes(DiscoveryNodes.builder().add(newNormalNode("node1")).add(newFrozenOnlyNode("frozen")))
|
|
|
.build();
|
|
|
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(clusterState);
|
|
|
- final AtomicBoolean advanceTime = new AtomicBoolean(randomBoolean());
|
|
|
+ final AtomicBoolean advanceTime = new AtomicBoolean(true);
|
|
|
|
|
|
final LongSupplier timeSupplier = new LongSupplier() {
|
|
|
long time;
|
|
@@ -764,6 +789,9 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|
|
);
|
|
|
final ImmutableOpenMap<String, DiskUsage> frozenAboveFloodStageMaxHeadroom = frozenAboveFloodStageMaxHeadroomBuilder.build();
|
|
|
|
|
|
+ advanceTime.set(true); // first check sees new nodes and triggers a reroute
|
|
|
+ assertNoLogging(monitor, allDisksOk);
|
|
|
+ advanceTime.set(randomBoolean()); // no new nodes so no reroute delay needed
|
|
|
assertNoLogging(monitor, allDisksOk);
|
|
|
|
|
|
assertSingleInfoMessage(
|