|
@@ -60,17 +60,21 @@ import java.util.Set;
|
|
|
import java.util.SortedSet;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.function.Consumer;
|
|
|
+import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.IntStream;
|
|
|
import java.util.stream.Stream;
|
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
+import static java.util.Collections.emptySortedSet;
|
|
|
import static java.util.function.Predicate.not;
|
|
|
+import static java.util.stream.Collectors.toSet;
|
|
|
import static java.util.stream.Collectors.toUnmodifiableMap;
|
|
|
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE;
|
|
|
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_WARM_NODE_ROLE;
|
|
|
import static org.elasticsearch.cluster.routing.ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE;
|
|
|
import static org.elasticsearch.common.util.set.Sets.haveNonEmptyIntersection;
|
|
|
+import static org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageDeciderService.AllocationState.MAX_AMOUNT_OF_SHARD_DECISIONS;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
@@ -149,18 +153,35 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
long numPrevents = allocatableShards.numOfShards();
|
|
|
assert round != 0 || numPrevents > 0 : "must have shards that can be allocated on first round";
|
|
|
|
|
|
+ SortedSet<ShardId> expectedShardIds = allocatableShards.shardIds();
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation,
|
|
|
- new ShardsSize(numPrevents, allocatableShards.shardIds()),
|
|
|
+ numPrevents,
|
|
|
+ expectedShardIds,
|
|
|
+ shardNodeDecisions -> {
|
|
|
+ if (numPrevents > 0) {
|
|
|
+ assertEquals(cappedShardIds(expectedShardIds), shardNodeDecisions.keySet());
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardNodeDecisions.get(expectedShardIds.first()).canAllocateDecisions().get(0))
|
|
|
+ );
|
|
|
+ assertNull(shardNodeDecisions.get(expectedShardIds.first()).canRemainDecision());
|
|
|
+ } else {
|
|
|
+ assertTrue(shardNodeDecisions.isEmpty());
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ },
|
|
|
mockCanAllocateDiskDecider
|
|
|
);
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation,
|
|
|
- emptyShardsSize(),
|
|
|
+ 0,
|
|
|
+ emptySortedSet(),
|
|
|
+ Map::isEmpty,
|
|
|
mockCanAllocateDiskDecider,
|
|
|
CAN_ALLOCATE_NO_DECIDER
|
|
|
);
|
|
|
- verify(ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation, emptyShardsSize());
|
|
|
+ verify(ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation, 0, emptySortedSet(), Map::isEmpty);
|
|
|
// verify empty tier (no cold nodes) are always assumed a storage reason.
|
|
|
SortedSet<ShardId> unassignedShardIds = state.getRoutingNodes()
|
|
|
.unassigned()
|
|
@@ -170,22 +191,41 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
verify(
|
|
|
moveToCold(allIndices()),
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation,
|
|
|
- new ShardsSize(state.getRoutingNodes().unassigned().size(), unassignedShardIds),
|
|
|
+ state.getRoutingNodes().unassigned().size(),
|
|
|
+ unassignedShardIds,
|
|
|
+ Map::isEmpty,
|
|
|
DiscoveryNodeRole.DATA_COLD_NODE_ROLE
|
|
|
);
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsAllocation,
|
|
|
- emptyShardsSize(),
|
|
|
+ 0,
|
|
|
+ emptySortedSet(),
|
|
|
+ Map::isEmpty,
|
|
|
DiscoveryNodeRole.DATA_COLD_NODE_ROLE
|
|
|
);
|
|
|
if (numPrevents > 0) {
|
|
|
- verifyScale(numPrevents, "not enough storage available, needs " + numPrevents + "b", mockCanAllocateDiskDecider);
|
|
|
+ verifyScale(numPrevents, "not enough storage available, needs " + numPrevents + "b", Map::isEmpty, shardIdNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(expectedShardIds), shardIdNodeDecisions.keySet());
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardIdNodeDecisions.get(expectedShardIds.first()).canAllocateDecisions().get(0))
|
|
|
+ );
|
|
|
+ assertNull(shardIdNodeDecisions.get(expectedShardIds.first()).canRemainDecision());
|
|
|
+ return true;
|
|
|
+ }, mockCanAllocateDiskDecider);
|
|
|
} else {
|
|
|
- verifyScale(0, "storage ok", mockCanAllocateDiskDecider);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty, mockCanAllocateDiskDecider);
|
|
|
}
|
|
|
- verifyScale(0, "storage ok", mockCanAllocateDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
- verifyScale(0, "storage ok");
|
|
|
- verifyScale(addDataNodes(DATA_HOT_NODE_ROLE, "additional", state, hotNodes), 0, "storage ok", mockCanAllocateDiskDecider);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty, mockCanAllocateDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty);
|
|
|
+ verifyScale(
|
|
|
+ addDataNodes(DATA_HOT_NODE_ROLE, "additional", state, hotNodes),
|
|
|
+ 0,
|
|
|
+ "storage ok",
|
|
|
+ Map::isEmpty,
|
|
|
+ Map::isEmpty,
|
|
|
+ mockCanAllocateDiskDecider
|
|
|
+ );
|
|
|
lastState = state;
|
|
|
startRandomShards();
|
|
|
++round;
|
|
@@ -248,30 +288,55 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
)
|
|
|
);
|
|
|
|
|
|
+ SortedSet<ShardId> expectedShardIds = RoutingNodesHelper.shardsWithState(state.getRoutingNodes(), ShardRoutingState.STARTED)
|
|
|
+ .stream()
|
|
|
+ .map(ShardRouting::shardId)
|
|
|
+ .filter(subjectShards::contains)
|
|
|
+ .collect(Collectors.toCollection(TreeSet::new));
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- new ShardsSize(
|
|
|
- subjectShards.size(),
|
|
|
- RoutingNodesHelper.shardsWithState(state.getRoutingNodes(), ShardRoutingState.STARTED)
|
|
|
- .stream()
|
|
|
- .map(ShardRouting::shardId)
|
|
|
- .filter(sr -> subjectShards.contains(sr))
|
|
|
- .collect(Collectors.toCollection(TreeSet::new))
|
|
|
- ),
|
|
|
+ subjectShards.size(),
|
|
|
+ expectedShardIds,
|
|
|
+ shardIdNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(expectedShardIds), shardIdNodeDecisions.keySet());
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardIdNodeDecisions.get(expectedShardIds.first()).canAllocateDecisions().get(0))
|
|
|
+ );
|
|
|
+ assertDebugNoDecision(shardIdNodeDecisions.get(expectedShardIds.first()).canRemainDecision().decision());
|
|
|
+ return true;
|
|
|
+ },
|
|
|
mockCanAllocateDiskDecider
|
|
|
);
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- emptyShardsSize(),
|
|
|
+ 0,
|
|
|
+ emptySortedSet(),
|
|
|
+ Map::isEmpty,
|
|
|
mockCanAllocateDiskDecider,
|
|
|
CAN_ALLOCATE_NO_DECIDER
|
|
|
);
|
|
|
- verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, emptyShardsSize());
|
|
|
-
|
|
|
- verifyScale(subjectShards.size(), "not enough storage available, needs " + subjectShards.size() + "b", mockCanAllocateDiskDecider);
|
|
|
- verifyScale(0, "storage ok", mockCanAllocateDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
- verifyScale(0, "storage ok");
|
|
|
- verifyScale(addDataNodes(DATA_HOT_NODE_ROLE, "additional", state, hotNodes), 0, "storage ok", mockCanAllocateDiskDecider);
|
|
|
+ verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, 0, emptySortedSet(), Map::isEmpty);
|
|
|
+
|
|
|
+ verifyScale(subjectShards.size(), "not enough storage available, needs " + subjectShards.size() + "b", shardIdNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(expectedShardIds), shardIdNodeDecisions.keySet());
|
|
|
+ assertDebugNoDecision(shardIdNodeDecisions.get(expectedShardIds.first()).canRemainDecision().decision());
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardIdNodeDecisions.values().iterator().next().canAllocateDecisions().get(0))
|
|
|
+ );
|
|
|
+ return true;
|
|
|
+ }, Map::isEmpty, mockCanAllocateDiskDecider);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty, mockCanAllocateDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty);
|
|
|
+ verifyScale(
|
|
|
+ addDataNodes(DATA_HOT_NODE_ROLE, "additional", state, hotNodes),
|
|
|
+ 0,
|
|
|
+ "storage ok",
|
|
|
+ Map::isEmpty,
|
|
|
+ Map::isEmpty,
|
|
|
+ mockCanAllocateDiskDecider
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public void testMoveToEmpty() {
|
|
@@ -290,7 +355,9 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- emptyShardsSize(),
|
|
|
+ 0,
|
|
|
+ emptySortedSet(),
|
|
|
+ Map::isEmpty,
|
|
|
DiscoveryNodeRole.DATA_COLD_NODE_ROLE
|
|
|
);
|
|
|
|
|
@@ -305,7 +372,17 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
verify(
|
|
|
moveToCold(candidates),
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- new ShardsSize(allocatedCandidateShards, allocatedShardIds),
|
|
|
+ allocatedCandidateShards,
|
|
|
+ allocatedShardIds,
|
|
|
+ shardNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(allocatedShardIds), shardNodeDecisions.keySet());
|
|
|
+ if (allocatedShardIds.size() > 0) {
|
|
|
+ NodeDecisions nodeDecisions = shardNodeDecisions.get(allocatedShardIds.first());
|
|
|
+ assertTrue(nodeDecisions.canAllocateDecisions().isEmpty());
|
|
|
+ assertNotNull(nodeDecisions.canRemainDecision());
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ },
|
|
|
DiscoveryNodeRole.DATA_COLD_NODE_ROLE
|
|
|
);
|
|
|
}
|
|
@@ -360,54 +437,118 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
.filter(o -> subjectShards.contains(o))
|
|
|
.collect(Collectors.toCollection(TreeSet::new));
|
|
|
|
|
|
+ verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, nodes, shardIds, shardNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(shardIds), shardNodeDecisions.keySet());
|
|
|
+ List<NodeDecision> canAllocateDecisions = shardNodeDecisions.get(shardIds.first()).canAllocateDecisions();
|
|
|
+ assertEquals(hotNodes - 1, canAllocateDecisions.size());
|
|
|
+ if (canAllocateDecisions.size() > 0) {
|
|
|
+ assertDebugNoDecision(canAllocateDecisions.get(0).decision());
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardNodeDecisions.get(shardIds.first()).canRemainDecision())
|
|
|
+ );
|
|
|
+ return true;
|
|
|
+ }, mockCanRemainDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
verify(
|
|
|
ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- new ShardsSize(nodes, shardIds),
|
|
|
- mockCanRemainDiskDecider,
|
|
|
- CAN_ALLOCATE_NO_DECIDER
|
|
|
- );
|
|
|
- verify(
|
|
|
- ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- emptyShardsSize(),
|
|
|
+ 0,
|
|
|
+ emptySortedSet(),
|
|
|
+ Map::isEmpty,
|
|
|
mockCanRemainDiskDecider,
|
|
|
CAN_REMAIN_NO_DECIDER,
|
|
|
CAN_ALLOCATE_NO_DECIDER
|
|
|
);
|
|
|
- verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, emptyShardsSize());
|
|
|
+ verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, 0, emptySortedSet(), Map::isEmpty);
|
|
|
|
|
|
// only consider it once (move case) if both cannot remain and cannot allocate.
|
|
|
- verify(
|
|
|
- ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove,
|
|
|
- new ShardsSize(nodes, shardIds),
|
|
|
- mockCanAllocateDiskDecider,
|
|
|
- mockCanRemainDiskDecider
|
|
|
- );
|
|
|
+ verify(ReactiveStorageDeciderService.AllocationState::storagePreventsRemainOrMove, nodes, shardIds, shardNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(shardIds), shardNodeDecisions.keySet());
|
|
|
+ List<NodeDecision> canAllocateDecisions = shardNodeDecisions.get(shardIds.first()).canAllocateDecisions();
|
|
|
+ assertEquals(hotNodes - 1, canAllocateDecisions.size());
|
|
|
+ if (canAllocateDecisions.size() > 0) {
|
|
|
+ assertDebugNoDecision(canAllocateDecisions.get(0).decision());
|
|
|
+ }
|
|
|
|
|
|
- verifyScale(nodes, "not enough storage available, needs " + nodes + "b", mockCanRemainDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
- verifyScale(0, "storage ok", mockCanRemainDiskDecider, CAN_REMAIN_NO_DECIDER, CAN_ALLOCATE_NO_DECIDER);
|
|
|
- verifyScale(0, "storage ok");
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardNodeDecisions.get(shardIds.first()).canRemainDecision())
|
|
|
+ );
|
|
|
+ return true;
|
|
|
+ }, mockCanAllocateDiskDecider, mockCanRemainDiskDecider);
|
|
|
+
|
|
|
+ verifyScale(nodes, "not enough storage available, needs " + nodes + "b", shardNodeDecisions -> {
|
|
|
+ assertEquals(cappedShardIds(shardIds), shardNodeDecisions.keySet());
|
|
|
+ List<NodeDecision> canAllocateDecisions = shardNodeDecisions.get(shardIds.first()).canAllocateDecisions();
|
|
|
+ assertEquals(hotNodes - 1, canAllocateDecisions.size());
|
|
|
+ if (canAllocateDecisions.size() > 0) {
|
|
|
+ assertDebugNoDecision(canAllocateDecisions.get(0).decision());
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(
|
|
|
+ Decision.single(Decision.Type.NO, "disk_threshold", "test"),
|
|
|
+ firstNoDecision(shardNodeDecisions.get(shardIds.first()).canRemainDecision())
|
|
|
+ );
|
|
|
+ return true;
|
|
|
+ }, Map::isEmpty, mockCanRemainDiskDecider, CAN_ALLOCATE_NO_DECIDER);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty, mockCanRemainDiskDecider, CAN_REMAIN_NO_DECIDER, CAN_ALLOCATE_NO_DECIDER);
|
|
|
+ verifyScale(0, "storage ok", Map::isEmpty, Map::isEmpty);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void assertDebugNoDecision(Decision canAllocateDecision) {
|
|
|
+ assertEquals(Decision.Type.NO, canAllocateDecision.type());
|
|
|
+ assertTrue(canAllocateDecision.getDecisions().stream().anyMatch(d -> d.type() == Decision.Type.NO));
|
|
|
+ assertTrue(canAllocateDecision.getDecisions().stream().anyMatch(d -> d.type() == Decision.Type.YES));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static SortedSet<ShardId> cappedShardIds(SortedSet<ShardId> shardIds) {
|
|
|
+ return shardIds.stream().limit(MAX_AMOUNT_OF_SHARD_DECISIONS).collect(Collectors.toCollection(TreeSet::new));
|
|
|
}
|
|
|
|
|
|
- private static ShardsSize emptyShardsSize() {
|
|
|
- return new ShardsSize(0, Collections.emptySortedSet());
|
|
|
+ private static Decision firstNoDecision(NodeDecision nodeAllocationResult) {
|
|
|
+ List<Decision> noDecisions = nodeAllocationResult.decision()
|
|
|
+ .getDecisions()
|
|
|
+ .stream()
|
|
|
+ .filter(d -> d.type() == Decision.Type.NO)
|
|
|
+ .toList();
|
|
|
+ if (noDecisions.isEmpty()) {
|
|
|
+ throw new IllegalStateException("Unable to find NO can_remain decision");
|
|
|
+ }
|
|
|
+ return noDecisions.get(0);
|
|
|
}
|
|
|
|
|
|
private interface VerificationSubject {
|
|
|
- ShardsSize invoke(ReactiveStorageDeciderService.AllocationState state);
|
|
|
+ ShardsAllocationResults invoke(ReactiveStorageDeciderService.AllocationState state);
|
|
|
}
|
|
|
|
|
|
- private void verify(VerificationSubject subject, ShardsSize expected, AllocationDecider... allocationDeciders) {
|
|
|
- verify(subject, expected, DATA_HOT_NODE_ROLE, allocationDeciders);
|
|
|
+ private void verify(
|
|
|
+ VerificationSubject subject,
|
|
|
+ long expectedSizeInBytes,
|
|
|
+ SortedSet<ShardId> expectedShardIds,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> nodeDecisionsCheck,
|
|
|
+ AllocationDecider... allocationDeciders
|
|
|
+ ) {
|
|
|
+ verify(subject, expectedSizeInBytes, expectedShardIds, nodeDecisionsCheck, DATA_HOT_NODE_ROLE, allocationDeciders);
|
|
|
}
|
|
|
|
|
|
- private void verify(VerificationSubject subject, ShardsSize expected, DiscoveryNodeRole role, AllocationDecider... allocationDeciders) {
|
|
|
- verify(this.state, subject, expected, role, allocationDeciders);
|
|
|
+ private void verify(
|
|
|
+ VerificationSubject subject,
|
|
|
+ long expectedSizeInBytes,
|
|
|
+ SortedSet<ShardId> expectedShardIds,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> nodeDecisionsCheck,
|
|
|
+ DiscoveryNodeRole role,
|
|
|
+ AllocationDecider... allocationDeciders
|
|
|
+ ) {
|
|
|
+ verify(this.state, subject, expectedSizeInBytes, expectedShardIds, nodeDecisionsCheck, role, allocationDeciders);
|
|
|
}
|
|
|
|
|
|
private static void verify(
|
|
|
ClusterState state,
|
|
|
VerificationSubject subject,
|
|
|
- ShardsSize expected,
|
|
|
+ long expectedSizeInBytes,
|
|
|
+ SortedSet<ShardId> expectedShardIds,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> nodeDecisionsCheck,
|
|
|
DiscoveryNodeRole role,
|
|
|
AllocationDecider... allocationDeciders
|
|
|
) {
|
|
@@ -417,14 +558,30 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
createAllocationDeciders(allocationDeciders),
|
|
|
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
|
|
|
);
|
|
|
- assertThat(subject.invoke(allocationState), equalTo(expected));
|
|
|
+ ShardsAllocationResults shardsAllocationResults = subject.invoke(allocationState);
|
|
|
+ assertThat(shardsAllocationResults.sizeInBytes(), equalTo(expectedSizeInBytes));
|
|
|
+ assertThat(shardsAllocationResults.shardIds(), equalTo(expectedShardIds));
|
|
|
+ assertTrue("failed canAllocate decisions check", nodeDecisionsCheck.test(shardsAllocationResults.shardNodeDecisions()));
|
|
|
}
|
|
|
|
|
|
- private void verifyScale(long expectedDifference, String reason, AllocationDecider... allocationDeciders) {
|
|
|
- verifyScale(state, expectedDifference, reason, allocationDeciders);
|
|
|
+ private void verifyScale(
|
|
|
+ long expectedDifference,
|
|
|
+ String reason,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> assignedNodeDecisions,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> unassignedNodeDecisions,
|
|
|
+ AllocationDecider... allocationDeciders
|
|
|
+ ) {
|
|
|
+ verifyScale(state, expectedDifference, reason, assignedNodeDecisions, unassignedNodeDecisions, allocationDeciders);
|
|
|
}
|
|
|
|
|
|
- private static void verifyScale(ClusterState state, long expectedDifference, String reason, AllocationDecider... allocationDeciders) {
|
|
|
+ private static void verifyScale(
|
|
|
+ ClusterState state,
|
|
|
+ long expectedDifference,
|
|
|
+ String reason,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> assignedNodeDecisions,
|
|
|
+ Predicate<Map<ShardId, NodeDecisions>> unassignedNodeDecisions,
|
|
|
+ AllocationDecider... allocationDeciders
|
|
|
+ ) {
|
|
|
ReactiveStorageDeciderService decider = new ReactiveStorageDeciderService(
|
|
|
Settings.EMPTY,
|
|
|
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
@@ -443,11 +600,15 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
assertThat(resultReason.summary(), equalTo(reason));
|
|
|
assertThat(resultReason.unassignedShardIds(), equalTo(decider.allocationState(context).storagePreventsAllocation().shardIds()));
|
|
|
assertThat(resultReason.assignedShardIds(), equalTo(decider.allocationState(context).storagePreventsRemainOrMove().shardIds()));
|
|
|
+ assertTrue("failed assigned decisions check", assignedNodeDecisions.test(resultReason.assignedNodeDecisions()));
|
|
|
+ assertTrue("failed unassigned decisions check", unassignedNodeDecisions.test(resultReason.unassignedNodeDecisions()));
|
|
|
} else {
|
|
|
assertThat(result.requiredCapacity(), is(nullValue()));
|
|
|
assertThat(resultReason.summary(), equalTo("current capacity not available"));
|
|
|
assertThat(resultReason.unassignedShardIds(), equalTo(Set.of()));
|
|
|
assertThat(resultReason.assignedShardIds(), equalTo(Set.of()));
|
|
|
+ assertThat(resultReason.unassignedNodeDecisions(), equalTo(Map.of()));
|
|
|
+ assertThat(resultReason.assignedNodeDecisions(), equalTo(Map.of()));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -458,16 +619,18 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
RoutingAllocation allocation = createRoutingAllocation(state, deciders);
|
|
|
// There could be duplicated of shard ids, and numOfShards is calculated based on them,
|
|
|
// so we can't just collect to the shard ids to `TreeSet`
|
|
|
- List<ShardId> allocatableShards = state.getRoutingNodes()
|
|
|
+ List<ShardRouting> allocatableShards = state.getRoutingNodes()
|
|
|
.unassigned()
|
|
|
.stream()
|
|
|
.filter(shard -> subjectShards.contains(shard.shardId()))
|
|
|
.filter(
|
|
|
shard -> allocation.routingNodes().stream().anyMatch(node -> deciders.canAllocate(shard, node, allocation) != Decision.NO)
|
|
|
)
|
|
|
- .map(ShardRouting::shardId)
|
|
|
.toList();
|
|
|
- return new AllocatableShards(allocatableShards.size(), new TreeSet<>(allocatableShards));
|
|
|
+ return new AllocatableShards(
|
|
|
+ allocatableShards.size(),
|
|
|
+ allocatableShards.stream().map(ShardRouting::shardId).collect(Collectors.toCollection(TreeSet::new))
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private boolean hasStartedSubjectShard() {
|
|
@@ -545,7 +708,7 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
Set<RoutingNode> candidates = allocation.routingNodes()
|
|
|
.stream()
|
|
|
.filter(n -> allocation.deciders().canAllocate(toMove, n, allocation) == Decision.YES)
|
|
|
- .collect(Collectors.toSet());
|
|
|
+ .collect(toSet());
|
|
|
if (candidates.isEmpty() == false) {
|
|
|
allocation.routingNodes().relocateShard(toMove, randomFrom(candidates).nodeId(), 0L, allocation.changes());
|
|
|
}
|
|
@@ -582,7 +745,7 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
private TestAutoscalingDeciderContext(ClusterState state, Set<DiscoveryNodeRole> roles, AutoscalingCapacity currentCapacity) {
|
|
|
this.state = state;
|
|
|
this.currentCapacity = currentCapacity;
|
|
|
- this.nodes = state.nodes().stream().filter(n -> roles.stream().anyMatch(n.getRoles()::contains)).collect(Collectors.toSet());
|
|
|
+ this.nodes = state.nodes().stream().filter(n -> roles.stream().anyMatch(n.getRoles()::contains)).collect(toSet());
|
|
|
this.roles = roles;
|
|
|
this.info = createClusterInfo(state);
|
|
|
}
|
|
@@ -697,11 +860,10 @@ public class ReactiveStorageDeciderDecisionTests extends AutoscalingTestCase {
|
|
|
}
|
|
|
|
|
|
private static String randomNodeId(RoutingNodes routingNodes, DiscoveryNodeRole role) {
|
|
|
- return randomFrom(routingNodes.stream().map(RoutingNode::node).filter(n -> n.getRoles().contains(role)).collect(Collectors.toSet()))
|
|
|
- .getId();
|
|
|
+ return randomFrom(routingNodes.stream().map(RoutingNode::node).filter(n -> n.getRoles().contains(role)).collect(toSet())).getId();
|
|
|
}
|
|
|
|
|
|
private static Set<ShardId> shardIds(Iterable<ShardRouting> candidateShards) {
|
|
|
- return StreamSupport.stream(candidateShards.spliterator(), false).map(ShardRouting::shardId).collect(Collectors.toSet());
|
|
|
+ return StreamSupport.stream(candidateShards.spliterator(), false).map(ShardRouting::shardId).collect(toSet());
|
|
|
}
|
|
|
}
|