Browse Source

[Rollup] improve handling of failures on first search (#35269)

Improve error handling in the Indexer if an exception occurs during the very 1st retrieval (query execution)
Hendrik Muhs 7 years ago
parent
commit
433469d4d0

+ 7 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -151,8 +151,13 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
             if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
                 // fire off the search. Note this is async, the method will return from here
-                executor.execute(() -> doNextSearch(buildSearchRequest(),
-                        ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))));
+                executor.execute(() -> {
+                    try {
+                        doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
+                    } catch (Exception e) {
+                        finishWithFailure(e);
+                    }
+                });
                 logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
                 return true;
             } else {

+ 91 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

@@ -110,6 +110,78 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
     }
 
+    private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
+
+        // test the execution order
+        private int step;
+
+        protected MockIndexerThrowsFirstSearch(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
+            super(executor, initialState, initialPosition, new MockJobStats());
+        }
+
+        @Override
+        protected String getJobId() {
+            return "mock";
+        }
+
+        @Override
+        protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
+            fail("should not be called");
+            return null;
+        }
+
+        @Override
+        protected SearchRequest buildSearchRequest() {
+            assertThat(step, equalTo(1));
+            ++step;
+            return null;
+        }
+
+        @Override
+        protected void onStartJob(long now) {
+            assertThat(step, equalTo(0));
+            ++step;
+        }
+
+        @Override
+        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+            throw new RuntimeException("Failed to build search request");
+        }
+
+        @Override
+        protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
+            fail("should not be called");
+        }
+
+        @Override
+        protected void doSaveState(IndexerState state, Integer position, Runnable next) {
+            assertThat(step, equalTo(2));
+            ++step;
+            next.run();
+        }
+
+        @Override
+        protected void onFailure(Exception exc) {
+            assertThat(step, equalTo(3));
+            ++step;
+            isFinished.set(true);
+        }
+
+        @Override
+        protected void onFinish() {
+            fail("should not be called");
+        }
+
+        @Override
+        protected void onAbort() {
+            fail("should not be called");
+        }
+
+        public int getStep() {
+            return step;
+        }
+    }
+
     private static class MockJobStats extends IndexerJobStats {
 
         @Override
@@ -121,7 +193,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
     public void testStateMachine() throws InterruptedException {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
         final ExecutorService executor = Executors.newFixedThreadPool(1);
-
+        isFinished.set(false);
         try {
 
             MockIndexer indexer = new MockIndexer(executor, state, 2);
@@ -140,4 +212,22 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             executor.shutdownNow();
         }
     }
+
+    public void testStateMachineBrokenSearch() throws InterruptedException {
+        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        isFinished.set(false);
+        try {
+
+            MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            assertTrue(ESTestCase.awaitBusy(() -> isFinished.get()));
+            assertThat(indexer.getStep(), equalTo(4));
+
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }