|  | @@ -110,6 +110,8 @@ import static org.mockito.Mockito.when;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private static AtomicReference<DesiredBalanceMetrics.AllocationStats> ALLOCATION_STATS_PLACEHOLDER = new AtomicReference<>();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testNoChangesOnEmptyDesiredBalance() {
 | 
	
		
			
				|  |  |          final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3);
 | 
	
		
			
				|  |  |          final var routingAllocation = createRoutingAllocationFrom(clusterState);
 | 
	
	
		
			
				|  | @@ -235,8 +237,9 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              (indexName, nodeId) -> indexName.equals("index-0") && nodeId.equals("node-0")
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        AtomicReference<DesiredBalanceMetrics.AllocationStats> allocationStats = new AtomicReference<>();
 | 
	
		
			
				|  |  |          final var allocationService = createTestAllocationService(
 | 
	
		
			
				|  |  | -            routingAllocation -> reconcile(routingAllocation, desiredBalance),
 | 
	
		
			
				|  |  | +            routingAllocation -> reconcile(routingAllocation, desiredBalance, allocationStats),
 | 
	
		
			
				|  |  |              new SameShardAllocationDecider(clusterSettings),
 | 
	
		
			
				|  |  |              new ReplicaAfterPrimaryActiveAllocationDecider(),
 | 
	
		
			
				|  |  |              new ThrottlingAllocationDecider(clusterSettings),
 | 
	
	
		
			
				|  | @@ -260,6 +263,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              final var index1RoutingTable = stateWithStartedPrimary.routingTable().shardRoutingTable("index-1", 0);
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.primaryShard().unassigned());
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
 | 
	
		
			
				|  |  | +            assertNotNull(allocationStats.get());
 | 
	
		
			
				|  |  | +            assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle
 | 
	
	
		
			
				|  | @@ -273,6 +278,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              final var index1RoutingTable = stateWithInitializingSecondPrimary.routingTable().shardRoutingTable("index-1", 0);
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.primaryShard().initializing());
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
 | 
	
		
			
				|  |  | +            assertNotNull(allocationStats.get());
 | 
	
		
			
				|  |  | +            assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute(
 | 
	
	
		
			
				|  | @@ -286,6 +293,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              final var index1RoutingTable = stateWithStartedPrimariesAndInitializingReplica.routingTable().shardRoutingTable("index-1", 0);
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.primaryShard().started());
 | 
	
		
			
				|  |  |              assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
 | 
	
		
			
				|  |  | +            assertNotNull(allocationStats.get());
 | 
	
		
			
				|  |  | +            assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -815,6 +824,9 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testMoveShards() {
 | 
	
		
			
				|  |  | +        /**
 | 
	
		
			
				|  |  | +         * Set up 4 nodes and an index of 3 shards with 1 replica each (6 shard copies).
 | 
	
		
			
				|  |  | +         */
 | 
	
		
			
				|  |  |          final var discoveryNodes = discoveryNodes(4);
 | 
	
		
			
				|  |  |          final var metadata = Metadata.builder();
 | 
	
		
			
				|  |  |          final var routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY);
 | 
	
	
		
			
				|  | @@ -839,11 +851,13 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              .build();
 | 
	
		
			
				|  |  |          final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // Set up overriding AllocationDecider#canAllocate decisions for a shard.
 | 
	
		
			
				|  |  |          final var canAllocateRef = new AtomicReference<>(Decision.YES);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final var desiredBalance = new AtomicReference<>(desiredBalance(clusterState, (shardId, nodeId) -> true));
 | 
	
		
			
				|  |  | +        AtomicReference<DesiredBalanceMetrics.AllocationStats> allocationStats = new AtomicReference<>();
 | 
	
		
			
				|  |  |          final var allocationService = createTestAllocationService(
 | 
	
		
			
				|  |  | -            routingAllocation -> reconcile(routingAllocation, desiredBalance.get()),
 | 
	
		
			
				|  |  | +            routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), allocationStats),
 | 
	
		
			
				|  |  |              new SameShardAllocationDecider(clusterSettings),
 | 
	
		
			
				|  |  |              new ReplicaAfterPrimaryActiveAllocationDecider(),
 | 
	
		
			
				|  |  |              new ThrottlingAllocationDecider(clusterSettings),
 | 
	
	
		
			
				|  | @@ -873,7 +887,10 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              assertTrue(shardRouting.started());
 | 
	
		
			
				|  |  |              assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +        assertNotNull(allocationStats);
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // Only allow allocation on two of the nodes, excluding the other two nodes.
 | 
	
		
			
				|  |  |          clusterSettings.applySettings(
 | 
	
		
			
				|  |  |              Settings.builder()
 | 
	
		
			
				|  |  |                  .putList(
 | 
	
	
		
			
				|  | @@ -886,6 +903,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no
 | 
	
		
			
				|  |  |                                                                                                            // movement needed
 | 
	
		
			
				|  |  | +        assertNotNull(allocationStats);
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -894,10 +913,14 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          final var reroutedState = allocationService.reroute(clusterState, "test", ActionListener.noop());
 | 
	
		
			
				|  |  |          assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
 | 
	
		
			
				|  |  |          assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
 | 
	
		
			
				|  |  | +        assertNotNull(allocationStats);
 | 
	
		
			
				|  |  | +        // Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Ensuring that we check the shortcut two-param canAllocate() method up front
 | 
	
		
			
				|  |  |          canAllocateRef.set(Decision.NO);
 | 
	
		
			
				|  |  |          assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
 | 
	
		
			
				|  |  |          canAllocateRef.set(Decision.YES);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Restore filter to default
 | 
	
	
		
			
				|  | @@ -935,6 +958,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              "test",
 | 
	
		
			
				|  |  |              ActionListener.noop()
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -963,11 +988,13 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          final var desiredBalance = new AtomicReference<>(
 | 
	
		
			
				|  |  |              desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-0") || nodeId.equals("node-1"))
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  | +        AtomicReference<DesiredBalanceMetrics.AllocationStats> allocationStats = new AtomicReference<>();
 | 
	
		
			
				|  |  |          final var allocationService = createTestAllocationService(
 | 
	
		
			
				|  |  | -            routingAllocation -> reconcile(routingAllocation, desiredBalance.get()),
 | 
	
		
			
				|  |  | +            routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), allocationStats),
 | 
	
		
			
				|  |  |              new SameShardAllocationDecider(clusterSettings),
 | 
	
		
			
				|  |  |              new ReplicaAfterPrimaryActiveAllocationDecider(),
 | 
	
		
			
				|  |  |              new ThrottlingAllocationDecider(clusterSettings),
 | 
	
		
			
				|  |  | +            new ConcurrentRebalanceAllocationDecider(clusterSettings),
 | 
	
		
			
				|  |  |              new AllocationDecider() {
 | 
	
		
			
				|  |  |                  @Override
 | 
	
		
			
				|  |  |                  public Decision canRebalance(RoutingAllocation allocation) {
 | 
	
	
		
			
				|  | @@ -997,24 +1024,28 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no
 | 
	
		
			
				|  |  | -                                                                                                          // movement needed
 | 
	
		
			
				|  |  | +        // All still on desired nodes, no movement needed, cluster state remains the same.
 | 
	
		
			
				|  |  | +        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          canRebalanceGlobalRef.set(Decision.NO);
 | 
	
		
			
				|  |  | -        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // rebalancing forbidden on all
 | 
	
		
			
				|  |  | -                                                                                                          // shards, no movement
 | 
	
		
			
				|  |  | +        // rebalancing forbidden on all shards, no movement allowed, cluster state remains the same.
 | 
	
		
			
				|  |  | +        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
 | 
	
		
			
				|  |  |          canRebalanceGlobalRef.set(Decision.YES);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          canRebalanceShardRef.set(Decision.NO);
 | 
	
		
			
				|  |  | -        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // rebalancing forbidden on
 | 
	
		
			
				|  |  | -                                                                                                          // specific shards, no movement
 | 
	
		
			
				|  |  | +        // rebalancing forbidden on specific shards, still no movement.
 | 
	
		
			
				|  |  | +        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
 | 
	
		
			
				|  |  |          canRebalanceShardRef.set(Decision.YES);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          canAllocateShardRef.set(Decision.NO);
 | 
	
		
			
				|  |  | -        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // allocation not possible, no
 | 
	
		
			
				|  |  | -                                                                                                          // movement
 | 
	
		
			
				|  |  | +        // allocation not possible, no movement
 | 
	
		
			
				|  |  | +        assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
 | 
	
		
			
				|  |  |          canAllocateShardRef.set(Decision.YES);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // The next reroute starts moving shards to node-2 and node-3, but interleaves the decisions between node-0 and node-1 for fairness.
 | 
	
	
		
			
				|  | @@ -1022,6 +1053,16 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          final var reroutedState = allocationService.reroute(clusterState, "test", ActionListener.noop());
 | 
	
		
			
				|  |  |          assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
 | 
	
		
			
				|  |  |          assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
 | 
	
		
			
				|  |  | +        assertNotNull(allocationStats.get());
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged
 | 
	
		
			
				|  |  | +        // because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not
 | 
	
		
			
				|  |  | +        // been completed, no additional shard relocations can begin.
 | 
	
		
			
				|  |  | +        assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop()));
 | 
	
		
			
				|  |  | +        assertNotNull(allocationStats);
 | 
	
		
			
				|  |  | +        // Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
 | 
	
		
			
				|  |  | +        assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testDoNotRebalanceToTheNodeThatNoLongerExists() {
 | 
	
	
		
			
				|  | @@ -1225,12 +1266,14 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          while (true) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              var allocation = createRoutingAllocationFrom(clusterState, deciders);
 | 
	
		
			
				|  |  | -            reconciler.reconcile(balance, allocation);
 | 
	
		
			
				|  |  | +            var allocationStats = reconciler.reconcile(balance, allocation);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING);
 | 
	
		
			
				|  |  |              if (initializing.isEmpty()) {
 | 
	
		
			
				|  |  | +                assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats);
 | 
	
		
			
				|  |  |                  break;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              for (ShardRouting shardRouting : initializing) {
 | 
	
		
			
				|  |  |                  totalOutgoingMoves.get(shardRouting.relocatingNodeId()).incrementAndGet();
 | 
	
		
			
				|  |  |                  allocation.routingNodes().startShard(shardRouting, allocation.changes(), 0L);
 | 
	
	
		
			
				|  | @@ -1344,11 +1387,24 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
 | 
	
		
			
				|  |  | +        reconcile(routingAllocation, desiredBalance, ALLOCATION_STATS_PLACEHOLDER);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private static void reconcile(
 | 
	
		
			
				|  |  | +        RoutingAllocation routingAllocation,
 | 
	
		
			
				|  |  | +        DesiredBalance desiredBalance,
 | 
	
		
			
				|  |  | +        AtomicReference<DesiredBalanceMetrics.AllocationStats> allocationStatsAtomicReference
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  |          final var threadPool = mock(ThreadPool.class);
 | 
	
		
			
				|  |  |          when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet);
 | 
	
		
			
				|  |  | -        new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation);
 | 
	
		
			
				|  |  | +        allocationStatsAtomicReference.set(
 | 
	
		
			
				|  |  | +            new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation)
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Returns whether the node's shards are all desired assignments.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  |      private static boolean isReconciled(RoutingNode node, DesiredBalance balance) {
 | 
	
		
			
				|  |  |          for (ShardRouting shardRouting : node) {
 | 
	
		
			
				|  |  |              if (balance.assignments().get(shardRouting.shardId()).nodeIds().contains(node.nodeId()) == false) {
 | 
	
	
		
			
				|  | @@ -1486,6 +1542,10 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |              .build();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Settings that limit concurrency on each node to: a single primary shard recovery from local disk; a single shard move as a source
 | 
	
		
			
				|  |  | +     * node; a single shard move as the destination node.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  |      private static Settings throttleSettings() {
 | 
	
		
			
				|  |  |          return Settings.builder()
 | 
	
		
			
				|  |  |              .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1)
 |