Ver Fonte

Avoid wrapping rejection exception in exchange (#112178)

We should avoid wrapping EsRejectedExecutionException in an 
ElasticsearchException as it would change the status code from 429 to
500. Ideally, we should avoid wrapping exceptions altogether, but that
would require bigger changes.

Closes #112106
Nhat Nguyen há 1 ano atrás
pai
commit
046d6ee754

+ 6 - 0
docs/changelog/112178.yaml

@@ -0,0 +1,6 @@
+pr: 112178
+summary: Avoid wrapping rejection exception in exchange
+area: ES|QL
+type: bug
+issues:
+ - 112106

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

@@ -146,7 +146,7 @@ public abstract class AsyncOperator implements Operator {
         Exception e = failureCollector.getFailure();
         if (e != null) {
             discardPages();
-            throw ExceptionsHelper.convertToElastic(e);
+            throw ExceptionsHelper.convertToRuntime(e);
         }
     }
 

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

@@ -54,7 +54,7 @@ public final class ExchangeSourceHandler {
         private void checkFailure() {
             Exception e = failure.getFailure();
             if (e != null) {
-                throw ExceptionsHelper.convertToElastic(e);
+                throw ExceptionsHelper.convertToRuntime(e);
             }
         }
 

+ 25 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.action;
 
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.TransportAction;
@@ -16,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverStatus;
@@ -30,6 +32,9 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.protocol.xpack.XPackInfoRequest;
 import org.elasticsearch.protocol.xpack.XPackInfoResponse;
 import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
@@ -43,6 +48,7 @@ import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.EnrichPlugin;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.junit.After;
@@ -82,6 +88,7 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
         plugins.add(IngestCommonPlugin.class);
         plugins.add(ReindexPlugin.class);
         plugins.add(InternalTransportSettingPlugin.class);
+        plugins.add(MockTransportService.TestPlugin.class);
         return plugins;
     }
 
@@ -420,6 +427,24 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
         }
     }
 
+    public void testRejection() {
+        for (var ts : internalCluster().getInstances(TransportService.class)) {
+            ((MockTransportService) ts).addRequestHandlingBehavior(EnrichLookupService.LOOKUP_ACTION_NAME, (h, r, channel, t) -> {
+                EsRejectedExecutionException ex = new EsRejectedExecutionException("test", false);
+                channel.sendResponse(new RemoteTransportException("test", ex));
+            });
+        }
+        try {
+            String query = "FROM listen* | " + enrichSongCommand();
+            Exception error = expectThrows(Exception.class, () -> run(query).close());
+            assertThat(ExceptionsHelper.status(error), equalTo(RestStatus.TOO_MANY_REQUESTS));
+        } finally {
+            for (var ts : internalCluster().getInstances(TransportService.class)) {
+                ((MockTransportService) ts).clearAllRules();
+            }
+        }
+    }
+
     public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
 
         public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {

+ 60 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

@@ -8,14 +8,24 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.apache.lucene.tests.util.LuceneTestCase;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.MockSearchService;
 import org.elasticsearch.search.SearchService;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -27,6 +37,10 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 
 /**
  * Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
@@ -38,6 +52,7 @@ public class ManyShardsIT extends AbstractEsqlIntegTestCase {
     protected Collection<Class<? extends Plugin>> getMockPlugins() {
         var plugins = new ArrayList<>(super.getMockPlugins());
         plugins.add(MockSearchService.TestPlugin.class);
+        plugins.add(MockTransportService.TestPlugin.class);
         return plugins;
     }
 
@@ -97,6 +112,51 @@ public class ManyShardsIT extends AbstractEsqlIntegTestCase {
         }
     }
 
+    public void testRejection() throws Exception {
+        String[] nodes = internalCluster().getNodeNames();
+        for (String node : nodes) {
+            MockTransportService ts = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
+            ts.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
+                handler.messageReceived(request, new TransportChannel() {
+                    @Override
+                    public String getProfileName() {
+                        return channel.getProfileName();
+                    }
+
+                    @Override
+                    public void sendResponse(TransportResponse response) {
+                        channel.sendResponse(new RemoteTransportException("simulated", new EsRejectedExecutionException("test queue")));
+                    }
+
+                    @Override
+                    public void sendResponse(Exception exception) {
+                        channel.sendResponse(exception);
+                    }
+                }, task);
+            });
+        }
+        try {
+            AtomicReference<Exception> failure = new AtomicReference<>();
+            EsqlQueryRequest request = new EsqlQueryRequest();
+            request.query("from test-* | stats count(user) by tags");
+            request.acceptedPragmaRisks(true);
+            request.pragmas(randomPragmas());
+            CountDownLatch queryLatch = new CountDownLatch(1);
+            client().execute(EsqlQueryAction.INSTANCE, request, ActionListener.runAfter(ActionListener.wrap(r -> {
+                r.close();
+                throw new AssertionError("expected failure");
+            }, failure::set), queryLatch::countDown));
+            assertTrue(queryLatch.await(10, TimeUnit.SECONDS));
+            assertThat(failure.get(), instanceOf(EsRejectedExecutionException.class));
+            assertThat(ExceptionsHelper.status(failure.get()), equalTo(RestStatus.TOO_MANY_REQUESTS));
+            assertThat(failure.get().getMessage(), equalTo("test queue"));
+        } finally {
+            for (String node : nodes) {
+                ((MockTransportService) internalCluster().getInstance(TransportService.class, node)).clearAllRules();
+            }
+        }
+    }
+
     static class SearchContextCounter {
         private final int maxAllowed;
         private final AtomicInteger current = new AtomicInteger();