Browse Source

[ML] Close results stream before data frame analytics job stops (#67854)

Investigating the failures in #67581 it looked like after restarting
the regression job the process was started but no data was loaded.
So the process was getting stuck waiting for data.

Looking into the code it looks like this can be explained by the
fact that AnalyticsResultProcessor counts down its completion
latch before it closes the results stream. This means the job
may go to stopped state while the out stream is still alive,
which on windows results to the directory with the named pipes
staying around. Then when the job is started again, which the
test does immediately, the old pipes are used and thus the
data is not sent to the the new process.

This commit fixes this by ensuring the process output stream
is consumed and closed when the anylytics process is closed.

Fixes #67581
Dimitris Athanasiou 4 years ago
parent
commit
63c85ff3e8

+ 0 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -481,7 +481,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean");
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581")
     public void testStopAndRestart() throws Exception {
         initialize("classification_stop_and_restart");
         String predictedClassField = KEYWORD_FIELD + "_prediction";

+ 0 - 13
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

@@ -69,23 +69,10 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
     private String sourceIndex;
     private String destIndex;
 
-    @Before
-    public void setupLogging() {
-        client().admin().cluster()
-            .prepareUpdateSettings()
-            .setTransientSettings(Settings.builder()
-                .put("logger.org.elasticsearch.xpack.ml.dataframe", "DEBUG"))
-            .get();
-    }
 
     @After
     public void cleanup() {
         cleanUp();
-        client().admin().cluster()
-            .prepareUpdateSettings()
-            .setTransientSettings(Settings.builder()
-                .putNull("logger.org.elasticsearch.xpack.ml.dataframe"))
-            .get();
     }
 
     @Override

+ 15 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java

@@ -57,4 +57,19 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
     public Iterator<Result> readAnalyticsResults() {
         return resultsParser.parseResults(processOutStream());
     }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            super.close();
+        } finally {
+            // Unlike autodetect where closing the process input stream initiates
+            // termination and additional output from the process which forces us
+            // to close the output stream after we've finished processing its results,
+            // in analytics we wait until we've read all results and then we close the
+            // process. Thus, we can take care of consuming and closing the output
+            // stream within close itself.
+            consumeAndCloseOutputStream();
+        }
+    }
 }

+ 0 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java

@@ -25,14 +25,6 @@ public interface AnalyticsProcess<ProcessResult> extends NativeProcess {
      */
     Iterator<ProcessResult> readAnalyticsResults();
 
-    /**
-     * Read anything left in the stream before
-     * closing the stream otherwise if the process
-     * tries to write more after the close it gets
-     * a SIGPIPE
-     */
-    void consumeAndCloseOutputStream();
-
     /**
      *
      * @return the process config

+ 0 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

@@ -117,7 +117,6 @@ public class AnalyticsResultProcessor {
                 completeResultsProgress();
             }
             completionLatch.countDown();
-            process.consumeAndCloseOutputStream();
         }
     }
 

+ 0 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java

@@ -98,7 +98,6 @@ public class MemoryUsageEstimationProcessManager {
                     jobId, e.getMessage(), process.readError()).getFormattedMessage();
             throw ExceptionsHelper.serverError(errorMsg, e);
         } finally {
-            process.consumeAndCloseOutputStream();
             try {
                 LOGGER.debug("[{}] Closing process", jobId);
                 process.close();

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

@@ -313,12 +313,11 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     }
 
     public void consumeAndCloseOutputStream() {
-        try {
+        try (InputStream outStream = processOutStream()) {
             byte[] buff = new byte[512];
-            while (processOutStream().read(buff) >= 0) {
+            while (outStream.read(buff) >= 0) {
                 // Do nothing
             }
-            processOutStream().close();
         } catch (IOException e) {
             // Given we are closing down the process there is no point propagating IO exceptions here
         }

+ 0 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java

@@ -108,7 +108,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
         inOrder.verify(process).readError();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         verifyNoMoreInteractions(process, listener);
     }
@@ -127,7 +126,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
         inOrder.verify(process).readError();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         verifyNoMoreInteractions(process, listener);
     }
@@ -146,7 +144,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
         inOrder.verify(process).readError();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         verifyNoMoreInteractions(process, listener);
     }
@@ -164,7 +161,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
 
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         inOrder.verify(process).readError();
         verifyNoMoreInteractions(process, listener);
@@ -186,7 +182,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
 
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         inOrder.verify(process).readError();
         verifyNoMoreInteractions(process, listener);
@@ -201,7 +196,6 @@ public class MemoryUsageEstimationProcessManagerTests extends ESTestCase {
 
         InOrder inOrder = inOrder(process);
         inOrder.verify(process).readAnalyticsResults();
-        inOrder.verify(process).consumeAndCloseOutputStream();
         inOrder.verify(process).close();
         verifyNoMoreInteractions(process, listener);
     }