浏览代码

Unwrap transport exceptions in exchange service (#105431)

We should unwrap TransportException errors; otherwise, we can return 
them to the caller instead of the actual underlying cause. This becomes
important when the underlying cause is a 4xx error, while 
TransportException is a 5xx error. I found this when running the
heap-attack tests
Nhat Nguyen 1 年之前
父节点
当前提交
b8d708bb96

+ 6 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.operator.exchange;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.SubscribableListener;
@@ -14,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.tasks.TaskCancelledException;
+import org.elasticsearch.transport.TransportException;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -181,7 +183,10 @@ public final class ExchangeSourceHandler {
             loopControl.exited();
         }
 
-        void onSinkFailed(Exception e) {
+        void onSinkFailed(Exception originEx) {
+            final Exception e = originEx instanceof TransportException
+                ? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
+                : originEx;
             failure.getAndUpdate(first -> {
                 if (first == null) {
                     return e;

+ 18 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Strings;
@@ -20,6 +21,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 
 import java.util.ArrayList;
@@ -76,18 +78,31 @@ public class EsqlActionBreakerIT extends EsqlActionIT {
             .build();
     }
 
-    @Override
-    protected EsqlQueryResponse run(EsqlQueryRequest request) {
+    private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws CircuitBreakingException {
         setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048)));
         try {
             return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
         } catch (Exception e) {
             logger.info("request failed", e);
             ensureBlocksReleased();
+            throw e;
         } finally {
             setRequestCircuitBreakerLimit(null);
         }
-        return super.run(request);
+    }
+
+    @Override
+    protected EsqlQueryResponse run(EsqlQueryRequest request) {
+        try {
+            return runWithBreaking(request);
+        } catch (Exception e) {
+            try (EsqlQueryResponse resp = super.run(request)) {
+                assertThat(e, instanceOf(CircuitBreakingException.class));
+                assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
+                resp.incRef();
+                return resp;
+            }
+        }
     }
 
     /**