|
|
@@ -69,6 +69,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
|
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
|
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
|
|
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
|
|
+import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
|
|
|
import static org.hamcrest.Matchers.aMapWithSize;
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
@@ -387,8 +388,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
|
|
|
var allocateCalled = new AtomicBoolean();
|
|
|
var desiredBalanceComputer = new DesiredBalanceComputer(
|
|
|
- Settings.EMPTY,
|
|
|
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
|
+ createBuiltInClusterSettings(),
|
|
|
mock(ThreadPool.class),
|
|
|
new ShardsAllocator() {
|
|
|
@Override
|
|
|
@@ -661,8 +661,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
List.of()
|
|
|
);
|
|
|
var desiredBalance = new DesiredBalanceComputer(
|
|
|
- Settings.EMPTY,
|
|
|
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
|
+ createBuiltInClusterSettings(),
|
|
|
mock(ThreadPool.class),
|
|
|
new BalancedShardsAllocator(Settings.EMPTY)
|
|
|
).compute(DesiredBalance.INITIAL, input, queue(), ignored -> iteration.incrementAndGet() < 1000);
|
|
|
@@ -827,8 +826,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
);
|
|
|
|
|
|
var desiredBalance = new DesiredBalanceComputer(
|
|
|
- Settings.EMPTY,
|
|
|
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
|
+ createBuiltInClusterSettings(),
|
|
|
mock(ThreadPool.class),
|
|
|
new BalancedShardsAllocator(settings)
|
|
|
).compute(
|
|
|
@@ -890,38 +888,33 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
var currentTime = new AtomicLong(0L);
|
|
|
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));
|
|
|
|
|
|
- var desiredBalanceComputer = new DesiredBalanceComputer(
|
|
|
- Settings.EMPTY,
|
|
|
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
|
- mockThreadPool,
|
|
|
- new ShardsAllocator() {
|
|
|
- @Override
|
|
|
- public void allocate(RoutingAllocation allocation) {
|
|
|
- final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
|
|
|
- while (unassignedIterator.hasNext()) {
|
|
|
- final var shardRouting = unassignedIterator.next();
|
|
|
- if (shardRouting.primary()) {
|
|
|
- unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
|
|
|
- } else {
|
|
|
- unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // move shard on each iteration
|
|
|
- for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
|
|
|
- allocation.routingNodes().relocateShard(shard, "node-1", 0L, allocation.changes());
|
|
|
- }
|
|
|
- for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
|
|
|
- allocation.routingNodes().relocateShard(shard, "node-0", 0L, allocation.changes());
|
|
|
+ var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), mockThreadPool, new ShardsAllocator() {
|
|
|
+ @Override
|
|
|
+ public void allocate(RoutingAllocation allocation) {
|
|
|
+ final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
|
|
|
+ while (unassignedIterator.hasNext()) {
|
|
|
+ final var shardRouting = unassignedIterator.next();
|
|
|
+ if (shardRouting.primary()) {
|
|
|
+ unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
|
|
|
+ } else {
|
|
|
+ unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
- throw new AssertionError("only used for allocation explain");
|
|
|
+ // move shard on each iteration
|
|
|
+ for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
|
|
|
+ allocation.routingNodes().relocateShard(shard, "node-1", 0L, allocation.changes());
|
|
|
+ }
|
|
|
+ for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
|
|
|
+ allocation.routingNodes().relocateShard(shard, "node-0", 0L, allocation.changes());
|
|
|
}
|
|
|
}
|
|
|
- );
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
+ throw new AssertionError("only used for allocation explain");
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
MockLogAppender mockAppender = new MockLogAppender();
|
|
|
mockAppender.start();
|
|
|
@@ -1036,39 +1029,31 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
* @return a {@link DesiredBalanceComputer} which allocates unassigned primaries to node-0 and unassigned replicas to node-1
|
|
|
*/
|
|
|
private static DesiredBalanceComputer createDesiredBalanceComputer() {
|
|
|
- return new DesiredBalanceComputer(
|
|
|
- Settings.EMPTY,
|
|
|
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
|
|
- mock(ThreadPool.class),
|
|
|
- new ShardsAllocator() {
|
|
|
- @Override
|
|
|
- public void allocate(RoutingAllocation allocation) {
|
|
|
- final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
|
|
|
- while (unassignedIterator.hasNext()) {
|
|
|
- final var shardRouting = unassignedIterator.next();
|
|
|
- if (shardRouting.primary()) {
|
|
|
- unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
|
|
|
- } else if (isCorrespondingPrimaryStarted(shardRouting, allocation)) {
|
|
|
- unassignedIterator.initialize("node-1", null, 0L, allocation.changes());
|
|
|
- } else {
|
|
|
- unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
|
|
|
- }
|
|
|
+ return new DesiredBalanceComputer(createBuiltInClusterSettings(), mock(ThreadPool.class), new ShardsAllocator() {
|
|
|
+ @Override
|
|
|
+ public void allocate(RoutingAllocation allocation) {
|
|
|
+ final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
|
|
|
+ while (unassignedIterator.hasNext()) {
|
|
|
+ final var shardRouting = unassignedIterator.next();
|
|
|
+ if (shardRouting.primary()) {
|
|
|
+ unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
|
|
|
+ } else if (isCorrespondingPrimaryStarted(shardRouting, allocation)) {
|
|
|
+ unassignedIterator.initialize("node-1", null, 0L, allocation.changes());
|
|
|
+ } else {
|
|
|
+ unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- private static boolean isCorrespondingPrimaryStarted(ShardRouting shardRouting, RoutingAllocation allocation) {
|
|
|
- return allocation.routingNodes()
|
|
|
- .assignedShards(shardRouting.shardId())
|
|
|
- .stream()
|
|
|
- .anyMatch(r -> r.primary() && r.started());
|
|
|
- }
|
|
|
+ private static boolean isCorrespondingPrimaryStarted(ShardRouting shardRouting, RoutingAllocation allocation) {
|
|
|
+ return allocation.routingNodes().assignedShards(shardRouting.shardId()).stream().anyMatch(r -> r.primary() && r.started());
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
- throw new AssertionError("only used for allocation explain");
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
+ throw new AssertionError("only used for allocation explain");
|
|
|
}
|
|
|
- );
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map<ShardId, ShardAssignment> expected) {
|