|
@@ -19,23 +19,17 @@ import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
import org.elasticsearch.cluster.ESAllocationTestCase;
|
|
|
-import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
|
|
|
-import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
|
|
|
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
|
|
|
-import org.elasticsearch.cluster.metadata.ProjectId;
|
|
|
-import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
-import org.elasticsearch.cluster.routing.RecoverySource;
|
|
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
-import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
@@ -62,7 +56,6 @@ import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
|
|
import org.elasticsearch.test.ClusterServiceUtils;
|
|
|
import org.elasticsearch.test.MockLog;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
-import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
@@ -783,262 +776,6 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testSimulateStartedShardsWithinASingleClusterInfoPolling() {
|
|
|
- final var firstNode = newNode("node-1");
|
|
|
- final var secondNode = newNode("node-2");
|
|
|
- final var thirdNode = newNode("node-3");
|
|
|
- final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder().add(firstNode).add(secondNode).add(thirdNode);
|
|
|
- discoveryNodesBuilder.localNodeId(firstNode.getId()).masterNodeId(firstNode.getId());
|
|
|
-
|
|
|
- final ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(ProjectId.DEFAULT);
|
|
|
- final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY);
|
|
|
- final var indexMetadata = IndexMetadata.builder("test-index").settings(indexSettings(IndexVersion.current(), 2, 0)).build();
|
|
|
- final var shardId0 = new ShardId(indexMetadata.getIndex(), 0);
|
|
|
- final ShardRouting shardRouting0 = ShardRoutingHelper.moveToStarted(
|
|
|
- ShardRoutingHelper.initialize(
|
|
|
- ShardRouting.newUnassigned(
|
|
|
- shardId0,
|
|
|
- true,
|
|
|
- RecoverySource.EmptyStoreRecoverySource.INSTANCE,
|
|
|
- new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "new index"),
|
|
|
- ShardRouting.Role.DEFAULT
|
|
|
- ),
|
|
|
- firstNode.getId()
|
|
|
- )
|
|
|
- );
|
|
|
-
|
|
|
- final ShardId shardId1 = new ShardId(indexMetadata.getIndex(), 1);
|
|
|
- final ShardRouting shardRouting1 = ShardRoutingHelper.initialize(
|
|
|
- ShardRouting.newUnassigned(
|
|
|
- shardId1,
|
|
|
- true,
|
|
|
- RecoverySource.EmptyStoreRecoverySource.INSTANCE,
|
|
|
- new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "new index"),
|
|
|
- ShardRouting.Role.DEFAULT
|
|
|
- ),
|
|
|
- thirdNode.getId()
|
|
|
- );
|
|
|
- routingTableBuilder.add(IndexRoutingTable.builder(shardId0.getIndex()).addShard(shardRouting0).addShard(shardRouting1).build());
|
|
|
- projectBuilder.put(
|
|
|
- IndexMetadata.builder(indexMetadata).putInSyncAllocationIds(0, Set.of(shardRouting0.allocationId().getId())).build(),
|
|
|
- false
|
|
|
- );
|
|
|
-
|
|
|
- var clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
- .nodes(discoveryNodesBuilder)
|
|
|
- .metadata(Metadata.builder().put(projectBuilder))
|
|
|
- .putRoutingTable(ProjectId.DEFAULT, routingTableBuilder.build())
|
|
|
- .build();
|
|
|
-
|
|
|
- var threadPool = new TestThreadPool(getTestName());
|
|
|
- var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
|
|
|
-
|
|
|
- final var relocated = new AtomicBoolean(false);
|
|
|
- final AtomicReference<ClusterInfo> clusterInfoUsedByAllocator = new AtomicReference<>();
|
|
|
- var delegateAllocator = new ShardsAllocator() {
|
|
|
- @Override
|
|
|
- public void allocate(RoutingAllocation allocation) {
|
|
|
- allocation.routingNodes().setBalanceWeightStatsPerNode(Map.of());
|
|
|
- clusterInfoUsedByAllocator.set(allocation.clusterInfo());
|
|
|
- if (relocated.compareAndSet(false, true)) {
|
|
|
- logger.info("--> relocating shard [{}]", shardId0);
|
|
|
- final ShardRouting shardRouting = allocation.routingTable(ProjectId.DEFAULT).shardRoutingTable(shardId0).primaryShard();
|
|
|
- allocation.routingNodes().relocateShard(shardRouting, secondNode.getId(), 100, "test", allocation.changes());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
|
|
|
- throw new AssertionError("only used for allocation explain");
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- final var allocationServiceRef = new AtomicReference<AllocationService>();
|
|
|
- var clusterSettings = createBuiltInClusterSettings();
|
|
|
- var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator, TEST_ONLY_EXPLAINER);
|
|
|
- var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
|
|
|
- delegateAllocator,
|
|
|
- threadPool,
|
|
|
- clusterService,
|
|
|
- desiredBalanceComputer,
|
|
|
- (reconcilerClusterState, rerouteStrategy) -> allocationServiceRef.get()
|
|
|
- .executeWithRoutingAllocation(reconcilerClusterState, "reconcile-desired-balance", rerouteStrategy),
|
|
|
- EMPTY_NODE_ALLOCATION_STATS,
|
|
|
- DesiredBalanceMetrics.NOOP
|
|
|
- ) {
|
|
|
- @Override
|
|
|
- protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
|
|
|
- logger.info("--> reconcile called");
|
|
|
- super.reconcile(desiredBalance, allocation);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- // A fixed ClusterInfo from one polling cycle with hotspot on first node
|
|
|
- final var zeroUsageStats = new ThreadPoolUsageStats(10, 0.0f, 0);
|
|
|
- final Map<String, NodeUsageStatsForThreadPools> initialThreadPoolStats = Map.of(
|
|
|
- firstNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(firstNode.getId(), Map.of(ThreadPool.Names.WRITE, new ThreadPoolUsageStats(10, 10.0f, 30))),
|
|
|
- secondNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(secondNode.getId(), Map.of(ThreadPool.Names.WRITE, zeroUsageStats)),
|
|
|
- thirdNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(secondNode.getId(), Map.of(ThreadPool.Names.WRITE, zeroUsageStats))
|
|
|
- );
|
|
|
-
|
|
|
- final var clusterInfoRef = new AtomicReference<>(
|
|
|
- ClusterInfo.builder()
|
|
|
- .dataPath(Map.of(ClusterInfo.NodeAndShard.from(shardRouting0), "/data/path0"))
|
|
|
- .shardWriteLoads(Map.of(shardId0, 10.0d, shardId1, 10.0d))
|
|
|
- .nodeUsageStatsForThreadPools(initialThreadPoolStats)
|
|
|
- .build()
|
|
|
- );
|
|
|
-
|
|
|
- var service = new AllocationService(
|
|
|
- new AllocationDeciders(List.of()),
|
|
|
- createGatewayAllocator(),
|
|
|
- desiredBalanceShardsAllocator,
|
|
|
- clusterInfoRef::get,
|
|
|
- () -> SnapshotShardSizeInfo.EMPTY,
|
|
|
- TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
|
|
|
- );
|
|
|
- allocationServiceRef.set(service);
|
|
|
-
|
|
|
- try {
|
|
|
- // 1. Initial reroute should produce a desired balance which moves the shard from first node to second node.
|
|
|
- rerouteAndWait(service, clusterState, "first-reroute");
|
|
|
- final DesiredBalance desiredBalance = desiredBalanceShardsAllocator.getDesiredBalance();
|
|
|
- assertNotNull(desiredBalance.assignments().get(shardId0));
|
|
|
-
|
|
|
- // When compute ends, the last ClusterInfo it uses is an updated one different from the initial one because the simulation
|
|
|
- // starts all initializing shards on their target nodes. Thread pool stats are updated to reflect the shard movements.
|
|
|
- final ClusterInfo updatedClusterInfo = clusterInfoUsedByAllocator.get();
|
|
|
- assertThat(updatedClusterInfo, not(equalTo(clusterInfoRef.get())));
|
|
|
-
|
|
|
- // First node has reduced utilization and zero latency since shard0 has moved away
|
|
|
- final var firstNodeUpdatedStats = updatedClusterInfo.getNodeUsageStatsForThreadPools()
|
|
|
- .get(firstNode.getId())
|
|
|
- .threadPoolUsageStatsMap()
|
|
|
- .get(ThreadPool.Names.WRITE);
|
|
|
- assertThat(
|
|
|
- firstNodeUpdatedStats.averageThreadPoolUtilization(),
|
|
|
- equalTo(
|
|
|
- initialThreadPoolStats.get(firstNode.getId())
|
|
|
- .threadPoolUsageStatsMap()
|
|
|
- .get(ThreadPool.Names.WRITE)
|
|
|
- .averageThreadPoolUtilization() - 1.0f
|
|
|
- )
|
|
|
- );
|
|
|
- assertThat(firstNodeUpdatedStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L));
|
|
|
-
|
|
|
- // Second node has increased utilization since shard0 moved onto it. Latency does not change for incoming shards
|
|
|
- final var secondNodeUpdatedStats = updatedClusterInfo.getNodeUsageStatsForThreadPools()
|
|
|
- .get(secondNode.getId())
|
|
|
- .threadPoolUsageStatsMap()
|
|
|
- .get(ThreadPool.Names.WRITE);
|
|
|
- assertThat(secondNodeUpdatedStats.averageThreadPoolUtilization(), equalTo(1.0f));
|
|
|
- assertThat(secondNodeUpdatedStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L));
|
|
|
-
|
|
|
- // Third node has increased utilization since shard1 started on it
|
|
|
- final var thirdNodeUpdatedStats = updatedClusterInfo.getNodeUsageStatsForThreadPools()
|
|
|
- .get(thirdNode.getId())
|
|
|
- .threadPoolUsageStatsMap()
|
|
|
- .get(ThreadPool.Names.WRITE);
|
|
|
- assertThat(thirdNodeUpdatedStats.averageThreadPoolUtilization(), equalTo(1.0f));
|
|
|
-
|
|
|
- // 2. Reroute again and the simulated ClusterInfo remains the same due to no new change
|
|
|
- rerouteAndWait(service, clusterService.state(), "reroute-with-desired-shard-movements");
|
|
|
- assertThat(clusterInfoUsedByAllocator.get(), equalTo(updatedClusterInfo));
|
|
|
-
|
|
|
- // 3. Wait until reconciliation is completed for the moved shard. This means shard0 is now relocating in the
|
|
|
- // actual cluster state, i.e. not just in the desired balance assignments.
|
|
|
- ClusterServiceUtils.addTemporaryStateListener(
|
|
|
- clusterService,
|
|
|
- state -> state.routingTable(ProjectId.DEFAULT).shardRoutingTable(shardId0).primaryShard().relocating()
|
|
|
- );
|
|
|
- // Reroute again and the ClusterInfo simulation result remains unchanged and correctly accounts for all shard movements
|
|
|
- // (reconciled or not)
|
|
|
- rerouteAndWait(service, clusterService.state(), "reroute-with-reconciled-shard-movement");
|
|
|
- assertThat(clusterInfoUsedByAllocator.get(), equalTo(updatedClusterInfo));
|
|
|
-
|
|
|
- // 4. Actually start the relocating shard0 on the target node-2, i.e. action to honour the reconciliation result
|
|
|
- ClusterServiceUtils.setState(
|
|
|
- clusterService,
|
|
|
- service.applyStartedShards(
|
|
|
- clusterService.state(),
|
|
|
- List.of(
|
|
|
- clusterService.state()
|
|
|
- .routingTable(ProjectId.DEFAULT)
|
|
|
- .shardRoutingTable(shardId0)
|
|
|
- .primaryShard()
|
|
|
- .getTargetRelocatingShard()
|
|
|
- )
|
|
|
- )
|
|
|
- );
|
|
|
- assertThat(
|
|
|
- clusterService.state().routingTable(ProjectId.DEFAULT).shardRoutingTable(shardId0).primaryShard().currentNodeId(),
|
|
|
- equalTo(secondNode.getId())
|
|
|
- );
|
|
|
- // The actually started shard0 is still accounted for when simulating ClusterInfo
|
|
|
- rerouteAndWait(service, clusterService.state(), "reroute-with-actual-relocating-shard-started-event");
|
|
|
- assertThat(clusterInfoUsedByAllocator.get(), equalTo(updatedClusterInfo));
|
|
|
-
|
|
|
- // 5. Also start the initializing shard1 on the target node-3
|
|
|
- ClusterServiceUtils.setState(
|
|
|
- clusterService,
|
|
|
- service.applyStartedShards(
|
|
|
- clusterService.state(),
|
|
|
- List.of(clusterService.state().routingTable(ProjectId.DEFAULT).shardRoutingTable(shardId1).primaryShard())
|
|
|
- )
|
|
|
- );
|
|
|
- assertThat(
|
|
|
- clusterService.state().routingTable(ProjectId.DEFAULT).shardRoutingTable(shardId1).primaryShard().currentNodeId(),
|
|
|
- equalTo(thirdNode.getId())
|
|
|
- );
|
|
|
- // The actually started shard1 is accounted for when simulating ClusterInfo
|
|
|
- rerouteAndWait(service, clusterService.state(), "reroute-with-actual-initializing-shard-started-event");
|
|
|
- assertThat(clusterInfoUsedByAllocator.get(), equalTo(updatedClusterInfo));
|
|
|
-
|
|
|
- // 6. A new ClusterInfo is polled
|
|
|
- clusterInfoRef.set(
|
|
|
- ClusterInfo.builder()
|
|
|
- .dataPath(
|
|
|
- Map.of(
|
|
|
- new ClusterInfo.NodeAndShard(secondNode.getId(), shardId0),
|
|
|
- "/data/path0",
|
|
|
- new ClusterInfo.NodeAndShard(thirdNode.getId(), shardId1),
|
|
|
- "/data/path1"
|
|
|
- )
|
|
|
- )
|
|
|
- .shardWriteLoads(Map.of(shardId0, 10.0d, shardId1, 1.0d))
|
|
|
- .nodeUsageStatsForThreadPools(
|
|
|
- Map.of(
|
|
|
- firstNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(
|
|
|
- firstNode.getId(),
|
|
|
- Map.of(ThreadPool.Names.WRITE, new ThreadPoolUsageStats(10, 5.0f, 30))
|
|
|
- ),
|
|
|
- secondNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(
|
|
|
- secondNode.getId(),
|
|
|
- Map.of(ThreadPool.Names.WRITE, new ThreadPoolUsageStats(10, 5.0f, 30))
|
|
|
- ),
|
|
|
- thirdNode.getId(),
|
|
|
- new NodeUsageStatsForThreadPools(
|
|
|
- secondNode.getId(),
|
|
|
- Map.of(ThreadPool.Names.WRITE, new ThreadPoolUsageStats(10, 1.0f, 5))
|
|
|
- )
|
|
|
- )
|
|
|
- )
|
|
|
- .build()
|
|
|
- );
|
|
|
- // ClusterInfo is updated from the new polling and no adjustment is applied onto the new ClusterInfo due to no movement since
|
|
|
- rerouteAndWait(service, clusterService.state(), "reroute-after-new-cluster-info-polled");
|
|
|
- assertThat(clusterInfoUsedByAllocator.get(), equalTo(clusterInfoRef.get()));
|
|
|
- } finally {
|
|
|
- clusterService.close();
|
|
|
- terminate(threadPool);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public void testResetDesiredBalanceOnNoLongerMaster() {
|
|
|
|
|
|
var node1 = newNode(LOCAL_NODE_ID);
|