Przeglądaj źródła

Wait indefintely for http connections on shutdown by default (#106511)

This commit changes the meaning of the http shutdown timeout when set to
0. Previously the shutdown would proceed immediately, killing active
http connections. With this change 0 means no timeout is used, but
active connections are still waited on, indefinitely. This new behavior
is what was originally intended for the default value of the shutdown
timeout setting.
Ryan Ernst 1 rok temu
rodzic
commit
2b67444a46

+ 5 - 0
docs/changelog/106511.yaml

@@ -0,0 +1,5 @@
+pr: 106511
+summary: Wait indefintely for http connections on shutdown by default
+area: Infra/Node Lifecycle
+type: bug
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -313,6 +313,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE,
         HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
         HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD,
+        HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_POLL_PERIOD,
         HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT,
         HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
         HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY,

+ 28 - 6
server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

@@ -70,6 +70,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
 import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
 import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
 import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD;
+import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_POLL_PERIOD;
 
 public abstract class AbstractHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
     private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class);
@@ -95,6 +96,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
     private final RefCounted refCounted = AbstractRefCounted.of(() -> allClientsClosedListener.onResponse(null));
     private final Set<HttpServerChannel> httpServerChannels = ConcurrentCollections.newConcurrentSet();
     private final long shutdownGracePeriodMillis;
+    private final long shutdownPollPeriodMillis;
     private final HttpClientStatsTracker httpClientStatsTracker;
 
     private final HttpTracer httpLogger;
@@ -146,6 +148,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
         slowLogThresholdMs = TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings).getMillis();
         httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool);
         shutdownGracePeriodMillis = SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD.get(settings).getMillis();
+        shutdownPollPeriodMillis = SETTING_HTTP_SERVER_SHUTDOWN_POLL_PERIOD.get(settings).getMillis();
     }
 
     public Recycler<BytesRef> recycler() {
@@ -272,17 +275,36 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
 
         boolean closed = false;
 
+        long pollTimeMillis = shutdownPollPeriodMillis;
         if (shutdownGracePeriodMillis > 0) {
+            if (shutdownGracePeriodMillis < pollTimeMillis) {
+                pollTimeMillis = shutdownGracePeriodMillis;
+            }
+            logger.debug(format("waiting [%d]ms for clients to close connections", shutdownGracePeriodMillis));
+        } else {
+            logger.debug("waiting indefinitely for clients to close connections");
+        }
+
+        long startPollTimeMillis = System.currentTimeMillis();
+        do {
             try {
-                logger.debug(format("waiting [%d]ms for clients to close connections", shutdownGracePeriodMillis));
-                FutureUtils.get(allClientsClosedListener, shutdownGracePeriodMillis, TimeUnit.MILLISECONDS);
+                FutureUtils.get(allClientsClosedListener, pollTimeMillis, TimeUnit.MILLISECONDS);
                 closed = true;
             } catch (ElasticsearchTimeoutException t) {
-                logger.warn(format("timed out while waiting [%d]ms for clients to close connections", shutdownGracePeriodMillis));
+                logger.info(format("still waiting on %d client connections to close", httpChannels.size()));
+                if (shutdownGracePeriodMillis > 0) {
+                    long endPollTimeMillis = System.currentTimeMillis();
+                    long remainingGracePeriodMillis = shutdownGracePeriodMillis - (endPollTimeMillis - startPollTimeMillis);
+                    if (remainingGracePeriodMillis <= 0) {
+                        logger.warn(format("timed out while waiting [%d]ms for clients to close connections", shutdownGracePeriodMillis));
+                        break;
+                    } else if (remainingGracePeriodMillis < pollTimeMillis) {
+                        pollTimeMillis = remainingGracePeriodMillis;
+                    }
+                }
             }
-        } else {
-            logger.debug("closing all client connections immediately");
-        }
+        } while (closed == false);
+
         if (closed == false) {
             try {
                 CloseableChannel.closeChannels(new ArrayList<>(httpChannels.values()), true);

+ 6 - 0
server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java

@@ -132,6 +132,12 @@ public final class HttpTransportSettings {
         Setting.Property.NodeScope
     );
 
+    public static final Setting<TimeValue> SETTING_HTTP_SERVER_SHUTDOWN_POLL_PERIOD = Setting.positiveTimeSetting(
+        "http.shutdown_poll_period",
+        TimeValue.timeValueMinutes(5),
+        Setting.Property.NodeScope
+    );
+
     // don't reset cookies by default, since I don't think we really need to
     // note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookies
     public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES = Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);

+ 51 - 12
server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java

@@ -918,8 +918,8 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
         }
     }
 
-    public void testStopDoesntWaitIfGraceIsZero() {
-        try (var noWait = LogExpectation.unexpectWait(); var transport = new TestHttpServerTransport(Settings.EMPTY)) {
+    public void testStopWaitsIndefinitelyIfGraceIsZero() {
+        try (var wait = LogExpectation.expectWait(); var transport = new TestHttpServerTransport(Settings.EMPTY)) {
             TestHttpChannel httpChannel = new TestHttpChannel();
             transport.serverAcceptedChannel(httpChannel);
             transport.incomingRequest(testHttpRequest(), httpChannel);
@@ -927,7 +927,33 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
             transport.doStop();
             assertFalse(transport.testHttpServerChannel.isOpen());
             assertFalse(httpChannel.isOpen());
-            noWait.assertExpectationsMatched();
+            wait.assertExpectationsMatched();
+        }
+    }
+
+    public void testStopLogsProgress() throws Exception {
+        TestHttpChannel httpChannel = new TestHttpChannel();
+        var doneWithRequest = new CountDownLatch(1);
+        try (var wait = LogExpectation.expectUpdate(1); var transport = new TestHttpServerTransport(gracePeriod(SHORT_GRACE_PERIOD_MS))) {
+
+            httpChannel.blockSendResponse();
+            var inResponse = httpChannel.notifyInSendResponse();
+
+            transport.serverAcceptedChannel(httpChannel);
+            new Thread(() -> {
+                transport.incomingRequest(testHttpRequest(), httpChannel);
+                doneWithRequest.countDown();
+            }, "testStopLogsProgress -> incomingRequest").start();
+
+            inResponse.await();
+
+            transport.doStop();
+            assertFalse(transport.testHttpServerChannel.isOpen());
+            assertFalse(httpChannel.isOpen());
+            wait.assertExpectationsMatched();
+        } finally {
+            httpChannel.allowSendResponse();
+            doneWithRequest.await();
         }
     }
 
@@ -1345,20 +1371,24 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
         }
 
         public static LogExpectation expectTimeout(int grace) {
-            return new LogExpectation(grace).timedOut(true).wait(true);
+            return new LogExpectation(grace).timedOut(true).wait(false);
         }
 
         public static LogExpectation unexpectedTimeout(int grace) {
-            return new LogExpectation(grace).timedOut(false).wait(true);
+            return new LogExpectation(grace).timedOut(false).wait(false);
+        }
+
+        public static LogExpectation expectWait() {
+            return new LogExpectation(0).wait(true);
         }
 
-        public static LogExpectation unexpectWait() {
-            return new LogExpectation(0).wait(false);
+        public static LogExpectation expectUpdate(int connections) {
+            return new LogExpectation(0).update(connections);
         }
 
         private LogExpectation timedOut(boolean expected) {
             var message = "timed out while waiting [" + grace + "]ms for clients to close connections";
-            var name = "message";
+            var name = "timed out message";
             var logger = AbstractHttpServerTransport.class.getName();
             var level = Level.WARN;
             if (expected) {
@@ -1370,18 +1400,27 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
         }
 
         private LogExpectation wait(boolean expected) {
-            var message = "closing all client connections immediately";
-            var name = "message";
+            var message = "waiting indefinitely for clients to close connections";
+            var name = "wait message";
             var logger = AbstractHttpServerTransport.class.getName();
             var level = Level.DEBUG;
             if (expected) {
-                appender.addExpectation(new MockLogAppender.UnseenEventExpectation(name, logger, level, message));
-            } else {
                 appender.addExpectation(new MockLogAppender.SeenEventExpectation(name, logger, level, message));
+            } else {
+                appender.addExpectation(new MockLogAppender.UnseenEventExpectation(name, logger, level, message));
             }
             return this;
         }
 
+        private LogExpectation update(int connections) {
+            var message = "still waiting on " + connections + " client connections to close";
+            var name = "update message";
+            var logger = AbstractHttpServerTransport.class.getName();
+            var level = Level.INFO;
+            appender.addExpectation(new MockLogAppender.SeenEventExpectation(name, logger, level, message));
+            return this;
+        }
+
         public void assertExpectationsMatched() {
             appender.assertAllExpectationsMatched();
             checked = true;