|
@@ -11,16 +11,16 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
|
|
|
import org.apache.lucene.util.SetOnce;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.ClusterInfo;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
+import org.elasticsearch.cluster.ESAllocationTestCase;
|
|
|
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
-import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
-import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
@@ -29,28 +29,30 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
|
|
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
|
|
import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
|
|
|
+import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
|
|
|
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
|
|
import org.elasticsearch.gateway.GatewayAllocator;
|
|
|
+import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
|
|
import org.elasticsearch.test.ClusterServiceUtils;
|
|
|
-import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Predicate;
|
|
@@ -62,7 +64,7 @@ import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClu
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.hasItem;
|
|
|
|
|
|
-public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
+public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
|
|
|
private static final String LOCAL_NODE_ID = "node-1";
|
|
|
private static final String OTHER_NODE_ID = "node-2";
|
|
@@ -107,8 +109,8 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
|
|
|
- var localNode = createDiscoveryNode(LOCAL_NODE_ID);
|
|
|
- var otherNode = createDiscoveryNode(OTHER_NODE_ID);
|
|
|
+ var localNode = newNode(LOCAL_NODE_ID);
|
|
|
+ var otherNode = newNode(OTHER_NODE_ID);
|
|
|
var initialState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
|
|
|
.nodes(DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
|
|
|
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
|
|
@@ -135,7 +137,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
clusterService.start();
|
|
|
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
- var reconcileAction = new DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction() {
|
|
|
+ var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
@@ -200,7 +202,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
var listenersCalled = new CountDownLatch(2);
|
|
|
var clusterStateUpdatesExecuted = new CountDownLatch(2);
|
|
|
|
|
|
- var discoveryNode = createDiscoveryNode("node-0");
|
|
|
+ var discoveryNode = newNode("node-0");
|
|
|
var initialState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
.nodes(DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).masterNodeId(discoveryNode.getId()))
|
|
|
.build();
|
|
@@ -208,7 +210,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
var threadPool = new TestThreadPool(getTestName());
|
|
|
var clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
- var reconcileAction = new DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction() {
|
|
|
+ var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
reconciliations.incrementAndGet();
|
|
@@ -301,8 +303,8 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
var newMasterElected = new CountDownLatch(1);
|
|
|
var clusterStateUpdatesExecuted = new CountDownLatch(1);
|
|
|
|
|
|
- var node1 = createDiscoveryNode(LOCAL_NODE_ID);
|
|
|
- var node2 = createDiscoveryNode(OTHER_NODE_ID);
|
|
|
+ var node1 = newNode(LOCAL_NODE_ID);
|
|
|
+ var node2 = newNode(OTHER_NODE_ID);
|
|
|
var initial = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
.nodes(DiscoveryNodes.builder().add(node1).add(node2).localNodeId(node1.getId()).masterNodeId(node1.getId()))
|
|
|
.build();
|
|
@@ -310,7 +312,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
var threadPool = new TestThreadPool(getTestName());
|
|
|
var clusterService = ClusterServiceUtils.createClusterService(initial, threadPool);
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
- var reconcileAction = new DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction() {
|
|
|
+ var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
@@ -391,15 +393,72 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static DiscoveryNode createDiscoveryNode(String nodeId) {
|
|
|
- return new DiscoveryNode(
|
|
|
- nodeId,
|
|
|
- nodeId,
|
|
|
- buildNewFakeTransportAddress(),
|
|
|
- Map.of(),
|
|
|
- Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
|
|
|
- Version.CURRENT
|
|
|
+ public void testResetDesiredBalance() {
|
|
|
+
|
|
|
+ var node1 = newNode(LOCAL_NODE_ID);
|
|
|
+ var node2 = newNode(OTHER_NODE_ID);
|
|
|
+
|
|
|
+ var shardId = new ShardId("test-index", UUIDs.randomBase64UUID(), 0);
|
|
|
+ var index = createIndex(shardId.getIndexName());
|
|
|
+ var clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
+ .nodes(DiscoveryNodes.builder().add(node1).add(node2).localNodeId(node1.getId()).masterNodeId(node1.getId()))
|
|
|
+ .metadata(Metadata.builder().put(index, false).build())
|
|
|
+ .routingTable(RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY).addAsNew(index).build())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ var threadPool = new TestThreadPool(getTestName());
|
|
|
+ var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
|
|
|
+
|
|
|
+ var delegateAllocator = createShardsAllocator();
|
|
|
+
|
|
|
+ var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator) {
|
|
|
+
|
|
|
+ final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DesiredBalance compute(
|
|
|
+ DesiredBalance previousDesiredBalance,
|
|
|
+ DesiredBalanceInput desiredBalanceInput,
|
|
|
+ Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
|
|
|
+ Predicate<DesiredBalanceInput> isFresh
|
|
|
+ ) {
|
|
|
+ lastComputationInput.set(previousDesiredBalance);
|
|
|
+ return super.compute(previousDesiredBalance, desiredBalanceInput, pendingDesiredBalanceMoves, isFresh);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
|
|
|
+ delegateAllocator,
|
|
|
+ threadPool,
|
|
|
+ clusterService,
|
|
|
+ desiredBalanceComputer,
|
|
|
+ (reconcilerClusterState, routingAllocationAction) -> reconcilerClusterState
|
|
|
);
|
|
|
+
|
|
|
+ var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator());
|
|
|
+
|
|
|
+ try {
|
|
|
+ // initial computation is based on DesiredBalance.INITIAL
|
|
|
+ rerouteAndWait(service, clusterState, "initial-allocation");
|
|
|
+ assertThat(desiredBalanceComputer.lastComputationInput.get(), equalTo(DesiredBalance.INITIAL));
|
|
|
+
|
|
|
+ // any next computation is based on current desired balance
|
|
|
+ var current = desiredBalanceShardsAllocator.getDesiredBalance();
|
|
|
+ rerouteAndWait(service, clusterState, "next-allocation");
|
|
|
+ assertThat(desiredBalanceComputer.lastComputationInput.get(), equalTo(current));
|
|
|
+
|
|
|
+ // when desired balance is resetted then computation is based on balance with no previous assignments
|
|
|
+ desiredBalanceShardsAllocator.resetDesiredBalance();
|
|
|
+ current = desiredBalanceShardsAllocator.getDesiredBalance();
|
|
|
+ rerouteAndWait(service, clusterState, "reset-desired-balance");
|
|
|
+ assertThat(
|
|
|
+ desiredBalanceComputer.lastComputationInput.get(),
|
|
|
+ equalTo(new DesiredBalance(current.lastConvergedIndex(), Map.of()))
|
|
|
+ );
|
|
|
+ } finally {
|
|
|
+ clusterService.close();
|
|
|
+ terminate(threadPool);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static IndexMetadata createIndex(String name) {
|
|
@@ -475,4 +534,8 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
+
|
|
|
+ private static void rerouteAndWait(AllocationService service, ClusterState clusterState, String reason) {
|
|
|
+ PlainActionFuture.<Void, RuntimeException>get(f -> service.reroute(clusterState, reason, f), 10, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
}
|