|
@@ -75,6 +75,7 @@ import static org.hamcrest.Matchers.is;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
+// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file
|
|
|
public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
|
|
|
private MlMemoryTracker memoryTracker;
|
|
@@ -142,12 +143,11 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
jobBuilder.setJobVersion(Version.CURRENT);
|
|
|
|
|
|
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
|
|
|
- cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
+ cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertEquals("", result.getExplanation());
|
|
|
assertEquals("_node_id3", result.getExecutorNode());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public void testSelectLeastLoadedMlNode_maxCapacity() {
|
|
|
int numNodes = randomIntBetween(1, 10);
|
|
|
int maxRunningJobsPerNode = randomIntBetween(1, 100);
|
|
@@ -178,11 +178,11 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
|
|
|
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
|
|
|
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), maxRunningJobsPerNode, 2,
|
|
|
30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNull(result.getExecutorNode());
|
|
|
- assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
|
|
|
- + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
|
|
|
+ assertTrue(result.getExplanation(), result.getExplanation().contains("because this node is full. Number of opened jobs ["
|
|
|
+ + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
|
|
|
}
|
|
|
|
|
|
public void testSelectLeastLoadedMlNode_noMlNodes() {
|
|
@@ -205,7 +205,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
|
|
|
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
|
|
|
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 20, 2, 30, memoryTracker,
|
|
|
isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
|
|
|
assertNull(result.getExecutorNode());
|
|
@@ -241,7 +241,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
|
|
|
|
|
|
ClusterState cs = csBuilder.build();
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 10, 2, 30, memoryTracker,
|
|
|
isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertEquals("_node_id3", result.getExecutorNode());
|
|
|
|
|
@@ -252,8 +252,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
csBuilder = ClusterState.builder(cs);
|
|
|
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
|
|
cs = csBuilder.build();
|
|
|
- result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
|
|
|
- logger);
|
|
|
+ result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
|
|
|
+ isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNull("no node selected, because OPENING state", result.getExecutorNode());
|
|
|
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
|
|
|
|
@@ -264,8 +264,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
csBuilder = ClusterState.builder(cs);
|
|
|
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
|
|
cs = csBuilder.build();
|
|
|
- result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
|
|
|
- logger);
|
|
|
+ result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
|
|
|
+ isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNull("no node selected, because stale task", result.getExecutorNode());
|
|
|
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
|
|
|
|
@@ -276,8 +276,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
csBuilder = ClusterState.builder(cs);
|
|
|
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
|
|
cs = csBuilder.build();
|
|
|
- result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
|
|
|
- logger);
|
|
|
+ result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
|
|
|
+ isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNull("no node selected, because null state", result.getExecutorNode());
|
|
|
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
|
|
}
|
|
@@ -316,7 +316,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
|
|
|
|
|
|
// Allocation won't be possible if the stale failed job is treated as opening
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
|
|
|
isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertEquals("_node_id1", result.getExecutorNode());
|
|
|
|
|
@@ -327,8 +327,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
csBuilder = ClusterState.builder(cs);
|
|
|
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
|
|
|
cs = csBuilder.build();
|
|
|
- result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
|
|
|
- logger);
|
|
|
+ result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 10, 2, 30, memoryTracker,
|
|
|
+ isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNull("no node selected, because OPENING state", result.getExecutorNode());
|
|
|
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
|
|
}
|
|
@@ -360,7 +360,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
cs.nodes(nodes);
|
|
|
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
|
|
cs.metaData(metaData);
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 10, 2, 30,
|
|
|
memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
|
|
|
assertNull(result.getExecutorNode());
|
|
@@ -391,7 +391,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
cs.nodes(nodes);
|
|
|
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
|
|
cs.metaData(metaData);
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(),
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), 10,
|
|
|
2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertThat(result.getExplanation(), containsString(
|
|
|
"because the job's model snapshot requires a node of version [6.3.0] or higher"));
|
|
@@ -420,7 +420,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
cs.metaData(metaData);
|
|
|
|
|
|
Job job = jobWithRules("job_with_rules");
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker,
|
|
|
isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertThat(result.getExplanation(), containsString(
|
|
|
"because jobs using custom_rules require a node of version [6.4.0] or higher"));
|
|
@@ -449,7 +449,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
cs.metaData(metaData);
|
|
|
|
|
|
Job job = jobWithRules("job_with_rules");
|
|
|
- Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
|
|
|
+ Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker,
|
|
|
isMemoryTrackerRecentlyRefreshed, logger);
|
|
|
assertNotNull(result.getExecutorNode());
|
|
|
}
|
|
@@ -539,7 +539,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
ClusterService clusterService = mock(ClusterService.class);
|
|
|
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
|
|
|
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
|
|
|
- MachineLearning.MAX_LAZY_ML_NODES)
|
|
|
+ MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
|
|
|
);
|
|
|
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
|
|
|
|
@@ -556,7 +556,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|
|
ClusterService clusterService = mock(ClusterService.class);
|
|
|
ClusterSettings clusterSettings = new ClusterSettings(settings,
|
|
|
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
|
|
|
- MachineLearning.MAX_LAZY_ML_NODES)
|
|
|
+ MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
|
|
|
);
|
|
|
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
|
|
|