|
@@ -27,12 +27,15 @@ import org.elasticsearch.action.get.GetResponse;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
+import org.elasticsearch.action.update.UpdateRequest;
|
|
|
import org.elasticsearch.client.ml.CloseJobRequest;
|
|
|
import org.elasticsearch.client.ml.CloseJobResponse;
|
|
|
import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
|
|
|
import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
|
|
|
import org.elasticsearch.client.ml.DeleteCalendarRequest;
|
|
|
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
|
|
|
+import org.elasticsearch.client.ml.DeleteExpiredDataRequest;
|
|
|
+import org.elasticsearch.client.ml.DeleteExpiredDataResponse;
|
|
|
import org.elasticsearch.client.ml.DeleteFilterRequest;
|
|
|
import org.elasticsearch.client.ml.DeleteForecastRequest;
|
|
|
import org.elasticsearch.client.ml.DeleteJobRequest;
|
|
@@ -110,6 +113,7 @@ import org.elasticsearch.client.ml.job.util.PageParams;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
+import org.elasticsearch.search.SearchHit;
|
|
|
import org.junit.After;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -130,6 +134,7 @@ import static org.hamcrest.CoreMatchers.hasItems;
|
|
|
import static org.hamcrest.CoreMatchers.not;
|
|
|
import static org.hamcrest.Matchers.contains;
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
|
|
@@ -772,6 +777,142 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
|
|
|
assertThat(totalTotals, containsInAnyOrder(totals));
|
|
|
}
|
|
|
|
|
|
+ public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
|
|
|
+ // Tests that nothing goes wrong when there's nothing to delete
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+
|
|
|
+ DeleteExpiredDataResponse response = execute(new DeleteExpiredDataRequest(),
|
|
|
+ machineLearningClient::deleteExpiredData,
|
|
|
+ machineLearningClient::deleteExpiredDataAsync);
|
|
|
+
|
|
|
+ assertTrue(response.getDeleted());
|
|
|
+ }
|
|
|
+
|
|
|
+ private String createExpiredData(String jobId) throws Exception {
|
|
|
+ String indexId = jobId + "-data";
|
|
|
+ // Set up the index and docs
|
|
|
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexId);
|
|
|
+ createIndexRequest.mapping("doc", "timestamp", "type=date,format=epoch_millis", "total", "type=long");
|
|
|
+ highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
|
|
+ BulkRequest bulk = new BulkRequest();
|
|
|
+ bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+
|
|
|
+ long nowMillis = System.currentTimeMillis();
|
|
|
+ int totalBuckets = 2 * 24;
|
|
|
+ int normalRate = 10;
|
|
|
+ int anomalousRate = 100;
|
|
|
+ int anomalousBucket = 30;
|
|
|
+ for (int bucket = 0; bucket < totalBuckets; bucket++) {
|
|
|
+ long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
|
|
|
+ int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
|
|
|
+ for (int point = 0; point < bucketRate; point++) {
|
|
|
+ IndexRequest indexRequest = new IndexRequest(indexId, "doc");
|
|
|
+ indexRequest.source(XContentType.JSON, "timestamp", timestamp, "total", randomInt(1000));
|
|
|
+ bulk.add(indexRequest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
|
|
|
+
|
|
|
+ {
|
|
|
+ // Index a randomly named unused state document
|
|
|
+ String docId = "non_existing_job_" + randomFrom("model_state_1234567#1", "quantiles", "categorizer_state#1");
|
|
|
+ IndexRequest indexRequest = new IndexRequest(".ml-state", "doc", docId);
|
|
|
+ indexRequest.source(Collections.emptyMap(), XContentType.JSON);
|
|
|
+ indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ Job job = buildJobForExpiredDataTests(jobId);
|
|
|
+ putJob(job);
|
|
|
+ openJob(job);
|
|
|
+ String datafeedId = createAndPutDatafeed(jobId, indexId);
|
|
|
+
|
|
|
+ startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis - TimeValue.timeValueHours(24).getMillis()));
|
|
|
+
|
|
|
+ waitForJobToClose(jobId);
|
|
|
+
|
|
|
+ // Update snapshot timestamp to force it out of snapshot retention window
|
|
|
+ long oneDayAgo = nowMillis - TimeValue.timeValueHours(24).getMillis() - 1;
|
|
|
+ updateModelSnapshotTimestamp(jobId, String.valueOf(oneDayAgo));
|
|
|
+
|
|
|
+ openJob(job);
|
|
|
+
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+ ForecastJobRequest forecastJobRequest = new ForecastJobRequest(jobId);
|
|
|
+ forecastJobRequest.setDuration(TimeValue.timeValueHours(3));
|
|
|
+ forecastJobRequest.setExpiresIn(TimeValue.timeValueSeconds(1));
|
|
|
+ ForecastJobResponse forecastJobResponse = machineLearningClient.forecastJob(forecastJobRequest, RequestOptions.DEFAULT);
|
|
|
+
|
|
|
+ waitForForecastToComplete(jobId, forecastJobResponse.getForecastId());
|
|
|
+
|
|
|
+ // Wait for the forecast to expire
|
|
|
+ awaitBusy(() -> false, 1, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ // Run up to now
|
|
|
+ startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis));
|
|
|
+
|
|
|
+ waitForJobToClose(jobId);
|
|
|
+
|
|
|
+ return forecastJobResponse.getForecastId();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testDeleteExpiredData() throws Exception {
|
|
|
+
|
|
|
+ String jobId = "test-delete-expired-data";
|
|
|
+
|
|
|
+ String forecastId = createExpiredData(jobId);
|
|
|
+
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+
|
|
|
+ GetModelSnapshotsRequest getModelSnapshotsRequest = new GetModelSnapshotsRequest(jobId);
|
|
|
+ GetModelSnapshotsResponse getModelSnapshotsResponse = execute(getModelSnapshotsRequest, machineLearningClient::getModelSnapshots,
|
|
|
+ machineLearningClient::getModelSnapshotsAsync);
|
|
|
+
|
|
|
+ assertEquals(2L, getModelSnapshotsResponse.count());
|
|
|
+
|
|
|
+ assertTrue(forecastExists(jobId, forecastId));
|
|
|
+
|
|
|
+ {
|
|
|
+ // Verify .ml-state contains the expected unused state document
|
|
|
+ Iterable<SearchHit> hits = searchAll(".ml-state");
|
|
|
+ List<SearchHit> target = new ArrayList<>();
|
|
|
+ hits.forEach(target::add);
|
|
|
+ long numMatches = target.stream()
|
|
|
+ .filter(c -> c.getId().startsWith("non_existing_job"))
|
|
|
+ .count();
|
|
|
+
|
|
|
+ assertThat(numMatches, equalTo(1L));
|
|
|
+ }
|
|
|
+
|
|
|
+ DeleteExpiredDataRequest request = new DeleteExpiredDataRequest();
|
|
|
+ DeleteExpiredDataResponse response = execute(request, machineLearningClient::deleteExpiredData,
|
|
|
+ machineLearningClient::deleteExpiredDataAsync);
|
|
|
+
|
|
|
+ assertTrue(response.getDeleted());
|
|
|
+
|
|
|
+ awaitBusy(() -> false, 1, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ GetModelSnapshotsRequest getModelSnapshotsRequest1 = new GetModelSnapshotsRequest(jobId);
|
|
|
+ GetModelSnapshotsResponse getModelSnapshotsResponse1 = execute(getModelSnapshotsRequest1, machineLearningClient::getModelSnapshots,
|
|
|
+ machineLearningClient::getModelSnapshotsAsync);
|
|
|
+
|
|
|
+ assertEquals(1L, getModelSnapshotsResponse1.count());
|
|
|
+
|
|
|
+ assertFalse(forecastExists(jobId, forecastId));
|
|
|
+
|
|
|
+ {
|
|
|
+ // Verify .ml-state doesn't contain unused state documents
|
|
|
+ Iterable<SearchHit> hits = searchAll(".ml-state");
|
|
|
+ List<SearchHit> hitList = new ArrayList<>();
|
|
|
+ hits.forEach(hitList::add);
|
|
|
+ long numMatches = hitList.stream()
|
|
|
+ .filter(c -> c.getId().startsWith("non_existing_job"))
|
|
|
+ .count();
|
|
|
+
|
|
|
+ assertThat(numMatches, equalTo(0L));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testDeleteForecast() throws Exception {
|
|
|
String jobId = "test-delete-forecast";
|
|
|
|
|
@@ -1146,6 +1287,27 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
|
|
|
return generator.ofCodePointsLength(random(), 10, 10);
|
|
|
}
|
|
|
|
|
|
+ private static Job buildJobForExpiredDataTests(String jobId) {
|
|
|
+ Job.Builder builder = new Job.Builder(jobId);
|
|
|
+ builder.setDescription(randomAlphaOfLength(10));
|
|
|
+
|
|
|
+ Detector detector = new Detector.Builder()
|
|
|
+ .setFunction("count")
|
|
|
+ .setDetectorDescription(randomAlphaOfLength(10))
|
|
|
+ .build();
|
|
|
+ AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Arrays.asList(detector));
|
|
|
+ //should not be random, see:https://github.com/elastic/ml-cpp/issues/208
|
|
|
+ configBuilder.setBucketSpan(new TimeValue(1, TimeUnit.HOURS));
|
|
|
+ builder.setAnalysisConfig(configBuilder);
|
|
|
+
|
|
|
+ DataDescription.Builder dataDescription = new DataDescription.Builder();
|
|
|
+ dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
|
|
|
+ dataDescription.setTimeField("timestamp");
|
|
|
+ builder.setDataDescription(dataDescription);
|
|
|
+
|
|
|
+ return builder.build();
|
|
|
+ }
|
|
|
+
|
|
|
public static Job buildJob(String jobId) {
|
|
|
Job.Builder builder = new Job.Builder(jobId);
|
|
|
builder.setDescription(randomAlphaOfLength(10));
|
|
@@ -1176,6 +1338,53 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
|
|
|
highLevelClient().machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);
|
|
|
}
|
|
|
|
|
|
+ private void waitForJobToClose(String jobId) throws Exception {
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+
|
|
|
+ assertBusy(() -> {
|
|
|
+ JobStats stats = machineLearningClient.getJobStats(new GetJobStatsRequest(jobId), RequestOptions.DEFAULT).jobStats().get(0);
|
|
|
+ assertEquals(JobState.CLOSED, stats.getState());
|
|
|
+ }, 30, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startDatafeed(String datafeedId, String start, String end) throws Exception {
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+
|
|
|
+ StartDatafeedRequest startDatafeedRequest = new StartDatafeedRequest(datafeedId);
|
|
|
+ startDatafeedRequest.setStart(start);
|
|
|
+ startDatafeedRequest.setEnd(end);
|
|
|
+ StartDatafeedResponse response = execute(startDatafeedRequest,
|
|
|
+ machineLearningClient::startDatafeed,
|
|
|
+ machineLearningClient::startDatafeedAsync);
|
|
|
+
|
|
|
+ assertTrue(response.isStarted());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws Exception {
|
|
|
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
|
|
|
+
|
|
|
+ GetModelSnapshotsRequest getModelSnapshotsRequest = new GetModelSnapshotsRequest(jobId);
|
|
|
+ GetModelSnapshotsResponse getModelSnapshotsResponse = execute(getModelSnapshotsRequest, machineLearningClient::getModelSnapshots,
|
|
|
+ machineLearningClient::getModelSnapshotsAsync);
|
|
|
+
|
|
|
+ assertThat(getModelSnapshotsResponse.count(), greaterThanOrEqualTo(1L));
|
|
|
+
|
|
|
+ ModelSnapshot modelSnapshot = getModelSnapshotsResponse.snapshots().get(0);
|
|
|
+
|
|
|
+ String snapshotId = modelSnapshot.getSnapshotId();
|
|
|
+ String documentId = jobId + "_model_snapshot_" + snapshotId;
|
|
|
+
|
|
|
+ String snapshotUpdate = "{ \"timestamp\": " + timestamp + "}";
|
|
|
+ UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + jobId, "doc", documentId);
|
|
|
+ updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
|
|
|
+ highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);
|
|
|
+
|
|
|
+ // Wait a second to ensure subsequent model snapshots will have a different ID (it depends on epoch seconds)
|
|
|
+ awaitBusy(() -> false, 1, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
private String createAndPutDatafeed(String jobId, String indexName) throws IOException {
|
|
|
String datafeedId = jobId + "-feed";
|
|
|
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
|