Browse Source

[ML] Add timeouts to named pipe connections (#62993)

This PR adds timeouts to the named pipe connections of the
autodetect, normalize and data_frame_analyzer processes.
This argument requires the changes of elastic/ml-cpp#1514 in
order to work, so that PR will be merged before this one.
(The controller process already had a different mechanism,
tied to the ES JVM lifetime.)

Closes elastic/ml-cpp#1504
David Roberts 5 years ago
parent
commit
f6921fc65a
15 changed files with 63 additions and 86 deletions
  1. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java
  2. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java
  3. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java
  4. 2 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java
  5. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java
  6. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java
  7. 4 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java
  8. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java
  9. 2 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java
  10. 4 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java
  11. 3 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java
  12. 18 11
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java
  13. 6 26
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java
  14. 1 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java
  15. 11 9
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java

@@ -14,7 +14,6 @@ import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -28,8 +27,8 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
     protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
                                              NativeController nativeController, ProcessPipes processPipes,
                                              int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash,
-                                             Duration processConnectTimeout, NamedXContentRegistry namedXContentRegistry) {
-        super(jobId, nativeController, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout);
+                                             NamedXContentRegistry namedXContentRegistry) {
+        super(jobId, nativeController, processPipes, numberOfFields, filesToDelete, onProcessCrash);
         this.name = Objects.requireNonNull(name);
         this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
     }

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java

@@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.process.StateToProcessWriterHelper;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Consumer;
@@ -37,10 +36,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
 
     protected NativeAnalyticsProcess(String jobId, NativeController nativeController, ProcessPipes processPipes,
                                      int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash,
-                                     Duration processConnectTimeout, AnalyticsProcessConfig config,
+                                     AnalyticsProcessConfig config,
                                      NamedXContentRegistry namedXContentRegistry) {
         super(NAME, AnalyticsResult.PARSER, jobId, nativeController, processPipes, numberOfFields, filesToDelete, onProcessCrash,
-            processConnectTimeout, namedXContentRegistry);
+            namedXContentRegistry);
         this.config = Objects.requireNonNull(config);
     }
 

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -72,7 +72,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
                                                          Consumer<String> onProcessCrash) {
         String jobId = config.getId();
         List<Path> filesToDelete = new ArrayList<>();
-        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
+        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
                 false, true, true, hasState, config.getAnalysis().persistsState());
 
         // The extra 2 are for the checksum and the control field
@@ -83,7 +83,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
         NativeAnalyticsProcess analyticsProcess =
             new NativeAnalyticsProcess(
                 jobId, nativeController, processPipes, numberOfFields, filesToDelete,
-                onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry);
+                onProcessCrash, analyticsProcessConfig, namedXContentRegistry);
 
         try {
             startProcess(config, executorService, analyticsProcess);

+ 2 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java

@@ -12,7 +12,6 @@ import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.process.ProcessPipes;
 
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -21,10 +20,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
     private static final String NAME = "memory_usage_estimation";
 
     protected NativeMemoryUsageEstimationProcess(String jobId, NativeController nativeController, ProcessPipes processPipes,
-                                                 int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash,
-                                                 Duration processConnectTimeout) {
+                                                 int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash) {
         super(NAME, MemoryUsageEstimationResult.PARSER, jobId, nativeController, processPipes, numberOfFields, filesToDelete,
-            onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
+            onProcessCrash, NamedXContentRegistry.EMPTY);
     }
 
     @Override

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

@@ -67,7 +67,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
         // memory estimation process pipe names are unique.  Therefore an increasing counter value is appended to the config ID
         // to ensure uniqueness between calls.
         ProcessPipes processPipes = new ProcessPipes(
-            env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
+            env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
             false, false, true, false, false);
 
         createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
@@ -78,8 +78,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
             processPipes,
             0,
             filesToDelete,
-            onProcessCrash,
-            processConnectTimeout);
+            onProcessCrash);
 
         try {
             process.start(executorService);

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

@@ -28,7 +28,6 @@ import org.elasticsearch.xpack.ml.process.NativeController;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Consumer;
@@ -46,8 +45,8 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
 
     NativeAutodetectProcess(String jobId, NativeController nativeController, ProcessPipes processPipes,
                             int numberOfFields, List<Path> filesToDelete, ProcessResultsParser<AutodetectResult> resultsParser,
-                            Consumer<String> onProcessCrash, Duration processConnectTimeout) {
-        super(jobId, nativeController, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout);
+                            Consumer<String> onProcessCrash) {
+        super(jobId, nativeController, processPipes, numberOfFields, filesToDelete, onProcessCrash);
         this.resultsParser = resultsParser;
     }
 

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -76,9 +76,9 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
                                                      ExecutorService executorService,
                                                      Consumer<String> onProcessCrash) {
         List<Path> filesToDelete = new ArrayList<>();
-        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(),
-                false, true, true, params.modelSnapshot() != null,
-                !AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
+        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
+            job.getId(), false, true, true, params.modelSnapshot() != null,
+            AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
         createNativeProcess(job, params, processPipes, filesToDelete);
         boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA
                 && job.getAnalysisConfig().getCategorizationFieldName() != null;
@@ -90,7 +90,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
             NamedXContentRegistry.EMPTY);
         NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
                 job.getId(), nativeController, processPipes, numberOfFields,
-                filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
+                filesToDelete, resultsParser, onProcessCrash);
         try {
             autodetect.start(executorService, stateProcessor);
             return autodetect;

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

@@ -10,7 +10,6 @@ import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
 import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.process.ProcessPipes;
 
-import java.time.Duration;
 import java.util.Collections;
 
 /**
@@ -20,8 +19,8 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
 
     private static final String NAME = "normalizer";
 
-    NativeNormalizerProcess(String jobId, NativeController nativeController, ProcessPipes processPipes, Duration processConnectTimeout) {
-        super(jobId, nativeController, processPipes, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout);
+    NativeNormalizerProcess(String jobId, NativeController nativeController, ProcessPipes processPipes) {
+        super(jobId, nativeController, processPipes, 0, Collections.emptyList(), (ignore) -> {});
     }
 
     @Override

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -54,12 +54,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
         // The job ID passed to the process pipes is only used to make the file names unique.  Since normalize can get run many times
         // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
         // are unique.  Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
-        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE,
+        ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
             jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
         createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
 
-        NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes,
-            processConnectTimeout);
+        NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes);
 
         try {
             normalizerProcess.start(executorService);

+ 4 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

@@ -54,7 +54,6 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     private final int numberOfFields;
     private final List<Path> filesToDelete;
     private final Consumer<String> onProcessCrash;
-    private final Duration processConnectTimeout;
     private volatile Future<?> logTailFuture;
     private volatile Future<?> stateProcessorFuture;
     private volatile boolean processCloseInitiated;
@@ -62,8 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     private volatile boolean isReady;
 
     protected AbstractNativeProcess(String jobId, NativeController nativeController, ProcessPipes processPipes,
-                                    int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash,
-                                    Duration processConnectTimeout) {
+                                    int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash) {
         this.jobId = jobId;
         this.nativeController = nativeController;
         this.processPipes = processPipes;
@@ -71,7 +69,6 @@ public abstract class AbstractNativeProcess implements NativeProcess {
         this.numberOfFields = numberOfFields;
         this.filesToDelete = filesToDelete;
         this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
-        this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
     }
 
     public abstract String getName();
@@ -84,7 +81,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
      */
     public void start(ExecutorService executorService) throws IOException {
 
-        processPipes.connectLogStream(processConnectTimeout);
+        processPipes.connectLogStream();
         cppLogHandler.set(processPipes.getLogStreamHandler());
 
         logTailFuture = executorService.submit(() -> {
@@ -99,7 +96,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
             }
         });
 
-        processPipes.connectOtherStreams(processConnectTimeout);
+        processPipes.connectOtherStreams();
         if (processPipes.getProcessInStream().isPresent()) {
             processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get()));
             this.recordWriter.set(new LengthEncodedWriter(processInStream.get()));
@@ -215,7 +212,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
         try {
             // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
             // Without the PID we cannot kill the process.
-            nativeController.killProcess(cppLogHandler().getPid(processConnectTimeout));
+            nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()));
 
             // Wait for the process to die before closing processInStream as if the process
             // is still alive when processInStream is closed it may start persisting state

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

@@ -50,11 +50,11 @@ public class NativeController implements MlController {
     }
 
     NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
-        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
                 true, false, false, false, false);
-        processPipes.connectLogStream(CONTROLLER_CONNECT_TIMEOUT);
+        processPipes.connectLogStream();
         tailLogsInThread(processPipes.getLogStreamHandler());
-        processPipes.connectOtherStreams(CONTROLLER_CONNECT_TIMEOUT);
+        processPipes.connectOtherStreams();
         this.localNodeName = localNodeName;
         this.cppLogHandler = processPipes.getLogStreamHandler();
         this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());

+ 18 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java

@@ -36,6 +36,7 @@ public class ProcessPipes {
     public static final String RESTORE_IS_PIPE_ARG = "--restoreIsPipe";
     public static final String PERSIST_ARG = "--persist=";
     public static final String PERSIST_IS_PIPE_ARG = "--persistIsPipe";
+    public static final String TIMEOUT_ARG = "--namedPipeConnectTimeout=";
 
     private final NamedPipeHelper namedPipeHelper;
     private final String jobId;
@@ -50,6 +51,14 @@ public class ProcessPipes {
     private final String restorePipeName;
     private final String persistPipeName;
 
+    /**
+     * Needs to be long enough for the C++ process perform all startup tasks that precede creation of named
+     * pipes.  There should not be very many of these, so a short timeout should be fine.  However, at least
+     * five seconds is recommended due to the vagaries of process scheduling and the way VMs can completely
+     * stall for some hypervisor actions.
+     */
+    private final Duration timeout;
+
     private CppLogMessageHandler logStreamHandler;
     private OutputStream commandStream;
     private OutputStream processInStream;
@@ -66,11 +75,12 @@ public class ProcessPipes {
      * @param jobId The job ID of the process to which pipes are to be opened, if the process is associated with a specific job.
      *              May be null or empty for processes not associated with a specific job.
      */
-    public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, String processName, String jobId,
+    public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
                         boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
                         boolean wantRestorePipe, boolean wantPersistPipe) {
         this.namedPipeHelper = namedPipeHelper;
         this.jobId = jobId;
+        this.timeout = timeout;
 
         // The way the pipe names are formed MUST match what is done in the controller main()
         // function, as it does not get any command line arguments when started as a daemon.  If
@@ -116,6 +126,7 @@ public class ProcessPipes {
             command.add(PERSIST_ARG + persistPipeName);
             command.add(PERSIST_IS_PIPE_ARG);
         }
+        command.add(TIMEOUT_ARG + timeout.getSeconds());
     }
 
     /**
@@ -123,24 +134,16 @@ public class ProcessPipes {
      * started to read from it</em>so that there is no risk of messages logged in between creation of the other pipes on the C++
      * side from blocking due to filling up the named pipe's buffer, and hence deadlocking communications between that process
      * and this JVM.
-     * @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
-     *                There should not be very many of these, so a short timeout should be fine.  However, at least a couple of
-     *                seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
-     *                some hypervisor actions.
      */
-    public void connectLogStream(Duration timeout) throws IOException {
+    public void connectLogStream() throws IOException {
         logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout));
     }
 
     /**
      * Connect the other pipes created by the C++ process after the logging pipe has been connected.  This must be called after
      * the corresponding C++ process has been started, and after {@link #connectLogStream}.
-     * @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
-     *                There should not be very many of these, so a short timeout should be fine.  However, at least a couple of
-     *                seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
-     *                some hypervisor actions.
      */
-    public void connectOtherStreams(Duration timeout) throws IOException {
+    public void connectOtherStreams() throws IOException {
         assert logStreamHandler != null : "Must connect log stream before other streams";
         if (logStreamHandler == null) {
             throw new NullPointerException("Must connect log stream before other streams");
@@ -255,4 +258,8 @@ public class ProcessPipes {
         }
         return Optional.of(persistStream);
     }
+
+    public Duration getTimeout() {
+        return timeout;
+    }
 }

+ 6 - 26
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

@@ -41,10 +41,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class NativeAutodetectProcessTests extends ESTestCase {
@@ -76,13 +73,14 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream));
         when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream));
         when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream));
+        when(processPipes.getTimeout()).thenReturn(Duration.ofSeconds(randomIntBetween(5, 100)));
     }
 
     @SuppressWarnings("unchecked")
     public void testProcessStartTime() throws Exception {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
                 processPipes, NUMBER_FIELDS, null,
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
             process.start(executorService, mock(IndexingStateProcessor.class));
 
             ZonedDateTime startTime = process.getProcessStartTime();
@@ -100,7 +98,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         String[] record = {"r1", "r2", "r3", "r4", "r5"};
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
                 processPipes, NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
             process.start(executorService, mock(IndexingStateProcessor.class));
 
             process.writeRecord(record);
@@ -130,7 +128,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
     public void testFlush() throws IOException {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
                 processPipes, NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
             process.start(executorService, mock(IndexingStateProcessor.class));
 
             FlushJobParams params = FlushJobParams.builder().build();
@@ -159,8 +157,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
 
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
             processPipes, NUMBER_FIELDS, Collections.emptyList(),
-            new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
-            Duration.ZERO)) {
+            new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
 
             process.start(executorService);
             process.consumeAndCloseOutputStream();
@@ -168,28 +165,11 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public void testPipeConnectTimeout() throws IOException {
-
-        int timeoutSeconds = randomIntBetween(5, 100);
-
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
-            processPipes, NUMBER_FIELDS, Collections.emptyList(),
-            new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
-            Duration.ofSeconds(timeoutSeconds))) {
-
-            process.start(executorService);
-        }
-
-        verify(processPipes, times(1)).connectLogStream(eq(Duration.ofSeconds(timeoutSeconds)));
-        verify(processPipes, times(1)).connectOtherStreams(eq(Duration.ofSeconds(timeoutSeconds)));
-    }
-
     @SuppressWarnings("unchecked")
     private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
                 processPipes, NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
             process.start(executorService, mock(IndexingStateProcessor.class));
 
             writeFunction.accept(process);

+ 1 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java

@@ -17,7 +17,6 @@ import org.junit.Before;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -188,7 +187,7 @@ public class AbstractNativeProcessTests extends ESTestCase {
     private class TestNativeProcess extends AbstractNativeProcess {
 
         TestNativeProcess() {
-            super("foo", nativeController, processPipes, 0, null, onProcessCrash, Duration.ZERO);
+            super("foo", nativeController, processPipes, 0, null, onProcessCrash);
         }
 
         @Override

+ 11 - 9
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

@@ -60,12 +60,13 @@ public class ProcessPipesTests extends ESTestCase {
         when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class)))
                 .thenReturn(new ByteArrayInputStream(PERSIST_BYTES));
 
-        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
-                false, true, true, true, true);
+        int timeoutSeconds = randomIntBetween(5, 100);
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
+            "my_job", false, true, true, true, true);
 
         List<String> command = new ArrayList<>();
         processPipes.addArgs(command);
-        assertEquals(9, command.size());
+        assertEquals(10, command.size());
         assertEquals(ProcessPipes.LOG_PIPE_ARG, command.get(0).substring(0, ProcessPipes.LOG_PIPE_ARG.length()));
         assertEquals(ProcessPipes.INPUT_ARG, command.get(1).substring(0, ProcessPipes.INPUT_ARG.length()));
         assertEquals(ProcessPipes.INPUT_IS_PIPE_ARG, command.get(2));
@@ -75,15 +76,16 @@ public class ProcessPipesTests extends ESTestCase {
         assertEquals(ProcessPipes.RESTORE_IS_PIPE_ARG, command.get(6));
         assertEquals(ProcessPipes.PERSIST_ARG, command.get(7).substring(0, ProcessPipes.PERSIST_ARG.length()));
         assertEquals(ProcessPipes.PERSIST_IS_PIPE_ARG, command.get(8));
+        assertEquals(ProcessPipes.TIMEOUT_ARG + timeoutSeconds, command.get(9));
 
-        processPipes.connectLogStream(Duration.ofSeconds(2));
+        processPipes.connectLogStream();
 
         CppLogMessageHandler logMessageHandler = processPipes.getLogStreamHandler();
         assertNotNull(logMessageHandler);
         logMessageHandler.tailStream();
         assertEquals(42, logMessageHandler.getPid(Duration.ZERO));
 
-        processPipes.connectOtherStreams(Duration.ofSeconds(2));
+        processPipes.connectOtherStreams();
 
         assertFalse(processPipes.getCommandStream().isPresent());
         assertTrue(processPipes.getProcessInStream().isPresent());
@@ -108,7 +110,7 @@ public class ProcessPipesTests extends ESTestCase {
         Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
         Environment env = TestEnvironment.newEnvironment(settings);
 
-        new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
+        new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
                 true, true, true, true, true);
     }
 
@@ -135,11 +137,11 @@ public class ProcessPipesTests extends ESTestCase {
 
         Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
         Environment env = TestEnvironment.newEnvironment(settings);
-        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
                 true, true, true, true, true);
 
-        processPipes.connectLogStream(Duration.ofSeconds(2));
-        expectThrows(IOException.class, () -> processPipes.connectOtherStreams(Duration.ofSeconds(2)));
+        processPipes.connectLogStream();
+        expectThrows(IOException.class, processPipes::connectOtherStreams);
 
         // check the pipes successfully opened were then closed
         verify(logStream, times(1)).close();