|
@@ -22,10 +22,13 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
+import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
|
|
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
|
@@ -39,6 +42,7 @@ import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
|
|
|
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
|
|
+import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.gateway.GatewayAllocator;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
|
@@ -48,6 +52,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -57,6 +62,9 @@ import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Predicate;
|
|
|
|
|
|
+import static org.elasticsearch.cluster.routing.AllocationId.newInitializing;
|
|
|
+import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
|
|
+import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
|
|
|
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.hasItem;
|
|
@@ -118,9 +126,9 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
var clusterSettings = createBuiltInClusterSettings(settings);
|
|
|
var clusterService = new ClusterService(
|
|
|
settings,
|
|
|
- createBuiltInClusterSettings(settings),
|
|
|
+ clusterSettings,
|
|
|
new FakeThreadPoolMasterService(LOCAL_NODE_ID, threadPool, deterministicTaskQueue::scheduleNow),
|
|
|
- new ClusterApplierService(LOCAL_NODE_ID, Settings.EMPTY, clusterSettings, threadPool) {
|
|
|
+ new ClusterApplierService(LOCAL_NODE_ID, settings, clusterSettings, threadPool) {
|
|
|
@Override
|
|
|
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
|
|
return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
|
|
@@ -137,7 +145,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
- public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
+ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllocationAction) {
|
|
|
return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
|
}
|
|
|
};
|
|
@@ -194,6 +202,112 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testShouldNotRemoveAllocationDelayMarkersOnReconcile() {
|
|
|
+
|
|
|
+ var localNode = newNode(LOCAL_NODE_ID);
|
|
|
+ var otherNode = newNode(OTHER_NODE_ID);
|
|
|
+
|
|
|
+ var unassignedTimeNanos = System.nanoTime();
|
|
|
+ var computationTime = unassignedTimeNanos;
|
|
|
+ var reconciliationTime = unassignedTimeNanos + INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(Settings.EMPTY).nanos();
|
|
|
+
|
|
|
+ var delayedUnasssignedInfo = new UnassignedInfo(
|
|
|
+ UnassignedInfo.Reason.NODE_RESTARTING,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ 0,
|
|
|
+ unassignedTimeNanos,
|
|
|
+ TimeValue.nsecToMSec(unassignedTimeNanos),
|
|
|
+ true,
|
|
|
+ UnassignedInfo.AllocationStatus.NO_ATTEMPT,
|
|
|
+ Set.of(),
|
|
|
+ "node-3"
|
|
|
+ );
|
|
|
+
|
|
|
+ var inSyncAllocationId = UUIDs.randomBase64UUID();
|
|
|
+ var index = IndexMetadata.builder("test")
|
|
|
+ .settings(indexSettings(Version.CURRENT, 1, 1))
|
|
|
+ .putInSyncAllocationIds(0, Set.of(inSyncAllocationId))
|
|
|
+ .build();
|
|
|
+ var shardId = new ShardId(index.getIndex(), 0);
|
|
|
+ var indexRoutingTable = IndexRoutingTable.builder(index.getIndex())
|
|
|
+ .addShard(newShardRouting(shardId, LOCAL_NODE_ID, null, true, ShardRoutingState.STARTED, newInitializing(inSyncAllocationId)))
|
|
|
+ .addShard(newShardRouting(shardId, null, null, false, ShardRoutingState.UNASSIGNED, delayedUnasssignedInfo))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ var initialState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
|
|
|
+ .nodes(DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
|
|
|
+ .metadata(Metadata.builder().put(index, false).build())
|
|
|
+ .routingTable(RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY).add(indexRoutingTable).build())
|
|
|
+ .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ var threadPool = new TestThreadPool(getTestName());
|
|
|
+ var clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
|
|
|
+ var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
+ var reconciledStateRef = new AtomicReference<ClusterState>();
|
|
|
+ var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
+ @Override
|
|
|
+ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllocationAction) {
|
|
|
+ ClusterState reconciled = allocationServiceRef.get()
|
|
|
+ .executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
|
+ reconciledStateRef.set(reconciled);
|
|
|
+ return reconciled;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ var clusterSettings = createBuiltInClusterSettings();
|
|
|
+ var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
|
|
|
+ clusterSettings,
|
|
|
+ createShardsAllocator(),
|
|
|
+ threadPool,
|
|
|
+ clusterService,
|
|
|
+ reconcileAction
|
|
|
+ );
|
|
|
+ var allocationService = new AllocationService(
|
|
|
+ new AllocationDeciders(List.of()),
|
|
|
+ createGatewayAllocator(
|
|
|
+ (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.removeAndIgnore(
|
|
|
+ UnassignedInfo.AllocationStatus.NO_ATTEMPT,
|
|
|
+ allocation.changes()
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ desiredBalanceShardsAllocator,
|
|
|
+ () -> ClusterInfo.EMPTY,
|
|
|
+ () -> SnapshotShardSizeInfo.EMPTY,
|
|
|
+ TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
|
|
|
+ ) {
|
|
|
+
|
|
|
+ int call = 0;
|
|
|
+ long[] timeToReturn = new long[] { computationTime, reconciliationTime };
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected long currentNanoTime() {
|
|
|
+ return timeToReturn[call++];
|
|
|
+ }
|
|
|
+ };
|
|
|
+ allocationServiceRef.set(allocationService);
|
|
|
+
|
|
|
+ try {
|
|
|
+ rerouteAndWait(allocationService, initialState, "test");
|
|
|
+
|
|
|
+ var reconciledState = reconciledStateRef.get();
|
|
|
+
|
|
|
+ // Desired balance computation could be performed _before_ delayed allocation is expired
|
|
|
+ // while corresponding reconciliation might happen _after_.
|
|
|
+ // In such case reconciliation will not allocate delayed shard as its balance is not computed yet
|
|
|
+ // and must NOT clear delayed flag so that a followup reroute is scheduled for the shard.
|
|
|
+ var unassigned = reconciledState.getRoutingNodes().unassigned();
|
|
|
+ assertThat(unassigned.size(), equalTo(1));
|
|
|
+ var unassignedShard = unassigned.iterator().next();
|
|
|
+ assertThat(unassignedShard.unassignedInfo().isDelayed(), equalTo(true));
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ clusterService.close();
|
|
|
+ terminate(threadPool);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testCallListenersOnlyAfterProducingFreshInput() throws InterruptedException {
|
|
|
|
|
|
var reconciliations = new AtomicInteger(0);
|
|
@@ -210,7 +324,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
- public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
+ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllocationAction) {
|
|
|
reconciliations.incrementAndGet();
|
|
|
return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
|
}
|
|
@@ -312,7 +426,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
var allocationServiceRef = new SetOnce<AllocationService>();
|
|
|
var reconcileAction = new DesiredBalanceReconcilerAction() {
|
|
|
@Override
|
|
|
- public ClusterState apply(ClusterState clusterState, Consumer<RoutingAllocation> routingAllocationAction) {
|
|
|
+ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllocationAction) {
|
|
|
return allocationServiceRef.get().executeWithRoutingAllocation(clusterState, "reconcile", routingAllocationAction);
|
|
|
}
|
|
|
};
|
|
@@ -430,7 +544,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
threadPool,
|
|
|
clusterService,
|
|
|
desiredBalanceComputer,
|
|
|
- (reconcilerClusterState, routingAllocationAction) -> reconcilerClusterState
|
|
|
+ (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState
|
|
|
);
|
|
|
|
|
|
var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator());
|
|
@@ -482,7 +596,7 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
|
|
|
threadPool,
|
|
|
clusterService,
|
|
|
desiredBalanceComputer,
|
|
|
- (reconcilerClusterState, routingAllocationAction) -> reconcilerClusterState
|
|
|
+ (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState
|
|
|
);
|
|
|
|
|
|
var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator());
|