Browse Source

[ML] Wait for the worker service to shutdown before closing task processor (#117920) (#118165)

David Kyle 10 months ago
parent
commit
e4b0f8a4d6

+ 6 - 0
docs/changelog/117920.yaml

@@ -0,0 +1,6 @@
+pr: 117920
+summary: Wait for the worker service to shutdown before closing task processor
+area: Machine Learning
+type: bug
+issues:
+ - 117563

+ 12 - 21
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java

@@ -631,12 +631,15 @@ public class DeploymentManager {
             logger.debug(() -> format("[%s] Forcefully stopping process", task.getDeploymentId()));
             prepareInternalStateForShutdown();
 
-            if (priorityProcessWorker.isShutdown()) {
-                // most likely there was a crash or exception that caused the
-                // thread to stop. Notify any waiting requests in the work queue
-                handleAlreadyShuttingDownWorker();
-            } else {
-                priorityProcessWorker.shutdown();
+            priorityProcessWorker.shutdownNow();
+            try {
+                // wait for any currently executing work to finish
+                if (priorityProcessWorker.awaitTermination(10L, TimeUnit.SECONDS)) {
+                    priorityProcessWorker.notifyQueueRunnables();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.info(Strings.format("[%s] Interrupted waiting for process worker after shutdownNow", PROCESS_NAME));
             }
 
             killProcessIfPresent();
@@ -649,12 +652,6 @@ public class DeploymentManager {
             stateStreamer.cancel();
         }
 
-        private void handleAlreadyShuttingDownWorker() {
-            logger.debug(() -> format("[%s] Process worker was already marked for shutdown", task.getDeploymentId()));
-
-            priorityProcessWorker.notifyQueueRunnables();
-        }
-
         private void killProcessIfPresent() {
             try {
                 if (process.get() == null) {
@@ -675,15 +672,7 @@ public class DeploymentManager {
         private synchronized void stopProcessAfterCompletingPendingWork() {
             logger.debug(() -> format("[%s] Stopping process after completing its pending work", task.getDeploymentId()));
             prepareInternalStateForShutdown();
-
-            if (priorityProcessWorker.isShutdown()) {
-                // most likely there was a crash or exception that caused the
-                // thread to stop. Notify any waiting requests in the work queue
-                handleAlreadyShuttingDownWorker();
-            } else {
-                signalAndWaitForWorkerTermination();
-            }
-
+            signalAndWaitForWorkerTermination();
             stopProcessGracefully();
             closeNlpTaskProcessor();
         }
@@ -707,6 +696,8 @@ public class DeploymentManager {
                     throw new TimeoutException(
                         Strings.format("Timed out waiting for process worker to complete for process %s", PROCESS_NAME)
                     );
+                } else {
+                    priorityProcessWorker.notifyQueueRunnables();
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();