Browse Source

[ML] Fix thread leak when waiting for job flush (#32196) (#32541)

Benjamin Trent 7 years ago
parent
commit
9fb790dcc3

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

@@ -270,7 +270,7 @@ public class AutodetectCommunicator implements Closeable {
     }
 
     @Nullable
-    FlushAcknowledgement waitFlushToCompletion(String flushId) {
+    FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException {
         LOGGER.debug("[{}] waiting for flush", job.getId());
 
         FlushAcknowledgement flushAcknowledgement;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java

@@ -485,7 +485,7 @@ public class AutoDetectResultProcessor {
      * @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
      */
     @Nullable
-    public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
+    public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException {
         return failed ? null : flushListener.waitForFlush(flushId, timeout);
     }
 

+ 3 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

@@ -22,18 +22,14 @@ class FlushListener {
     final AtomicBoolean cleared = new AtomicBoolean(false);
 
     @Nullable
-    FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
+    FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
         if (cleared.get()) {
             return null;
         }
 
         FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
-        try {
-            if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
-                return holder.flushAcknowledgement;
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+        if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
+            return holder.flushAcknowledgement;
         }
         return null;
     }

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java

@@ -108,7 +108,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         verifyNoMoreInteractions(process);
     }
 
-    public void testFlushJob() throws IOException {
+    public void testFlushJob() throws IOException, InterruptedException {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
@@ -123,7 +123,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         }
     }
 
-    public void testWaitForFlushReturnsIfParserFails() throws IOException {
+    public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
@@ -144,7 +144,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
     }
 
-    public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
+    public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java

@@ -514,7 +514,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
         verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
     }
 
-    public void testParsingErrorSetsFailed() {
+    public void testParsingErrorSetsFailed() throws InterruptedException {
         @SuppressWarnings("unchecked")
         Iterator<AutodetectResult> iterator = mock(Iterator.class);
         when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));

+ 12 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java

@@ -22,8 +22,12 @@ public class FlushListenerTests extends ESTestCase {
         FlushListener listener = new FlushListener();
         AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
         new Thread(() -> {
-            FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
-            flushAcknowledgementHolder.set(flushAcknowledgement);
+            try {
+                FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
+                flushAcknowledgementHolder.set(flushAcknowledgement);
+            } catch (InterruptedException _ex) {
+                Thread.currentThread().interrupt();
+            }
         }).start();
         assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
         assertNull(flushAcknowledgementHolder.get());
@@ -46,8 +50,12 @@ public class FlushListenerTests extends ESTestCase {
             AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
             flushAcknowledgementHolders.add(flushAcknowledgementHolder);
             new Thread(() -> {
-                FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
-                flushAcknowledgementHolder.set(flushAcknowledgement);
+                try {
+                    FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
+                    flushAcknowledgementHolder.set(flushAcknowledgement);
+                } catch (InterruptedException _ex) {
+                    Thread.currentThread().interrupt();
+                }
             }).start();
         }
         assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));