瀏覽代碼

Simplify all shards iteration (#94453)

Ievgen Degtiarenko 2 年之前
父節點
當前提交
ffc87137cf

+ 2 - 5
server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

@@ -43,7 +43,6 @@ import org.hamcrest.Matchers;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -182,8 +181,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
 
         ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getMasterName());
         ClusterState state = clusterService.state();
-        for (Iterator<ShardRouting> iterator = state.routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shard = iterator.next();
+        for (ShardRouting shard : state.routingTable().allShardsIterator()) {
             String dataPath = info.getDataPath(shard);
             assertNotNull(dataPath);
 
@@ -327,8 +325,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
         }
 
         RoutingTable routingTable = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState().routingTable();
-        for (Iterator<ShardRouting> iterator = routingTable.allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shard = iterator.next();
+        for (ShardRouting shard : routingTable.allShardsIterator()) {
             assertTrue(
                 infoAfterRecovery.getReservedSpace(shard.currentNodeId(), infoAfterRecovery.getDataPath(shard))
                     .containsShardId(shard.shardId())

+ 3 - 3
server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

@@ -200,11 +200,11 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
      * @return All the shards
      */
     public Stream<ShardRouting> allShards() {
-        return indicesRouting.values().stream().flatMap(this::allShards);
+        return indicesRouting.values().stream().flatMap(IndexRoutingTable::allShards).flatMap(IndexShardRoutingTable::allShards);
     }
 
-    public Stream<ShardRouting> allShards(IndexRoutingTable indexRoutingTable) {
-        return indexRoutingTable.allShards().flatMap(IndexShardRoutingTable::allShards);
+    public Iterable<ShardRouting> allShardsIterator() {
+        return () -> allShards().iterator();
     }
 
     /**

+ 1 - 3
server/src/main/java/org/elasticsearch/rest/action/cat/RestAllocationAction.java

@@ -28,7 +28,6 @@ import org.elasticsearch.rest.action.RestActionListener;
 import org.elasticsearch.rest.action.RestResponseListener;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -100,8 +99,7 @@ public class RestAllocationAction extends AbstractCatAction {
     private Table buildTable(RestRequest request, final ClusterStateResponse state, final NodesStatsResponse stats) {
         final Map<String, Integer> allocs = new HashMap<>();
 
-        for (Iterator<ShardRouting> iterator = state.getState().routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shard = iterator.next();
+        for (ShardRouting shard : state.getState().routingTable().allShardsIterator()) {
             String nodeId = "UNASSIGNED";
             if (shard.assignedToNode()) {
                 nodeId = shard.currentNodeId();

+ 1 - 3
server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java

@@ -46,7 +46,6 @@ import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.search.suggest.completion.CompletionStats;
 
 import java.time.Instant;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.function.Function;
@@ -261,8 +260,7 @@ public class RestShardsAction extends AbstractCatAction {
     // package private for testing
     Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) {
         Table table = getTableWithHeader(request);
-        for (Iterator<ShardRouting> iterator = state.getState().routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shard = iterator.next();
+        for (ShardRouting shard : state.getState().routingTable().allShardsIterator()) {
             ShardStats shardStats = stats.asMap().get(shard);
             CommonStats commonStats = null;
             CommitStats commitStats = null;

+ 1 - 3
server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java

@@ -41,7 +41,6 @@ import org.elasticsearch.index.shard.ShardId;
 import org.hamcrest.Matchers;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.IntStream;
@@ -319,8 +318,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
             assertSame(clusterState, resultingState);
         }
 
-        for (Iterator<ShardRouting> iterator = resultingState.getRoutingTable().allShards().iterator(); iterator.hasNext();) {
-            final ShardRouting shard = iterator.next();
+        for (ShardRouting shard : resultingState.getRoutingTable().allShardsIterator()) {
             if (shard.assignedToNode()) {
                 for (final var task : tasks) {
                     assertFalse(

+ 2 - 5
server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java

@@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -955,16 +954,14 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
         AllocationCommands commands = new AllocationCommands();
         final var unusedNodes = clusterState.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
         // Cancel all initializing shards
-        for (Iterator<ShardRouting> iterator = clusterState.routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting routing = iterator.next();
+        for (ShardRouting routing : clusterState.routingTable().allShardsIterator()) {
             unusedNodes.remove(routing.currentNodeId());
             if (routing.initializing()) {
                 commands.add(new CancelAllocationCommand(routing.shardId().getIndexName(), routing.id(), routing.currentNodeId(), false));
             }
         }
         // Move started primary to another node.
-        for (Iterator<ShardRouting> iterator = clusterState.routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting routing = iterator.next();
+        for (ShardRouting routing : clusterState.routingTable().allShardsIterator()) {
             if (routing.primary()) {
                 var currentNodeId = routing.currentNodeId();
                 unusedNodes.remove(currentNodeId);

+ 1 - 3
server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java

@@ -47,7 +47,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -416,8 +415,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
         if (snapshotIndices.isEmpty() == false) {
             // Some indices are restored from snapshot, the RestoreInProgress must be set accordingly
             Map<ShardId, RestoreInProgress.ShardRestoreStatus> restoreShards = new HashMap<>();
-            for (Iterator<ShardRouting> iterator = routingTable.allShards().iterator(); iterator.hasNext();) {
-                ShardRouting shard = iterator.next();
+            for (ShardRouting shard : routingTable.allShardsIterator()) {
                 if (shard.primary() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
                     final ShardId shardId = shard.shardId();
                     restoreShards.put(shardId, new RestoreInProgress.ShardRestoreStatus(node1.getId(), RestoreInProgress.State.INIT));

+ 2 - 5
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java

@@ -64,7 +64,6 @@ import org.junit.BeforeClass;
 
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -860,8 +859,7 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
             changed = newState != clusterState;
             clusterState = newState;
         } while (changed);
-        for (Iterator<ShardRouting> iterator = clusterState.routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shardRouting = iterator.next();
+        for (ShardRouting shardRouting : clusterState.routingTable().allShardsIterator()) {
             assertTrue(shardRouting.started());
             assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
         }
@@ -983,8 +981,7 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
             changed = newState != clusterState;
             clusterState = newState;
         } while (changed);
-        for (Iterator<ShardRouting> iterator = clusterState.routingTable().allShards().iterator(); iterator.hasNext();) {
-            ShardRouting shardRouting = iterator.next();
+        for (ShardRouting shardRouting : clusterState.routingTable().allShardsIterator()) {
             assertTrue(shardRouting.started());
             assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java

@@ -76,7 +76,7 @@ public class RestShardsActionTests extends ESTestCase {
 
         ClusterStateResponse state = mock(ClusterStateResponse.class);
         RoutingTable routingTable = mock(RoutingTable.class);
-        when(routingTable.allShards()).thenReturn(shardRoutings.stream());
+        when(routingTable.allShardsIterator()).thenReturn(shardRoutings);
         ClusterState clusterState = mock(ClusterState.class);
         when(clusterState.routingTable()).thenReturn(routingTable);
         when(clusterState.nodes()).thenReturn(discoveryNodes);