|
@@ -13,6 +13,7 @@ import org.apache.logging.log4j.Level;
|
|
|
import org.apache.lucene.util.SetOnce;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.support.ActionTestUtils;
|
|
|
+import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.ClusterInfo;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
@@ -56,6 +57,7 @@ import org.elasticsearch.test.ClusterServiceUtils;
|
|
|
import org.elasticsearch.test.MockLog;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Queue;
|
|
@@ -78,7 +80,9 @@ import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
import static org.hamcrest.Matchers.hasItem;
|
|
|
+import static org.hamcrest.Matchers.is;
|
|
|
import static org.hamcrest.Matchers.not;
|
|
|
+import static org.hamcrest.Matchers.sameInstance;
|
|
|
|
|
|
public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
|
|
@@ -906,6 +910,76 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testNotReconcileEagerlyForEmptyRoutingTable() {
|
|
|
+ final var threadPool = new TestThreadPool(getTestName());
|
|
|
+ final var clusterService = ClusterServiceUtils.createClusterService(ClusterState.EMPTY_STATE, threadPool);
|
|
|
+ final var clusterSettings = createBuiltInClusterSettings();
|
|
|
+ final var shardsAllocator = createShardsAllocator();
|
|
|
+ final var reconciliationTaskSubmitted = new AtomicBoolean();
|
|
|
+ final var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
|
|
|
+ shardsAllocator,
|
|
|
+ threadPool,
|
|
|
+ clusterService,
|
|
|
+ new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
|
|
|
+ @Override
|
|
|
+ public DesiredBalance compute(
|
|
|
+ DesiredBalance previousDesiredBalance,
|
|
|
+ DesiredBalanceInput desiredBalanceInput,
|
|
|
+ Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
|
|
|
+ Predicate<DesiredBalanceInput> isFresh
|
|
|
+ ) {
|
|
|
+ assertThat(previousDesiredBalance, sameInstance(DesiredBalance.INITIAL));
|
|
|
+ return new DesiredBalance(desiredBalanceInput.index(), Map.of());
|
|
|
+ }
|
|
|
+ },
|
|
|
+ (clusterState, rerouteStrategy) -> null,
|
|
|
+ TelemetryProvider.NOOP
|
|
|
+ ) {
|
|
|
+
|
|
|
+ private ActionListener<Void> lastListener;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
|
|
|
+ lastListener = listener;
|
|
|
+ super.allocate(allocation, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
|
|
|
+ fail("should not call reconcile");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void submitReconcileTask(DesiredBalance desiredBalance) {
|
|
|
+ assertThat(desiredBalance.lastConvergedIndex(), equalTo(0L));
|
|
|
+ reconciliationTaskSubmitted.set(true);
|
|
|
+ lastListener.onResponse(null);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ assertThat(desiredBalanceShardsAllocator.getDesiredBalance(), sameInstance(DesiredBalance.INITIAL));
|
|
|
+ try {
|
|
|
+ final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
|
|
+ desiredBalanceShardsAllocator.allocate(
|
|
|
+ new RoutingAllocation(
|
|
|
+ new AllocationDeciders(Collections.emptyList()),
|
|
|
+ clusterService.state(),
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ randomNonNegativeLong()
|
|
|
+ ),
|
|
|
+ future
|
|
|
+ );
|
|
|
+ safeGet(future);
|
|
|
+ assertThat(desiredBalanceShardsAllocator.getStats().computationSubmitted(), equalTo(1L));
|
|
|
+ assertThat(desiredBalanceShardsAllocator.getStats().computationExecuted(), equalTo(1L));
|
|
|
+ assertThat(reconciliationTaskSubmitted.get(), is(true));
|
|
|
+ assertThat(desiredBalanceShardsAllocator.getDesiredBalance().lastConvergedIndex(), equalTo(0L));
|
|
|
+ } finally {
|
|
|
+ clusterService.close();
|
|
|
+ terminate(threadPool);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static IndexMetadata createIndex(String name) {
|
|
|
return IndexMetadata.builder(name).settings(indexSettings(IndexVersion.current(), 1, 0)).build();
|
|
|
}
|