Browse Source

This implementation lazily (on 1st forecast request) checks for available
diskspace and creates a subfolder for storing data outside of Lucene
indexes, but as part of the ES data paths.

Details:
- tmp storage is managed and does not allow allocation if disk space is
below a threshold (5GB at the moment)
- tmp storage is supposed to be managed by the native component but in
case this fails cleanup is provided:
- on job close
- on process crash
- after node crash, on restart
- available space is re-checked for every forecast call (the native
component has to check again before writing)

Note: The 1st path that has enough space is chosen on job open (job
close/reopen triggers a new search)

Hendrik Muhs 7 years ago
parent
commit
6c313a9871

+ 1 - 4
x-pack/docs/en/ml/forecasting.asciidoc

@@ -59,10 +59,7 @@ For more information about any of these functions, see <<ml-functions>>.
 * Forecasts run concurrently with real-time {ml} analysis. That is to say, {ml}
 analysis does not stop while forecasts are generated. Forecasts can have an
 impact on {ml} jobs, however, especially in terms of memory usage. For this
-reason, forecasts run only if the model memory status is acceptable and the
-snapshot models for the forecast do not require more than 20 MB. If these memory
-limits are reached, consider splitting the job into multiple smaller jobs and
-creating forecasts for these.
+reason, forecasts run only if the model memory status is acceptable.
 * The job must be open when you create a forecast. Otherwise, an error occurs.
 * If there is insufficient data to generate any meaningful predictions, an
 error occurs. In general, forecasts that are created early in the learning phase

+ 5 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -286,7 +286,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                         DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
                         DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
                         AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
-                        AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE));
+                        AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
+                        AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
     }
 
     public Settings additionalSettings() {
@@ -403,6 +404,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
         // This object's constructor attaches to the license state, so there's no need to retain another reference to it
         new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
 
+        // run node startup tasks
+        autodetectProcessManager.onNodeStartup();
+
         return Arrays.asList(
                 mlLifeCycleService,
                 jobProvider,

+ 12 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java

@@ -15,6 +15,8 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -28,6 +30,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -36,6 +39,8 @@ import static org.elasticsearch.xpack.core.ml.action.ForecastJobAction.Request.D
 public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJobAction.Request,
         ForecastJobAction.Response> {
 
+    private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);
+
     private final JobProvider jobProvider;
     @Inject
     public TransportForecastJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@@ -73,6 +78,13 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
             paramsBuilder.expiresIn(request.getExpiresIn());
         }
 
+        // tmp storage might be null, we do not log here, because it might not be
+        // required
+        Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
+        if (tmpStorage != null) {
+            paramsBuilder.tmpStorage(tmpStorage.toString());
+        }
+
         ForecastParams params = paramsBuilder.build();
         processManager.forecastJob(task, params, e -> {
             if (e == null) {

+ 123 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java

@@ -0,0 +1,123 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ml.job.process;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.env.Environment;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/**
+ * Provide storage for native components.
+ */
+public class NativeStorageProvider {
+
+    private static final Logger LOGGER = Loggers.getLogger(NativeStorageProvider.class);
+
+
+    private static final String LOCAL_STORAGE_SUBFOLDER = "ml-local-data";
+    private static final String LOCAL_STORAGE_TMP_FOLDER = "tmp";
+
+    private final Environment environment;
+
+    // do not allow any usage below this threshold
+    private final ByteSizeValue minLocalStorageAvailable;
+
+    public NativeStorageProvider(Environment environment, ByteSizeValue minDiskSpaceOffHeap) {
+        this.environment = environment;
+        this.minLocalStorageAvailable = minDiskSpaceOffHeap;
+    }
+
+    /**
+     * Removes any temporary storage leftovers.
+     *
+     * Removes all temp files and folder which might be there as a result of an
+     * unclean node shutdown or broken clients.
+     *
+     * Do not call while there are running jobs.
+     *
+     * @throws IOException if cleanup fails
+     */
+    public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() throws IOException {
+        for (Path p : environment.dataFiles()) {
+            IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER));
+        }
+    }
+
+    /**
+     * Tries to find local storage for storing temporary data.
+     *
+     * @param uniqueIdentifier An identifier to be used as sub folder
+     * @param requestedSize The maximum size required
+     * @return Path for temporary storage if available, null otherwise
+     */
+    public Path tryGetLocalTmpStorage(String uniqueIdentifier, ByteSizeValue requestedSize) {
+        for (Path path : environment.dataFiles()) {
+            try {
+                if (getUsableSpace(path) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes()) {
+                    Path tmpDirectory = path.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER).resolve(uniqueIdentifier);
+                    Files.createDirectories(tmpDirectory);
+                    return tmpDirectory;
+                }
+            } catch (IOException e) {
+                LOGGER.debug("Failed to obtain information about path [{}]: {}", path, e);
+            }
+
+        }
+        LOGGER.debug("Failed to find native storage for [{}], returning null", uniqueIdentifier);
+        return null;
+    }
+
+    public boolean localTmpStorageHasEnoughSpace(Path path, ByteSizeValue requestedSize) {
+        Path realPath = path.toAbsolutePath();
+        for (Path p : environment.dataFiles()) {
+            try {
+                if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) {
+                    return getUsableSpace(p) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes();
+                }
+            } catch (IOException e) {
+                LOGGER.debug("Failed to optain information about path [{}]: {}", path, e);
+            }
+        }
+
+        LOGGER.debug("Not enough space left for path [{}]", path);
+        return false;
+    }
+
+    /**
+     * Delete temporary storage, previously allocated
+     *
+     * @param path
+     *            Path to temporary storage
+     * @throws IOException
+     *             if path can not be cleaned up
+     */
+    public void cleanupLocalTmpStorage(Path path) throws IOException {
+        // do not allow to breakout from the tmp storage provided
+        Path realPath = path.toAbsolutePath();
+        for (Path p : environment.dataFiles()) {
+            if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) {
+                IOUtils.rm(path);
+            }
+        }
+    }
+
+    long getUsableSpace(Path path) throws IOException {
+        long freeSpaceInBytes = Environment.getFileStore(path).getUsableSpace();
+
+        /* See: https://bugs.openjdk.java.net/browse/JDK-8162520 */
+        if (freeSpaceInBytes < 0) {
+            freeSpaceInBytes = Long.MAX_VALUE;
+        }
+        return freeSpaceInBytes;
+    }
+}

+ 66 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
 
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
@@ -15,11 +16,12 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -47,6 +49,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
 import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
+import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@@ -59,6 +62,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.time.ZonedDateTime;
 import java.util.Date;
@@ -96,6 +100,10 @@ public class AutodetectProcessManager extends AbstractComponent {
     public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
             Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope);
 
+    // Undocumented setting for integration test purposes
+    public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
+            Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope);
+
     private final Client client;
     private final Environment environment;
     private final ThreadPool threadPool;
@@ -107,8 +115,12 @@ public class AutodetectProcessManager extends AbstractComponent {
     private final JobResultsPersister jobResultsPersister;
     private final JobDataCountsPersister jobDataCountsPersister;
 
+    private NativeStorageProvider nativeStorageProvider;
     private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<>();
 
+    // a map that manages the allocation of temporary space to jobs
+    private final ConcurrentMap<String, Path> nativeTmpStorage = new ConcurrentHashMap<>();
+
     private final int maxAllowedRunningJobs;
 
     private final NamedXContentRegistry xContentRegistry;
@@ -133,6 +145,15 @@ public class AutodetectProcessManager extends AbstractComponent {
         this.jobResultsPersister = jobResultsPersister;
         this.jobDataCountsPersister = jobDataCountsPersister;
         this.auditor = auditor;
+        this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
+    }
+
+    public void onNodeStartup() {
+        try {
+            nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
+        } catch (Exception e) {
+            logger.warn("Failed to cleanup native storage from previous invocation", e);
+        }
     }
 
     public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
@@ -251,6 +272,28 @@ public class AutodetectProcessManager extends AbstractComponent {
         });
     }
 
+    /**
+     * Request temporary storage to be used for the job
+     *
+     * @param jobTask The job task
+     * @param requestedSize requested size
+     * @return a Path to local storage or null if storage is not available
+     */
+    public Path tryGetTmpStorage(JobTask jobTask, ByteSizeValue requestedSize) {
+        String jobId = jobTask.getJobId();
+        Path path = nativeTmpStorage.get(jobId);
+        if (path == null) {
+            path = nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize);
+            if (path != null) {
+                nativeTmpStorage.put(jobId, path);
+            }
+        } else if (!nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) {
+            // the previous tmp location ran out of disk space, do not allow further usage
+            return null;
+        }
+        return path;
+    }
+
     /**
      * Do a forecast for the running job.
      *
@@ -258,10 +301,11 @@ public class AutodetectProcessManager extends AbstractComponent {
      * @param params    Forecast parameters
      */
     public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Exception> handler) {
-        logger.debug("Forecasting job {}", jobTask.getJobId());
+        String jobId = jobTask.getJobId();
+        logger.debug("Forecasting job {}", jobId);
         AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
         if (communicator == null) {
-            String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId());
+            String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
             logger.debug(message);
             handler.accept(ExceptionsHelper.conflictStatusException(message));
             return;
@@ -271,7 +315,7 @@ public class AutodetectProcessManager extends AbstractComponent {
             if (e == null) {
                 handler.accept(null);
             } else {
-                String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobTask.getJobId());
+                String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobId);
                 logger.error(msg, e);
                 handler.accept(ExceptionsHelper.serverError(msg, e));
             }
@@ -477,6 +521,11 @@ public class AutodetectProcessManager extends AbstractComponent {
                 }
             }
             setJobState(jobTask, JobState.FAILED);
+            try {
+                removeTmpStorage(jobTask.getJobId());
+            } catch (IOException e) {
+                logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobTask.getJobId()), e);
+            }
         };
     }
 
@@ -535,6 +584,12 @@ public class AutodetectProcessManager extends AbstractComponent {
             // thread that gets into this method blocks until the first thread has finished closing the job
             processContext.unlock();
         }
+        // delete any tmp storage
+        try {
+            removeTmpStorage(jobId);
+        } catch (IOException e) {
+            logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
+        }
     }
 
     int numberOfOpenJobs() {
@@ -613,6 +668,13 @@ public class AutodetectProcessManager extends AbstractComponent {
         return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
     }
 
+    private void removeTmpStorage(String jobId) throws IOException {
+        Path path = nativeTmpStorage.get(jobId);
+        if (path != null) {
+            nativeStorageProvider.cleanupLocalTmpStorage(path);
+        }
+    }
+
     ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
         AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
         executorService.submit(autoDetectWorkerExecutor::start);

+ 22 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java

@@ -16,12 +16,14 @@ public class ForecastParams {
     private final long createTime;
     private final long duration;
     private final long expiresIn;
+    private final String tmpStorage;
 
-    private ForecastParams(String forecastId, long createTime, long duration, long expiresIn) {
+    private ForecastParams(String forecastId, long createTime, long duration, long expiresIn, String tmpStorage) {
         this.forecastId = forecastId;
         this.createTime = createTime;
         this.duration = duration;
         this.expiresIn = expiresIn;
+        this.tmpStorage = tmpStorage;
     }
 
     public String getForecastId() {
@@ -52,9 +54,18 @@ public class ForecastParams {
         return expiresIn;
     }
 
+    /**
+     * Temporary storage forecast is allowed to use for persisting models.
+     *
+     * @return path to tmp storage
+     */
+    public String getTmpStorage() {
+        return tmpStorage;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(forecastId, createTime, duration, expiresIn);
+        return Objects.hash(forecastId, createTime, duration, expiresIn, tmpStorage);
     }
 
     @Override
@@ -69,7 +80,8 @@ public class ForecastParams {
         return Objects.equals(forecastId, other.forecastId)
                 && Objects.equals(createTime, other.createTime)
                 && Objects.equals(duration, other.duration)
-                && Objects.equals(expiresIn, other.expiresIn);
+                && Objects.equals(expiresIn, other.expiresIn)
+                && Objects.equals(tmpStorage, other.tmpStorage);
     }
 
     public static Builder builder() {
@@ -81,6 +93,7 @@ public class ForecastParams {
         private final long createTimeEpochSecs;
         private long durationSecs;
         private long expiresInSecs;
+        private String tmpStorage;
 
         private Builder() {
             forecastId = UUIDs.base64UUID();
@@ -101,8 +114,13 @@ public class ForecastParams {
             return this;
         }
 
+        public Builder tmpStorage(String tmpStorage) {
+            this.tmpStorage = tmpStorage;
+            return this;
+        }
+
         public ForecastParams build() {
-            return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs);
+            return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs, tmpStorage);
         }
     }
 }

+ 3 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java

@@ -164,6 +164,9 @@ public class ControlMsgToProcessWriter {
         if (params.getExpiresIn() != -1) {
             builder.field("expires_in", params.getExpiresIn());
         }
+        if (params.getTmpStorage() != null) {
+            builder.field("tmp_storage", params.getTmpStorage());
+        }
         builder.endObject();
         
         writeMessage(FORECAST_MESSAGE_CODE + Strings.toString(builder));

+ 139 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java

@@ -0,0 +1,139 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ml.job.process;
+
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Assert;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doAnswer;
+
+public class NativeStorageProviderTests extends ESTestCase {
+
+    public void testTmpStorage() throws IOException {
+        Map<Path, Long> storage = new HashMap<>();
+        Path tmpDir = createTempDir();
+
+        storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes());
+        NativeStorageProvider storageProvider = createNativeStorageProvider(storage);
+
+        Assert.assertNotNull(
+                storageProvider.tryGetLocalTmpStorage(randomAlphaOfLengthBetween(4, 10), new ByteSizeValue(100, ByteSizeUnit.BYTES)));
+        Assert.assertNull(storageProvider.tryGetLocalTmpStorage(randomAlphaOfLengthBetween(4, 10),
+                new ByteSizeValue(1024 * 1024 * 1024 + 1, ByteSizeUnit.BYTES)));
+
+        String id = randomAlphaOfLengthBetween(4, 10);
+        Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.GB));
+        Assert.assertNotNull(path);
+
+        Assert.assertEquals(tmpDir.resolve("ml-local-data").resolve("tmp").resolve(id).toString(), path.toString());
+    }
+
+    public void testTmpStorageChooseDisk() throws IOException {
+        Map<Path, Long> storage = new HashMap<>();
+        Path tmpDir = createTempDir();
+
+        // low disk space
+        Path disk1 = tmpDir.resolve(randomAlphaOfLengthBetween(4, 10));
+        storage.put(disk1, new ByteSizeValue(1, ByteSizeUnit.GB).getBytes());
+
+        // sufficient disk space
+        Path disk2 = tmpDir.resolve(randomAlphaOfLengthBetween(4, 10));
+        storage.put(disk2, new ByteSizeValue(20, ByteSizeUnit.GB).getBytes());
+
+        NativeStorageProvider storageProvider = createNativeStorageProvider(storage);
+
+        String id = randomAlphaOfLengthBetween(4, 10);
+        Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.GB));
+        Assert.assertNotNull(path);
+
+        // should resolve to disk2 as disk1 is low on space
+        Assert.assertEquals(disk2.resolve("ml-local-data").resolve("tmp").resolve(id).toString(), path.toString());
+    }
+
+    public void testTmpStorageCleanup() throws IOException {
+        Map<Path, Long> storage = new HashMap<>();
+        Path tmpDir = createTempDir();
+        storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes());
+        NativeStorageProvider storageProvider = createNativeStorageProvider(storage);
+        String id = randomAlphaOfLengthBetween(4, 10);
+
+        Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.KB));
+
+        Assert.assertTrue(Files.exists(path));
+        Path testFile = PathUtils.get(path.toString(), "testFile");
+        BufferedWriter writer = Files.newBufferedWriter(testFile, StandardCharsets.UTF_8);
+        writer.write("created by NativeStorageProviderTests::testTmpStorageDelete");
+
+        writer.close();
+        Assert.assertTrue(Files.exists(testFile));
+        Assert.assertTrue(Files.isRegularFile(testFile));
+
+        // the native component should cleanup itself, but assume it has crashed
+        storageProvider.cleanupLocalTmpStorage(path);
+        Assert.assertFalse(Files.exists(testFile));
+        Assert.assertFalse(Files.exists(path));
+    }
+
+    public void testTmpStorageCleanupOnStart() throws IOException {
+        Map<Path, Long> storage = new HashMap<>();
+        Path tmpDir = createTempDir();
+        storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes());
+        NativeStorageProvider storageProvider = createNativeStorageProvider(storage);
+        String id = randomAlphaOfLengthBetween(4, 10);
+
+        Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.KB));
+
+        Assert.assertTrue(Files.exists(path));
+        Path testFile = PathUtils.get(path.toString(), "testFile");
+
+        BufferedWriter writer = Files.newBufferedWriter(testFile, StandardCharsets.UTF_8);
+        writer.write("created by NativeStorageProviderTests::testTmpStorageWipe");
+
+        writer.close();
+        Assert.assertTrue(Files.exists(testFile));
+        Assert.assertTrue(Files.isRegularFile(testFile));
+
+        // create a new storage provider to test the case of a crashed node
+        storageProvider = createNativeStorageProvider(storage);
+        storageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
+        Assert.assertFalse(Files.exists(testFile));
+        Assert.assertFalse(Files.exists(path));
+    }
+
+    private NativeStorageProvider createNativeStorageProvider(Map<Path, Long> paths) throws IOException {
+        Environment environment = mock(Environment.class);
+
+        when(environment.dataFiles()).thenReturn(paths.keySet().toArray(new Path[paths.size()]));
+        NativeStorageProvider storageProvider = spy(new NativeStorageProvider(environment, new ByteSizeValue(5, ByteSizeUnit.GB)));
+
+        doAnswer(invocation -> {
+            return paths.getOrDefault(invocation.getArguments()[0], Long.valueOf(0)).longValue();
+        }
+
+        ).when(storageProvider).getUsableSpace(any(Path.class));
+
+        return storageProvider;
+    }
+
+}

+ 1 - 0
x-pack/qa/ml-native-tests/build.gradle

@@ -61,6 +61,7 @@ integTestCluster {
   setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
   setting 'xpack.security.audit.enabled', 'true'
   setting 'xpack.license.self_generated.type', 'trial'
+  setting 'xpack.ml.min_disk_space_off_heap', '200mb'
 
   keystoreSetting 'bootstrap.password', 'x-pack-test-password'
   keystoreSetting 'xpack.security.transport.ssl.keystore.secure_password', 'keypass'

+ 34 - 13
x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
@@ -206,8 +207,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
         assertThat(e.getMessage(), equalTo("Cannot run forecast: Forecast cannot be executed as model memory status is not OK"));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/30399")
-    public void testMemoryLimit() throws Exception {
+    public void testOverflowToDisk() throws Exception {
         Detector.Builder detector = new Detector.Builder("mean", "value");
         detector.setByFieldName("clientIP");
 
@@ -216,7 +216,9 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
         analysisConfig.setBucketSpan(bucketSpan);
         DataDescription.Builder dataDescription = new DataDescription.Builder();
         dataDescription.setTimeFormat("epoch");
-        Job.Builder job = new Job.Builder("forecast-it-test-memory-limit");
+        Job.Builder job = new Job.Builder("forecast-it-test-overflow-to-disk");
+        AnalysisLimits limits = new AnalysisLimits(2048L, null);
+        job.setAnalysisLimits(limits);
         job.setAnalysisConfig(analysisConfig);
         job.setDataDescription(dataDescription);
 
@@ -224,28 +226,47 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
         putJob(job);
         openJob(job.getId());
         createDataWithLotsOfClientIps(bucketSpan, job);
-        ElasticsearchException e = expectThrows(ElasticsearchException.class,
-                () -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null));
-        assertThat(e.getMessage(),
-                equalTo("Cannot run forecast: Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB"));
+
+        try {
+            String forecastId = forecast(job.getId(), TimeValue.timeValueHours(1), null);
+
+            waitForecastToFinish(job.getId(), forecastId);
+        } catch (ElasticsearchStatusException e) {
+            if (e.getMessage().contains("disk space")) {
+                throw new ElasticsearchStatusException(
+                        "Test likely fails due to insufficient disk space on test machine, please free up space.", e.status(), e);
+            }
+            throw e;
+        }
+
+        closeJob(job.getId());
+
+        List<ForecastRequestStats> forecastStats = getForecastStats();
+        assertThat(forecastStats.size(), equalTo(1));
+        ForecastRequestStats forecastRequestStats = forecastStats.get(0);
+        List<Forecast> forecasts = getForecasts(job.getId(), forecastRequestStats);
+
+        assertThat(forecastRequestStats.getRecordCount(), equalTo(8000L));
+        assertThat(forecasts.size(), equalTo(8000));
     }
 
     private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
         long now = Instant.now().getEpochSecond();
-        long timestamp = now - 50 * bucketSpan.seconds();
-        while (timestamp < now) {
-            for (int i = 1; i < 256; i++) {
+        long timestamp = now - 15 * bucketSpan.seconds();
+
+        for (int h = 0; h < 15; h++) {
+            for (int i = 1; i < 101; i++) {
                 List<String> data = new ArrayList<>();
-                for (int j = 1; j < 100; j++) {
+                for (int j = 1; j < 81; j++) {
                     Map<String, Object> record = new HashMap<>();
                     record.put("time", timestamp);
-                    record.put("value", 10.0);
+                    record.put("value", 10.0 + h);
                     record.put("clientIP", String.format(Locale.ROOT, "192.168.%d.%d", i, j));
                     data.add(createJsonRecord(record));
                 }
                 postData(job.getId(), data.stream().collect(Collectors.joining()));
-                timestamp += bucketSpan.seconds();
             }
+            timestamp += bucketSpan.seconds();
         }
         flushJob(job.getId(), false);
     }