|
|
@@ -5,6 +5,7 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.ml.integration;
|
|
|
|
|
|
+import org.elasticsearch.action.ActionFuture;
|
|
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
@@ -77,9 +78,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
.get();
|
|
|
assertThat(bulkResponse.hasFailures(), is(false));
|
|
|
-
|
|
|
- // Ensure all data is searchable
|
|
|
- client().admin().indices().prepareRefresh(DATA_INDEX).get();
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
@@ -94,6 +92,17 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
}
|
|
|
|
|
|
public void testDeleteExpiredData() throws Exception {
|
|
|
+ // Index some unused state documents (more than 10K to test scrolling works)
|
|
|
+ BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
|
|
+ bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ for (int i = 0; i < 10010; i++) {
|
|
|
+ String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
|
|
|
+ IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
|
|
|
+ indexRequest.source(Collections.emptyMap());
|
|
|
+ bulkRequestBuilder.add(indexRequest);
|
|
|
+ }
|
|
|
+ ActionFuture<BulkResponse> indexUnusedStateDocsResponse = bulkRequestBuilder.execute();
|
|
|
+
|
|
|
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L));
|
|
|
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L));
|
|
|
registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
|
|
|
@@ -104,6 +113,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
|
|
|
long now = System.currentTimeMillis();
|
|
|
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;
|
|
|
+
|
|
|
+ // Start all jobs
|
|
|
for (Job.Builder job : getJobs()) {
|
|
|
putJob(job);
|
|
|
|
|
|
@@ -117,7 +128,14 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
// Run up to a day ago
|
|
|
openJob(job.getId());
|
|
|
startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now let's wait for all jobs to be closed
|
|
|
+ for (Job.Builder job : getJobs()) {
|
|
|
waitUntilJobIsClosed(job.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Job.Builder job : getJobs()) {
|
|
|
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47)));
|
|
|
assertThat(getRecords(job.getId()).size(), equalTo(1));
|
|
|
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
|
|
|
@@ -143,6 +161,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
|
|
|
waitForecastToFinish(job.getId(), forecastNoExpiryId);
|
|
|
}
|
|
|
+
|
|
|
// Refresh to ensure the snapshot timestamp updates are visible
|
|
|
client().admin().indices().prepareRefresh("*").get();
|
|
|
|
|
|
@@ -175,16 +194,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|
|
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
|
|
|
}
|
|
|
|
|
|
- // Index some unused state documents (more than 10K to test scrolling works)
|
|
|
- BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
|
|
- bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
- for (int i = 0; i < 10010; i++) {
|
|
|
- String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
|
|
|
- IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
|
|
|
- indexRequest.source(Collections.emptyMap());
|
|
|
- bulkRequestBuilder.add(indexRequest);
|
|
|
- }
|
|
|
- assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK));
|
|
|
+ // Before we call the delete-expired-data action we need to make sure the unused state docs were indexed
|
|
|
+ assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
|
|
|
|
|
|
// Now call the action under test
|
|
|
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
|