|  | @@ -9,6 +9,7 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  package org.elasticsearch.cluster.routing.allocation.allocator;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import org.apache.logging.log4j.Level;
 | 
	
		
			
				|  |  |  import org.apache.lucene.util.SetOnce;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.ActionTestUtils;
 | 
	
	
		
			
				|  | @@ -52,6 +53,7 @@ import org.elasticsearch.index.shard.ShardId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
 | 
	
		
			
				|  |  |  import org.elasticsearch.telemetry.TelemetryProvider;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.ClusterServiceUtils;
 | 
	
		
			
				|  |  | +import org.elasticsearch.test.MockLog;
 | 
	
		
			
				|  |  |  import org.elasticsearch.threadpool.TestThreadPool;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
	
		
			
				|  | @@ -59,11 +61,12 @@ import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.Queue;
 | 
	
		
			
				|  |  |  import java.util.Set;
 | 
	
		
			
				|  |  |  import java.util.concurrent.CountDownLatch;
 | 
	
		
			
				|  |  | +import java.util.concurrent.CyclicBarrier;
 | 
	
		
			
				|  |  |  import java.util.concurrent.TimeUnit;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicBoolean;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicLong;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicReference;
 | 
	
		
			
				|  |  | -import java.util.function.BiConsumer;
 | 
	
		
			
				|  |  |  import java.util.function.Consumer;
 | 
	
		
			
				|  |  |  import java.util.function.Predicate;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -85,14 +88,19 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |      public void testGatewayAllocatorPreemptsAllocation() {
 | 
	
		
			
				|  |  |          final var nodeId = randomFrom(LOCAL_NODE_ID, OTHER_NODE_ID);
 | 
	
		
			
				|  |  |          testAllocate(
 | 
	
		
			
				|  |  | -            (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.initialize(nodeId, null, 0L, allocation.changes()),
 | 
	
		
			
				|  |  | +            (shardRouting, allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.initialize(
 | 
	
		
			
				|  |  | +                nodeId,
 | 
	
		
			
				|  |  | +                null,
 | 
	
		
			
				|  |  | +                0L,
 | 
	
		
			
				|  |  | +                allocation.changes()
 | 
	
		
			
				|  |  | +            ),
 | 
	
		
			
				|  |  |              routingTable -> assertEquals(nodeId, routingTable.index("test-index").shard(0).primaryShard().currentNodeId())
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testGatewayAllocatorStillFetching() {
 | 
	
		
			
				|  |  |          testAllocate(
 | 
	
		
			
				|  |  | -            (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.removeAndIgnore(
 | 
	
		
			
				|  |  | +            (shardRouting, allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.removeAndIgnore(
 | 
	
		
			
				|  |  |                  UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA,
 | 
	
		
			
				|  |  |                  allocation.changes()
 | 
	
		
			
				|  |  |              ),
 | 
	
	
		
			
				|  | @@ -108,17 +116,14 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testGatewayAllocatorDoesNothing() {
 | 
	
		
			
				|  |  | -        testAllocate((allocation, unassignedAllocationHandler) -> {}, routingTable -> {
 | 
	
		
			
				|  |  | +        testAllocate((shardRouting, allocation, unassignedAllocationHandler) -> {}, routingTable -> {
 | 
	
		
			
				|  |  |              var shardRouting = routingTable.shardRoutingTable("test-index", 0).primaryShard();
 | 
	
		
			
				|  |  |              assertTrue(shardRouting.assignedToNode());// assigned by a followup reconciliation
 | 
	
		
			
				|  |  |              assertThat(shardRouting.unassignedInfo().lastAllocationStatus(), equalTo(UnassignedInfo.AllocationStatus.NO_ATTEMPT));
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    public void testAllocate(
 | 
	
		
			
				|  |  | -        BiConsumer<RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler> allocateUnassigned,
 | 
	
		
			
				|  |  | -        Consumer<RoutingTable> verifier
 | 
	
		
			
				|  |  | -    ) {
 | 
	
		
			
				|  |  | +    public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer<RoutingTable> verifier) {
 | 
	
		
			
				|  |  |          var deterministicTaskQueue = new DeterministicTaskQueue();
 | 
	
		
			
				|  |  |          var threadPool = deterministicTaskQueue.getThreadPool();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -295,7 +300,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          var allocationService = new AllocationService(
 | 
	
		
			
				|  |  |              new AllocationDeciders(List.of()),
 | 
	
		
			
				|  |  |              createGatewayAllocator(
 | 
	
		
			
				|  |  | -                (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.removeAndIgnore(
 | 
	
		
			
				|  |  | +                (shardRouting, allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.removeAndIgnore(
 | 
	
		
			
				|  |  |                      UnassignedInfo.AllocationStatus.NO_ATTEMPT,
 | 
	
		
			
				|  |  |                      allocation.changes()
 | 
	
		
			
				|  |  |                  )
 | 
	
	
		
			
				|  | @@ -336,6 +341,157 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testIndexCreationInterruptsLongDesiredBalanceComputation() throws Exception {
 | 
	
		
			
				|  |  | +        var discoveryNode = newNode("node-0");
 | 
	
		
			
				|  |  | +        var initialState = ClusterState.builder(ClusterName.DEFAULT)
 | 
	
		
			
				|  |  | +            .nodes(DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).masterNodeId(discoveryNode.getId()))
 | 
	
		
			
				|  |  | +            .build();
 | 
	
		
			
				|  |  | +        final var ignoredIndexName = "index-ignored";
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        var threadPool = new TestThreadPool(getTestName());
 | 
	
		
			
				|  |  | +        var time = new AtomicLong(threadPool.relativeTimeInMillis());
 | 
	
		
			
				|  |  | +        var clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
 | 
	
		
			
				|  |  | +        var allocationServiceRef = new SetOnce<AllocationService>();
 | 
	
		
			
				|  |  | +        var reconcileAction = new DesiredBalanceReconcilerAction() {
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllocationAction) {
 | 
	
		
			
				|  |  | +                return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        var gatewayAllocator = createGatewayAllocator((shardRouting, allocation, unassignedAllocationHandler) -> {
 | 
	
		
			
				|  |  | +            if (shardRouting.getIndexName().equals(ignoredIndexName)) {
 | 
	
		
			
				|  |  | +                unassignedAllocationHandler.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +        var shardsAllocator = new ShardsAllocator() {
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void allocate(RoutingAllocation allocation) {
 | 
	
		
			
				|  |  | +                // simulate long computation
 | 
	
		
			
				|  |  | +                time.addAndGet(1_000);
 | 
	
		
			
				|  |  | +                var dataNodeId = allocation.nodes().getDataNodes().values().iterator().next().getId();
 | 
	
		
			
				|  |  | +                var unassignedIterator = allocation.routingNodes().unassigned().iterator();
 | 
	
		
			
				|  |  | +                while (unassignedIterator.hasNext()) {
 | 
	
		
			
				|  |  | +                    unassignedIterator.next();
 | 
	
		
			
				|  |  | +                    unassignedIterator.initialize(dataNodeId, null, 0L, allocation.changes());
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
 | 
	
		
			
				|  |  | +                throw new AssertionError("only used for allocation explain");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Make sure the computation takes at least a few iterations, where each iteration takes 1s (see {@code #shardsAllocator.allocate}).
 | 
	
		
			
				|  |  | +        // By setting the following setting we ensure the desired balance computation will be interrupted early to not delay assigning
 | 
	
		
			
				|  |  | +        // newly created primary shards. This ensures that we hit a desired balance computation (3s) which is longer than the configured
 | 
	
		
			
				|  |  | +        // setting below.
 | 
	
		
			
				|  |  | +        var clusterSettings = createBuiltInClusterSettings(
 | 
	
		
			
				|  |  | +            Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2s").build()
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +        final int minIterations = between(3, 10);
 | 
	
		
			
				|  |  | +        var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
 | 
	
		
			
				|  |  | +            shardsAllocator,
 | 
	
		
			
				|  |  | +            threadPool,
 | 
	
		
			
				|  |  | +            clusterService,
 | 
	
		
			
				|  |  | +            new DesiredBalanceComputer(clusterSettings, shardsAllocator, time::get) {
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                public DesiredBalance compute(
 | 
	
		
			
				|  |  | +                    DesiredBalance previousDesiredBalance,
 | 
	
		
			
				|  |  | +                    DesiredBalanceInput desiredBalanceInput,
 | 
	
		
			
				|  |  | +                    Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
 | 
	
		
			
				|  |  | +                    Predicate<DesiredBalanceInput> isFresh
 | 
	
		
			
				|  |  | +                ) {
 | 
	
		
			
				|  |  | +                    return super.compute(previousDesiredBalance, desiredBalanceInput, pendingDesiredBalanceMoves, isFresh);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                boolean hasComputationConverged(boolean hasRoutingChanges, int currentIteration) {
 | 
	
		
			
				|  |  | +                    return super.hasComputationConverged(hasRoutingChanges, currentIteration) && currentIteration >= minIterations;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            },
 | 
	
		
			
				|  |  | +            reconcileAction,
 | 
	
		
			
				|  |  | +            TelemetryProvider.NOOP
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +        var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator);
 | 
	
		
			
				|  |  | +        allocationServiceRef.set(allocationService);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        var rerouteFinished = new CyclicBarrier(2);
 | 
	
		
			
				|  |  | +        // A mock cluster state update task for creating an index
 | 
	
		
			
				|  |  | +        class CreateIndexTask extends ClusterStateUpdateTask {
 | 
	
		
			
				|  |  | +            private final String indexName;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            private CreateIndexTask(String indexName) {
 | 
	
		
			
				|  |  | +                this.indexName = indexName;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public ClusterState execute(ClusterState currentState) throws Exception {
 | 
	
		
			
				|  |  | +                var indexMetadata = createIndex(indexName);
 | 
	
		
			
				|  |  | +                var newState = ClusterState.builder(currentState)
 | 
	
		
			
				|  |  | +                    .metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true))
 | 
	
		
			
				|  |  | +                    .routingTable(
 | 
	
		
			
				|  |  | +                        RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, currentState.routingTable())
 | 
	
		
			
				|  |  | +                            .addAsNew(indexMetadata)
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  | +                    .build();
 | 
	
		
			
				|  |  | +                return allocationService.reroute(
 | 
	
		
			
				|  |  | +                    newState,
 | 
	
		
			
				|  |  | +                    "test",
 | 
	
		
			
				|  |  | +                    ActionTestUtils.assertNoFailureListener(response -> safeAwait(rerouteFinished))
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +                throw new AssertionError(e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final var computationInterruptedMessage =
 | 
	
		
			
				|  |  | +            "Desired balance computation for * interrupted * in order to not delay assignment of newly created index shards *";
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Create a new index which is not ignored and therefore must be considered when a desired balance
 | 
	
		
			
				|  |  | +            // computation takes longer than 2s.
 | 
	
		
			
				|  |  | +            assertThat(desiredBalanceShardsAllocator.getStats().computationExecuted(), equalTo(0L));
 | 
	
		
			
				|  |  | +            MockLog.assertThatLogger(() -> {
 | 
	
		
			
				|  |  | +                clusterService.submitUnbatchedStateUpdateTask("test", new CreateIndexTask("index-1"));
 | 
	
		
			
				|  |  | +                safeAwait(rerouteFinished);
 | 
	
		
			
				|  |  | +                assertThat(clusterService.state().getRoutingTable().index("index-1").primaryShardsUnassigned(), equalTo(0));
 | 
	
		
			
				|  |  | +            },
 | 
	
		
			
				|  |  | +                DesiredBalanceComputer.class,
 | 
	
		
			
				|  |  | +                new MockLog.SeenEventExpectation(
 | 
	
		
			
				|  |  | +                    "Should log interrupted computation",
 | 
	
		
			
				|  |  | +                    DesiredBalanceComputer.class.getCanonicalName(),
 | 
	
		
			
				|  |  | +                    Level.INFO,
 | 
	
		
			
				|  |  | +                    computationInterruptedMessage
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            assertBusy(() -> assertFalse(desiredBalanceShardsAllocator.getStats().computationActive()));
 | 
	
		
			
				|  |  | +            assertThat(desiredBalanceShardsAllocator.getStats().computationExecuted(), equalTo(2L));
 | 
	
		
			
				|  |  | +            // The computation should not get interrupted when the newly created index shard stays unassigned.
 | 
	
		
			
				|  |  | +            MockLog.assertThatLogger(() -> {
 | 
	
		
			
				|  |  | +                clusterService.submitUnbatchedStateUpdateTask("test", new CreateIndexTask(ignoredIndexName));
 | 
	
		
			
				|  |  | +                safeAwait(rerouteFinished);
 | 
	
		
			
				|  |  | +                assertThat(clusterService.state().getRoutingTable().index(ignoredIndexName).primaryShardsUnassigned(), equalTo(1));
 | 
	
		
			
				|  |  | +            },
 | 
	
		
			
				|  |  | +                DesiredBalanceComputer.class,
 | 
	
		
			
				|  |  | +                new MockLog.UnseenEventExpectation(
 | 
	
		
			
				|  |  | +                    "Should log interrupted computation",
 | 
	
		
			
				|  |  | +                    DesiredBalanceComputer.class.getCanonicalName(),
 | 
	
		
			
				|  |  | +                    Level.INFO,
 | 
	
		
			
				|  |  | +                    computationInterruptedMessage
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            assertBusy(() -> assertFalse(desiredBalanceShardsAllocator.getStats().computationActive()));
 | 
	
		
			
				|  |  | +            assertThat(desiredBalanceShardsAllocator.getStats().computationExecuted(), equalTo(3L));
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            clusterService.close();
 | 
	
		
			
				|  |  | +            terminate(threadPool);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testCallListenersOnlyAfterProducingFreshInput() throws InterruptedException {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          var reconciliations = new AtomicInteger(0);
 | 
	
	
		
			
				|  | @@ -772,13 +928,30 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |          return createGatewayAllocator(DesiredBalanceShardsAllocatorTests::initialize);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private static void initialize(RoutingAllocation allocation, ExistingShardsAllocator.UnassignedAllocationHandler handler) {
 | 
	
		
			
				|  |  | +    private static void initialize(
 | 
	
		
			
				|  |  | +        ShardRouting shardRouting,
 | 
	
		
			
				|  |  | +        RoutingAllocation allocation,
 | 
	
		
			
				|  |  | +        ExistingShardsAllocator.UnassignedAllocationHandler handler
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  |          handler.initialize(allocation.nodes().getLocalNodeId(), null, 0L, allocation.changes());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private static GatewayAllocator createGatewayAllocator(
 | 
	
		
			
				|  |  | -        BiConsumer<RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler> allocateUnassigned
 | 
	
		
			
				|  |  | -    ) {
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * A helper interface to simplify creating a GatewayAllocator in the tests by only requiring
 | 
	
		
			
				|  |  | +     * an implementation for {@link org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator#allocateUnassigned}.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    interface AllocateUnassignedHandler {
 | 
	
		
			
				|  |  | +        void handle(
 | 
	
		
			
				|  |  | +            ShardRouting shardRouting,
 | 
	
		
			
				|  |  | +            RoutingAllocation allocation,
 | 
	
		
			
				|  |  | +            ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Creates an implementation of GatewayAllocator that delegates its logic for allocating unassigned shards to the provided handler.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static GatewayAllocator createGatewayAllocator(AllocateUnassignedHandler allocateUnassigned) {
 | 
	
		
			
				|  |  |          return new GatewayAllocator() {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
	
		
			
				|  | @@ -790,7 +963,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
 | 
	
		
			
				|  |  |                  RoutingAllocation allocation,
 | 
	
		
			
				|  |  |                  UnassignedAllocationHandler unassignedAllocationHandler
 | 
	
		
			
				|  |  |              ) {
 | 
	
		
			
				|  |  | -                allocateUnassigned.accept(allocation, unassignedAllocationHandler);
 | 
	
		
			
				|  |  | +                allocateUnassigned.handle(shardRouting, allocation, unassignedAllocationHandler);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 |