|
@@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.PostDataAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
|
|
+import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
|
|
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
|
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
|
|
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
|
@@ -424,6 +425,60 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|
|
assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));
|
|
|
}
|
|
|
|
|
|
+ public void testCloseUnassignedLazyJobAndDatafeed() throws Exception {
|
|
|
+ internalCluster().ensureAtLeastNumDataNodes(3);
|
|
|
+ ensureStableCluster(3);
|
|
|
+
|
|
|
+ String jobId = "test-lazy-stop";
|
|
|
+ String datafeedId = jobId + "-datafeed";
|
|
|
+ // Assume the test machine won't have space to assign a 2TB job
|
|
|
+ Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.TB), true);
|
|
|
+ PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
|
|
|
+ client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
|
|
+
|
|
|
+ client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();
|
|
|
+
|
|
|
+ DatafeedConfig config = createDatafeed(datafeedId, jobId, Collections.singletonList("data"));
|
|
|
+ PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
|
|
|
+ client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
|
|
|
+
|
|
|
+ ensureYellow(); // at least the primary shards of the indices a job uses should be started
|
|
|
+ OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId);
|
|
|
+ client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
|
|
|
+
|
|
|
+ // Job state should be opening because it won't fit anyway, but is allowed to open lazily
|
|
|
+ GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId);
|
|
|
+ GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
|
|
|
+ assertEquals(JobState.OPENING, jobStatsResponse.getResponse().results().get(0).getState());
|
|
|
+
|
|
|
+ StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
|
|
|
+ client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest).actionGet();
|
|
|
+
|
|
|
+ // Datafeed state should be starting while it waits for job assignment
|
|
|
+ GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
|
|
|
+ GetDatafeedsStatsAction.Response datafeedStatsResponse =
|
|
|
+ client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
|
|
|
+ assertEquals(DatafeedState.STARTING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
|
|
|
+
|
|
|
+ // A starting datafeed can be stopped normally or by force
|
|
|
+ StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
|
|
|
+ stopDatafeedRequest.setForce(randomBoolean());
|
|
|
+ StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
|
|
|
+ assertTrue(stopDatafeedResponse.isStopped());
|
|
|
+
|
|
|
+ datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
|
|
|
+ assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
|
|
|
+
|
|
|
+ // An opening job can also be stopped normally or by force
|
|
|
+ CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
|
|
|
+ closeJobRequest.setForce(randomBoolean());
|
|
|
+ CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
|
|
|
+ assertTrue(closeJobResponse.isClosed());
|
|
|
+
|
|
|
+ jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
|
|
|
+ assertEquals(JobState.CLOSED, jobStatsResponse.getResponse().results().get(0).getState());
|
|
|
+ }
|
|
|
+
|
|
|
private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) {
|
|
|
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
|
|
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|