Browse Source

[ML] fix two datafeed flush lockup bugs (#46982)

* [ML] fix two flush lockup bugs

* Addressing PR comments

* moving debug logging line so it is only written on success
Benjamin Trent 6 years ago
parent
commit
7935d88484

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

@@ -284,7 +284,7 @@ public class AutodetectCommunicator implements Closeable {
     }
 
     @Nullable
-    FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException {
+    FlushAcknowledgement waitFlushToCompletion(String flushId) throws Exception {
         LOGGER.debug("[{}] waiting for flush", job.getId());
 
         FlushAcknowledgement flushAcknowledgement;
@@ -300,6 +300,7 @@ public class AutodetectCommunicator implements Closeable {
         }
 
         if (processKilled == false) {
+            LOGGER.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", job.getId());
             // We also have to wait for the normalizer to become idle so that we block
             // clients from querying results in the middle of normalization.
             autodetectResultProcessor.waitUntilRenormalizerIsIdle();

+ 16 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -288,9 +288,21 @@ public class AutodetectResultProcessor {
             // Commit previous writes here, effectively continuing
             // the flush from the C++ autodetect process right
             // through to the data store
-            bulkResultsPersister.executeRequest();
-            persister.commitResultWrites(jobId);
-            flushListener.acknowledgeFlush(flushAcknowledgement);
+            Exception exception = null;
+            try {
+                bulkResultsPersister.executeRequest();
+                persister.commitResultWrites(jobId);
+                LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
+            } catch (Exception e) {
+                LOGGER.error(
+                    "[" + jobId + "] failed to bulk persist results and commit writes during flush acknowledgement for ID " +
+                        flushAcknowledgement.getId(),
+                    e);
+                exception = e;
+                throw e;
+            } finally {
+                flushListener.acknowledgeFlush(flushAcknowledgement, exception);
+            }
             // Interim results may have been produced by the flush,
             // which need to be
             // deleted when the next finalized results come through
@@ -391,7 +403,7 @@ public class AutodetectResultProcessor {
      * @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
      */
     @Nullable
-    public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException {
+    public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws Exception {
         return failed ? null : flushListener.waitForFlush(flushId, timeout);
     }
 

+ 7 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

@@ -28,25 +28,29 @@ class FlushListener {
     });
 
     @Nullable
-    FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
+    FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws Exception {
         if (onClear.hasRun()) {
             return null;
         }
 
         FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
         if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
+            if (holder.flushException != null) {
+                throw holder.flushException;
+            }
             return holder.flushAcknowledgement;
         }
         return null;
     }
 
-    void acknowledgeFlush(FlushAcknowledgement flushAcknowledgement) {
+    void acknowledgeFlush(FlushAcknowledgement flushAcknowledgement, @Nullable Exception exception) {
         // acknowledgeFlush(...) could be called before waitForFlush(...)
         // a flush api call writes a flush command to the analytical process and then via a different thread the
         // result reader then reads whether the flush has been acked.
         String flushId = flushAcknowledgement.getId();
         FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
         holder.flushAcknowledgement = flushAcknowledgement;
+        holder.flushException = exception;
         holder.latch.countDown();
     }
 
@@ -62,6 +66,7 @@ class FlushListener {
 
         private final CountDownLatch latch;
         private volatile FlushAcknowledgement flushAcknowledgement;
+        private volatile Exception flushException;
 
         private FlushAcknowledgementHolder(String flushId) {
             this.flushAcknowledgement = new FlushAcknowledgement(flushId, null);

+ 20 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java

@@ -122,7 +122,26 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
     }
 
     private void forceFinishWork() {
-        semaphore.release();
+        // We cannot allow new quantiles to be added while we are failing from a previous renormalization failure.
+        synchronized (quantilesDeque) {
+            // We discard all but the earliest quantiles, if they exist
+            QuantilesWithLatch earliestQuantileWithLatch = null;
+            for (QuantilesWithLatch quantilesWithLatch = quantilesDeque.pollFirst(); quantilesWithLatch != null;
+                 quantilesWithLatch = quantilesDeque.pollFirst()) {
+                if (earliestQuantileWithLatch == null) {
+                    earliestQuantileWithLatch = quantilesWithLatch;
+                }
+                // Count down all the latches as they no longer matter since we failed
+                quantilesWithLatch.latch.countDown();
+            }
+            // Keep the earliest quantile so that the next call to doRenormalizations() will include as much as the failed normalization
+            // window as possible.
+            // Since this latch is already countedDown, there is no reason to put it in the `latchDeque` again
+            if (earliestQuantileWithLatch != null) {
+                quantilesDeque.addLast(earliestQuantileWithLatch);
+            }
+            semaphore.release();
+        }
     }
 
     private void doRenormalizations() {

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java

@@ -108,7 +108,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         verifyNoMoreInteractions(process);
     }
 
-    public void testFlushJob() throws IOException, InterruptedException {
+    public void testFlushJob() throws Exception {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class);
@@ -123,7 +123,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         }
     }
 
-    public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
+    public void testWaitForFlushReturnsIfParserFails() throws Exception {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutodetectResultProcessor processor = mock(AutodetectResultProcessor.class);
@@ -144,7 +144,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
         assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
     }
 
-    public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
+    public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws Exception {
         AutodetectProcess process = mockAutodetectProcessWithOutputStream();
         when(process.isProcessAlive()).thenReturn(true);
         AutodetectResultProcessor autodetectResultProcessor = Mockito.mock(AutodetectResultProcessor.class);

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -220,7 +220,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         assertTrue(processorUnderTest.isDeleteInterimRequired());
 
         verify(persister).bulkPersisterBuilder(JOB_ID);
-        verify(flushListener).acknowledgeFlush(flushAcknowledgement);
+        verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
         verify(persister).commitResultWrites(JOB_ID);
         verify(bulkBuilder).executeRequest();
     }
@@ -242,7 +242,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         inOrder.verify(persister).persistCategoryDefinition(categoryDefinition);
         inOrder.verify(bulkBuilder).executeRequest();
         inOrder.verify(persister).commitResultWrites(JOB_ID);
-        inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement);
+        inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
     }
 
     public void testProcessResult_modelPlot() {
@@ -397,7 +397,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
     }
 
-    public void testParsingErrorSetsFailed() throws InterruptedException {
+    public void testParsingErrorSetsFailed() throws Exception {
         @SuppressWarnings("unchecked")
         Iterator<AutodetectResult> iterator = mock(Iterator.class);
         when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));

+ 33 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java

@@ -14,6 +14,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 public class FlushListenerTests extends ESTestCase {
@@ -27,12 +28,14 @@ public class FlushListenerTests extends ESTestCase {
                 flushAcknowledgementHolder.set(flushAcknowledgement);
             } catch (InterruptedException _ex) {
                 Thread.currentThread().interrupt();
+            } catch (Exception ex) {
+                fail("unexpected exception " + ex.getMessage());
             }
         }).start();
         assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
         assertNull(flushAcknowledgementHolder.get());
         FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
-        listener.acknowledgeFlush(flushAcknowledgement);
+        listener.acknowledgeFlush(flushAcknowledgement, null);
         assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
         assertEquals(1, listener.awaitingFlushed.size());
 
@@ -40,6 +43,33 @@ public class FlushListenerTests extends ESTestCase {
         assertEquals(0, listener.awaitingFlushed.size());
     }
 
+    public void testAcknowledgeFlushFailure() throws Exception {
+        FlushListener listener = new FlushListener();
+        AtomicReference<Exception> flushExceptionHolder = new AtomicReference<>();
+        new Thread(() -> {
+            try {
+                listener.waitForFlush("_id", Duration.ofMillis(10000));
+                fail("Expected exception to throw.");
+            } catch (InterruptedException _ex) {
+                Thread.currentThread().interrupt();
+            } catch (Exception ex) {
+                flushExceptionHolder.set(ex);
+            }
+        }).start();
+        assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
+        assertNull(flushExceptionHolder.get());
+        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
+        listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM"));
+        assertBusy(() -> {
+            assertNotNull(flushExceptionHolder.get());
+            assertThat(flushExceptionHolder.get().getMessage(), equalTo("BOOM"));
+        });
+        assertEquals(1, listener.awaitingFlushed.size());
+
+        listener.clear("_id");
+        assertEquals(0, listener.awaitingFlushed.size());
+    }
+
     public void testClear() throws Exception {
         FlushListener listener = new FlushListener();
 
@@ -55,6 +85,8 @@ public class FlushListenerTests extends ESTestCase {
                     flushAcknowledgementHolder.set(flushAcknowledgement);
                 } catch (InterruptedException _ex) {
                     Thread.currentThread().interrupt();
+                } catch (Exception ex) {
+                    fail("unexpected exception " + ex.getMessage());
                 }
             }).start();
         }