1
0
Эх сурвалжийг харах

[ML] Return 429 status when RequestExecutorService queue full (#134178)

Rather than always throwing an ElasticsearchStatusException with 500
status in ActionUtils.wrapFailuresInElasticsearchException(), determine
the appropriate status from the unwrapped exception

- Remove createInternalServerError() method from ActionUtils
- Refactor AlibabaCloudSearch*Action classes to be consistent with
  SenderExecutableAction
- Update tests to account for new behaviour
Donal Evans 1 сар өмнө
parent
commit
92b15a3f96

+ 5 - 0
docs/changelog/134178.yaml

@@ -0,0 +1,5 @@
+pr: 134178
+summary: Return 429 status when `RequestExecutorService` queue full
+area: Machine Learning
+type: bug
+issues: []

+ 5 - 8
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java

@@ -13,7 +13,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.inference.InferenceServiceResults;
-import org.elasticsearch.rest.RestStatus;
 
 public class ActionUtils {
 
@@ -28,19 +27,17 @@ public class ActionUtils {
                 l.onFailure(esException);
             } else {
                 l.onFailure(
-                    createInternalServerError(
-                        unwrappedException,
-                        Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage())
+                    // Determine the appropriate RestStatus from the unwrapped exception, then wrap in an ElasticsearchStatusException
+                    new ElasticsearchStatusException(
+                        Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()),
+                        ExceptionsHelper.status(unwrappedException),
+                        unwrappedException
                     )
                 );
             }
         });
     }
 
-    public static ElasticsearchStatusException createInternalServerError(Throwable e, String message) {
-        return new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, e);
-    }
-
     public static String constructFailedToSendRequestMessage(String message) {
         return Strings.format("Failed to send %s request", message);
     }

+ 5 - 9
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.core.TimeValue;
@@ -27,7 +26,6 @@ import org.elasticsearch.xpack.inference.services.alibabacloudsearch.completion.
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
-import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
 
 public class AlibabaCloudSearchCompletionAction implements ExecutableAction {
@@ -61,16 +59,14 @@ public class AlibabaCloudSearchCompletionAction implements ExecutableAction {
             return;
         }
 
+        ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
+            failedToSendRequestErrorMessage,
+            listener
+        );
         try {
-            ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
-                failedToSendRequestErrorMessage,
-                listener
-            );
             sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
-        } catch (ElasticsearchException e) {
-            listener.onFailure(e);
         } catch (Exception e) {
-            listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
+            wrappedListener.onFailure(e);
         }
     }
 }

+ 5 - 9
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.inference.InferenceServiceResults;
@@ -22,7 +21,6 @@ import org.elasticsearch.xpack.inference.services.alibabacloudsearch.embeddings.
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
-import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
 
 public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction {
@@ -42,16 +40,14 @@ public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction {
 
     @Override
     public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
+        ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
+            failedToSendRequestErrorMessage,
+            listener
+        );
         try {
-            ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
-                failedToSendRequestErrorMessage,
-                listener
-            );
             sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
-        } catch (ElasticsearchException e) {
-            listener.onFailure(e);
         } catch (Exception e) {
-            listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
+            wrappedListener.onFailure(e);
         }
     }
 }

+ 5 - 9
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.inference.InferenceServiceResults;
@@ -24,7 +23,6 @@ import org.elasticsearch.xpack.inference.services.alibabacloudsearch.rerank.Alib
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
-import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
 
 public class AlibabaCloudSearchRerankAction implements ExecutableAction {
@@ -46,16 +44,14 @@ public class AlibabaCloudSearchRerankAction implements ExecutableAction {
 
     @Override
     public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
+        ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
+            failedToSendRequestErrorMessage,
+            listener
+        );
         try {
-            ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
-                failedToSendRequestErrorMessage,
-                listener
-            );
             sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
-        } catch (ElasticsearchException e) {
-            listener.onFailure(e);
         } catch (Exception e) {
-            listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
+            wrappedListener.onFailure(e);
         }
     }
 }

+ 5 - 9
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.inference.InferenceServiceResults;
@@ -24,7 +23,6 @@ import org.elasticsearch.xpack.inference.services.alibabacloudsearch.sparse.Alib
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
-import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
 import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
 
 public class AlibabaCloudSearchSparseAction implements ExecutableAction {
@@ -46,16 +44,14 @@ public class AlibabaCloudSearchSparseAction implements ExecutableAction {
 
     @Override
     public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
+        ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
+            failedToSendRequestErrorMessage,
+            listener
+        );
         try {
-            ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
-                failedToSendRequestErrorMessage,
-                listener
-            );
             sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
-        } catch (ElasticsearchException e) {
-            listener.onFailure(e);
         } catch (Exception e) {
-            listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
+            wrappedListener.onFailure(e);
         }
     }
 }

+ 50 - 12
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java

@@ -10,8 +10,10 @@ package org.elasticsearch.xpack.inference.external.action;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.inference.InferenceServiceResults;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.inference.external.http.sender.InferenceInputs;
 import org.elasticsearch.xpack.inference.external.http.sender.RequestManager;
@@ -89,31 +91,67 @@ public class SenderExecutableActionTests extends ESTestCase {
     @SuppressWarnings("unchecked")
     public void testSendThrowingExceptionIsWrapped() {
         var expectedException = new IllegalStateException("test");
-        var actualException = new AtomicReference<Exception>();
+        var actualExceptionReference = new AtomicReference<Exception>();
 
         doThrow(expectedException).when(sender)
             .send(eq(requestManager), any(InferenceInputs.class), any(TimeValue.class), any(ActionListener.class));
 
-        execute(actualException);
+        execute(actualExceptionReference);
 
-        assertThat(actualException.get(), notNullValue());
-        assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
-        assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
-        assertThat(actualException.get().getCause(), sameInstance(expectedException));
+        Exception actualException = actualExceptionReference.get();
+        assertThat(actualException, notNullValue());
+        assertThat(actualException.getMessage(), is(failureExceptionMessage));
+        assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
+        assertThat(actualException.getCause(), sameInstance(expectedException));
+        assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
     }
 
     public void testSenderReturnedExceptionIsWrapped() {
         var expectedException = new IllegalStateException("test");
-        var actualException = new AtomicReference<Exception>();
+        var actualExceptionReference = new AtomicReference<Exception>();
 
         mockSender(listener -> listener.onFailure(expectedException));
 
-        execute(actualException);
+        execute(actualExceptionReference);
 
-        assertThat(actualException.get(), notNullValue());
-        assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
-        assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
-        assertThat(actualException.get().getCause(), sameInstance(expectedException));
+        Exception actualException = actualExceptionReference.get();
+        assertThat(actualException, notNullValue());
+        assertThat(actualException.getMessage(), is(failureExceptionMessage));
+        assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
+        assertThat(actualException.getCause(), sameInstance(expectedException));
+        assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
+    }
+
+    public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsIllegalArgumentException() {
+        var expectedException = new IllegalArgumentException("test");
+        var actualExceptionReference = new AtomicReference<Exception>();
+
+        mockSender(listener -> listener.onFailure(expectedException));
+
+        execute(actualExceptionReference);
+
+        Exception actualException = actualExceptionReference.get();
+        assertThat(actualException, notNullValue());
+        assertThat(actualException.getMessage(), is(failureExceptionMessage));
+        assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
+        assertThat(actualException.getCause(), sameInstance(expectedException));
+        assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.BAD_REQUEST));
+    }
+
+    public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsEsRejectedExecutionException() {
+        var expectedException = new EsRejectedExecutionException("test");
+        var actualExceptionReference = new AtomicReference<Exception>();
+
+        mockSender(listener -> listener.onFailure(expectedException));
+
+        execute(actualExceptionReference);
+
+        Exception actualException = actualExceptionReference.get();
+        assertThat(actualException, notNullValue());
+        assertThat(actualException.getMessage(), is(failureExceptionMessage));
+        assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
+        assertThat(actualException.getCause(), sameInstance(expectedException));
+        assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.TOO_MANY_REQUESTS));
     }
 
     @SuppressWarnings("unchecked")

+ 2 - 3
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java

@@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
 import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResultsTests.buildExpectationCompletion;
 import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors;
 import static org.elasticsearch.xpack.inference.Utils.mockClusterServiceEmpty;
-import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
 import static org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettingsTests.getSecretSettingsMap;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.ArgumentMatchers.any;
@@ -110,8 +109,8 @@ public class AlibabaCloudSearchCompletionActionTests extends ESTestCase {
         PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
         action.execute(new ChatCompletionInput(List.of(randomAlphaOfLength(10))), InferenceAction.Request.DEFAULT_TIMEOUT, listener);
 
-        var thrownException = expectThrows(ElasticsearchException.class, () -> listener.actionGet(TIMEOUT));
-        assertThat(thrownException.getMessage(), is(constructFailedToSendRequestMessage("AlibabaCloud Search completion")));
+        var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT));
+        assertThat(thrownException.getMessage(), is("Failed to send AlibabaCloud Search completion request. Cause: error"));
     }
 
     public void testExecute_ThrowsIllegalArgumentException_WhenInputIsNotChatCompletionInput() {