Browse Source

Clarify double-response exception (#124809)

It is confusing to readers to report `Channel is already closed` in
reaction to a double-response bug, and this may be interpreted as a
networking issue. We're not really closing anything here, and it's a
definite logic bug to call `sendResponse()` twice, so this commit
clarifies the actual problem in the exception message.

Relates ES-10996

Backport of #124706 to `8.x`
David Turner 7 months ago
parent
commit
49f686318e

+ 12 - 10
server/src/main/java/org/elasticsearch/rest/RestController.java

@@ -909,12 +909,15 @@ public class RestController implements HttpServerTransport.Dispatcher {
         }
     }
 
+    // exposed for tests
+    static boolean PERMIT_DOUBLE_RESPONSE = false;
+
     private static final class ResourceHandlingHttpChannel extends DelegatingRestChannel {
         private final CircuitBreakerService circuitBreakerService;
         private final int contentLength;
         private final HttpRouteStatsTracker statsTracker;
         private final long startTime;
-        private final AtomicBoolean closed = new AtomicBoolean();
+        private final AtomicBoolean responseSent = new AtomicBoolean();
 
         ResourceHandlingHttpChannel(
             RestChannel delegate,
@@ -933,7 +936,14 @@ public class RestController implements HttpServerTransport.Dispatcher {
         public void sendResponse(RestResponse response) {
             boolean success = false;
             try {
-                close();
+                // protect against double-response bugs
+                if (responseSent.compareAndSet(false, true) == false) {
+                    final var message = "have already sent a response to this request, cannot send another";
+                    assert PERMIT_DOUBLE_RESPONSE : message;
+                    throw new IllegalStateException(message);
+                }
+                inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
+
                 statsTracker.addRequestStats(contentLength);
                 statsTracker.addResponseTime(rawRelativeTimeInMillis() - startTime);
                 if (response.isChunked() == false) {
@@ -964,14 +974,6 @@ public class RestController implements HttpServerTransport.Dispatcher {
         private static long rawRelativeTimeInMillis() {
             return TimeValue.nsecToMSec(System.nanoTime());
         }
-
-        private void close() {
-            // attempt to close once atomically
-            if (closed.compareAndSet(false, true) == false) {
-                throw new IllegalStateException("Channel is already closed");
-            }
-            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
-        }
     }
 
     private static class ResponseLengthRecorder extends AtomicReference<HttpRouteStatsTracker> implements Releasable {

+ 39 - 29
server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

@@ -468,40 +468,50 @@ public class RestControllerTests extends ESTestCase {
     }
 
     public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
-        int contentLength = BREAKER_LIMIT.bytesAsInt();
-        String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
-        // we will produce an error in the rest handler and one more when sending the error response
-        RestRequest request = testRestRequest("/error", content, XContentType.JSON);
-        ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
-
-        restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());
-
-        assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
-        assertEquals(0, inFlightRequestsBreaker.getUsed());
+        try {
+            RestController.PERMIT_DOUBLE_RESPONSE = true;
+            int contentLength = BREAKER_LIMIT.bytesAsInt();
+            String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
+            // we will produce an error in the rest handler and one more when sending the error response
+            RestRequest request = testRestRequest("/error", content, XContentType.JSON);
+            ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
+
+            restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());
+
+            assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
+            assertEquals(0, inFlightRequestsBreaker.getUsed());
+        } finally {
+            RestController.PERMIT_DOUBLE_RESPONSE = false;
+        }
     }
 
     public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnErrorDuringSend() {
-        int contentLength = Math.toIntExact(BREAKER_LIMIT.getBytes());
-        String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
-        // use a real recycler that tracks leaks and create some content bytes in the test handler to check for leaks
-        final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
-        restController.registerHandler(
-            new Route(GET, "/foo"),
-            (request, c, client) -> new RestToXContentListener<>(c).onResponse((b, p) -> b.startObject().endObject())
-        );
-        // we will produce an error in the rest handler and one more when sending the error response
-        RestRequest request = testRestRequest("/foo", content, XContentType.JSON);
-        ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true) {
-            @Override
-            protected BytesStream newBytesOutput() {
-                return new RecyclerBytesStreamOutput(recycler);
-            }
-        };
+        try {
+            RestController.PERMIT_DOUBLE_RESPONSE = true;
+            int contentLength = Math.toIntExact(BREAKER_LIMIT.getBytes());
+            String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
+            // use a real recycler that tracks leaks and create some content bytes in the test handler to check for leaks
+            final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
+            restController.registerHandler(
+                new Route(GET, "/foo"),
+                (request, c, client) -> new RestToXContentListener<>(c).onResponse((b, p) -> b.startObject().endObject())
+            );
+            // we will produce an error in the rest handler and one more when sending the error response
+            RestRequest request = testRestRequest("/foo", content, XContentType.JSON);
+            ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true) {
+                @Override
+                protected BytesStream newBytesOutput() {
+                    return new RecyclerBytesStreamOutput(recycler);
+                }
+            };
 
-        restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());
+            restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());
 
-        assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
-        assertEquals(0, inFlightRequestsBreaker.getUsed());
+            assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
+            assertEquals(0, inFlightRequestsBreaker.getUsed());
+        } finally {
+            RestController.PERMIT_DOUBLE_RESPONSE = false;
+        }
     }
 
     public void testDispatchRequestLimitsBytes() {