Browse Source

[Transform] Halt Indexer on Stop/Abort API (#107792)

When `_stop?wait_for_checkpoint=false` and
`_stop?force=true&wait_for_checkpoint=false` are called, there is a
small chance that the Transform Indexer thread will run if it is
scheduled before the stop API is called but before the threadpool runs
the executable. The `onStart` method now checks the state of the
indexer before executing. This will mitigate errors caused by reading
from Transform internal indices while the Task is stopped or deleted.

This does not impact when `wait_for_checkpoint=true`, because the
indexer state will remain `INDEXING` until the checkpoint is finished.

Relate #107266
Pat Whelan 1 year ago
parent
commit
3e6df2630e

+ 5 - 0
docs/changelog/107792.yaml

@@ -0,0 +1,5 @@
+pr: 107792
+summary: Halt Indexer on Stop/Abort API
+area: Transform
+type: bug
+issues: []

+ 12 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -268,11 +268,19 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     @Override
     protected void onStart(long now, ActionListener<Boolean> listener) {
         if (context.getTaskState() == TransformTaskState.FAILED) {
-            logger.debug("[{}] attempted to start while failed.", getJobId());
+            logger.debug("[{}] attempted to start while in state [{}].", getJobId(), TransformTaskState.FAILED.value());
             listener.onFailure(new ElasticsearchException("Attempted to start a failed transform [{}].", getJobId()));
             return;
         }
 
+        switch (getState()) {
+            case ABORTING, STOPPING, STOPPED -> {
+                logger.debug("[{}] attempted to start while in state [{}].", getJobId(), getState().value());
+                listener.onResponse(false);
+                return;
+            }
+        }
+
         if (context.getAuthState() != null && HealthStatus.RED.equals(context.getAuthState().getStatus())) {
             // AuthorizationState status is RED which means there was permission check error during PUT or _update.
             listener.onFailure(
@@ -543,7 +551,9 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     private void finalizeCheckpoint(ActionListener<Void> listener) {
         try {
             // reset the page size, so we do not memorize a low page size forever
-            context.setPageSize(function.getInitialPageSize());
+            if (function != null) {
+                context.setPageSize(function.getInitialPageSize());
+            }
             // reset the changed bucket to free memory
             if (changeCollector != null) {
                 changeCollector.clear();

+ 168 - 35
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java

@@ -109,7 +109,9 @@ public class TransformIndexerStateTests extends ESTestCase {
 
         private TransformState persistedState;
         private AtomicInteger saveStateListenerCallCount = new AtomicInteger(0);
+        private SearchResponse searchResponse = ONE_HIT_SEARCH_RESPONSE;
         // used for synchronizing with the test
+        private CountDownLatch startLatch;
         private CountDownLatch searchLatch;
         private CountDownLatch doProcessLatch;
 
@@ -163,6 +165,10 @@ public class TransformIndexerStateTests extends ESTestCase {
             return doProcessLatch = new CountDownLatch(count);
         }
 
+        public CountDownLatch createAwaitForStartLatch(int count) {
+            return startLatch = new CountDownLatch(count);
+        }
+
         @Override
         void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener) {
             responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE);
@@ -188,14 +194,24 @@ public class TransformIndexerStateTests extends ESTestCase {
 
         @Override
         protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
-            if (searchLatch != null) {
+            maybeWaitOnLatch(searchLatch);
+            threadPool.generic().execute(() -> nextPhase.onResponse(searchResponse));
+        }
+
+        private static void maybeWaitOnLatch(CountDownLatch countDownLatch) {
+            if (countDownLatch != null) {
                 try {
-                    searchLatch.await();
+                    countDownLatch.await();
                 } catch (InterruptedException e) {
                     throw new IllegalStateException(e);
                 }
             }
-            threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE));
+        }
+
+        @Override
+        protected void onStart(long now, ActionListener<Boolean> listener) {
+            maybeWaitOnLatch(startLatch);
+            super.onStart(now, listener);
         }
 
         @Override
@@ -259,6 +275,10 @@ public class TransformIndexerStateTests extends ESTestCase {
         void validate(ActionListener<ValidateTransformAction.Response> listener) {
             listener.onResponse(null);
         }
+
+        void finishCheckpoint() {
+            searchResponse = null;
+        }
     }
 
     class MockedTransformIndexerForStatePersistenceTesting extends TransformIndexer {
@@ -371,22 +391,7 @@ public class TransformIndexerStateTests extends ESTestCase {
     }
 
     public void testTriggerStatePersistence() {
-        TransformConfig config = new TransformConfig(
-            randomAlphaOfLength(10),
-            randomSourceConfig(),
-            randomDestConfig(),
-            null,
-            new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
-            null,
-            randomPivotConfig(),
-            null,
-            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
-            null,
-            null,
-            null,
-            null,
-            null
-        );
+        TransformConfig config = createTransformConfig();
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.INDEXING);
 
         TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
@@ -452,22 +457,7 @@ public class TransformIndexerStateTests extends ESTestCase {
     }
 
     public void testStopAtCheckpoint() throws Exception {
-        TransformConfig config = new TransformConfig(
-            randomAlphaOfLength(10),
-            randomSourceConfig(),
-            randomDestConfig(),
-            null,
-            new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
-            null,
-            randomPivotConfig(),
-            null,
-            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
-            null,
-            null,
-            null,
-            null,
-            null
-        );
+        TransformConfig config = createTransformConfig();
 
         for (IndexerState state : IndexerState.values()) {
             // skip indexing case, tested below
@@ -684,6 +674,130 @@ public class TransformIndexerStateTests extends ESTestCase {
         }
     }
 
+    /**
+     * Given a started transform
+     * And the indexer thread has not started yet
+     * When a user calls _stop?force=false
+     * Then the indexer thread should exit early
+     */
+    public void testStopBeforeIndexingThreadStarts() throws Exception {
+        var indexer = createMockIndexer(
+            createTransformConfig(),
+            new AtomicReference<>(IndexerState.STARTED),
+            null,
+            threadPool,
+            auditor,
+            null,
+            new TransformIndexerStats(),
+            new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))
+        );
+
+        // stop the indexer thread once it kicks off
+        var startLatch = indexer.createAwaitForStartLatch(1);
+        assertEquals(IndexerState.STARTED, indexer.start());
+        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+        assertEquals(IndexerState.INDEXING, indexer.getState());
+
+        // stop the indexer, equivalent to _stop?force=false
+        assertEquals(IndexerState.STOPPING, indexer.stop());
+        assertEquals(IndexerState.STOPPING, indexer.getState());
+
+        // now let the indexer thread run
+        startLatch.countDown();
+
+        assertBusy(() -> {
+            assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
+            assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(-1L));
+        });
+    }
+
+    /**
+     * Given a started transform
+     * And the indexer thread has not started yet
+     * When a user calls _stop?force=true
+     * Then the indexer thread should exit early
+     */
+    public void testForceStopBeforeIndexingThreadStarts() throws Exception {
+        var indexer = createMockIndexer(
+            createTransformConfig(),
+            new AtomicReference<>(IndexerState.STARTED),
+            null,
+            threadPool,
+            auditor,
+            null,
+            new TransformIndexerStats(),
+            new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))
+        );
+
+        // stop the indexer thread once it kicks off
+        var startLatch = indexer.createAwaitForStartLatch(1);
+        assertEquals(IndexerState.STARTED, indexer.start());
+        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+        assertEquals(IndexerState.INDEXING, indexer.getState());
+
+        // stop the indexer, equivalent to _stop?force=true
+        assertFalse("Transform Indexer thread should still be running", indexer.abort());
+        assertEquals(IndexerState.ABORTING, indexer.getState());
+
+        // now let the indexer thread run
+        startLatch.countDown();
+
+        assertBusy(() -> {
+            assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
+            assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(-1L));
+        });
+    }
+
+    /**
+     * Given a started transform
+     * And the indexer thread has not started yet
+     * When a user calls _stop?wait_for_checkpoint=true
+     * Then the indexer thread should not exit early
+     */
+    public void testStopWaitForCheckpointBeforeIndexingThreadStarts() throws Exception {
+        var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
+        var indexer = createMockIndexer(
+            createTransformConfig(),
+            new AtomicReference<>(IndexerState.STARTED),
+            null,
+            threadPool,
+            auditor,
+            null,
+            new TransformIndexerStats(),
+            context
+        );
+
+        // stop the indexer thread once it kicks off
+        var startLatch = indexer.createAwaitForStartLatch(1);
+        assertEquals(IndexerState.STARTED, indexer.start());
+        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+        assertEquals(IndexerState.INDEXING, indexer.getState());
+
+        // stop the indexer, equivalent to _stop?wait_for_checkpoint=true
+        context.setShouldStopAtCheckpoint(true);
+        CountDownLatch stopLatch = new CountDownLatch(1);
+        countResponse(listener -> setStopAtCheckpoint(indexer, true, listener), stopLatch);
+
+        // now let the indexer thread run
+        indexer.finishCheckpoint();
+        startLatch.countDown();
+
+        // wait for all listeners
+        assertTrue("timed out after 5s", stopLatch.await(5, TimeUnit.SECONDS));
+
+        // there should be no listeners waiting
+        assertEquals(0, indexer.getSaveStateListenerCount());
+
+        // listener must have been called by the indexing thread between timesStopAtCheckpointChanged and 6 times
+        // this is not exact, because we do not know _when_ the other thread persisted the flag
+        assertThat(indexer.getSaveStateListenerCallCount(), lessThanOrEqualTo(1));
+
+        assertBusy(() -> {
+            assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
+            assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(1L));
+        });
+    }
+
     @TestIssueLogging(
         value = "org.elasticsearch.xpack.transform.transforms:DEBUG",
         issueUrl = "https://github.com/elastic/elasticsearch/issues/92069"
@@ -868,4 +982,23 @@ public class TransformIndexerStateTests extends ESTestCase {
         indexer.initialize();
         return indexer;
     }
+
+    private static TransformConfig createTransformConfig() {
+        return new TransformConfig(
+            randomAlphaOfLength(10),
+            randomSourceConfig(),
+            randomDestConfig(),
+            null,
+            new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
+            null,
+            randomPivotConfig(),
+            null,
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
+            null,
+            null,
+            null,
+            null,
+            null
+        );
+    }
 }