|
@@ -6,6 +6,7 @@
|
|
|
package org.elasticsearch.xpack.ml.integration;
|
|
|
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
@@ -24,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.index.query.TermsQueryBuilder;
|
|
|
import org.elasticsearch.persistent.PersistentTaskResponse;
|
|
|
import org.elasticsearch.persistent.PersistentTasksClusterService;
|
|
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|
@@ -47,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
|
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
|
|
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
|
|
+import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
|
|
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
|
|
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
|
|
|
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
|
@@ -58,6 +61,9 @@ import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment;
|
|
|
+import static org.hamcrest.Matchers.arrayWithSize;
|
|
|
+import static org.hamcrest.Matchers.is;
|
|
|
+import static org.hamcrest.Matchers.notNullValue;
|
|
|
|
|
|
public class MlDistributedFailureIT extends BaseMlIntegTestCase {
|
|
|
|
|
@@ -176,9 +182,22 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
|
|
|
|
|
|
// Since 7.5 we can also stop an unassigned job either normally or by force
|
|
|
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
|
|
|
- closeJobRequest.setForce(randomBoolean());
|
|
|
+ boolean closeWithForce = randomBoolean();
|
|
|
+ closeJobRequest.setForce(closeWithForce);
|
|
|
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
|
|
|
assertTrue(closeJobResponse.isClosed());
|
|
|
+
|
|
|
+ // We should have an audit message indicating that the job was closed
|
|
|
+ String expectedAuditMessage = closeWithForce ? "Job is closing (forced)" : "Job is closing";
|
|
|
+ SearchRequest searchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
|
|
|
+ searchRequest.source().query(new TermsQueryBuilder("message.raw", expectedAuditMessage));
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX));
|
|
|
+ SearchResponse searchResponse = client().search(searchRequest).actionGet();
|
|
|
+ assertThat(searchResponse.getHits(), notNullValue());
|
|
|
+ assertThat(searchResponse.getHits().getHits(), arrayWithSize(1));
|
|
|
+ assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() throws Exception {
|