소스 검색

[ML] Skip renormalization after node shutdown API called (#89347)

A node can be informed that it is about to be shut down
using the node shutdown API. When this happens we
gracefully stop the jobs on the node and they persist
state. This state persistence includes latest quantiles,
and usually receipt of new quantiles triggers a
renormalization. However, in the case of an impending
node shutdown we do not want to be kicking off new
processes that may delay the shutdown.

This PR changes the anomaly detection job results
processor so that it will not trigger a renormalization
based on quantiles received after a node shutdown message
is received.
David Roberts 3 년 전
부모
커밋
e4ff839e4c

+ 5 - 0
docs/changelog/89347.yaml

@@ -0,0 +1,5 @@
+pr: 89347
+summary: Skip renormalization after node shutdown API called
+area: Machine Learning
+type: enhancement
+issues: []

+ 4 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

@@ -414,4 +414,8 @@ public class AutodetectCommunicator implements Closeable {
         }
         categorizationAnalyzer = new CategorizationAnalyzer(analysisRegistry, categorizationAnalyzerConfig);
     }
+
+    public void setVacating(boolean vacating) {
+        autodetectResultProcessor.setVacating(vacating);
+    }
 }

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -901,6 +901,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
                 if (jobKilled) {
                     communicator.killProcess(true, false, false);
                 } else {
+                    communicator.setVacating(jobTask.isVacating());
                     // communicator.close() may take a long time to run, if the job persists a large model state as a
                     // result of calling it. We want to leave open the option to kill the job during this time, which
                     // is why the allocation ID must remain in the map until after the close is complete.

+ 8 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -95,6 +95,7 @@ public class AutodetectResultProcessor {
     final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
     private final FlushListener flushListener;
     private volatile boolean processKilled;
+    private volatile boolean vacating;
     private volatile boolean failed;
     private final Map<String, ForecastRequestStats> runningForecasts;
     private final long priorRunsBucketCount;
@@ -233,6 +234,7 @@ public class AutodetectResultProcessor {
 
     public void setProcessKilled() {
         processKilled = true;
+        vacating = false;
         try {
             renormalizer.shutdown();
         } catch (InterruptedException e) {
@@ -241,6 +243,10 @@ public class AutodetectResultProcessor {
         }
     }
 
+    public void setVacating(boolean vacating) {
+        this.vacating = vacating;
+    }
+
     void handleOpenForecasts() {
         try {
             if (runningForecasts.isEmpty() == false) {
@@ -360,7 +366,8 @@ public class AutodetectResultProcessor {
             persister.persistQuantiles(quantiles, this::isAlive);
             bulkResultsPersister.executeRequest();
 
-            if (processKilled == false && renormalizer.isEnabled()) {
+            // If a node is trying to shut down then don't trigger any further normalizations on the node
+            if (vacating == false && processKilled == false && renormalizer.isEnabled()) {
                 // We need to make all results written up to these quantiles available for renormalization
                 persister.commitResultWrites(jobId);
                 LOGGER.debug("[{}] Quantiles queued for renormalization", jobId);

+ 32 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -420,6 +420,38 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         assertEquals(1, manager.numberOfOpenJobs());
         manager.closeJob(jobTask, null);
         assertEquals(0, manager.numberOfOpenJobs());
+        verify(autodetectCommunicator).setVacating(false);
+    }
+
+    public void testVacate() {
+        ExecutorService executorService = mock(ExecutorService.class);
+        doAnswer(invocationOnMock -> {
+            ((Runnable) invocationOnMock.getArguments()[0]).run();
+            return null;
+        }).when(executorService).execute(any(Runnable.class));
+        when(threadPool.executor(anyString())).thenReturn(executorService);
+        AutodetectProcessManager manager = createSpyManager();
+        assertEquals(0, manager.numberOfOpenJobs());
+
+        JobTask jobTask = mock(JobTask.class);
+        when(jobTask.getJobId()).thenReturn("foo");
+        when(jobTask.triggerVacate()).thenReturn(true);
+        manager.openJob(jobTask, clusterState, DEFAULT_MASTER_NODE_TIMEOUT, (e, b) -> {});
+        manager.processData(
+            jobTask,
+            analysisRegistry,
+            createInputStream(""),
+            randomFrom(XContentType.values()),
+            mock(DataLoadParams.class),
+            (dataCounts1, e) -> {}
+        );
+
+        // job is created
+        assertEquals(1, manager.numberOfOpenJobs());
+        when(jobTask.isVacating()).thenReturn(true);
+        manager.vacateOpenJobsOnThisNode();
+        assertEquals(0, manager.numberOfOpenJobs());
+        verify(autodetectCommunicator).setVacating(true);
     }
 
     public void testCanCloseClosingJob() throws Exception {