Browse Source

Reset allocation failure counter on condition (#108987)

Occasionally shard allocation might fail. Consistent failures will
increment `failedAllocations` counter.  The `MaxRetryAllocationDecider`
together with `index.allocation.max_retry(default=5)` will decide when
to stop trying to allocate. We also persist failures counter in cluster
state, restarting individual nodes wont reset counter.

Reaching max_retry count can create a problem when underlying issue for
allocation is resolved, re-allocation will not happen. Currently we
don't reset failures count automatically, only through REST API. 

In this change I introduce a new service that listens to cluster updates
when new node joins, and triggers reroute with retry's reset.
Mikhail Berezovskiy 1 year ago
parent
commit
2448859a0a

+ 84 - 0
server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/AllocationFailuresResetIT.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.IndexEventListener;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
+import org.elasticsearch.test.ESIntegTestCase.Scope;
+import org.elasticsearch.test.MockIndexEventListener;
+
+import java.util.List;
+
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+public class AllocationFailuresResetIT extends ESIntegTestCase {
+
+    private static final String INDEX = "index-1";
+    private static final int SHARD = 0;
+
+    @Override
+    protected List<Class<? extends Plugin>> nodePlugins() {
+        return List.of(MockIndexEventListener.TestPlugin.class);
+    }
+
+    private void injectAllocationFailures(String node) {
+        internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node).setNewDelegate(new IndexEventListener() {
+            @Override
+            public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings) {
+                throw new RuntimeException("shard allocation failure");
+            }
+        });
+    }
+
+    private void removeAllocationFailuresInjection(String node) {
+        internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node).setNewDelegate(new IndexEventListener() {
+        });
+    }
+
+    private void awaitShardAllocMaxRetries() throws Exception {
+        var maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(internalCluster().getDefaultSettings());
+        assertBusy(() -> {
+            var state = clusterAdmin().prepareState().get().getState();
+            var index = state.getRoutingTable().index(INDEX);
+            assertNotNull(index);
+            var shard = index.shard(SHARD).primaryShard();
+            assertNotNull(shard);
+            var unassigned = shard.unassignedInfo();
+            assertNotNull(unassigned);
+            assertEquals(maxRetries.intValue(), unassigned.failedAllocations());
+        });
+    }
+
+    private void awaitShardAllocSucceed() throws Exception {
+        assertBusy(() -> {
+            var state = clusterAdmin().prepareState().get().getState();
+            var index = state.getRoutingTable().index(INDEX);
+            assertNotNull(index);
+            var shard = index.shard(SHARD).primaryShard();
+            assertNotNull(shard);
+            assertTrue(shard.assignedToNode());
+            assertTrue(shard.started());
+        });
+    }
+
+    public void testResetFailuresOnNodeJoin() throws Exception {
+        var node1 = internalCluster().startNode();
+        injectAllocationFailures(node1);
+        prepareCreate(INDEX, indexSettings(1, 0)).execute();
+        awaitShardAllocMaxRetries();
+        removeAllocationFailuresInjection(node1);
+        internalCluster().startNode();
+        awaitShardAllocSucceed();
+    }
+
+}

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -157,6 +157,7 @@ public class ClusterModule extends AbstractModule {
             snapshotsInfoService,
             shardRoutingRoleStrategy
         );
+        this.allocationService.addAllocFailuresResetListenerTo(clusterService);
         this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
         this.allocationStatsService = new AllocationStatsService(clusterService, clusterInfoService, shardsAllocator, writeLoadForecaster);
         this.telemetryProvider = telemetryProvider;

+ 9 - 0
server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

@@ -1276,6 +1276,15 @@ public class RoutingNodes implements Iterable<RoutingNode> {
         }
     }
 
+    public boolean hasAllocationFailures() {
+        return unassignedShards.stream().anyMatch((shardRouting -> {
+            if (shardRouting.unassignedInfo() == null) {
+                return false;
+            }
+            return shardRouting.unassignedInfo().failedAllocations() > 0;
+        }));
+    }
+
     public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
         final var unassignedIterator = unassigned().iterator();
         while (unassignedIterator.hasNext()) {

+ 22 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -35,6 +35,9 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.logging.ESLogMessage;
@@ -559,6 +562,25 @@ public class AllocationService {
         }
     }
 
+    /**
+     * Creates a cluster state listener that resets allocation failures. For example, reset when a new node joins a cluster. Resetting
+     * counter on new node join covers a variety of use cases, such as rolling update, version change, node restarts.
+     */
+    public void addAllocFailuresResetListenerTo(ClusterService clusterService) {
+        // batched cluster update executor, runs reroute once per batch
+        // set retryFailed=true to trigger failures reset during reroute
+        var taskQueue = clusterService.createTaskQueue("reset-allocation-failures", Priority.NORMAL, (batchCtx) -> {
+            batchCtx.taskContexts().forEach((taskCtx) -> taskCtx.success(() -> {}));
+            return reroute(batchCtx.initialState(), new AllocationCommands(), false, true, false, ActionListener.noop()).clusterState();
+        });
+
+        clusterService.addListener((changeEvent) -> {
+            if (changeEvent.nodesAdded() && changeEvent.state().getRoutingNodes().hasAllocationFailures()) {
+                taskQueue.submitTask("reset-allocation-failures", (e) -> { assert MasterService.isPublishFailureException(e); }, null);
+            }
+        });
+    }
+
     private static void disassociateDeadNodes(RoutingAllocation allocation) {
         for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
             RoutingNode node = it.next();

+ 169 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationFailuresResetTests.java

@@ -0,0 +1,169 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.EmptyClusterInfoService;
+import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
+import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.gateway.TestGatewayAllocator;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.List;
+import java.util.Set;
+
+public class AllocationFailuresResetTests extends ESTestCase {
+
+    private ThreadPool threadPool;
+    private ClusterService clusterService;
+
+    private static ClusterState addNode(ClusterState state, String name) {
+        var nodes = DiscoveryNodes.builder(state.nodes()).add(DiscoveryNodeUtils.create(name));
+        return ClusterState.builder(state).nodes(nodes).build();
+    }
+
+    private static ClusterState removeNode(ClusterState state, String name) {
+        var nodes = DiscoveryNodes.builder();
+        state.nodes().stream().filter((node) -> node.getId() != name).forEach(nodes::add);
+        return ClusterState.builder(state).nodes(nodes).build();
+    }
+
+    private static ClusterState addShardWithFailures(ClusterState state) {
+        var index = "index-1";
+        var shard = 0;
+
+        var indexMeta = new IndexMetadata.Builder(index).settings(
+            Settings.builder().put(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build())
+        ).numberOfShards(1).numberOfReplicas(0).build();
+
+        var meta = Metadata.builder(state.metadata()).put(indexMeta, false).build();
+
+        var shardId = new ShardId(indexMeta.getIndex(), shard);
+        var nonZeroFailures = 5;
+        var unassignedInfo = new UnassignedInfo(
+            UnassignedInfo.Reason.ALLOCATION_FAILED,
+            null,
+            null,
+            nonZeroFailures,
+            0,
+            0,
+            false,
+            UnassignedInfo.AllocationStatus.NO_ATTEMPT,
+            Set.of(),
+            null
+        );
+
+        var shardRouting = ShardRouting.newUnassigned(
+            shardId,
+            true,
+            new RecoverySource.EmptyStoreRecoverySource(),
+            unassignedInfo,
+            ShardRouting.Role.DEFAULT
+        );
+
+        var routingTable = new RoutingTable.Builder().add(
+            new IndexRoutingTable.Builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, indexMeta.getIndex()).initializeAsNew(
+                meta.index(index)
+            ).addIndexShard(IndexShardRoutingTable.builder(shardId).addShard(shardRouting)).build()
+        ).build();
+
+        return ClusterState.builder(state).metadata(meta).routingTable(routingTable).build();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        threadPool = new TestThreadPool("reset-alloc-failures");
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+        var allocationService = new AllocationService(
+            new AllocationDeciders(List.of(new MaxRetryAllocationDecider())),
+            new TestGatewayAllocator(),
+            new BalancedShardsAllocator(Settings.EMPTY),
+            EmptyClusterInfoService.INSTANCE,
+            EmptySnapshotsInfoService.INSTANCE,
+            TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
+        );
+        allocationService.addAllocFailuresResetListenerTo(clusterService);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        clusterService.stop();
+        threadPool.shutdownNow();
+    }
+
+    /**
+     * Create state with two nodes and allocation failures, and does <b>not</b> reset counter after node removal
+     */
+    public void testRemoveNodeDoesNotResetCounter() throws Exception {
+        var initState = clusterService.state();
+        var stateWithNewNode = addNode(initState, "node-2");
+        clusterService.getClusterApplierService().onNewClusterState("add node", () -> stateWithNewNode, ActionListener.noop());
+
+        var stateWithFailures = addShardWithFailures(stateWithNewNode);
+        clusterService.getClusterApplierService().onNewClusterState("add failures", () -> stateWithFailures, ActionListener.noop());
+
+        assertBusy(() -> {
+            var resultState = clusterService.state();
+            assertEquals(2, resultState.nodes().size());
+            assertEquals(1, resultState.getRoutingTable().allShards().count());
+            assertTrue(resultState.getRoutingNodes().hasAllocationFailures());
+        });
+
+        var stateWithRemovedNode = removeNode(stateWithFailures, "node-2");
+        clusterService.getClusterApplierService().onNewClusterState("remove node", () -> stateWithRemovedNode, ActionListener.noop());
+        assertBusy(() -> {
+            var resultState = clusterService.state();
+            assertEquals(1, resultState.nodes().size());
+            assertEquals(1, resultState.getRoutingTable().allShards().count());
+            assertTrue(resultState.getRoutingNodes().hasAllocationFailures());
+        });
+    }
+
+    /**
+     * Create state with one node and allocation failures, and reset counter after node addition
+     */
+    public void testAddNodeResetsCounter() throws Exception {
+        var initState = clusterService.state();
+        var stateWithFailures = addShardWithFailures(initState);
+        clusterService.getClusterApplierService().onNewClusterState("add failures", () -> stateWithFailures, ActionListener.noop());
+
+        var stateWithNewNode = addNode(stateWithFailures, "node-2");
+        clusterService.getClusterApplierService().onNewClusterState("add node", () -> stateWithNewNode, ActionListener.noop());
+
+        assertBusy(() -> {
+            var resultState = clusterService.state();
+            assertEquals(2, resultState.nodes().size());
+            assertEquals(1, resultState.getRoutingTable().allShards().count());
+            assertFalse(resultState.getRoutingNodes().hasAllocationFailures());
+        });
+    }
+}