Browse Source

Remove dependency on AbstractCollection on set of classes. (#95872)

This change remove inheritance of AbstractCollection on set of classes to
prevent exposing mutation methods such as add/remove that are implemented with
default throw new UnsupportedOpertaionException().
Ievgen Degtiarenko 2 years ago
parent
commit
a6fa7a2b17
16 changed files with 120 additions and 73 deletions
  1. 5 3
      server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
  2. 10 3
      server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
  3. 5 3
      server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
  4. 1 1
      server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java
  5. 1 1
      server/src/main/java/org/elasticsearch/health/node/DiskHealthIndicatorService.java
  6. 3 3
      server/src/test/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageActionTests.java
  7. 2 2
      server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java
  8. 9 11
      server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java
  9. 1 1
      x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java
  10. 1 1
      x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java
  11. 1 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java
  12. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java
  13. 5 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java
  14. 23 23
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java
  15. 11 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java
  16. 41 11
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -60,7 +60,6 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
-import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -96,7 +95,7 @@ import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
  * The details of how this is persisted are covered in {@link org.elasticsearch.gateway.PersistedClusterStateService}.
  * </p>
  */
-public class Metadata extends AbstractCollection<IndexMetadata> implements Diffable<Metadata>, ChunkedToXContent {
+public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, ChunkedToXContent {
 
     private static final Logger logger = LogManager.getLogger(Metadata.class);
 
@@ -1347,7 +1346,10 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
         return indices.values().iterator();
     }
 
-    @Override
+    public Stream<IndexMetadata> stream() {
+        return indices.values().stream();
+    }
+
     public int size() {
         return indices.size();
     }

+ 10 - 3
server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

@@ -22,8 +22,8 @@ import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.Nullable;
 
 import java.io.IOException;
-import java.util.AbstractCollection;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -41,7 +41,7 @@ import java.util.stream.Stream;
  * This class holds all {@link DiscoveryNode} in the cluster and provides convenience methods to
  * access, modify merge / diff discovery nodes.
  */
-public class DiscoveryNodes extends AbstractCollection<DiscoveryNode> implements SimpleDiffable<DiscoveryNodes> {
+public class DiscoveryNodes implements Iterable<DiscoveryNode>, SimpleDiffable<DiscoveryNodes> {
 
     public static final DiscoveryNodes EMPTY_NODES = builder().build();
 
@@ -103,7 +103,14 @@ public class DiscoveryNodes extends AbstractCollection<DiscoveryNode> implements
         return nodes.values().iterator();
     }
 
-    @Override
+    public Stream<DiscoveryNode> stream() {
+        return nodes.values().stream();
+    }
+
+    public Collection<DiscoveryNode> getAllNodes() {
+        return nodes.values();
+    }
+
     public int size() {
         return nodes.size();
     }

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

@@ -24,7 +24,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 
-import java.util.AbstractCollection;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,7 +57,7 @@ import java.util.stream.StreamSupport;
  * <li> {@link #failShard} fails/cancels an assigned shard.
  * </ul>
  */
-public class RoutingNodes extends AbstractCollection<RoutingNode> {
+public class RoutingNodes implements Iterable<RoutingNode> {
 
     private final Map<String, RoutingNode> nodesToShards;
 
@@ -254,6 +253,10 @@ public class RoutingNodes extends AbstractCollection<RoutingNode> {
         return Collections.unmodifiableCollection(nodesToShards.values()).iterator();
     }
 
+    public Stream<RoutingNode> stream() {
+        return nodesToShards.values().stream();
+    }
+
     public Iterator<RoutingNode> mutableIterator() {
         ensureMutable();
         return nodesToShards.values().iterator();
@@ -863,7 +866,6 @@ public class RoutingNodes extends AbstractCollection<RoutingNode> {
     /**
      * Returns the number of routing nodes
      */
-    @Override
     public int size() {
         return nodesToShards.size();
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

@@ -86,7 +86,7 @@ public class DesiredBalanceComputer {
         final var knownNodeIds = routingAllocation.nodes().stream().map(DiscoveryNode::getId).collect(toSet());
         final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation.clusterInfo());
 
-        if (routingNodes.isEmpty()) {
+        if (routingNodes.size() == 0) {
             return new DesiredBalance(desiredBalanceInput.index(), Map.of());
         }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/health/node/DiskHealthIndicatorService.java

@@ -119,7 +119,7 @@ public class DiskHealthIndicatorService implements HealthIndicatorService {
     private void logNodesMissingHealthInfo(Map<String, DiskHealthInfo> diskHealthInfoMap, ClusterState clusterState) {
         if (logger.isDebugEnabled()) {
             String nodesMissingHealthInfo = getSortedUniqueValuesString(
-                clusterState.getNodes(),
+                clusterState.getNodes().getAllNodes(),
                 node -> diskHealthInfoMap.containsKey(node.getId()) == false,
                 HealthIndicatorDisplayValues::getNodeName
             );

+ 3 - 3
server/src/test/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageActionTests.java

@@ -88,7 +88,7 @@ public class TransportAnalyzeIndexDiskUsageActionTests extends ESTestCase {
         Map<ShardId, List<ShardRouting>> groupShardRoutings = new HashMap<>();
         for (int i = 0; i < numberOfShards; i++) {
             ShardId shardId = new ShardId("test_index", "n/a", i);
-            DiscoveryNode node = randomFrom(nodes);
+            DiscoveryNode node = randomFrom(nodes.getAllNodes());
             ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, node.getId(), randomBoolean(), ShardRoutingState.STARTED);
             groupShardRoutings.put(shardId, List.of(shardRouting));
             nodeToShards.computeIfAbsent(node, k -> new LinkedList<>()).add(shardRouting);
@@ -145,7 +145,7 @@ public class TransportAnalyzeIndexDiskUsageActionTests extends ESTestCase {
         Map<ShardId, List<ShardRouting>> shardToRoutings = new HashMap<>();
         for (int i = 0; i < numberOfShards; i++) {
             ShardId shardId = new ShardId("test_index", "n/a", i);
-            List<ShardRouting> shardRoutings = randomSubsetOf(between(1, nodes.size()), nodes).stream()
+            List<ShardRouting> shardRoutings = randomSubsetOf(between(1, nodes.size()), nodes.getAllNodes()).stream()
                 .map(node -> TestShardRouting.newShardRouting(shardId, node.getId(), randomBoolean(), ShardRoutingState.STARTED))
                 .toList();
             shardToRoutings.put(shardId, shardRoutings);
@@ -229,7 +229,7 @@ public class TransportAnalyzeIndexDiskUsageActionTests extends ESTestCase {
         Map<ShardId, List<ShardRouting>> shardToRoutings = new HashMap<>();
         for (int i = 0; i < numberOfShards; i++) {
             ShardId shardId = new ShardId("test_index", "n/a", i);
-            List<ShardRouting> shardRoutings = randomSubsetOf(between(1, discoNodes.size()), discoNodes).stream()
+            List<ShardRouting> shardRoutings = randomSubsetOf(between(1, discoNodes.size()), discoNodes.getAllNodes()).stream()
                 .map(node -> TestShardRouting.newShardRouting(shardId, node.getId(), randomBoolean(), ShardRoutingState.STARTED))
                 .toList();
             shardToRoutings.put(shardId, shardRoutings);

+ 2 - 2
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -1369,7 +1369,7 @@ public class TransportSearchActionTests extends ESTestCase {
             } else {
                 // relocated or no longer assigned
                 relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
-                targetNode = randomFrom(clusterState.nodes()).getId();
+                targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
             }
             contexts.put(
                 new ShardId(indexMetadata.getIndex(), shardId),
@@ -1423,7 +1423,7 @@ public class TransportSearchActionTests extends ESTestCase {
             anotherShardId,
             new SearchContextIdForNode(
                 null,
-                randomFrom(clusterState.nodes()).getId(),
+                randomFrom(clusterState.nodes().getAllNodes()).getId(),
                 new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
             )
         );

+ 9 - 11
server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

@@ -45,9 +45,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -59,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toMap;
 import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment;
 import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
 import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
@@ -116,7 +115,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         final PersistentTasksClusterService service = createService(
             (params, candidateNodes, clusterState) -> "never_assign".equals(((TestParams) params).getTestParam())
                 ? NO_NODE_FOUND
-                : randomNodeAssignment(clusterState.nodes())
+                : randomNodeAssignment(clusterState.nodes().getAllNodes())
         );
 
         int numberOfIterations = randomIntBetween(1, 30);
@@ -184,7 +183,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         final ClusterChangedEvent event = new ClusterChangedEvent("test", current, previous);
 
         final PersistentTasksClusterService service = createService(
-            (params, candidateNodes, clusterState) -> randomNodeAssignment(clusterState.nodes())
+            (params, candidateNodes, clusterState) -> randomNodeAssignment(clusterState.nodes().getAllNodes())
         );
         assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(changed && unassigned));
     }
@@ -614,14 +613,13 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
             // Now that we have a bunch of tasks that need to be assigned, let's
             // mark half the nodes as shut down and make sure they do not have any
             // tasks assigned
-            Collection<DiscoveryNode> allNodes = clusterState.nodes();
-            Map<String, SingleNodeShutdownMetadata> shutdownMetadataMap = new HashMap<>();
-            allNodes.stream()
+            var allNodes = clusterState.nodes();
+            var shutdownMetadataMap = allNodes.stream()
                 .limit(Math.floorDiv(allNodes.size(), 2))
-                .forEach(
-                    node -> shutdownMetadataMap.put(
-                        node.getId(),
-                        SingleNodeShutdownMetadata.builder()
+                .collect(
+                    toMap(
+                        DiscoveryNode::getId,
+                        node -> SingleNodeShutdownMetadata.builder()
                             .setNodeId(node.getId())
                             .setReason("shutdown for a unit test")
                             .setType(type)

+ 1 - 1
x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java

@@ -216,7 +216,7 @@ public class AutoscalingCalculateCapacityServiceTests extends AutoscalingTestCas
         );
 
         assertThat(context.nodes().size(), equalTo(1));
-        assertThat(context.nodes(), equalTo(new HashSet<>(state.nodes())));
+        assertThat(context.nodes(), equalTo(new HashSet<>(state.nodes().getAllNodes())));
         if (hasDataRole) {
             assertNull(context.currentCapacity());
         } else {

+ 1 - 1
x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java

@@ -243,7 +243,7 @@ public class ReactiveStorageDeciderServiceTests extends AutoscalingTestCase {
         );
         ShardRouting primaryShard = subjectRoutings.primaryShard();
         ShardRouting replicaShard = subjectRoutings.replicaShards().get(0);
-        DiscoveryNode[] nodes = initialClusterState.nodes().toArray(DiscoveryNode[]::new);
+        DiscoveryNode[] nodes = initialClusterState.nodes().getAllNodes().toArray(DiscoveryNode[]::new);
         boolean useReplica = randomBoolean();
         if (useReplica || randomBoolean()) {
             startShard(allocation, primaryShard, nodes[0].getId());

+ 1 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java

@@ -92,7 +92,7 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
         clusterStateBuilder.nodes(nodesBuilder);
         final Assignment assignment = executor.getAssignment(
             mock(ShardFollowTask.class),
-            clusterStateBuilder.nodes(),
+            clusterStateBuilder.nodes().getAllNodes(),
             clusterStateBuilder.build()
         );
         consumer.accept(theSpecial, assignment);

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -220,7 +220,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         validateJobAndId(jobId, job);
         // If we already know that we can't find an ml node because all ml nodes are running at capacity or
         // simply because there are no ml nodes in the cluster then we fail quickly here:
-        PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes(), clusterState);
+        PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         if (assignment.equals(AWAITING_UPGRADE)) {
             throw makeCurrentlyBeingUpgradedException(logger, params.getJobId());
         }

+ 5 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java

@@ -57,7 +57,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build()))
             .build();
 
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
+        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         assertThat(assignment.getExecutorNode(), is(nullValue()));
         assertThat(assignment.getExplanation(), is(equalTo("persistent task cannot be assigned while upgrade mode is enabled.")));
     }
@@ -70,7 +70,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
             .build();
 
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
+        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         assertThat(assignment.getExecutorNode(), is(nullValue()));
         assertThat(assignment.getExplanation(), is(emptyString()));
     }
@@ -89,7 +89,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             )
             .build();
 
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
+        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         assertThat(assignment.getExecutorNode(), is(nullValue()));
         assertThat(
             assignment.getExplanation(),
@@ -118,7 +118,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             )
             .build();
 
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
+        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         assertThat(assignment.getExecutorNode(), is(nullValue()));
         assertThat(
             assignment.getExplanation(),
@@ -153,7 +153,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             .nodes(DiscoveryNodes.builder().add(createNode(0, true, Version.V_7_10_0)))
             .build();
 
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
+        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
         assertThat(assignment.getExecutorNode(), is(equalTo("_node_id0")));
         assertThat(assignment.getExplanation(), is(emptyString()));
     }

+ 23 - 23
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

@@ -121,7 +121,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -167,7 +167,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
             memoryTracker,
@@ -219,7 +219,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -273,7 +273,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
             memoryTracker,
@@ -310,7 +310,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -367,7 +367,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
             memoryTracker,
@@ -423,7 +423,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
             memoryTracker,
@@ -493,7 +493,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -564,7 +564,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         ClusterState cs = csBuilder.build();
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job6.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -585,7 +585,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date());
         jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job7.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -608,7 +608,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job7.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -628,7 +628,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job7.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -706,7 +706,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         // Assignment won't be possible if the stale failed job is treated as opening
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job7.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -726,7 +726,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date());
         jobNodeSelector = new JobNodeSelector(
             cs,
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job8.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -786,7 +786,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -846,7 +846,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -916,7 +916,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -971,7 +971,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = jobWithRules("job_with_rules");
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -1069,7 +1069,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -1114,7 +1114,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -1169,7 +1169,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -1204,7 +1204,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,
@@ -1294,7 +1294,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(
             cs.build(),
-            shuffled(cs.nodes()),
+            shuffled(cs.nodes().getAllNodes()),
             job.getId(),
             MlTasks.JOB_TASK_NAME,
             memoryTracker,

+ 11 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java

@@ -167,7 +167,7 @@ public class OpenJobPersistentTasksExecutorTests extends ESTestCase {
         assertEquals(
             "Not opening [unavailable_index_with_lazy_node], "
                 + "because not all primary shards are active for the following indices [.ml-state]",
-            executor.getAssignment(params, csBuilder.nodes(), csBuilder.build()).getExplanation()
+            executor.getAssignment(params, csBuilder.nodes().getAllNodes(), csBuilder.build()).getExplanation()
         );
     }
 
@@ -186,7 +186,11 @@ public class OpenJobPersistentTasksExecutorTests extends ESTestCase {
         when(job.allowLazyOpen()).thenReturn(true);
         OpenJobAction.JobParams params = new OpenJobAction.JobParams("lazy_job");
         params.setJob(job);
-        PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(params, csBuilder.nodes(), csBuilder.build());
+        PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(
+            params,
+            csBuilder.nodes().getAllNodes(),
+            csBuilder.build()
+        );
         assertNotNull(assignment);
         assertNull(assignment.getExecutorNode());
         assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation());
@@ -203,7 +207,11 @@ public class OpenJobPersistentTasksExecutorTests extends ESTestCase {
         Job job = mock(Job.class);
         OpenJobAction.JobParams params = new OpenJobAction.JobParams("job_during_reset");
         params.setJob(job);
-        PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(params, csBuilder.nodes(), csBuilder.build());
+        PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment(
+            params,
+            csBuilder.nodes().getAllNodes(),
+            csBuilder.build()
+        );
         assertNotNull(assignment);
         assertNull(assignment.getExecutorNode());
         assertEquals(MlTasks.RESET_IN_PROGRESS.getExplanation(), assignment.getExplanation());

+ 41 - 11
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

@@ -63,15 +63,17 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         TransformPersistentTasksExecutor executor = buildTaskExecutor();
 
         assertThat(
-            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, true), cs.nodes(), cs).getExecutorNode(),
+            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, true), cs.nodes().getAllNodes(), cs)
+                .getExecutorNode(),
             equalTo("current-data-node-with-1-tasks")
         );
         assertThat(
-            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs.nodes(), cs).getExecutorNode(),
+            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs.nodes().getAllNodes(), cs)
+                .getExecutorNode(),
             equalTo("current-data-node-with-0-tasks-transform-remote-disabled")
         );
         assertThat(
-            executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_7_0, null, true), cs.nodes(), cs)
+            executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_7_0, null, true), cs.nodes().getAllNodes(), cs)
                 .getExecutorNode(),
             equalTo("past-data-node-1")
         );
@@ -85,7 +87,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
 
         Assignment assignment = executor.getAssignment(
             new TransformTaskParams("new-task-id", Version.CURRENT, null, false),
-            cs.nodes(),
+            cs.nodes().getAllNodes(),
             cs
         );
         assertNull(assignment.getExecutorNode());
@@ -99,7 +101,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         cs = buildClusterState(nodes);
         executor = buildTaskExecutor();
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.CURRENT, null, false),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNotNull(assignment.getExecutorNode());
         assertThat(assignment.getExecutorNode(), equalTo("dedicated-transform-node"));
 
@@ -108,7 +114,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         cs = buildClusterState(nodes);
         executor = buildTaskExecutor();
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_8_0_0, null, false), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.V_8_0_0, null, false),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNull(assignment.getExecutorNode());
         assertThat(
             assignment.getExplanation(),
@@ -121,7 +131,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
             )
         );
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, false), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.V_7_5_0, null, false),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNotNull(assignment.getExecutorNode());
         assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1"));
 
@@ -130,7 +144,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         cs = buildClusterState(nodes);
         executor = buildTaskExecutor();
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNull(assignment.getExecutorNode());
         assertThat(
             assignment.getExplanation(),
@@ -142,7 +160,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
             )
         );
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.CURRENT, null, false),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNotNull(assignment.getExecutorNode());
         assertThat(assignment.getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled"));
 
@@ -151,7 +173,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         cs = buildClusterState(nodes);
         executor = buildTaskExecutor();
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNull(assignment.getExecutorNode());
         assertThat(
             assignment.getExplanation(),
@@ -169,7 +195,11 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         cs = buildClusterState(nodes);
         executor = buildTaskExecutor();
 
-        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs.nodes(), cs);
+        assignment = executor.getAssignment(
+            new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true),
+            cs.nodes().getAllNodes(),
+            cs
+        );
         assertNotNull(assignment.getExecutorNode());
         assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1"));
     }