Browse Source

[ML] Extract common native process base class (#34856)

We currently have two different native processes:
autodetect & normalizer. There are plans for introducing
a new process. All these share many things in common.
This commit refactors the processes to extend an
`AbstractNativeProcess` class that encapsulates those
commonalities with the purpose of reusing the code
for new processes in the future.
Dimitris Athanasiou 7 năm trước cách đây
mục cha
commit
a39a67cd38
46 tập tin đã thay đổi với 560 bổ sung492 xóa
  1. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java
  2. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/process/writer/RecordWriter.java
  3. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java
  4. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java
  5. 4 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  6. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java
  7. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java
  8. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java
  9. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java
  10. 3 72
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java
  11. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
  12. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java
  13. 28 212
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java
  14. 4 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java
  15. 17 11
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java
  16. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java
  17. 2 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java
  18. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java
  19. 37 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java
  20. 14 77
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java
  21. 21 10
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java
  22. 2 26
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java
  23. 2 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java
  24. 265 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java
  25. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java
  26. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java
  27. 85 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java
  28. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java
  29. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java
  30. 14 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/StateProcessor.java
  31. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessage.java
  32. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java
  33. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java
  34. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java
  35. 6 6
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java
  36. 13 12
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java
  37. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java
  38. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java
  39. 3 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java
  40. 1 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java
  41. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java
  42. 4 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java
  43. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java
  44. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandlerTests.java
  45. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageTests.java
  46. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriterTests.java

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java

@@ -16,7 +16,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
+import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/writer/RecordWriter.java → x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/process/writer/RecordWriter.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.core.ml.job.process.autodetect.writer;
+package org.elasticsearch.xpack.core.ml.process.writer;
 
 import java.io.IOException;
 import java.util.List;

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
+import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
 
 import java.util.ArrayList;
 import java.util.Arrays;

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java

@@ -12,7 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
+import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
 
 import java.util.ArrayList;
 import java.util.Arrays;

+ 4 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -168,8 +168,6 @@ import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@@ -180,6 +178,8 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcess
 import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
 import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
 import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
 import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
@@ -386,7 +386,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                     nativeController,
                     client,
                     clusterService);
-                normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
+                normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
             } catch (IOException e) {
                 // This also should not happen in production, as the MachineLearningFeatureSet should have
                 // hit the same error first and brought down the node with a friendlier error message
@@ -396,8 +396,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
             autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
                     new BlackHoleAutodetectProcess(job.getId());
             // factor of 1.0 makes renormalization a no-op
-            normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) ->
-                    new MultiplyingNormalizerProcess(settings, 1.0);
+            normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
         }
         NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
                 threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

@@ -31,8 +31,8 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

@@ -10,8 +10,8 @@ import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import java.io.IOException;

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java

@@ -19,9 +19,9 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
+import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils;
-import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

@@ -264,7 +264,7 @@ public class AutodetectCommunicator implements Closeable {
 
     public void persistJob(BiConsumer<Void, Exception> handler) {
         submitOperation(() -> {
-            autodetectProcess.persistJob();
+            autodetectProcess.persistState();
             return null;
         }, handler);
     }

+ 3 - 72
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java

@@ -10,23 +10,22 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
+import org.elasticsearch.xpack.ml.process.NativeProcess;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.time.ZonedDateTime;
 import java.util.Iterator;
 import java.util.List;
 
 /**
  * Interface representing the native C++ autodetect process
  */
-public interface AutodetectProcess extends Closeable {
+public interface AutodetectProcess extends NativeProcess {
 
     /**
      * Restore state from the given {@link ModelSnapshot}
@@ -35,22 +34,6 @@ public interface AutodetectProcess extends Closeable {
      */
     void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot);
 
-    /**
-     * Is the process ready to receive data?
-     * @return {@code true} if the process is ready to receive data
-     */
-    boolean isReady();
-
-    /**
-     * Write the record to autodetect. The record parameter should not be encoded
-     * (i.e. length encoded) the implementation will appy the corrrect encoding.
-     *
-     * @param record Plain array of strings, implementors of this class should
-     *               encode the record appropriately
-     * @throws IOException If the write failed
-     */
-    void writeRecord(String[] record) throws IOException;
-
     /**
      * Write the reset buckets control message
      *
@@ -115,60 +98,8 @@ public interface AutodetectProcess extends Closeable {
      */
     void forecastJob(ForecastParams params) throws IOException;
 
-    /**
-     * Ask the job to start persisting model state in the background
-     * @throws IOException If writing the request fails
-     */
-    void persistJob() throws IOException;
-
-    /**
-     * Flush the output data stream
-     */
-    void flushStream() throws IOException;
-
-    /**
-     * Kill the process.  Do not wait for it to stop gracefully.
-     */
-    void kill() throws IOException;
-
     /**
      * @return stream of autodetect results.
      */
     Iterator<AutodetectResult> readAutodetectResults();
-
-    /**
-     * The time the process was started
-     * @return Process start time
-     */
-    ZonedDateTime getProcessStartTime();
-
-    /**
-     * Returns true if the process still running.
-     * Methods such as {@link #flushJob(FlushJobParams)} are essentially
-     * asynchronous the command will be continue to execute in the process after
-     * the call has returned. This method tests whether something catastrophic
-     * occurred in the process during its execution.
-     * @return True if the process is still running
-     */
-    boolean isProcessAlive();
-
-    /**
-     * Check whether autodetect terminated given maximum 45ms for termination
-     *
-     * Processing errors are highly likely caused by autodetect being unexpectedly
-     * terminated.
-     *
-     * Workaround: As we can not easily check if autodetect is alive, we rely on
-     * the logPipe being ended. As the loghandler runs in another thread which
-     * might fall behind this one, we give it a grace period of 45ms.
-     *
-     * @return false if process has ended for sure, true if it probably still runs
-     */
-    boolean isProcessAliveAfterWaiting();
-
-    /**
-     * Read any content in the error output buffer.
-     * @return An error message or empty String if no error.
-     */
-    String readError();
 }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -50,7 +50,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
 import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
 import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
-import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
+import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

@@ -96,7 +96,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
     }
 
     @Override
-    public void persistJob() {
+    public void persistState() {
     }
 
     @Override

+ 28 - 212
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

@@ -5,300 +5,116 @@
  */
 package org.elasticsearch.xpack.ml.job.process.autodetect;
 
+import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.xpack.core.ml.MachineLearningField;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
-import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
-import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
-import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
-import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.Duration;
-import java.time.ZonedDateTime;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * Autodetect process using native code.
  */
-class NativeAutodetectProcess implements AutodetectProcess {
-    private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class);
+class NativeAutodetectProcess extends AbstractNativeProcess implements AutodetectProcess {
 
-    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
+    private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcess.class);
+
+    private static final String NAME = "autodetect";
 
-    private final String jobId;
-    private final CppLogMessageHandler cppLogHandler;
-    private final OutputStream processInStream;
-    private final InputStream processOutStream;
-    private final OutputStream processRestoreStream;
-    private final LengthEncodedWriter recordWriter;
-    private final ZonedDateTime startTime;
-    private final int numberOfFields;
-    private final List<Path> filesToDelete;
-    private final Runnable onProcessCrash;
-    private volatile Future<?> logTailFuture;
-    private volatile Future<?> stateProcessorFuture;
-    private volatile boolean processCloseInitiated;
-    private volatile boolean processKilled;
-    private volatile boolean isReady;
     private final AutodetectResultsParser resultsParser;
 
     NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
                             OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
                             AutodetectResultsParser resultsParser, Runnable onProcessCrash) {
-        this.jobId = jobId;
-        cppLogHandler = new CppLogMessageHandler(jobId, logStream);
-        this.processInStream = new BufferedOutputStream(processInStream);
-        this.processOutStream = processOutStream;
-        this.processRestoreStream = processRestoreStream;
-        this.recordWriter = new LengthEncodedWriter(this.processInStream);
-        startTime = ZonedDateTime.now();
-        this.numberOfFields = numberOfFields;
-        this.filesToDelete = filesToDelete;
+        super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
         this.resultsParser = resultsParser;
-        this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
     }
 
-    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
-        logTailFuture = executorService.submit(() -> {
-            try (CppLogMessageHandler h = cppLogHandler) {
-                h.tailStream();
-            } catch (IOException e) {
-                if (processKilled == false) {
-                    LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", jobId), e);
-                }
-            } finally {
-                if (processCloseInitiated == false && processKilled == false) {
-                    // The log message doesn't say "crashed", as the process could have been killed
-                    // by a user or other process (e.g. the Linux OOM killer)
-
-                    String errors = cppLogHandler.getErrors();
-                    LOGGER.error("[{}] autodetect process stopped unexpectedly: {}", jobId, errors);
-                    onProcessCrash.run();
-                }
-            }
-        });
-        stateProcessorFuture = executorService.submit(() -> {
-            try (InputStream in = persistStream) {
-                stateProcessor.process(jobId, in);
-                if (processKilled == false) {
-                    LOGGER.info("[{}] State output finished", jobId);
-                }
-            } catch (IOException e) {
-                if (processKilled == false) {
-                    LOGGER.error(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e);
-                }
-            }
-        });
+    @Override
+    public String getName() {
+        return NAME;
     }
 
     @Override
     public void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot) {
         if (modelSnapshot != null) {
-            try (OutputStream r = processRestoreStream) {
-                stateStreamer.restoreStateToStream(jobId, modelSnapshot, r);
+            try (OutputStream r = processRestoreStream()) {
+                stateStreamer.restoreStateToStream(jobId(), modelSnapshot, r);
             } catch (Exception e) {
                 // TODO: should we fail to start?
-                if (processKilled == false) {
-                    LOGGER.error("Error restoring model state for job " + jobId, e);
+                if (isProcessKilled() == false) {
+                    LOGGER.error("Error restoring model state for job " + jobId(), e);
                 }
             }
         }
-        isReady = true;
-    }
-
-    @Override
-    public boolean isReady() {
-        return isReady;
-    }
-
-    @Override
-    public void writeRecord(String[] record) throws IOException {
-        recordWriter.writeRecord(record);
+        setReady();
     }
 
     @Override
     public void writeResetBucketsControlMessage(DataLoadParams params) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeResetBucketsMessage(params);
+        newMessageWriter().writeResetBucketsMessage(params);
     }
 
     @Override
     public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeUpdateModelPlotMessage(modelPlotConfig);
+        newMessageWriter().writeUpdateModelPlotMessage(modelPlotConfig);
     }
 
     @Override
     public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
+        newMessageWriter().writeUpdateDetectorRulesMessage(detectorIndex, rules);
     }
 
     @Override
     public void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeUpdateFiltersMessage(filters);
+        newMessageWriter().writeUpdateFiltersMessage(filters);
     }
 
     @Override
     public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeUpdateScheduledEventsMessage(events, bucketSpan);
+        newMessageWriter().writeUpdateScheduledEventsMessage(events, bucketSpan);
     }
 
     @Override
     public String flushJob(FlushJobParams params) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
+        ControlMsgToProcessWriter writer = newMessageWriter();
         writer.writeFlushControlMessage(params);
         return writer.writeFlushMessage();
     }
 
     @Override
     public void forecastJob(ForecastParams params) throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeForecastMessage(params);
-    }
-
-    @Override
-    public void persistJob() throws IOException {
-        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields);
-        writer.writeStartBackgroundPersistMessage();
-    }
-
-    @Override
-    public void flushStream() throws IOException {
-        recordWriter.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            processCloseInitiated = true;
-            // closing its input causes the process to exit
-            processInStream.close();
-            // wait for the process to exit by waiting for end-of-file on the named pipe connected
-            // to the state processor - it may take a long time for all the model state to be
-            // indexed
-            if (stateProcessorFuture != null) {
-                stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
-            }
-            // the log processor should have stopped by now too - assume processing the logs will
-            // take no more than 5 seconds longer than processing the state (usually it should
-            // finish first)
-            if (logTailFuture != null) {
-                logTailFuture.get(5, TimeUnit.SECONDS);
-            }
-
-            if (cppLogHandler.seenFatalError()) {
-                throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
-            }
-            LOGGER.debug("[{}] Autodetect process exited", jobId);
-        } catch (ExecutionException | TimeoutException e) {
-            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process", jobId), e);
-        } catch (InterruptedException e) {
-            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process", jobId), e);
-            Thread.currentThread().interrupt();
-        } finally {
-            deleteAssociatedFiles();
-        }
+        newMessageWriter().writeForecastMessage(params);
     }
 
     @Override
-    public void kill() throws IOException {
-        processKilled = true;
-        try {
-            // The PID comes via the processes log stream.  We don't wait for it to arrive here,
-            // but if the wait times out it implies the process has only just started, in which
-            // case it should die very quickly when we close its input stream.
-            NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
-
-            // Wait for the process to die before closing processInStream as if the process
-            // is still alive when processInStream is closed autodetect will start persisting state
-            cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
-        } catch (TimeoutException e) {
-            LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", jobId);
-        } finally {
-            try {
-                processInStream.close();
-            } catch (IOException e) {
-                // Ignore it - we're shutting down and the method itself has logged a warning
-            }
-            try {
-                deleteAssociatedFiles();
-            } catch (IOException e) {
-                // Ignore it - we're shutting down and the method itself has logged a warning
-            }
-        }
-    }
-
-    private synchronized void deleteAssociatedFiles() throws IOException {
-        if (filesToDelete == null) {
-            return;
-        }
-
-        for (Path fileToDelete : filesToDelete) {
-            if (Files.deleteIfExists(fileToDelete)) {
-                LOGGER.debug("[{}] Deleted file {}", jobId, fileToDelete.toString());
-            } else {
-                LOGGER.warn("[{}] Failed to delete file {}", jobId, fileToDelete.toString());
-            }
-        }
-
-        filesToDelete.clear();
+    public void persistState() throws IOException {
+        newMessageWriter().writeStartBackgroundPersistMessage();
     }
 
     @Override
     public Iterator<AutodetectResult> readAutodetectResults() {
-        return resultsParser.parseResults(processOutStream);
+        return resultsParser.parseResults(processOutStream());
     }
 
-    @Override
-    public ZonedDateTime getProcessStartTime() {
-        return startTime;
-    }
-
-    @Override
-    public boolean isProcessAlive() {
-        // Sanity check: make sure the process hasn't terminated already
-        return !cppLogHandler.hasLogStreamEnded();
-    }
-
-    @Override
-    public boolean isProcessAliveAfterWaiting() {
-        cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45));
-        return isProcessAlive();
-    }
-
-    @Override
-    public String readError() {
-        return cppLogHandler.getErrors();
+    private ControlMsgToProcessWriter newMessageWriter() {
+        return new ControlMsgToProcessWriter(recordWriter(), numberOfFields());
     }
 }

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -16,10 +16,10 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
-import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
+import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
 
@@ -67,7 +67,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
         // The extra 1 is the control field
         int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1;
 
-        StateProcessor stateProcessor = new StateProcessor(settings, client);
+        AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId());
         AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings);
         NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
                 job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),

+ 17 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java

@@ -5,17 +5,18 @@
  */
 package org.elasticsearch.xpack.ml.job.process.autodetect.output;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
-import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
+import org.elasticsearch.xpack.ml.process.StateProcessor;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,17 +29,22 @@ import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
 /**
  * Reads the autodetect state and persists via a bulk request
  */
-public class StateProcessor extends AbstractComponent {
+public class AutodetectStateProcessor implements StateProcessor {
+
+    private static final Logger LOGGER = LogManager.getLogger(AutodetectStateProcessor.class);
 
     private static final int READ_BUF_SIZE = 8192;
+
     private final Client client;
+    private final String jobId;
 
-    public StateProcessor(Settings settings, Client client) {
-        super(settings);
+    public AutodetectStateProcessor(Client client, String jobId) {
         this.client = client;
+        this.jobId = jobId;
     }
 
-    public void process(String jobId, InputStream in) throws IOException {
+    @Override
+    public void process(InputStream in) throws IOException {
         BytesReference bytesToDate = null;
         List<BytesReference> newBlocks = new ArrayList<>();
         byte[] readBuf = new byte[READ_BUF_SIZE];
@@ -56,7 +62,7 @@ public class StateProcessor extends AbstractComponent {
             } else {
                 BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0]));
                 bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes);
-                bytesToDate = splitAndPersist(jobId, bytesToDate, searchFrom);
+                bytesToDate = splitAndPersist(bytesToDate, searchFrom);
                 searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length();
                 newBlocks.clear();
             }
@@ -69,7 +75,7 @@ public class StateProcessor extends AbstractComponent {
      * data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON
      * (as would be uploaded to the public REST API) separated by zero bytes ('\0').
      */
-    private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) throws IOException {
+    private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom) throws IOException {
         int splitFrom = 0;
         while (true) {
             int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom);
@@ -80,7 +86,7 @@ public class StateProcessor extends AbstractComponent {
             // Ignore completely empty chunks
             if (nextZeroByte > splitFrom) {
                 // No validation - assume the native process has formatted the state correctly
-                persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
+                persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
             }
             splitFrom = nextZeroByte + 1;
         }
@@ -90,11 +96,11 @@ public class StateProcessor extends AbstractComponent {
         return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
     }
 
-    void persist(String jobId, BytesReference bytes) throws IOException {
+    void persist(BytesReference bytes) throws IOException {
         BulkRequest bulkRequest = new BulkRequest();
         bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON);
         if (bulkRequest.numberOfActions() > 0) {
-            logger.trace("[{}] Persisting job state document", jobId);
+            LOGGER.trace("[{}] Persisting job state document", jobId);
             try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
                 client.bulk(bulkRequest).actionGet();
             }

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java

@@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
+import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
 import org.supercsv.encoder.CsvEncoder;
 import org.supercsv.encoder.DefaultCsvEncoder;
 import org.supercsv.prefs.CsvPreference;

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
+import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -168,7 +169,7 @@ public class ControlMsgToProcessWriter {
             builder.field("tmp_storage", params.getTmpStorage());
         }
         builder.endObject();
-        
+
         writeMessage(FORECAST_MESSAGE_CODE + Strings.toString(builder));
         fillCommandBuffer();
         lengthEncodedWriter.flush();

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java

@@ -5,7 +5,7 @@
  */
 package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
 
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
+import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
 import org.supercsv.io.CsvListWriter;
 import org.supercsv.prefs.CsvPreference;
 

+ 37 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java

@@ -5,9 +5,8 @@
  */
 package org.elasticsearch.xpack.ml.job.process.normalizer;
 
+import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
@@ -17,6 +16,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResult
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.time.ZonedDateTime;
 
 /**
  * Normalizer process that doesn't use native code.
@@ -27,16 +27,15 @@ import java.io.PipedOutputStream;
  * - It can be used to produce results in testing that do not vary based on changes to the real normalization algorithms
  */
 public class MultiplyingNormalizerProcess implements NormalizerProcess {
-    private static final Logger LOGGER = Loggers.getLogger(MultiplyingNormalizerProcess.class);
 
-    private final Settings settings;
+    private static final Logger LOGGER = LogManager.getLogger(MultiplyingNormalizerProcess.class);
+
     private final double factor;
     private final PipedInputStream processOutStream;
     private XContentBuilder builder;
     private boolean shouldIgnoreHeader;
 
-    public MultiplyingNormalizerProcess(Settings settings, double factor) {
-        this.settings = settings;
+    public MultiplyingNormalizerProcess(double factor) {
         this.factor = factor;
         processOutStream = new PipedInputStream();
         try {
@@ -49,6 +48,11 @@ public class MultiplyingNormalizerProcess implements NormalizerProcess {
         shouldIgnoreHeader = true;
     }
 
+    @Override
+    public boolean isReady() {
+        return true;
+    }
+
     @Override
     public void writeRecord(String[] record) throws IOException {
         if (shouldIgnoreHeader) {
@@ -77,13 +81,33 @@ public class MultiplyingNormalizerProcess implements NormalizerProcess {
     }
 
     @Override
-    public void close() throws IOException {
+    public void persistState() {
+        // Nothing to do
+    }
+
+    @Override
+    public void flushStream() {
+        // Nothing to do
+    }
+
+    @Override
+    public void kill() {
+        // Nothing to do
+    }
+
+    @Override
+    public ZonedDateTime getProcessStartTime() {
+        return null;
+    }
+
+    @Override
+    public void close() {
         builder.close();
     }
 
     @Override
     public NormalizerResultHandler createNormalizedResultsHandler() {
-        return new NormalizerResultHandler(settings, processOutStream);
+        return new NormalizerResultHandler(processOutStream);
     }
 
     @Override
@@ -92,6 +116,11 @@ public class MultiplyingNormalizerProcess implements NormalizerProcess {
         return true;
     }
 
+    @Override
+    public boolean isProcessAliveAfterWaiting() {
+        return true;
+    }
+
     @Override
     public String readError() {
         return "";

+ 14 - 77
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

@@ -5,104 +5,41 @@
  */
 package org.elasticsearch.xpack.ml.job.process.normalizer;
 
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
-import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
 import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.Collections;
 
 /**
  * Normalizer process using native code.
  */
-class NativeNormalizerProcess implements NormalizerProcess {
-    private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcess.class);
+class NativeNormalizerProcess extends AbstractNativeProcess implements NormalizerProcess {
 
-    private final String jobId;
-    private final Settings settings;
-    private final CppLogMessageHandler cppLogHandler;
-    private final OutputStream processInStream;
-    private final InputStream processOutStream;
-    private final LengthEncodedWriter recordWriter;
-    private volatile boolean processCloseInitiated;
-    private Future<?> logTailThread;
+    private static final String NAME = "normalizer";
 
-    NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream,
-                            InputStream processOutStream, ExecutorService executorService) throws EsRejectedExecutionException {
-        this.jobId = jobId;
-        this.settings = settings;
-        cppLogHandler = new CppLogMessageHandler(jobId, logStream);
-        this.processInStream = new BufferedOutputStream(processInStream);
-        this.processOutStream = processOutStream;
-        this.recordWriter = new LengthEncodedWriter(this.processInStream);
-        logTailThread = executorService.submit(() -> {
-            try (CppLogMessageHandler h = cppLogHandler) {
-                h.tailStream();
-            } catch (IOException e) {
-                LOGGER.error(new ParameterizedMessage("[{}] Error tailing normalizer process logs",
-                        new Object[] { jobId }), e);
-            } finally {
-                if (processCloseInitiated == false) {
-                    // The log message doesn't say "crashed", as the process could have been killed
-                    // by a user or other process (e.g. the Linux OOM killer)
-                    LOGGER.error("[{}] normalizer process stopped unexpectedly", jobId);
-                }
-            }
-        });
+    NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) {
+        super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), () -> {});
     }
 
     @Override
-    public void writeRecord(String[] record) throws IOException {
-        recordWriter.writeRecord(record);
+    public String getName() {
+        return NAME;
     }
 
     @Override
-    public void close() throws IOException {
-        try {
-            processCloseInitiated = true;
-            // closing its input causes the process to exit
-            processInStream.close();
-            // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger
-            // this may take a long time as it persists the model state
-            logTailThread.get(5, TimeUnit.MINUTES);
-            if (cppLogHandler.seenFatalError()) {
-                throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
-            }
-            LOGGER.debug("[{}] Normalizer process exited", jobId);
-        } catch (ExecutionException | TimeoutException e) {
-            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running normalizer process", new Object[] { jobId }), e);
-        } catch (InterruptedException e) {
-            LOGGER.warn("[{}] Exception closing the running normalizer process", jobId);
-            Thread.currentThread().interrupt();
-        }
+    public boolean isReady() {
+        return true;
     }
 
     @Override
-    public NormalizerResultHandler createNormalizedResultsHandler() {
-        return new NormalizerResultHandler(settings, processOutStream);
+    public void persistState() {
+        // nothing to persist
     }
 
     @Override
-    public boolean isProcessAlive() {
-        // Sanity check: make sure the process hasn't terminated already
-        return !cppLogHandler.hasLogStreamEnded();
-    }
-
-    @Override
-    public String readError() {
-        return cppLogHandler.getErrors();
+    public NormalizerResultHandler createNormalizedResultsHandler() {
+        return new NormalizerResultHandler(processOutStream());
     }
 }

+ 21 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -5,13 +5,14 @@
  */
 package org.elasticsearch.xpack.ml.job.process.normalizer;
 
+import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
 
 import java.io.IOException;
@@ -22,17 +23,15 @@ import java.util.concurrent.ExecutorService;
 
 public class NativeNormalizerProcessFactory implements NormalizerProcessFactory {
 
-    private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcessFactory.class);
+    private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
     private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
     private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
 
     private final Environment env;
-    private final Settings settings;
     private final NativeController nativeController;
 
-    public NativeNormalizerProcessFactory(Environment env, Settings settings, NativeController nativeController) {
+    public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
         this.env = Objects.requireNonNull(env);
-        this.settings = Objects.requireNonNull(settings);
         this.nativeController = Objects.requireNonNull(nativeController);
     }
 
@@ -43,8 +42,20 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
                 true, false, true, true, false, false);
         createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
 
-        return new NativeNormalizerProcess(jobId, settings, processPipes.getLogStream().get(),
-                processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), executorService);
+        NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
+                processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
+
+        try {
+            normalizerProcess.start(executorService);
+            return normalizerProcess;
+        } catch (EsRejectedExecutionException e) {
+            try {
+                IOUtils.close(normalizerProcess);
+            } catch (IOException ioe) {
+                LOGGER.error("Can't close normalizer", ioe);
+            }
+            throw e;
+        }
     }
 
     private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan) {

+ 2 - 26
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java

@@ -6,40 +6,16 @@
 package org.elasticsearch.xpack.ml.job.process.normalizer;
 
 import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler;
-
-import java.io.Closeable;
-import java.io.IOException;
+import org.elasticsearch.xpack.ml.process.NativeProcess;
 
 /**
  * Interface representing the native C++ normalizer process
  */
-public interface NormalizerProcess extends Closeable {
-
-    /**
-     * Write the record to normalizer. The record parameter should not be encoded
-     * (i.e. length encoded) the implementation will appy the corrrect encoding.
-     *
-     * @param record Plain array of strings, implementors of this class should
-     *               encode the record appropriately
-     * @throws IOException If the write failed
-     */
-    void writeRecord(String[] record) throws IOException;
+public interface NormalizerProcess extends NativeProcess {
 
     /**
      * Create a result handler for this process's results.
      * @return results handler
      */
     NormalizerResultHandler createNormalizedResultsHandler();
-
-    /**
-     * Returns true if the process still running.
-     * @return True if the process is still running
-     */
-    boolean isProcessAlive();
-
-    /**
-     * Read any content in the error output buffer.
-     * @return An error message or empty String if no error.
-     */
-    String readError();
 }

+ 2 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java

@@ -8,8 +8,6 @@ package org.elasticsearch.xpack.ml.job.process.normalizer.output;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
-import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContent;
@@ -26,15 +24,14 @@ import java.util.List;
 /**
  * Reads normalizer output.
  */
-public class NormalizerResultHandler extends AbstractComponent {
+public class NormalizerResultHandler {
 
     private static final int READ_BUF_SIZE = 1024;
 
     private final InputStream inputStream;
     private final List<NormalizerResult> normalizedResults;
 
-    public NormalizerResultHandler(Settings settings, InputStream inputStream) {
-        super(settings);
+    public NormalizerResultHandler(InputStream inputStream) {
         this.inputStream = inputStream;
         normalizedResults = new ArrayList<>();
     }

+ 265 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

@@ -0,0 +1,265 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.xpack.core.ml.MachineLearningField;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
+import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Abstract class for implementing a native process.
+ */
+public abstract class AbstractNativeProcess implements NativeProcess {
+
+    private static final Logger LOGGER = LogManager.getLogger(AbstractNativeProcess.class);
+
+    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
+
+    private final String jobId;
+    private final CppLogMessageHandler cppLogHandler;
+    private final OutputStream processInStream;
+    private final InputStream processOutStream;
+    private final OutputStream processRestoreStream;
+    private final LengthEncodedWriter recordWriter;
+    private final ZonedDateTime startTime;
+    private final int numberOfFields;
+    private final List<Path> filesToDelete;
+    private final Runnable onProcessCrash;
+    private volatile Future<?> logTailFuture;
+    private volatile Future<?> stateProcessorFuture;
+    private volatile boolean processCloseInitiated;
+    private volatile boolean processKilled;
+    private volatile boolean isReady;
+
+    protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
+                                    OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
+                                    Runnable onProcessCrash) {
+        this.jobId = jobId;
+        cppLogHandler = new CppLogMessageHandler(jobId, logStream);
+        this.processInStream = new BufferedOutputStream(processInStream);
+        this.processOutStream = processOutStream;
+        this.processRestoreStream = processRestoreStream;
+        this.recordWriter = new LengthEncodedWriter(this.processInStream);
+        startTime = ZonedDateTime.now();
+        this.numberOfFields = numberOfFields;
+        this.filesToDelete = filesToDelete;
+        this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
+    }
+
+    public abstract String getName();
+
+    /**
+     * Starts a process that does not persist any state
+     * @param executorService the executor service to run on
+     */
+    public void start(ExecutorService executorService) {
+        logTailFuture = executorService.submit(() -> {
+            try (CppLogMessageHandler h = cppLogHandler) {
+                h.tailStream();
+            } catch (IOException e) {
+                if (processKilled == false) {
+                    LOGGER.error(new ParameterizedMessage("[{}] Error tailing {} process logs", jobId, getName()), e);
+                }
+            } finally {
+                if (processCloseInitiated == false && processKilled == false) {
+                    // The log message doesn't say "crashed", as the process could have been killed
+                    // by a user or other process (e.g. the Linux OOM killer)
+
+                    String errors = cppLogHandler.getErrors();
+                    LOGGER.error("[{}] {} process stopped unexpectedly: {}", jobId, getName(), errors);
+                    onProcessCrash.run();
+                }
+            }
+        });
+    }
+
+    /**
+     * Starts a process that may persist its state
+     * @param executorService the executor service to run on
+     * @param stateProcessor the state processor
+     * @param persistStream the stream where the state is persisted
+     */
+    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
+        start(executorService);
+
+        stateProcessorFuture = executorService.submit(() -> {
+            try (InputStream in = persistStream) {
+                stateProcessor.process(in);
+                if (processKilled == false) {
+                    LOGGER.info("[{}] State output finished", jobId);
+                }
+            } catch (IOException e) {
+                if (processKilled == false) {
+                    LOGGER.error(new ParameterizedMessage("[{}] Error reading {} state output", jobId, getName()), e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public boolean isReady() {
+        return isReady;
+    }
+
+    protected void setReady() {
+        isReady = true;
+    }
+
+    @Override
+    public void writeRecord(String[] record) throws IOException {
+        recordWriter.writeRecord(record);
+    }
+
+    @Override
+    public void flushStream() throws IOException {
+        recordWriter.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            processCloseInitiated = true;
+            // closing its input causes the process to exit
+            processInStream.close();
+            // wait for the process to exit by waiting for end-of-file on the named pipe connected
+            // to the state processor - it may take a long time for all the model state to be
+            // indexed
+            if (stateProcessorFuture != null) {
+                stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
+            }
+            // the log processor should have stopped by now too - assume processing the logs will
+            // take no more than 5 seconds longer than processing the state (usually it should
+            // finish first)
+            if (logTailFuture != null) {
+                logTailFuture.get(5, TimeUnit.SECONDS);
+            }
+
+            if (cppLogHandler.seenFatalError()) {
+                throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
+            }
+            LOGGER.debug("[{}] {} process exited", jobId, getName());
+        } catch (ExecutionException | TimeoutException e) {
+            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", jobId, getName()), e);
+        } catch (InterruptedException e) {
+            LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", jobId, getName()), e);
+            Thread.currentThread().interrupt();
+        } finally {
+            deleteAssociatedFiles();
+        }
+    }
+
+    @Override
+    public void kill() throws IOException {
+        processKilled = true;
+        try {
+            // The PID comes via the processes log stream.  We don't wait for it to arrive here,
+            // but if the wait times out it implies the process has only just started, in which
+            // case it should die very quickly when we close its input stream.
+            NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
+
+            // Wait for the process to die before closing processInStream as if the process
+            // is still alive when processInStream is closed it may start persisting state
+            cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
+        } catch (TimeoutException e) {
+            LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName());
+        } finally {
+            try {
+                processInStream.close();
+            } catch (IOException e) {
+                // Ignore it - we're shutting down and the method itself has logged a warning
+            }
+            try {
+                deleteAssociatedFiles();
+            } catch (IOException e) {
+                // Ignore it - we're shutting down and the method itself has logged a warning
+            }
+        }
+    }
+
+    private synchronized void deleteAssociatedFiles() throws IOException {
+        if (filesToDelete == null) {
+            return;
+        }
+
+        for (Path fileToDelete : filesToDelete) {
+            if (Files.deleteIfExists(fileToDelete)) {
+                LOGGER.debug("[{}] Deleted file {}", jobId, fileToDelete.toString());
+            } else {
+                LOGGER.warn("[{}] Failed to delete file {}", jobId, fileToDelete.toString());
+            }
+        }
+
+        filesToDelete.clear();
+    }
+
+    @Override
+    public ZonedDateTime getProcessStartTime() {
+        return startTime;
+    }
+
+    @Override
+    public boolean isProcessAlive() {
+        // Sanity check: make sure the process hasn't terminated already
+        return !cppLogHandler.hasLogStreamEnded();
+    }
+
+    @Override
+    public boolean isProcessAliveAfterWaiting() {
+        cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45));
+        return isProcessAlive();
+    }
+
+    @Override
+    public String readError() {
+        return cppLogHandler.getErrors();
+    }
+
+    protected String jobId() {
+        return jobId;
+    }
+
+    protected InputStream processOutStream() {
+        return processOutStream;
+    }
+
+    @Nullable
+    protected OutputStream processRestoreStream() {
+        return processRestoreStream;
+    }
+
+    protected int numberOfFields() {
+        return numberOfFields;
+    }
+
+    protected LengthEncodedWriter recordWriter() {
+        return recordWriter;
+    }
+
+    protected boolean isProcessKilled() {
+        return processKilled;
+    }
+}

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

@@ -3,13 +3,13 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.env.Environment;
-import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
+import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
 
 import java.io.BufferedOutputStream;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;

+ 85 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java

@@ -0,0 +1,85 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+/**
+ * Interface representing a native C++ process
+ */
+public interface NativeProcess extends Closeable {
+
+    /**
+     * Is the process ready to receive data?
+     * @return {@code true} if the process is ready to receive data
+     */
+    boolean isReady();
+
+    /**
+     * Write the record to the process. The record parameter should not be encoded
+     * (i.e. length encoded) the implementation will apply the correct encoding.
+     *
+     * @param record Plain array of strings, implementors of this class should
+     *               encode the record appropriately
+     * @throws IOException If the write failed
+     */
+    void writeRecord(String[] record) throws IOException;
+
+    /**
+     * Ask the process to persist its state in the background
+     * @throws IOException If writing the request fails
+     */
+    void persistState() throws IOException;
+
+    /**
+     * Flush the output data stream
+     */
+    void flushStream() throws IOException;
+
+    /**
+     * Kill the process.  Do not wait for it to stop gracefully.
+     */
+    void kill() throws IOException;
+
+    /**
+     * The time the process was started
+     * @return Process start time
+     */
+    ZonedDateTime getProcessStartTime();
+
+    /**
+     * Returns true if the process still running.
+     * Methods instructing the process are essentially
+     * asynchronous; the command will be continue to execute in the process after
+     * the call has returned.
+     * This method tests whether something catastrophic
+     * occurred in the process during its execution.
+     * @return True if the process is still running
+     */
+    boolean isProcessAlive();
+
+    /**
+     * Check whether the process terminated given a grace period.
+     *
+     * Processing errors are highly likely caused by the process being unexpectedly
+     * terminated.
+     *
+     * Workaround: As we can not easily check if the process is alive, we rely on
+     * the logPipe being ended. As the loghandler runs in another thread which
+     * might fall behind this one, we give it a grace period.
+     *
+     * @return false if process has ended for sure, true if it probably still runs
+     */
+    boolean isProcessAliveAfterWaiting();
+
+    /**
+     * Read any content in the error output buffer.
+     * @return An error message or empty String if no error.
+     */
+    String readError();
+}

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java

@@ -4,7 +4,7 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.logging.Loggers;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessPipes.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.env.Environment;

+ 14 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/StateProcessor.java

@@ -0,0 +1,14 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface StateProcessor {
+
+    void process(InputStream in) throws IOException;
+}

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessage.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.logging;
+package org.elasticsearch.xpack.ml.process.logging;
 
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.logging;
+package org.elasticsearch.xpack.ml.process.logging;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriter.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java

@@ -3,9 +3,9 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
+package org.elasticsearch.xpack.ml.process.writer;
 
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
+import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java

@@ -15,8 +15,8 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
-import org.elasticsearch.xpack.ml.job.process.NativeController;
-import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.junit.Before;
 
 import java.nio.file.Path;

+ 6 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

@@ -9,7 +9,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
 import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
-import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
+import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@@ -56,7 +56,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
                 mock(OutputStream.class), mock(InputStream.class), mock(OutputStream.class),
                 NUMBER_FIELDS, null,
                 new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
-            process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
+            process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
 
             ZonedDateTime startTime = process.getProcessStartTime();
             Thread.sleep(500);
@@ -76,7 +76,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
                 bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
-            process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
+            process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
 
             process.writeRecord(record);
             process.flushStream();
@@ -108,7 +108,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
                 bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
-            process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
+            process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
 
             FlushJobParams params = FlushJobParams.builder().build();
             process.flushJob(params);
@@ -128,7 +128,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
     }
 
     public void testPersistJob() throws IOException {
-        testWriteMessage(p -> p.persistJob(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
+        testWriteMessage(p -> p.persistState(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
     }
 
     public void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
@@ -138,7 +138,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
                 bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
-            process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
+            process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
 
             writeFunction.accept(process);
             process.writeUpdateModelPlotMessage(new ModelPlotConfig());

+ 13 - 12
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java

@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -37,7 +36,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for reading state from the native process.
  */
-public class StateProcessorTests extends ESTestCase {
+public class AutodetectStateProcessorTests extends ESTestCase {
 
     private static final String STATE_SAMPLE = ""
             + "{\"index\": {\"_index\": \"test\", \"_type\": \"type1\", \"_id\": \"1\"}}\n"
@@ -50,18 +49,20 @@ public class StateProcessorTests extends ESTestCase {
             + "{ \"field\" : \"value3\" }\n"
             + "\0";
 
+    private static final String JOB_ID = "state-processor-test-job";
+
     private static final int NUM_LARGE_DOCS = 2;
     private static final int LARGE_DOC_SIZE = 1000000;
 
     private Client client;
-    private StateProcessor stateProcessor;
+    private AutodetectStateProcessor stateProcessor;
 
     @Before
     public void initialize() throws IOException {
         client = mock(Client.class);
         @SuppressWarnings("unchecked")
         ActionFuture<BulkResponse> bulkResponseFuture = mock(ActionFuture.class);
-        stateProcessor = spy(new StateProcessor(Settings.EMPTY, client));
+        stateProcessor = spy(new AutodetectStateProcessor(client, JOB_ID));
         when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
         ThreadPool threadPool = mock(ThreadPool.class);
         when(client.threadPool()).thenReturn(threadPool);
@@ -75,9 +76,9 @@ public class StateProcessorTests extends ESTestCase {
 
     public void testStateRead() throws IOException {
         ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
-        stateProcessor.process("_id", stream);
+        stateProcessor.process(stream);
         ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
-        verify(stateProcessor, times(3)).persist(eq("_id"), bytesRefCaptor.capture());
+        verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture());
 
         String[] threeStates = STATE_SAMPLE.split("\0");
         List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
@@ -92,9 +93,9 @@ public class StateProcessorTests extends ESTestCase {
         String zeroBytes = "\0\0\0\0\0\0";
         ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
 
-        stateProcessor.process("_id", stream);
+        stateProcessor.process(stream);
 
-        verify(stateProcessor, never()).persist(eq("_id"), any());
+        verify(stateProcessor, never()).persist(any());
         Mockito.verifyNoMoreInteractions(client);
     }
 
@@ -102,9 +103,9 @@ public class StateProcessorTests extends ESTestCase {
         String zeroBytes = "        \n\0";
         ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
 
-        stateProcessor.process("_id", stream);
+        stateProcessor.process(stream);
 
-        verify(stateProcessor, times(1)).persist(eq("_id"), any());
+        verify(stateProcessor, times(1)).persist(any());
         Mockito.verifyNoMoreInteractions(client);
     }
 
@@ -125,8 +126,8 @@ public class StateProcessorTests extends ESTestCase {
         }
 
         ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
-        stateProcessor.process("_id", stream);
-        verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq("_id"), any());
+        stateProcessor.process(stream);
+        verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
         verify(client, times(NUM_LARGE_DOCS)).bulk(any(BulkRequest.class));
         verify(client, times(NUM_LARGE_DOCS)).threadPool();
     }

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AbstractDataToProcessWriter.InputOutputMap;
+import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
 import org.junit.Before;
 import org.mockito.Mockito;
 

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
+import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;

+ 3 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java

@@ -5,7 +5,6 @@
  */
 package org.elasticsearch.xpack.ml.job.process.normalizer;
 
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
@@ -32,7 +31,7 @@ public class NormalizerTests extends ESTestCase {
     private static final double INITIAL_SCORE = 3.0;
     private static final double FACTOR = 2.0;
 
-    private Bucket generateBucket(Date timestamp) throws IOException {
+    private Bucket generateBucket(Date timestamp) {
         return new Bucket(JOB_ID, timestamp, BUCKET_SPAN);
     }
 
@@ -49,8 +48,8 @@ public class NormalizerTests extends ESTestCase {
         ExecutorService threadpool = Executors.newScheduledThreadPool(1);
         try {
             NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
-            when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN),
-                    any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
+            when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), any()))
+                    .thenReturn(new MultiplyingNormalizerProcess(FACTOR));
             Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
 
             Bucket bucket = generateBucket(new Date(0));

+ 1 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java

@@ -5,7 +5,6 @@
  */
 package org.elasticsearch.xpack.ml.job.process.normalizer.output;
 
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerResult;
 
@@ -32,7 +31,7 @@ public class NormalizerResultHandlerTests extends ESTestCase {
                 + "\"value_field_name\":\"x\",\"probability\":0.03,\"normalized_score\":22.22}\n";
 
         InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
-        NormalizerResultHandler handler = new NormalizerResultHandler(Settings.EMPTY, is);
+        NormalizerResultHandler handler = new NormalizerResultHandler(is);
         handler.process();
         List<NormalizerResult> results = handler.getNormalizedResults();
         assertEquals(3, results.size());

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.settings.Settings;

+ 4 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java

@@ -4,7 +4,7 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -21,12 +21,11 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.any;
-
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class NativeStorageProviderTests extends ESTestCase {
 

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessPipesTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process;
+package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.Environment;

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandlerTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.logging;
+package org.elasticsearch.xpack.ml.process.logging;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
@@ -203,7 +203,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
         }
     }
 
-    private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId) 
+    private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId)
             throws IOException {
         Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class);
         Loggers.addAppender(cppMessageLogger, mockAppender);

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.logging;
+package org.elasticsearch.xpack.ml.process.logging;
 
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
@@ -72,4 +72,4 @@ public class CppLogMessageTests extends AbstractSerializingTestCase<CppLogMessag
     protected CppLogMessage doParseInstance(XContentParser parser) {
         return CppLogMessage.PARSER.apply(parser, null);
     }
-}
+}

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriterTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriterTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
+package org.elasticsearch.xpack.ml.process.writer;
 
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Assert;