ソースを参照

[ML] Close any opened pipes if there is an error connecting to the process (#44869)

David Kyle 6 年 前
コミット
a3983023f6

+ 42 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java

@@ -127,23 +127,53 @@ public class ProcessPipes {
     public void connectStreams(Duration timeout) throws IOException {
         // The order here is important.  It must match the order that the C++ process tries to connect to the pipes, otherwise
         // a timeout is guaranteed.  Also change api::CIoManager in the C++ code if changing the order here.
-        if (logPipeName != null) {
-            logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout);
+        try {
+            if (logPipeName != null) {
+                logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout);
+            }
+            if (commandPipeName != null) {
+                commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout);
+            }
+            if (processInPipeName != null) {
+                processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout);
+            }
+            if (processOutPipeName != null) {
+                processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout);
+            }
+            if (restorePipeName != null) {
+                restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout);
+            }
+            if (persistPipeName != null) {
+                persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout);
+            }
+        } catch (IOException ioe) {
+            try {
+                closeUnusedStreams();
+            } catch (IOException suppressed) {
+                ioe.addSuppressed(new IOException("Error closing process pipes", suppressed));
+            }
+            throw ioe;
         }
-        if (commandPipeName != null) {
-            commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout);
+    }
+
+    private void closeUnusedStreams() throws IOException {
+        if (logStream != null) {
+            logStream.close();
         }
-        if (processInPipeName != null) {
-            processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout);
+        if (commandStream != null) {
+            commandStream.close();
         }
-        if (processOutPipeName != null) {
-            processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout);
+        if (processInStream != null) {
+            processInStream.close();
         }
-        if (restorePipeName != null) {
-            restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout);
+        if (processOutStream != null) {
+            processOutStream.close();
         }
-        if (persistPipeName != null) {
-            persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout);
+        if (restoreStream != null) {
+            restoreStream.close();
+        }
+        if (persistStream != null) {
+            persistStream.close();
         }
     }
 

+ 50 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

@@ -11,17 +11,21 @@ import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
-import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.contains;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ProcessPipesTests extends ESTestCase {
@@ -34,7 +38,7 @@ public class ProcessPipesTests extends ESTestCase {
         Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
         Environment env = TestEnvironment.newEnvironment(settings);
 
-        NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
+        NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
         when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
                 .thenReturn(new ByteArrayInputStream(LOG_BYTES));
         ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
@@ -89,4 +93,48 @@ public class ProcessPipesTests extends ESTestCase {
         assertEquals(6, processPipes.getPersistStream().get().read());
     }
 
+    public void testCloseUnusedPipes_notConnected() throws IOException {
+        NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
+        Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
+        Environment env = TestEnvironment.newEnvironment(settings);
+
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
+                true, true, true, true, true, true);
+    }
+
+    public void testCloseOpenedPipesOnError() throws IOException {
+
+        NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
+        InputStream logStream = mock(InputStream.class);
+        when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
+                .thenReturn(logStream);
+        OutputStream commandStream = mock(OutputStream.class);
+        when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class)))
+                .thenReturn(commandStream);
+        OutputStream processInStream = mock(OutputStream.class);
+        when(namedPipeHelper.openNamedPipeOutputStream(contains("input"), any(Duration.class)))
+                .thenReturn(processInStream);
+        InputStream processOutStream = mock(InputStream.class);
+        when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class)))
+                .thenReturn(processOutStream);
+        OutputStream restoreStream = mock(OutputStream.class);
+        when(namedPipeHelper.openNamedPipeOutputStream(contains("restore"), any(Duration.class)))
+                .thenReturn(restoreStream);
+        // opening this pipe will throw
+        when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class))).thenThrow(new IOException());
+
+        Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
+        Environment env = TestEnvironment.newEnvironment(settings);
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
+                true, true, true, true, true, true);
+
+        expectThrows(IOException.class, () -> processPipes.connectStreams(Duration.ofSeconds(2)));
+
+        // check the pipes successfully opened were then closed
+        verify(logStream, times(1)).close();
+        verify(commandStream, times(1)).close();
+        verify(processInStream, times(1)).close();
+        verify(processOutStream, times(1)).close();
+        verify(restoreStream, times(1)).close();
+    }
 }