Browse Source

[ML] Prevent node potentially going out of memory due to loading quantiles (#70376)

Large jobs with lots of partitions can get very big, retrieving snapshots
for such a job can cause a node to go out of memory.

With this change do not fetch quantiles when querying for (multiple)
modelSnapshots to avoid memory overhead. Quantiles aren't needed for
the API's using JobResultsProvider.modelSnapshots(...)

fixes #70372
Hendrik Muhs 4 years ago
parent
commit
74feca22b6

+ 16 - 0
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

@@ -649,19 +649,23 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_2")
             .setTimestamp(Date.from(Instant.ofEpochMilli(10)))
             .setMinVersion(Version.V_7_4_0)
+            .setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(10)), randomAlphaOfLength(20)))
             .build());
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_1")
             .setTimestamp(Date.from(Instant.ofEpochMilli(11)))
             .setMinVersion(Version.V_7_2_0)
+            .setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(11)), randomAlphaOfLength(20)))
             .build());
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("other_snap")
             .setTimestamp(Date.from(Instant.ofEpochMilli(12)))
             .setMinVersion(Version.V_7_3_0)
+            .setQuantiles(new Quantiles(jobId, Date.from(Instant.ofEpochMilli(12)), randomAlphaOfLength(20)))
             .build());
         createJob("other_job");
         indexModelSnapshot(new ModelSnapshot.Builder("other_job").setSnapshotId("other_snap")
             .setTimestamp(Date.from(Instant.ofEpochMilli(10)))
             .setMinVersion(Version.CURRENT)
+            .setQuantiles(new Quantiles("other_job", Date.from(Instant.ofEpochMilli(10)), randomAlphaOfLength(20)))
             .build());
         // Add a snapshot WITHOUT a min version.
         client().prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName("other_job"))
@@ -677,13 +681,17 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", future::onResponse, future::onFailure);
         List<ModelSnapshot> snapshots = future.actionGet().results();
         assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
+        assertNull(snapshots.get(0).getQuantiles());
         assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
+        assertNull(snapshots.get(1).getQuantiles());
 
         future = new PlainActionFuture<>();
         jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*", future::onResponse, future::onFailure);
         snapshots = future.actionGet().results();
         assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
         assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
+        assertNull(snapshots.get(0).getQuantiles());
+        assertNull(snapshots.get(1).getQuantiles());
 
         future = new PlainActionFuture<>();
         jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_*,other_snap", future::onResponse, future::onFailure);
@@ -716,6 +724,14 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
         assertThat(snapshots.get(3).getSnapshotId(), equalTo("snap_2"));
         assertThat(snapshots.get(4).getSnapshotId(), equalTo("other_snap"));
+
+        // assert that quantiles are not loaded
+        assertNull(snapshots.get(0).getQuantiles());
+        assertNull(snapshots.get(1).getQuantiles());
+        assertNull(snapshots.get(2).getQuantiles());
+        assertNull(snapshots.get(3).getQuantiles());
+        assertNull(snapshots.get(4).getQuantiles());
+
     }
 
     public void testGetAutodetectParams() throws Exception {

+ 1 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

@@ -17,13 +17,9 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
-import org.elasticsearch.xpack.core.action.util.QueryPage;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
-import java.util.stream.Collectors;
-
 public class TransportGetModelSnapshotsAction extends HandledTransportAction<GetModelSnapshotsAction.Request,
         GetModelSnapshotsAction.Response> {
 
@@ -74,16 +70,7 @@ public class TransportGetModelSnapshotsAction extends HandledTransportAction<Get
             request.getSort(),
             request.getDescOrder(),
             request.getSnapshotId(),
-            page -> listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page))),
+            page -> listener.onResponse(new GetModelSnapshotsAction.Response(page)),
             listener::onFailure);
     }
-
-    public static QueryPage<ModelSnapshot> clearQuantiles(QueryPage<ModelSnapshot> page) {
-        if (page.results() == null) {
-            return page;
-        }
-        return new QueryPage<>(page.results().stream().map(snapshot ->
-                new ModelSnapshot.Builder(snapshot).setQuantiles(null).build())
-                .collect(Collectors.toList()), page.count(), page.getResultsField());
-    }
 }

+ 13 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

@@ -80,6 +80,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.metrics.ExtendedStats;
 import org.elasticsearch.search.aggregations.metrics.Stats;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
@@ -154,6 +155,13 @@ public class JobResultsProvider {
     public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
     private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1;
 
+    // filter for quantiles in modelSnapshots to avoid memory overhead
+    private static final FetchSourceContext REMOVE_QUANTILES_FROM_SOURCE = new FetchSourceContext(
+        true,
+        null,
+        new String[] { ModelSnapshot.QUANTILES.getPreferredName() }
+    );
+
     private final Client client;
     private final Settings settings;
     private final IndexNameExpressionResolver resolver;
@@ -1006,6 +1014,8 @@ public class JobResultsProvider {
     /**
      * Get model snapshots for the job ordered by descending timestamp (newest first).
      *
+     * Note: quantiles are removed from the results.
+     *
      * @param jobId the job id
      * @param from  number of snapshots to from
      * @param size  number of snapshots to retrieve
@@ -1018,6 +1028,8 @@ public class JobResultsProvider {
     /**
      * Get model snapshots for the job ordered by descending restore priority.
      *
+     * Note: quantiles are removed from the results.
+     *
      * @param jobId          the job id
      * @param from           number of snapshots to from
      * @param size           number of snapshots to retrieve
@@ -1082,6 +1094,7 @@ public class JobResultsProvider {
         sourceBuilder.from(from);
         sourceBuilder.size(size);
         sourceBuilder.trackTotalHits(true);
+        sourceBuilder.fetchSource(REMOVE_QUANTILES_FROM_SOURCE);
         searchRequest.source(sourceBuilder);
         executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
                 ActionListener.<SearchResponse>wrap(searchResponse -> {

+ 1 - 22
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/modelsnapshots/GetModelSnapshotsTests.java

@@ -6,17 +6,9 @@
  */
 package org.elasticsearch.xpack.ml.modelsnapshots;
 
-import org.elasticsearch.common.ParseField;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
-import org.elasticsearch.xpack.ml.action.TransportGetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.action.util.PageParams;
-import org.elasticsearch.xpack.core.action.util.QueryPage;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
-
-import java.util.Arrays;
-import java.util.Date;
+import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 
 public class GetModelSnapshotsTests extends ESTestCase {
 
@@ -31,17 +23,4 @@ public class GetModelSnapshotsTests extends ESTestCase {
                 () -> new GetModelSnapshotsAction.Request("foo", null).setPageParams(new PageParams(10, -5)));
         assertEquals("Parameter [size] cannot be < 0", e.getMessage());
     }
-
-    public void testModelSnapshots_clearQuantiles() {
-        ModelSnapshot m1 = new ModelSnapshot.Builder("jobId").setQuantiles(
-                new Quantiles("jobId", new Date(), "quantileState")).build();
-        ModelSnapshot m2 = new ModelSnapshot.Builder("jobId").build();
-
-        QueryPage<ModelSnapshot> page = new QueryPage<>(Arrays.asList(m1, m2), 2, new ParseField("field"));
-        page = TransportGetModelSnapshotsAction.clearQuantiles(page);
-        assertEquals(2, page.results().size());
-        for (ModelSnapshot modelSnapshot : page.results()) {
-            assertNull(modelSnapshot.getQuantiles());
-        }
-    }
 }