Browse Source

[Transform] rework synchronization (#79858)

Transform uses the synchronized keyword to protect concurrent access in certain places. Unfortunately there are 2 locks, a task lock and a indexer lock. In the past this created some problems ending in the famous dining philosophers problem when both locks got acquired but in different order. This PR reworks synchronization using 1 lock instead of 2. In some places locking wasn't required or the scope could be reduced.
Hendrik Muhs 3 years ago
parent
commit
f90c0f2127

+ 70 - 52
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -55,6 +55,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
     private final AtomicReference<IndexerState> state;
     private final AtomicReference<JobPosition> position;
     private final ThreadPool threadPool;
+    private final Object lock;
 
     // throttling implementation
     private volatile float currentMaxDocsPerSecond;
@@ -97,11 +98,22 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         AtomicReference<IndexerState> initialState,
         JobPosition initialPosition,
         JobStats jobStats
+    ) {
+        this(threadPool, initialState, initialPosition, jobStats, new Object());
+    }
+
+    protected AsyncTwoPhaseIndexer(
+        ThreadPool threadPool,
+        AtomicReference<IndexerState> initialState,
+        JobPosition initialPosition,
+        JobStats jobStats,
+        Object lock
     ) {
         this.threadPool = threadPool;
         this.state = initialState;
         this.position = new AtomicReference<>(initialPosition);
         this.stats = jobStats;
+        this.lock = lock;
     }
 
     /**
@@ -133,7 +145,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      * @return The new state for the indexer (STARTED, INDEXING or ABORTING if the
      *         job was already aborted).
      */
-    public synchronized IndexerState start() {
+    public IndexerState start() {
         state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED);
         return state.get();
     }
@@ -147,7 +159,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      *
      * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
      */
-    public synchronized IndexerState stop() {
+    public IndexerState stop() {
         IndexerState indexerState = state.updateAndGet(previousState -> {
             if (previousState == IndexerState.INDEXING) {
                 return IndexerState.STOPPING;
@@ -174,7 +186,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      * @return true if the indexer is aborted, false if a background job is running
      *         and abort is delayed.
      */
-    public synchronized boolean abort() {
+    public boolean abort() {
         IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING);
         return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED;
     }
@@ -189,57 +201,63 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      *            complete buckets)
      * @return true if a job has been triggered, false otherwise
      */
-    public synchronized boolean maybeTriggerAsyncJob(long now) {
-        final IndexerState currentState = state.get();
-        switch (currentState) {
-            case INDEXING:
-            case STOPPING:
-            case ABORTING:
-                logger.warn(
-                    "Schedule was triggered for job ["
-                        + getJobId()
-                        + "], but prior indexer is still running "
-                        + "(with state ["
-                        + currentState
-                        + "]"
-                );
-                return false;
-
-            case STOPPED:
-                logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped.  Ignoring trigger.");
-                return false;
+    public boolean maybeTriggerAsyncJob(long now) {
+        synchronized (lock) {
+            final IndexerState currentState = state.get();
+            switch (currentState) {
+                case INDEXING:
+                case STOPPING:
+                case ABORTING:
+                    logger.warn(
+                        "Schedule was triggered for job ["
+                            + getJobId()
+                            + "], but prior indexer is still running "
+                            + "(with state ["
+                            + currentState
+                            + "]"
+                    );
+                    return false;
 
-            case STARTED:
-                logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
-                stats.incrementNumInvocations(1);
-
-                if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
-                    // fire off the search. Note this is async, the method will return from here
-                    threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
-                        onStart(now, ActionListener.wrap(r -> {
-                            assert r != null;
-                            if (r) {
-                                nextSearch();
-                            } else {
-                                onFinish(
-                                    ActionListener.wrap(
-                                        onFinishResponse -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure),
-                                        onFinishFailure -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure)
-                                    )
-                                );
-                            }
-                        }, this::finishWithFailure));
-                    });
-                    logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
-                    return true;
-                } else {
-                    logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
+                case STOPPED:
+                    logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped.  Ignoring trigger.");
                     return false;
-                }
 
-            default:
-                logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
-                throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]");
+                case STARTED:
+                    logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
+                    stats.incrementNumInvocations(1);
+
+                    if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
+                        // fire off the search. Note this is async, the method will return from here
+                        threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
+                            onStart(now, ActionListener.wrap(r -> {
+                                assert r != null;
+                                if (r) {
+                                    nextSearch();
+                                } else {
+                                    onFinish(
+                                        ActionListener.wrap(
+                                            onFinishResponse -> doSaveState(
+                                                finishAndSetState(),
+                                                position.get(),
+                                                this::afterFinishOrFailure
+                                            ),
+                                            onFinishFailure -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure)
+                                        )
+                                    );
+                                }
+                            }, this::finishWithFailure));
+                        });
+                        logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
+                        return true;
+                    } else {
+                        logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
+                        return false;
+                    }
+
+                default:
+                    logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
+                    throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]");
+            }
         }
     }
 
@@ -637,7 +655,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         }
     }
 
-    private synchronized void reQueueThrottledSearch() {
+    private void reQueueThrottledSearch() {
         currentMaxDocsPerSecond = getMaxDocsPerSecond();
 
         ScheduledRunnable runnable = scheduledNextSearch;

+ 128 - 114
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -140,7 +140,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         TransformCheckpoint nextCheckpoint,
         TransformContext context
     ) {
-        super(threadPool, initialState, initialPosition, jobStats);
+        // important: note that we pass the context object as lock object
+        super(threadPool, initialState, initialPosition, jobStats, context);
         ExceptionsHelper.requireNonNull(transformServices, "transformServices");
         this.transformsConfigManager = transformServices.getConfigManager();
         this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
@@ -569,36 +570,42 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     }
 
     @Override
-    public synchronized boolean maybeTriggerAsyncJob(long now) {
+    public boolean maybeTriggerAsyncJob(long now) {
+        boolean triggered;
+
         if (context.getTaskState() == TransformTaskState.FAILED) {
             logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());
             return false;
         }
 
-        // ignore trigger if indexer is running, prevents log spam in A2P indexer
-        IndexerState indexerState = getState();
-        if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) {
-            logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState);
-            return false;
-        }
+        synchronized (context) {
+            // ignore trigger if indexer is running, prevents log spam in A2P indexer
+            IndexerState indexerState = getState();
+            if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) {
+                logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState);
+                return false;
+            }
 
-        /*
-         * ignore if indexer thread is shutting down (after finishing a checkpoint)
-         * shutting down means:
-         *  - indexer has finished a checkpoint and called onFinish
-         *  - indexer state has changed from indexing to started
-         *  - state persistence has been called but has _not_ returned yet
-         *
-         *  If we trigger the indexer in this situation the 2nd indexer thread might
-         *  try to save state at the same time, causing a version conflict
-         *  see gh#67121
-         */
-        if (indexerThreadShuttingDown) {
-            logger.debug("[{}] indexer thread is shutting down. Ignoring trigger.", getJobId());
-            return false;
+            /*
+             * ignore if indexer thread is shutting down (after finishing a checkpoint)
+             * shutting down means:
+             *  - indexer has finished a checkpoint and called onFinish
+             *  - indexer state has changed from indexing to started
+             *  - state persistence has been called but has _not_ returned yet
+             *
+             *  If we trigger the indexer in this situation the 2nd indexer thread might
+             *  try to save state at the same time, causing a version conflict
+             *  see gh#67121
+             */
+            if (indexerThreadShuttingDown) {
+                logger.debug("[{}] indexer thread is shutting down. Ignoring trigger.", getJobId());
+                return false;
+            }
+
+            triggered = super.maybeTriggerAsyncJob(now);
         }
 
-        return super.maybeTriggerAsyncJob(now);
+        return triggered;
     }
 
     /**
@@ -790,111 +797,114 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         }
     }
 
-    private synchronized boolean addSetStopAtCheckpointListener(
-        boolean shouldStopAtCheckpoint,
-        ActionListener<Void> shouldStopAtCheckpointListener
-    ) throws InterruptedException {
-        // in case the indexer is already shutting down
-        if (indexerThreadShuttingDown) {
-            context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
-            saveStateRequestedDuringIndexerThreadShutdown = true;
-            return false;
-        }
-
-        IndexerState state = getState();
+    private boolean addSetStopAtCheckpointListener(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener)
+        throws InterruptedException {
 
-        // in case the indexer isn't running, respond immediately
-        if (state == IndexerState.STARTED && context.shouldStopAtCheckpoint() != shouldStopAtCheckpoint) {
-            IndexerState newIndexerState = IndexerState.STARTED;
-            TransformTaskState newtaskState = context.getTaskState();
-
-            // check if the transform is at a checkpoint, if so, we will shortcut and stop it below
-            // otherwise we set shouldStopAtCheckpoint, for this case the transform needs to get
-            // triggered, complete the checkpoint and stop
-            if (shouldStopAtCheckpoint && initialRun()) {
-                newIndexerState = IndexerState.STOPPED;
-                newtaskState = TransformTaskState.STOPPED;
-                logger.debug("[{}] transform is at a checkpoint, initiating stop.", transformConfig.getId());
-            } else {
+        synchronized (context) {
+            // in case the indexer is already shutting down
+            if (indexerThreadShuttingDown) {
                 context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
+                saveStateRequestedDuringIndexerThreadShutdown = true;
+                return false;
             }
 
-            final TransformState newTransformState = new TransformState(
-                newtaskState,
-                newIndexerState,
-                getPosition(),
-                context.getCheckpoint(),
-                context.getStateReason(),
-                getProgress(),
-                null,
-                newIndexerState == IndexerState.STARTED
-            );
+            IndexerState state = getState();
 
-            // because save state is async we need to block the call until state is persisted, so that the job can not
-            // be triggered (ensured by synchronized)
-            CountDownLatch latch = new CountDownLatch(1);
-            logger.debug("[{}] persisting stop at checkpoint", getJobId());
+            // in case the indexer isn't running, respond immediately
+            if (state == IndexerState.STARTED && context.shouldStopAtCheckpoint() != shouldStopAtCheckpoint) {
+                IndexerState newIndexerState = IndexerState.STARTED;
+                TransformTaskState newtaskState = context.getTaskState();
 
-            persistState(newTransformState, ActionListener.wrap(() -> latch.countDown()));
+                // check if the transform is at a checkpoint, if so, we will shortcut and stop it below
+                // otherwise we set shouldStopAtCheckpoint, for this case the transform needs to get
+                // triggered, complete the checkpoint and stop
+                if (shouldStopAtCheckpoint && initialRun()) {
+                    newIndexerState = IndexerState.STOPPED;
+                    newtaskState = TransformTaskState.STOPPED;
+                    logger.debug("[{}] transform is at a checkpoint, initiating stop.", transformConfig.getId());
+                } else {
+                    context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
+                }
 
-            if (latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS) == false) {
-                logger.error(
-                    new ParameterizedMessage(
-                        "[{}] Timed out ({}s) waiting for transform state to be stored.",
-                        getJobId(),
-                        PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
-                    )
+                final TransformState newTransformState = new TransformState(
+                    newtaskState,
+                    newIndexerState,
+                    getPosition(),
+                    context.getCheckpoint(),
+                    context.getStateReason(),
+                    getProgress(),
+                    null,
+                    newIndexerState == IndexerState.STARTED
                 );
-            }
 
-            // stop the transform if the decision was to stop it above
-            if (newtaskState.equals(TransformTaskState.STOPPED)) {
-                context.shutdown();
-            }
+                // because save state is async we need to block the call until state is persisted, so that the job can not
+                // be triggered (ensured by synchronized)
+                CountDownLatch latch = new CountDownLatch(1);
+                logger.debug("[{}] persisting stop at checkpoint", getJobId());
 
-            return false;
-        }
+                persistState(newTransformState, ActionListener.wrap(() -> latch.countDown()));
 
-        if (state != IndexerState.INDEXING) {
-            return false;
-        }
+                if (latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS) == false) {
+                    logger.error(
+                        new ParameterizedMessage(
+                            "[{}] Timed out ({}s) waiting for transform state to be stored.",
+                            getJobId(),
+                            PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
+                        )
+                    );
+                }
 
-        if (saveStateListeners.updateAndGet(currentListeners -> {
-            // check the state again (optimistic locking), while we checked the last time, the indexing thread could have
-            // saved the state and is finishing. As it first set the state and _than_ gets saveStateListeners, it's safe
-            // to just check the indexer state again
-            if (getState() != IndexerState.INDEXING) {
-                return null;
+                // stop the transform if the decision was to stop it above
+                if (newtaskState.equals(TransformTaskState.STOPPED)) {
+                    context.shutdown();
+                }
+
+                return false;
             }
 
-            if (currentListeners == null) {
-                // in case shouldStopAtCheckpoint has already the desired value _and_ we know its _persisted_, respond immediately
-                if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) {
+            if (state != IndexerState.INDEXING) {
+                return false;
+            }
+
+            if (saveStateListeners.updateAndGet(currentListeners -> {
+                // check the state again (optimistic locking), while we checked the last time, the indexing thread could have
+                // saved the state and is finishing. As it first set the state and _than_ gets saveStateListeners, it's safe
+                // to just check the indexer state again
+                if (getState() != IndexerState.INDEXING) {
                     return null;
                 }
 
-                return Collections.singletonList(shouldStopAtCheckpointListener);
+                if (currentListeners == null) {
+                    // in case shouldStopAtCheckpoint has already the desired value _and_ we know its _persisted_, respond immediately
+                    if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) {
+                        return null;
+                    }
+
+                    return Collections.singletonList(shouldStopAtCheckpointListener);
+                }
+                return CollectionUtils.appendToCopy(currentListeners, shouldStopAtCheckpointListener);
+            }) == null) {
+                return false;
             }
-            return CollectionUtils.appendToCopy(currentListeners, shouldStopAtCheckpointListener);
-        }) == null) {
-            return false;
-        }
 
-        context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
+            context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
+        }
         // in case of throttling the indexer might wait for the next search, fast forward, so stop listeners do not wait to long
         runSearchImmediately();
         return true;
     }
 
-    synchronized void stopAndMaybeSaveState() {
-        onStop();
-        IndexerState state = stop();
+    void stopAndMaybeSaveState() {
+        synchronized (context) {
+            onStop();
+            IndexerState state = stop();
 
-        if (indexerThreadShuttingDown) {
-            saveStateRequestedDuringIndexerThreadShutdown = true;
-            // if stop() returned STOPPED we need to persist state, otherwise the indexer does it for us
-        } else if (state == IndexerState.STOPPED) {
-            doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
+            if (indexerThreadShuttingDown) {
+                saveStateRequestedDuringIndexerThreadShutdown = true;
+                // if stop() returned STOPPED we need to persist state, otherwise the indexer does it for us
+            } else if (state == IndexerState.STOPPED) {
+                doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
+            }
         }
     }
 
@@ -1284,20 +1294,24 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         }
     }
 
-    private synchronized void startIndexerThreadShutdown() {
-        indexerThreadShuttingDown = true;
-        saveStateRequestedDuringIndexerThreadShutdown = false;
+    private void startIndexerThreadShutdown() {
+        synchronized (context) {
+            indexerThreadShuttingDown = true;
+            saveStateRequestedDuringIndexerThreadShutdown = false;
+        }
     }
 
-    private synchronized void finishIndexerThreadShutdown() {
-        indexerThreadShuttingDown = false;
-        if (saveStateRequestedDuringIndexerThreadShutdown) {
-            // if stop has been called and set shouldStopAtCheckpoint to true,
-            // we should stop if we just finished a checkpoint
-            if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
-                stop();
+    private void finishIndexerThreadShutdown() {
+        synchronized (context) {
+            indexerThreadShuttingDown = false;
+            if (saveStateRequestedDuringIndexerThreadShutdown) {
+                // if stop has been called and set shouldStopAtCheckpoint to true,
+                // we should stop if we just finished a checkpoint
+                if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
+                    stop();
+                }
+                doSaveState(getState(), getPosition(), () -> {});
             }
-            doSaveState(getState(), getPosition(), () -> {});
         }
     }
 

+ 201 - 189
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -210,7 +210,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
      * @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint
      * @param listener The listener to alert once started
      */
-    synchronized void start(Long startingCheckpoint, ActionListener<StartTransformAction.Response> listener) {
+    void start(Long startingCheckpoint, ActionListener<StartTransformAction.Response> listener) {
         logger.debug("[{}] start called with state [{}].", getTransformId(), getState());
         if (context.getTaskState() == TransformTaskState.FAILED) {
             listener.onFailure(
@@ -221,73 +221,76 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
             );
             return;
         }
-        if (getIndexer() == null) {
-            // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
-            // fully initialized.
-            // If we are NOT failed, then we can assume that `start` was just called early in the process.
-            String msg = context.getTaskState() == TransformTaskState.FAILED
-                ? "It failed during the initialization process; force stop to allow reinitialization."
-                : "Try again later.";
-            listener.onFailure(
-                new ElasticsearchStatusException(
-                    "Task for transform [{}] not fully initialized. {}",
-                    RestStatus.CONFLICT,
-                    getTransformId(),
-                    msg
-                )
-            );
-            return;
-        }
-        final IndexerState newState = getIndexer().start();
-        if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
-            listener.onFailure(
-                new ElasticsearchException("Cannot start task for transform [{}], because state was [{}]", transform.getId(), newState)
-            );
-            return;
-        }
-        context.resetTaskState();
 
-        if (startingCheckpoint != null) {
-            context.setCheckpoint(startingCheckpoint);
-        }
+        synchronized (context) {
+            if (getIndexer() == null) {
+                // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
+                // fully initialized.
+                // If we are NOT failed, then we can assume that `start` was just called early in the process.
+                String msg = context.getTaskState() == TransformTaskState.FAILED
+                    ? "It failed during the initialization process; force stop to allow reinitialization."
+                    : "Try again later.";
+                listener.onFailure(
+                    new ElasticsearchStatusException(
+                        "Task for transform [{}] not fully initialized. {}",
+                        RestStatus.CONFLICT,
+                        getTransformId(),
+                        msg
+                    )
+                );
+                return;
+            }
+            final IndexerState newState = getIndexer().start();
+            if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
+                listener.onFailure(
+                    new ElasticsearchException("Cannot start task for transform [{}], because state was [{}]", transform.getId(), newState)
+                );
+                return;
+            }
+            context.resetTaskState();
 
-        final TransformState state = new TransformState(
-            TransformTaskState.STARTED,
-            IndexerState.STOPPED,
-            getIndexer().getPosition(),
-            context.getCheckpoint(),
-            null,
-            getIndexer().getProgress(),
-            null,
-            context.shouldStopAtCheckpoint()
-        );
+            if (startingCheckpoint != null) {
+                context.setCheckpoint(startingCheckpoint);
+            }
 
-        logger.info("[{}] updating state for transform to [{}].", transform.getId(), state.toString());
-        // Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
-        // This keeps track of STARTED, FAILED, STOPPED
-        // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
-        // we could not read the previous state information from said index.
-        persistStateToClusterState(state, ActionListener.wrap(task -> {
-            auditor.info(transform.getId(), "Updated transform state to [" + state.getTaskState() + "].");
-            long now = System.currentTimeMillis();
-            // kick off the indexer
-            triggered(new Event(schedulerJobName(), now, now));
-            registerWithSchedulerJob();
-            listener.onResponse(new StartTransformAction.Response(true));
-        }, exc -> {
-            auditor.warning(
-                transform.getId(),
-                "Failed to persist to cluster state while marking task as started. Failure: " + exc.getMessage()
-            );
-            logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
-            getIndexer().stop();
-            listener.onFailure(
-                new ElasticsearchException(
-                    "Error while updating state for transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].",
-                    exc
-                )
+            final TransformState state = new TransformState(
+                TransformTaskState.STARTED,
+                IndexerState.STOPPED,
+                getIndexer().getPosition(),
+                context.getCheckpoint(),
+                null,
+                getIndexer().getProgress(),
+                null,
+                context.shouldStopAtCheckpoint()
             );
-        }));
+
+            logger.info("[{}] updating state for transform to [{}].", transform.getId(), state.toString());
+            // Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
+            // This keeps track of STARTED, FAILED, STOPPED
+            // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
+            // we could not read the previous state information from said index.
+            persistStateToClusterState(state, ActionListener.wrap(task -> {
+                auditor.info(transform.getId(), "Updated transform state to [" + state.getTaskState() + "].");
+                long now = System.currentTimeMillis();
+                // kick off the indexer
+                triggered(new Event(schedulerJobName(), now, now));
+                registerWithSchedulerJob();
+                listener.onResponse(new StartTransformAction.Response(true));
+            }, exc -> {
+                auditor.warning(
+                    transform.getId(),
+                    "Failed to persist to cluster state while marking task as started. Failure: " + exc.getMessage()
+                );
+                logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
+                getIndexer().stop();
+                listener.onFailure(
+                    new ElasticsearchException(
+                        "Error while updating state for transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].",
+                        exc
+                    )
+                );
+            }));
+        }
     }
 
     /**
@@ -296,10 +299,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
      * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not
      * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to the state index.
      */
-    public synchronized void setShouldStopAtCheckpoint(
-        boolean shouldStopAtCheckpoint,
-        ActionListener<Void> shouldStopAtCheckpointListener
-    ) {
+    public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) {
         // this should be called from the generic threadpool
         assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
         logger.debug(
@@ -308,20 +308,22 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
             shouldStopAtCheckpoint,
             getState()
         );
-        if (context.getTaskState() != TransformTaskState.STARTED || getIndexer() == null) {
-            shouldStopAtCheckpointListener.onResponse(null);
-            return;
-        }
+        synchronized (context) {
+            if (context.getTaskState() != TransformTaskState.STARTED || getIndexer() == null) {
+                shouldStopAtCheckpointListener.onResponse(null);
+                return;
+            }
 
-        if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) {
-            shouldStopAtCheckpointListener.onResponse(null);
-            return;
-        }
+            if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) {
+                shouldStopAtCheckpointListener.onResponse(null);
+                return;
+            }
 
-        getIndexer().setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
+            getIndexer().setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
+        }
     }
 
-    public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
+    public void stop(boolean force, boolean shouldStopAtCheckpoint) {
         logger.debug(
             "[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}], indexerstate[{}]",
             getTransformId(),
@@ -331,49 +333,53 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
             getIndexer() != null ? getIndexer().getState() : null
         );
 
-        if (context.getTaskState() == TransformTaskState.FAILED && force == false) {
-            throw new ElasticsearchStatusException(
-                TransformMessages.getMessage(CANNOT_STOP_FAILED_TRANSFORM, getTransformId(), context.getStateReason()),
-                RestStatus.CONFLICT
-            );
-        }
+        synchronized (context) {
+            if (context.getTaskState() == TransformTaskState.FAILED && force == false) {
+                throw new ElasticsearchStatusException(
+                    TransformMessages.getMessage(CANNOT_STOP_FAILED_TRANSFORM, getTransformId(), context.getStateReason()),
+                    RestStatus.CONFLICT
+                );
+            }
 
-        // cleanup potentially failed state.
-        boolean wasFailed = context.setTaskState(TransformTaskState.FAILED, TransformTaskState.STARTED);
-        context.resetReasonAndFailureCounter();
+            // cleanup potentially failed state.
+            boolean wasFailed = context.setTaskState(TransformTaskState.FAILED, TransformTaskState.STARTED);
+            context.resetReasonAndFailureCounter();
 
-        if (getIndexer() == null) {
-            // If there is no indexer the task has not been triggered
-            // but it still needs to be stopped and removed
-            shutdown();
-            return;
-        }
+            if (getIndexer() == null) {
+                // If there is no indexer the task has not been triggered
+                // but it still needs to be stopped and removed
+                shutdown();
+                return;
+            }
 
-        // If state was in a failed state, we should stop immediately
-        if (wasFailed) {
-            getIndexer().stopAndMaybeSaveState();
-            return;
-        }
+            // If state was in a failed state, we should stop immediately
+            if (wasFailed) {
+                getIndexer().stopAndMaybeSaveState();
+                return;
+            }
 
-        IndexerState indexerState = getIndexer().getState();
+            IndexerState indexerState = getIndexer().getState();
 
-        if (indexerState == IndexerState.STOPPED || indexerState == IndexerState.STOPPING) {
-            return;
-        }
+            if (indexerState == IndexerState.STOPPED || indexerState == IndexerState.STOPPING) {
+                return;
+            }
 
-        // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after).
-        // if it is false, stop immediately
-        if (shouldStopAtCheckpoint == false ||
-        // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint,
-        // or has yet to even start one.
-        // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
-            (indexerState == IndexerState.STARTED && getIndexer().initialRun())) {
-            getIndexer().stopAndMaybeSaveState();
+            // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after).
+            // if it is false, stop immediately
+            if (shouldStopAtCheckpoint == false ||
+            // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint,
+            // or has yet to even start one.
+            // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
+                (indexerState == IndexerState.STARTED && getIndexer().initialRun())) {
+                getIndexer().stopAndMaybeSaveState();
+            }
         }
     }
 
-    public synchronized void applyNewSettings(SettingsConfig newSettings) {
-        getIndexer().applyNewSettings(newSettings);
+    public void applyNewSettings(SettingsConfig newSettings) {
+        synchronized (context) {
+            getIndexer().applyNewSettings(newSettings);
+        }
     }
 
     @Override
@@ -387,43 +393,45 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
     }
 
     @Override
-    public synchronized void triggered(Event event) {
+    public void triggered(Event event) {
         // Ignore if event is not for this job
         if (event.getJobName().equals(schedulerJobName()) == false) {
             return;
         }
 
-        if (getIndexer() == null) {
-            logger.warn("[{}] transform task triggered with an unintialized indexer.", getTransformId());
-            return;
-        }
+        synchronized (context) {
+            if (getIndexer() == null) {
+                logger.warn("[{}] transform task triggered with an unintialized indexer.", getTransformId());
+                return;
+            }
 
-        if (context.getTaskState() == TransformTaskState.FAILED || context.getTaskState() == TransformTaskState.STOPPED) {
-            logger.debug(
-                "[{}] schedule was triggered for transform but task is [{}]. Ignoring trigger.",
-                getTransformId(),
-                context.getTaskState()
-            );
-            return;
-        }
+            if (context.getTaskState() == TransformTaskState.FAILED || context.getTaskState() == TransformTaskState.STOPPED) {
+                logger.debug(
+                    "[{}] schedule was triggered for transform but task is [{}]. Ignoring trigger.",
+                    getTransformId(),
+                    context.getTaskState()
+                );
+                return;
+            }
 
-        // ignore trigger if indexer is running or completely stopped
-        IndexerState indexerState = getIndexer().getState();
-        if (IndexerState.INDEXING.equals(indexerState)
-            || IndexerState.STOPPING.equals(indexerState)
-            || IndexerState.STOPPED.equals(indexerState)) {
-            logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState);
-            return;
-        }
+            // ignore trigger if indexer is running or completely stopped
+            IndexerState indexerState = getIndexer().getState();
+            if (IndexerState.INDEXING.equals(indexerState)
+                || IndexerState.STOPPING.equals(indexerState)
+                || IndexerState.STOPPED.equals(indexerState)) {
+                logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState);
+                return;
+            }
 
-        logger.debug("[{}] transform indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState);
+            logger.debug("[{}] transform indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState);
 
-        // if it runs for the 1st time we just do it, if not we check for changes
-        if (context.getCheckpoint() == 0) {
-            logger.debug("[{}] trigger initial run.", getTransformId());
-            getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
-        } else if (getIndexer().isContinuous()) {
-            getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
+            // if it runs for the 1st time we just do it, if not we check for changes
+            if (context.getCheckpoint() == 0) {
+                logger.debug("[{}] trigger initial run.", getTransformId());
+                getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
+            } else if (getIndexer().isContinuous()) {
+                getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
+            }
         }
     }
 
@@ -438,7 +446,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
      * This tries to remove the job from the scheduler and completes the persistent task
      */
     @Override
-    public synchronized void shutdown() {
+    public void shutdown() {
         logger.debug("[{}] shutdown of transform requested", transform.getId());
         deregisterSchedulerJob();
         markAsCompleted();
@@ -455,52 +463,55 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
     }
 
     @Override
-    public synchronized void fail(String reason, ActionListener<Void> listener) {
-        // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
-        // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
-        if (context.getTaskState() == TransformTaskState.FAILED) {
-            logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason);
-            listener.onResponse(null);
-            return;
-        }
-        // If the indexer is `STOPPING` this means that `TransformTask#stop` was called previously, but something caused
-        // the indexer to fail. Since `ClientTransformIndexer#doSaveState` will persist the state to the index once the indexer stops,
-        // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
-        if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
-            logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
-            listener.onResponse(null);
-            return;
-        }
-        // If we are stopped, this means that between the failure occurring and being handled, somebody called stop
-        // We should just allow that stop to continue
-        if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) {
-            logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason);
-            listener.onResponse(null);
-            return;
-        }
+    public void fail(String reason, ActionListener<Void> listener) {
+        synchronized (context) {
+            // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
+            // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
+            if (context.getTaskState() == TransformTaskState.FAILED) {
+                logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason);
+                listener.onResponse(null);
+                return;
+            }
+            // If the indexer is `STOPPING` this means that `TransformTask#stop` was called previously, but something caused
+            // the indexer to fail. Since `ClientTransformIndexer#doSaveState` will persist the state to the index once the indexer stops,
+            // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
+            if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
+                logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
+                listener.onResponse(null);
+                return;
+            }
+            // If we are stopped, this means that between the failure occurring and being handled, somebody called stop
+            // We should just allow that stop to continue
+            if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) {
+                logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason);
+                listener.onResponse(null);
+                return;
+            }
 
-        logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
-        auditor.error(transform.getId(), reason);
-        // We should not keep retrying. Either the task will be stopped, or started
-        // If it is started again, it is registered again.
-        deregisterSchedulerJob();
-        // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again,
-        // we should set this flag to false.
-        context.setShouldStopAtCheckpoint(false);
-
-        // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true
-        context.setTaskStateToFailed(reason);
-        TransformState newState = getState();
-        // Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
-        // This keeps track of STARTED, FAILED, STOPPED
-        // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that
-        // we could not read the previous state information from said index.
-        persistStateToClusterState(newState, ActionListener.wrap(r -> listener.onResponse(null), e -> {
-            String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "].";
-            auditor.warning(transform.getId(), msg + " Failure: " + e.getMessage());
-            logger.error(new ParameterizedMessage("[{}] {}", getTransformId(), msg), e);
-            listener.onFailure(e);
-        }));
+            logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
+            auditor.error(transform.getId(), reason);
+            // We should not keep retrying. Either the task will be stopped, or started
+            // If it is started again, it is registered again.
+            deregisterSchedulerJob();
+            // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again,
+            // we should set this flag to false.
+            context.setShouldStopAtCheckpoint(false);
+
+            // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true
+            context.setTaskStateToFailed(reason);
+            TransformState newState = getState();
+            // Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
+            // This keeps track of STARTED, FAILED, STOPPED
+            // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply
+            // that
+            // we could not read the previous state information from said index.
+            persistStateToClusterState(newState, ActionListener.wrap(r -> listener.onResponse(null), e -> {
+                String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "].";
+                auditor.warning(transform.getId(), msg + " Failure: " + e.getMessage());
+                logger.error(new ParameterizedMessage("[{}] {}", getTransformId(), msg), e);
+                listener.onFailure(e);
+            }));
+        }
     }
 
     /**
@@ -509,9 +520,10 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
      * shut down from the inside.
      */
     @Override
-    public synchronized void onCancelled() {
+    public void onCancelled() {
         logger.info("[{}] received cancellation request for transform, state: [{}].", getTransformId(), context.getTaskState());
-        if (getIndexer() != null && getIndexer().abort()) {
+        ClientTransformIndexer theIndexer = getIndexer();
+        if (theIndexer != null && theIndexer.abort()) {
             // there is no background transform running, we can shutdown safely
             shutdown();
         }
@@ -544,7 +556,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
         };
     }
 
-    synchronized void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) {
+    void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) {
         indexer.set(indexerBuilder.build(getThreadPool(), context));
     }