浏览代码

[ML] Inference service should reject tasks during shutdown (#105213)

* Fixing inference shutdown bug

* Update docs/changelog/105213.yaml

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Jonathan Buttner 1 年之前
父节点
当前提交
bb016bdbe9

+ 5 - 0
docs/changelog/105213.yaml

@@ -0,0 +1,5 @@
+pr: 105213
+summary: Inference service should reject tasks during shutdown
+area: Machine Learning
+type: bug
+issues: []

+ 2 - 0
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java

@@ -186,8 +186,10 @@ class RequestExecutorService implements RequestExecutor {
                 command.run();
             }
 
+            // TODO add logic to complete pending items in the queue before shutting down
             if (running.get() == false) {
                 logger.debug(() -> format("Http executor service [%s] exiting", serviceName));
+                rejectTaskBecauseOfShutdown(task);
             } else {
                 executeTask(task);
             }

+ 59 - 11
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xpack.inference.external.http.HttpResult;
 import org.elasticsearch.xpack.inference.external.request.HttpRequestTests;
 import org.junit.After;
 import org.junit.Before;
+import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
@@ -93,16 +94,18 @@ public class RequestExecutorServiceTests extends ESTestCase {
 
     public void testIsTerminated_AfterStopFromSeparateThread() throws Exception {
         var waitToShutdown = new CountDownLatch(1);
+        var waitToReturnFromSend = new CountDownLatch(1);
 
         var mockHttpClient = mock(HttpClient.class);
         doAnswer(invocation -> {
             waitToShutdown.countDown();
+            waitToReturnFromSend.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
             return Void.TYPE;
         }).when(mockHttpClient).send(any(), any(), any());
 
         var service = createRequestExecutorService(mockHttpClient, null);
 
-        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, service);
+        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, waitToReturnFromSend, service);
 
         PlainActionFuture<HttpResult> listener = new PlainActionFuture<>();
         service.execute(HttpRequestTests.createMock("inferenceEntityId"), null, listener);
@@ -277,9 +280,43 @@ public class RequestExecutorServiceTests extends ESTestCase {
         verify(queue, times(1)).take();
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105155")
+    public void testQueueTake_RejectsTask_WhenServiceShutsDown() throws Exception {
+        var mockTask = mock(AbstractRunnable.class);
+        @SuppressWarnings("unchecked")
+        BlockingQueue<AbstractRunnable> queue = mock(LinkedBlockingQueue.class);
+
+        var service = new RequestExecutorService(
+            "test_service",
+            mock(HttpClient.class),
+            threadPool,
+            mockQueueCreator(queue),
+            null,
+            createRequestExecutorServiceSettingsEmpty()
+        );
+
+        doAnswer(invocation -> {
+            service.shutdown();
+            return mockTask;
+        }).doReturn(new NoopTask()).when(queue).take();
+
+        service.start();
+
+        assertTrue(service.isTerminated());
+        verify(queue, times(1)).take();
+
+        ArgumentCaptor<Exception> argument = ArgumentCaptor.forClass(Exception.class);
+        verify(mockTask, times(1)).onRejection(argument.capture());
+        assertThat(argument.getValue(), instanceOf(EsRejectedExecutionException.class));
+        assertThat(
+            argument.getValue().getMessage(),
+            is("Failed to send request, queue service [test_service] has shutdown prior to executing request")
+        );
+
+        var rejectionException = (EsRejectedExecutionException) argument.getValue();
+        assertTrue(rejectionException.isExecutorShutdown());
+    }
+
     public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, InterruptedException, TimeoutException, IOException {
-        var waitToShutdown = new CountDownLatch(1);
         var httpClient = mock(HttpClient.class);
 
         var settings = createRequestExecutorServiceSettings(1);
@@ -299,13 +336,16 @@ public class RequestExecutorServiceTests extends ESTestCase {
 
         settings.setQueueCapacity(2);
 
+        var waitToShutdown = new CountDownLatch(1);
+        var waitToReturnFromSend = new CountDownLatch(1);
         // There is a request already queued, and its execution path will initiate shutting down the service
         doAnswer(invocation -> {
             waitToShutdown.countDown();
+            waitToReturnFromSend.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
             return Void.TYPE;
         }).when(httpClient).send(any(), any(), any());
 
-        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, service);
+        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, waitToReturnFromSend, service);
 
         service.start();
 
@@ -314,10 +354,8 @@ public class RequestExecutorServiceTests extends ESTestCase {
         assertThat(service.remainingQueueCapacity(), is(2));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105155")
     public void testChangingCapacity_DoesNotRejectsOverflowTasks_BecauseOfQueueFull() throws IOException, ExecutionException,
         InterruptedException, TimeoutException {
-        var waitToShutdown = new CountDownLatch(1);
         var httpClient = mock(HttpClient.class);
 
         var settings = createRequestExecutorServiceSettings(3);
@@ -332,13 +370,16 @@ public class RequestExecutorServiceTests extends ESTestCase {
 
         settings.setQueueCapacity(1);
 
+        var waitToShutdown = new CountDownLatch(1);
+        var waitToReturnFromSend = new CountDownLatch(1);
         // There is a request already queued, and its execution path will initiate shutting down the service
         doAnswer(invocation -> {
             waitToShutdown.countDown();
+            waitToReturnFromSend.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
             return Void.TYPE;
         }).when(httpClient).send(any(), any(), any());
 
-        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, service);
+        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, waitToReturnFromSend, service);
 
         service.start();
 
@@ -358,10 +399,8 @@ public class RequestExecutorServiceTests extends ESTestCase {
         assertTrue(thrownException.isExecutorShutdown());
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105155")
     public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IOException, ExecutionException, InterruptedException,
         TimeoutException {
-        var waitToShutdown = new CountDownLatch(1);
         var httpClient = mock(HttpClient.class);
 
         var settings = createRequestExecutorServiceSettings(1);
@@ -381,13 +420,16 @@ public class RequestExecutorServiceTests extends ESTestCase {
 
         settings.setQueueCapacity(0);
 
+        var waitToShutdown = new CountDownLatch(1);
+        var waitToReturnFromSend = new CountDownLatch(1);
         // There is a request already queued, and its execution path will initiate shutting down the service
         doAnswer(invocation -> {
             waitToShutdown.countDown();
+            waitToReturnFromSend.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
             return Void.TYPE;
         }).when(httpClient).send(any(), any(), any());
 
-        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, service);
+        Future<?> executorTermination = submitShutdownRequest(waitToShutdown, waitToReturnFromSend, service);
 
         service.start();
 
@@ -396,12 +438,18 @@ public class RequestExecutorServiceTests extends ESTestCase {
         assertThat(service.remainingQueueCapacity(), is(Integer.MAX_VALUE));
     }
 
-    private Future<?> submitShutdownRequest(CountDownLatch waitToShutdown, RequestExecutorService service) {
+    private Future<?> submitShutdownRequest(
+        CountDownLatch waitToShutdown,
+        CountDownLatch waitToReturnFromSend,
+        RequestExecutorService service
+    ) {
         return threadPool.generic().submit(() -> {
             try {
                 // wait for a task to be added to be executed before beginning shutdown
                 waitToShutdown.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                 service.shutdown();
+                // tells send to return
+                waitToReturnFromSend.countDown();
                 service.awaitTermination(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
             } catch (Exception e) {
                 fail(Strings.format("Failed to shutdown executor: %s", e));