1
0
Эх сурвалжийг харах

[ML] Ensure that anomaly detection job state update retries if master node is temoporarily unavailable (#129391)

During cluster upgrade, the anomaly detection jobs must be reassigned from one ML node to another. During this reassignment, the jobs transition through several states, including "opening" and "opened". If, during this transition, the master node becomes temporarily unavailable, e.g., due to reassignment, the new job state is not successfully committed to the cluster state. Therefore, once the new master became available, the cluster state was inconsistent: some anomaly detection jobs were opened, but their state got stuck as "opening".

This PR introduces a retryable action for updating the job state to ensure that the job state is successfully updated and the cluster state remains consistent during the upgrade.

Fixes #126148
Valeriy Khakhutskyy 4 сар өмнө
parent
commit
d487eb5b7c

+ 7 - 0
docs/changelog/129391.yaml

@@ -0,0 +1,7 @@
+pr: 129391
+summary: Ensure that anomaly detection job state update retries if master node is
+  temoporarily unavailable
+area: Machine Learning
+type: bug
+issues:
+ - 126148

+ 57 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.RetryableAction;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -29,6 +30,7 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.indices.InvalidAliasNameException;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -1002,13 +1004,17 @@ public class AutodetectProcessManager implements ClusterStateListener {
 
     void setJobState(JobTask jobTask, JobState state, String reason) {
         JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
-        jobTask.updatePersistentTaskState(
+        // retry state update to ensure that cluster state stays consistent
+        new UpdateStateRetryableAction(
+            logger,
+            threadPool,
+            jobTask,
             jobTaskState,
             ActionListener.wrap(
                 persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()),
                 e -> logSetJobStateFailure(state, jobTask.getJobId(), e)
             )
-        );
+        ).run();
     }
 
     private static void logSetJobStateFailure(JobState state, String jobId, Exception e) {
@@ -1021,7 +1027,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
 
     void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
         JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
-        jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> {
+        // retry state update to ensure that cluster state stays consistent
+        new UpdateStateRetryableAction(logger, threadPool, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> {
             try {
                 handler.accept(null);
             } catch (IOException e1) {
@@ -1033,7 +1040,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
             } catch (IOException e1) {
                 logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
             }
-        }));
+        })).run();
     }
 
     public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatistics(JobTask jobTask) {
@@ -1082,4 +1089,50 @@ public class AutodetectProcessManager implements ClusterStateListener {
         }
         return ByteSizeValue.ofBytes(memoryUsedBytes);
     }
+
+    private static class UpdateStateRetryableAction extends RetryableAction<PersistentTasksCustomMetadata.PersistentTask<?>> {
+
+        private static final int MIN_RETRY_SLEEP_MILLIS = 500;
+        private static final int RETRY_TIMEOUT_SECONDS = 30;
+        private final JobTask jobTask;
+        private final JobTaskState jobTaskState;
+
+        /**
+         * @param logger        The logger (use AutodetectProcessManager.logger)
+         * @param threadPool    The ThreadPool to schedule retries on
+         * @param jobTask       The JobTask whose state we’re updating
+         * @param jobTaskState  The new state to persist
+         */
+        UpdateStateRetryableAction(
+            Logger logger,
+            ThreadPool threadPool,
+            JobTask jobTask,
+            JobTaskState jobTaskState,
+            ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> delegateListener
+        ) {
+            super(
+                logger,
+                threadPool,
+                TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS),
+                TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS),
+                delegateListener,
+                // executor for retries
+                threadPool.generic()
+            );
+            this.jobTask = Objects.requireNonNull(jobTask);
+            this.jobTaskState = Objects.requireNonNull(jobTaskState);
+        }
+
+        @Override
+        public void tryAction(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
+            // this will call back either onResponse(...) or onFailure(...)
+            jobTask.updatePersistentTaskState(jobTaskState, listener);
+        }
+
+        @Override
+        public boolean shouldRetry(Exception e) {
+            // retry everything *except* when the task truly no longer exists
+            return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false;
+        }
+    }
 }

+ 141 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.ml.job.process.autodetect;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -33,6 +34,7 @@ import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskManager;
@@ -91,6 +93,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -262,6 +265,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
             handler.accept(buildAutodetectParams());
             return null;
         }).when(jobResultsProvider).getAutodetectParams(any(), any(), any());
+
+        // when running retry logic use the real executor service
+        when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
     }
 
     public void testOpenJob() {
@@ -854,6 +860,141 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes)));
     }
 
+    public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() {
+        AutodetectProcessManager manager = createSpyManager();
+        JobTask jobTask = mock(JobTask.class);
+        when(jobTask.getAllocationId()).thenReturn(123L);
+        when(jobTask.getJobId()).thenReturn("job-123");
+
+        // call the no-handler overload
+        manager.setJobState(jobTask, JobState.CLOSING, "closing-reason");
+
+        // verify we called updatePersistentTaskState with the expected state
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<JobTaskState> stateCaptor = ArgumentCaptor.forClass(JobTaskState.class);
+        verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any());
+        JobTaskState captured = stateCaptor.getValue();
+        assertEquals(JobState.CLOSING, captured.getState());
+        assertEquals(123L, captured.getAllocationId());
+        assertEquals("closing-reason", captured.getReason());
+    }
+
+    public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException {
+        // This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call
+        // to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null)
+        // (because there was no error).
+        AutodetectProcessManager manager = createSpyManager();
+        JobTask jobTask = mock(JobTask.class);
+
+        // stub updatePersistentTaskState to call onResponse
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
+                PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
+            listener.onResponse(null);
+            return null;
+        }).when(jobTask).updatePersistentTaskState(any(), any());
+
+        AtomicReference<Exception> holder = new AtomicReference<>();
+        CheckedConsumer<Exception, IOException> handler = holder::set;
+
+        manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);
+
+        // onResponse should have driven handler.accept(null)
+        assertNull(holder.get());
+        verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
+    }
+
+    public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException {
+        // Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception
+        when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class)))
+            .thenAnswer(invocation -> {
+                Runnable r = invocation.getArgument(0);
+                r.run();
+                return mock(ThreadPool.Cancellable.class);
+            });
+        AutodetectProcessManager manager = createSpyManager();
+        JobTask jobTask = mock(JobTask.class);
+        ResourceNotFoundException boom = new ResourceNotFoundException("boom");
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
+                (ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
+            listener.onFailure(boom);
+            return null;
+        }).when(jobTask).updatePersistentTaskState(any(), any());
+
+        AtomicReference<Exception> holder = new AtomicReference<>();
+        CheckedConsumer<Exception, IOException> handler = holder::set;
+
+        manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);
+
+        // onFailure should have driven handler.accept(boom)
+        assertSame(boom, holder.get());
+        verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
+    }
+
+    public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException {
+        // Verifies that transient failures are retried until eventual success, and the handler receives null on success
+
+        // ensure that all retries are executed on the same thread for determinism
+        when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> {
+            Runnable r = invocation.getArgument(0);
+            r.run();
+            return mock(ThreadPool.Cancellable.class);
+        });
+        AutodetectProcessManager manager = createSpyManager();
+        JobTask jobTask = mock(JobTask.class);
+        AtomicInteger attempts = new AtomicInteger();
+        doAnswer(invocation -> {
+            // Simulate transient failures for the first two attempts, then succeed on the third
+            @SuppressWarnings("unchecked")
+            ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
+                PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
+            if (attempts.incrementAndGet() < 3) {
+                listener.onFailure(new RuntimeException("transient failure"));
+            } else {
+                listener.onResponse(null);
+            }
+            return null;
+        }).when(jobTask).updatePersistentTaskState(any(), any());
+
+        AtomicReference<Exception> holder = new AtomicReference<>();
+        CheckedConsumer<Exception, IOException> handler = holder::set;
+
+        manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler);
+
+        // confirms that the method was called exactly three times (two failures then one success).
+        verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any());
+        assertNull(holder.get());
+    }
+
+    public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException {
+        // Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry
+        // again but immediately invokes the user’s handler with that exception.
+        AutodetectProcessManager manager = createSpyManager();
+        JobTask jobTask = mock(JobTask.class);
+        ResourceNotFoundException rnfe = new ResourceNotFoundException("not found");
+        doAnswer(invocation -> {
+            // Simulate a ResourceNotFoundException that should not be retried
+            @SuppressWarnings("unchecked")
+            ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
+                PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
+            listener.onFailure(rnfe);
+            return null;
+        }).when(jobTask).updatePersistentTaskState(any(), any());
+
+        AtomicReference<Exception> holder = new AtomicReference<>();
+        CheckedConsumer<Exception, IOException> handler = holder::set;
+
+        manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler);
+
+        // updatePersistentTaskState(...) was invoked exactly once (no retries).
+        verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any());
+        // The handler should have been invoked with the ResourceNotFoundException
+        assertSame(rnfe, holder.get());
+    }
+
     private AutodetectProcessManager createNonSpyManager(String jobId) {
         ExecutorService executorService = mock(ExecutorService.class);
         when(threadPool.executor(anyString())).thenReturn(executorService);