Browse Source

ActiveShardCount should wait for searchable shards to appear (#92980)

Ievgen Degtiarenko 2 years ago
parent
commit
5bce3f2ceb

+ 7 - 9
server/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java

@@ -153,8 +153,7 @@ public record ActiveShardCount(int value) implements Writeable {
                 waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetadata.getSettings());
             }
             for (int i = 0; i < indexRoutingTable.size(); i++) {
-                IndexShardRoutingTable shardRouting = indexRoutingTable.shard(i);
-                if (waitForActiveShards.enoughShardsActive(shardRouting) == false) {
+                if (waitForActiveShards.enoughShardsActive(indexRoutingTable.shard(i)) == false) {
                     // not enough active shard copies yet
                     return false;
                 }
@@ -171,13 +170,13 @@ public record ActiveShardCount(int value) implements Writeable {
     public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) {
         final int activeShardCount = shardRoutingTable.activeShards().size();
         if (this == ActiveShardCount.ALL) {
-            // adding 1 for the primary in addition to the total number of replicas,
-            // which gives us the total number of shard copies
-            return activeShardCount == shardRoutingTable.replicaShards().size() + 1;
-        } else if (this == ActiveShardCount.DEFAULT) {
-            return activeShardCount >= 1;
+            return activeShardCount == shardRoutingTable.size();
+        } else if (value == 0) {
+            return true;
+        } else if (value == 1) {
+            return shardRoutingTable.hasSearchShards() ? shardRoutingTable.getActiveSearchShardCount() >= 1 : activeShardCount >= 1;
         } else {
-            return activeShardCount >= value;
+            return shardRoutingTable.getActiveSearchShardCount() >= value;
         }
     }
 
@@ -189,5 +188,4 @@ public record ActiveShardCount(int value) implements Writeable {
             default -> Integer.toString(value);
         };
     }
-
 }

+ 34 - 6
server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java

@@ -45,23 +45,23 @@ public class IndexShardRoutingTable {
 
     final ShardShuffler shuffler;
     final ShardId shardId;
-
+    final ShardRouting[] shards;
     final ShardRouting primary;
     final List<ShardRouting> replicas;
-    final ShardRouting[] shards;
     final List<ShardRouting> activeShards;
     final List<ShardRouting> assignedShards;
-    final boolean allShardsStarted;
-
     /**
      * The initializing list, including ones that are initializing on a target node because of relocation.
      * If we can come up with a better variable name, it would be nice...
      */
     final List<ShardRouting> allInitializingShards;
+    final boolean allShardsStarted;
+    final int activeSearchShardCount;
+    final int totalSearchShardCount;
 
     IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards) {
-        this.shardId = shardId;
         this.shuffler = new RotationShardShuffler(Randomness.get().nextInt());
+        this.shardId = shardId;
         this.shards = shards.toArray(ShardRouting[]::new);
 
         ShardRouting primary = null;
@@ -70,6 +70,8 @@ public class IndexShardRoutingTable {
         List<ShardRouting> assignedShards = new ArrayList<>();
         List<ShardRouting> allInitializingShards = new ArrayList<>();
         boolean allShardsStarted = true;
+        int activeSearchShardCount = 0;
+        int totalSearchShardCount = 0;
         for (ShardRouting shard : this.shards) {
             if (shard.primary()) {
                 assert primary == null : "duplicate primary: " + primary + " vs " + shard;
@@ -79,6 +81,12 @@ public class IndexShardRoutingTable {
             }
             if (shard.active()) {
                 activeShards.add(shard);
+                if (shard.role().isSearchable()) {
+                    activeSearchShardCount++;
+                }
+            }
+            if (shard.role().isSearchable()) {
+                totalSearchShardCount++;
             }
             if (shard.initializing()) {
                 allInitializingShards.add(shard);
@@ -97,12 +105,14 @@ public class IndexShardRoutingTable {
                 allShardsStarted = false;
             }
         }
-        this.allShardsStarted = allShardsStarted;
         this.primary = primary;
         this.replicas = CollectionUtils.wrapUnmodifiableOrEmptySingleton(replicas);
         this.activeShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(activeShards);
         this.assignedShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedShards);
         this.allInitializingShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(allInitializingShards);
+        this.allShardsStarted = allShardsStarted;
+        this.activeSearchShardCount = activeSearchShardCount;
+        this.totalSearchShardCount = totalSearchShardCount;
     }
 
     /**
@@ -461,6 +471,24 @@ public class IndexShardRoutingTable {
         return allShardsStarted;
     }
 
+    /**
+     * @return the count of active searchable shards
+     */
+    public int getActiveSearchShardCount() {
+        return activeSearchShardCount;
+    }
+
+    /**
+     * @return the total count of searchable shards
+     */
+    public int getTotalSearchShardCount() {
+        return totalSearchShardCount;
+    }
+
+    public boolean hasSearchShards() {
+        return totalSearchShardCount > 0;
+    }
+
     @Nullable
     public ShardRouting getByAllocationId(String allocationId) {
         for (ShardRouting shardRouting : assignedShards()) {

+ 76 - 7
server/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -102,6 +103,73 @@ public class ActiveShardCountTests extends ESTestCase {
         runTestForOneActiveShard(ActiveShardCount.DEFAULT);
     }
 
+    public void testEnoughShardsActiveLevelDefaultWithSearchOnlyRole() {
+        final String indexName = "test-idx";
+        final int numberOfShards = randomIntBetween(1, 5);
+        final int numberOfReplicas = randomIntBetween(4, 7);
+        final ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
+        ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas, createCustomRoleStrategy(1));
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startPrimaries(clusterState, indexName);
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startLessThanWaitOnShards(clusterState, indexName, 1);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startAllShards(clusterState, indexName);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+    }
+
+    public void testEnoughShardsActiveCustomLevelWithSearchOnlyRole() {
+        final String indexName = "test-idx";
+        final int numberOfShards = randomIntBetween(1, 5);
+        final int numberOfReplicas = randomIntBetween(4, 7);
+        final int activeShardCount = randomIntBetween(2, numberOfReplicas);
+        final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount);
+        ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas, createCustomRoleStrategy(1));
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startPrimaries(clusterState, indexName);
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2);
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startWaitOnShards(clusterState, indexName, activeShardCount);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startAllShards(clusterState, indexName);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+    }
+
+    public void testEnoughShardsActiveWithNoSearchOnlyRoles() {
+        final String indexName = "test-idx";
+        final int numberOfShards = randomIntBetween(1, 5);
+        final int numberOfReplicas = randomIntBetween(4, 7);
+        final ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
+        ClusterState clusterState = initializeWithNewIndex(
+            indexName,
+            numberOfShards,
+            numberOfReplicas,
+            createCustomRoleStrategy(numberOfReplicas + 1)
+        );
+        assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startPrimaries(clusterState, indexName);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startLessThanWaitOnShards(clusterState, indexName, 1);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+        clusterState = startAllShards(clusterState, indexName);
+        assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
+    }
+
+    private static ShardRoutingRoleStrategy createCustomRoleStrategy(int indexShardCount) {
+        return new ShardRoutingRoleStrategy() {
+            @Override
+            public ShardRouting.Role newEmptyRole(int copyIndex) {
+                return copyIndex < indexShardCount ? ShardRouting.Role.INDEX_ONLY : ShardRouting.Role.SEARCH_ONLY;
+            }
+
+            @Override
+            public ShardRouting.Role newReplicaRole() {
+                return ShardRouting.Role.SEARCH_ONLY;
+            }
+        };
+    }
+
     public void testEnoughShardsActiveRandom() {
         final String indexName = "test-idx";
         final int numberOfShards = randomIntBetween(1, 5);
@@ -166,12 +234,11 @@ public class ActiveShardCountTests extends ESTestCase {
         }
     }
 
-    private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) {
+    private void runTestForOneActiveShard(final ActiveShardCount waitForActiveShards) {
         final String indexName = "test-idx";
         final int numberOfShards = randomIntBetween(1, 5);
         final int numberOfReplicas = randomIntBetween(4, 7);
-        assert activeShardCount == ActiveShardCount.ONE || activeShardCount == ActiveShardCount.DEFAULT;
-        final ActiveShardCount waitForActiveShards = activeShardCount;
+        assert waitForActiveShards == ActiveShardCount.ONE || waitForActiveShards == ActiveShardCount.DEFAULT;
         ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
         assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
         clusterState = startPrimaries(clusterState, indexName);
@@ -180,7 +247,11 @@ public class ActiveShardCountTests extends ESTestCase {
         assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
     }
 
-    private ClusterState initializeWithNewIndex(final String indexName, final int numShards, final int numReplicas) {
+    private ClusterState initializeWithNewIndex(String indexName, int numShards, int numReplicas) {
+        return initializeWithNewIndex(indexName, numShards, numReplicas, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY);
+    }
+
+    private ClusterState initializeWithNewIndex(String indexName, int numShards, int numReplicas, ShardRoutingRoleStrategy strategy) {
         // initial index creation and new routing table info
         final IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
             .settings(settings(Version.CURRENT).put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()))
@@ -188,9 +259,7 @@ public class ActiveShardCountTests extends ESTestCase {
             .numberOfReplicas(numReplicas)
             .build();
         final Metadata metadata = Metadata.builder().put(indexMetadata, true).build();
-        final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
-            .addAsNew(indexMetadata)
-            .build();
+        final RoutingTable routingTable = RoutingTable.builder(strategy).addAsNew(indexMetadata).build();
         return ClusterState.builder(new ClusterName("test_cluster")).metadata(metadata).routingTable(routingTable).build();
     }