Browse Source

Pass processConnectTimeout to the method that fetches C++ process' PID (#50276)

Przemysław Witek 5 years ago
parent
commit
114d8ef441

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java

@@ -15,6 +15,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -28,10 +29,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
     protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
                                              NativeController nativeController, InputStream logStream, OutputStream processInStream,
                                              InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
-                                             List<Path> filesToDelete, Consumer<String> onProcessCrash,
+                                             List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
                                              NamedXContentRegistry namedXContentRegistry) {
         super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
-            onProcessCrash);
+            onProcessCrash, processConnectTimeout);
         this.name = Objects.requireNonNull(name);
         this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
     }

+ 4 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java

@@ -15,6 +15,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Consumer;
@@ -27,10 +28,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
 
     protected NativeAnalyticsProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
                                      InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
-                                     List<Path> filesToDelete, Consumer<String> onProcessCrash, AnalyticsProcessConfig config,
-                                     NamedXContentRegistry namedXContentRegistry) {
+                                     List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
+                                     AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) {
         super(NAME, AnalyticsResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream,
-            numberOfFields, filesToDelete, onProcessCrash, namedXContentRegistry);
+            numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
         this.config = Objects.requireNonNull(config);
     }
 

+ 5 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -82,10 +82,11 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
 
         createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
 
-        NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(),
-                processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
-                processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig,
-                namedXContentRegistry);
+        NativeAnalyticsProcess analyticsProcess =
+            new NativeAnalyticsProcess(
+                jobId, nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
+                processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete,
+                onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry);
 
         try {
             startProcess(config, executorService, processPipes, analyticsProcess);

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.ml.process.NativeController;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -23,9 +24,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
     protected NativeMemoryUsageEstimationProcess(String jobId, NativeController nativeController, InputStream logStream,
                                                  OutputStream processInStream, InputStream processOutStream,
                                                  OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
-                                                 Consumer<String> onProcessCrash) {
+                                                 Consumer<String> onProcessCrash, Duration processConnectTimeout) {
         super(NAME, MemoryUsageEstimationResult.PARSER, jobId, nativeController, logStream, processInStream, processOutStream,
-            processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY);
+            processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
     }
 
     @Override

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

@@ -76,7 +76,8 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
             null,
             0,
             filesToDelete,
-            onProcessCrash);
+            onProcessCrash,
+            processConnectTimeout);
 
         try {
             process.start(executorService);

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Consumer;
@@ -44,9 +45,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
 
     NativeAutodetectProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
                             InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
-                            ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
+                            ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
+                            Duration processConnectTimeout) {
         super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
-            onProcessCrash);
+            onProcessCrash, processConnectTimeout);
         this.resultsParser = resultsParser;
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -91,7 +91,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
         NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
                 job.getId(), nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
                 processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
-                filesToDelete, resultsParser, onProcessCrash);
+                filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
         try {
             autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
             return autodetect;

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

@@ -11,6 +11,7 @@ import org.elasticsearch.xpack.ml.process.NativeController;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.time.Duration;
 import java.util.Collections;
 
 /**
@@ -21,8 +22,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
     private static final String NAME = "normalizer";
 
     NativeNormalizerProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
-                            InputStream processOutStream) {
-        super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
+                            InputStream processOutStream, Duration processConnectTimeout) {
+        super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {},
+            processConnectTimeout);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -53,7 +53,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
         createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
 
         NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes.getLogStream().get(),
-                processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
+                processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);
 
         try {
             normalizerProcess.start(executorService);

+ 8 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

@@ -52,6 +52,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     private final int numberOfFields;
     private final List<Path> filesToDelete;
     private final Consumer<String> onProcessCrash;
+    private final Duration processConnectTimeout;
     private volatile Future<?> logTailFuture;
     private volatile Future<?> stateProcessorFuture;
     private volatile boolean processCloseInitiated;
@@ -60,18 +61,19 @@ public abstract class AbstractNativeProcess implements NativeProcess {
 
     protected AbstractNativeProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
                                     InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
-                                    List<Path> filesToDelete, Consumer<String> onProcessCrash) {
+                                    List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout) {
         this.jobId = jobId;
         this.nativeController = nativeController;
-        cppLogHandler = new CppLogMessageHandler(jobId, logStream);
+        this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
         this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
         this.processOutStream = processOutStream;
         this.processRestoreStream = processRestoreStream;
         this.recordWriter = new LengthEncodedWriter(this.processInStream);
-        startTime = ZonedDateTime.now();
+        this.startTime = ZonedDateTime.now();
         this.numberOfFields = numberOfFields;
         this.filesToDelete = filesToDelete;
         this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
+        this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
     }
 
     public abstract String getName();
@@ -197,10 +199,9 @@ public abstract class AbstractNativeProcess implements NativeProcess {
         LOGGER.debug("[{}] Killing {} process", jobId, getName());
         processKilled = true;
         try {
-            // The PID comes via the processes log stream.  We don't wait for it to arrive here,
-            // but if the wait times out it implies the process has only just started, in which
-            // case it should die very quickly when we close its input stream.
-            nativeController.killProcess(cppLogHandler.getPid(Duration.ZERO));
+            // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
+            // Without the PID we cannot kill the process.
+            nativeController.killProcess(cppLogHandler.getPid(processConnectTimeout));
 
             // Wait for the process to die before closing processInStream as if the process
             // is still alive when processInStream is closed it may start persisting state

+ 7 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.time.ZonedDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Collections;
@@ -63,7 +64,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 mock(OutputStream.class), outputStream, mock(OutputStream.class),
                 NUMBER_FIELDS, null,
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
             process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
 
             ZonedDateTime startTime = process.getProcessStartTime();
@@ -86,7 +87,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
             process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
 
             process.writeRecord(record);
@@ -121,7 +122,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
             process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
 
             FlushJobParams params = FlushJobParams.builder().build();
@@ -155,7 +156,8 @@ public class NativeAutodetectProcessTests extends ESTestCase {
 
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
             processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
-            new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
+            new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
+            Duration.ZERO)) {
 
             process.consumeAndCloseOutputStream();
             assertThat(processOutStream.available(), equalTo(0));
@@ -171,7 +173,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
         try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
-                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
+                new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
             process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
 
             writeFunction.accept(process);

+ 2 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java

@@ -16,6 +16,7 @@ import org.junit.Before;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -142,7 +143,7 @@ public class AbstractNativeProcessTests extends ESTestCase {
     private class TestNativeProcess extends AbstractNativeProcess {
 
         TestNativeProcess(OutputStream inputStream) {
-            super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash);
+            super("foo", nativeController, logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
         }
 
         @Override