Browse Source

Fix Watcher HTTP connection config for longevity (#72736)

Watcher uses a connection pool for outgoing HTTP traffic, which means
that some HTTP connections may live for a long time, possibly in an idle
state. Such connections may be silently torn down by a remote device, so
that when we re-use them we encounter a `Connection reset` or similar
error.

This commit introduces a setting allowing users to set a finite expiry
time on these connections, and also enables TCP keepalives on them by
default so that a remote teardown will be actively detected sooner.

Closes #52997
David Turner 4 years ago
parent
commit
3e0959f308

+ 10 - 0
docs/reference/settings/notification-settings.asciidoc

@@ -65,6 +65,16 @@ connection is being initiated.
 The maximum period of inactivity between two data packets, before the
 request is aborted.
 
+`xpack.http.tcp.keep_alive`
+(<<static-cluster-setting,Static>>)
+Whether to enable TCP keepalives on HTTP connections. Defaults to `true`.
+
+`xpack.http.connection_pool_ttl`
+(<<static-cluster-setting,Static>>)
+The time-to-live of connections in the connection pool. If a connection is not
+re-used within this timeout, it is closed. By default, the time-to-live is
+infinite meaning that connections never expire.
+
 `xpack.http.max_response_size`::
 (<<static-cluster-setting,Static>>)
 Specifies the maximum size an HTTP response is allowed to have, defaults to

+ 11 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/test/http/MockRequest.java

@@ -8,6 +8,7 @@ package org.elasticsearch.test.http;
 
 import org.elasticsearch.common.SuppressForbidden;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Locale;
 
@@ -19,13 +20,15 @@ public class MockRequest {
     private final String method;
     private final URI uri;
     private final Headers headers;
+    private final InetSocketAddress remoteAddress;
     private String body = null;
 
     @SuppressForbidden(reason = "use http server header class")
-    MockRequest(String method, URI uri, com.sun.net.httpserver.Headers headers) {
+    MockRequest(String method, URI uri, com.sun.net.httpserver.Headers headers, InetSocketAddress remoteAddress) {
         this.method = method;
         this.uri = uri;
         this.headers = new Headers(headers);
+        this.remoteAddress = remoteAddress;
     }
 
     /**
@@ -63,6 +66,13 @@ public class MockRequest {
         return body;
     }
 
+    /**
+     * @return The address of the client
+     */
+    public InetSocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
     @Override
     public String toString() {
         return String.format(Locale.ROOT, "%s %s", method, uri);

+ 5 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/test/http/MockWebServer.java

@@ -193,7 +193,11 @@ public class MockWebServer implements Closeable {
      * Creates a MockRequest from an incoming HTTP request, that can later be checked in your test assertions
      */
     private MockRequest createRequest(HttpExchange exchange) throws IOException {
-        MockRequest request = new MockRequest(exchange.getRequestMethod(), exchange.getRequestURI(), exchange.getRequestHeaders());
+        MockRequest request = new MockRequest(
+                exchange.getRequestMethod(),
+                exchange.getRequestURI(),
+                exchange.getRequestHeaders(),
+                exchange.getRemoteAddress());
         if (exchange.getRequestBody() != null) {
             String body = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
             if (Strings.isEmpty(body) == false) {

+ 16 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java

@@ -26,6 +26,7 @@ import org.apache.http.client.methods.HttpRequestWrapper;
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.client.utils.URIUtils;
+import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.ContentType;
@@ -74,6 +75,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class HttpClient implements Closeable {
@@ -90,6 +92,8 @@ public class HttpClient implements Closeable {
     private final HttpProxy settingsProxy;
     private final TimeValue defaultConnectionTimeout;
     private final TimeValue defaultReadTimeout;
+    private final boolean tcpKeepaliveEnabled;
+    private final TimeValue connectionPoolTtl;
     private final ByteSizeValue maxResponseSize;
     private final CryptoService cryptoService;
     private final SSLService sslService;
@@ -97,6 +101,8 @@ public class HttpClient implements Closeable {
     public HttpClient(Settings settings, SSLService sslService, CryptoService cryptoService, ClusterService clusterService) {
         this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings);
         this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings);
+        this.tcpKeepaliveEnabled = HttpSettings.TCP_KEEPALIVE.get(settings);
+        this.connectionPoolTtl = HttpSettings.CONNECTION_POOL_TTL.get(settings);
         this.maxResponseSize = HttpSettings.MAX_HTTP_RESPONSE_SIZE.get(settings);
         this.settingsProxy = getProxyFromSettings(settings);
         this.cryptoService = cryptoService;
@@ -116,6 +122,16 @@ public class HttpClient implements Closeable {
         SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslConfiguration), verifier);
         clientBuilder.setSSLSocketFactory(factory);
 
+        final SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
+        if (tcpKeepaliveEnabled) {
+            socketConfigBuilder.setSoKeepAlive(true);
+        }
+        clientBuilder.setDefaultSocketConfig(socketConfigBuilder.build());
+
+        if (connectionPoolTtl.millis() > 0) {
+            clientBuilder.setConnectionTimeToLive(connectionPoolTtl.millis(), TimeUnit.MILLISECONDS);
+        }
+
         clientBuilder.evictExpiredConnections();
         clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
         clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);

+ 6 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java

@@ -30,6 +30,10 @@ public class HttpSettings {
             DEFAULT_READ_TIMEOUT, Property.NodeScope);
     static final Setting<TimeValue> CONNECTION_TIMEOUT = Setting.timeSetting("xpack.http.default_connection_timeout",
             DEFAULT_CONNECTION_TIMEOUT, Property.NodeScope);
+    static final Setting<Boolean> TCP_KEEPALIVE = Setting.boolSetting("xpack.http.tcp.keep_alive",
+            true, Property.NodeScope);
+    static final Setting<TimeValue> CONNECTION_POOL_TTL = Setting.timeSetting("xpack.http.connection_pool_ttl",
+            TimeValue.MINUS_ONE, Property.NodeScope);
 
     private static final String PROXY_HOST_KEY = "xpack.http.proxy.host";
     private static final String PROXY_PORT_KEY = "xpack.http.proxy.port";
@@ -55,6 +59,8 @@ public class HttpSettings {
         settings.addAll(SSL.getAllSettings());
         settings.add(READ_TIMEOUT);
         settings.add(CONNECTION_TIMEOUT);
+        settings.add(TCP_KEEPALIVE);
+        settings.add(CONNECTION_POOL_TTL);
         settings.add(PROXY_HOST);
         settings.add(PROXY_PORT);
         settings.add(PROXY_SCHEME);

+ 44 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java

@@ -770,6 +770,50 @@ public class HttpClientTests extends ESTestCase {
         assertCreateUri("https://example.org", "");
     }
 
+    public void testConnectionReuse() throws Exception {
+        final HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
+                .method(HttpMethod.POST)
+                .path("/" + randomAlphaOfLength(5))
+                .build();
+
+        webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
+        webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
+
+        httpClient.execute(request);
+        httpClient.execute(request);
+
+        assertThat(webServer.requests(), hasSize(2));
+        // by default we re-use connections forever
+        assertThat(webServer.requests().get(0).getRemoteAddress(), equalTo(webServer.requests().get(1).getRemoteAddress()));
+        webServer.clearRequests();
+
+        try (HttpClient unpooledHttpClient = new HttpClient(
+                Settings.builder().put(HttpSettings.CONNECTION_POOL_TTL.getKey(), "99ms").build(),
+                new SSLService(environment),
+                null,
+                mockClusterService())) {
+
+            webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
+            webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
+
+            unpooledHttpClient.execute(request);
+
+            // Connection pool expiry is based on System.currentTimeMillis so wait for this clock to advance far enough for the connection
+            // we just used to expire
+            final long waitStartTime = System.currentTimeMillis();
+            while (System.currentTimeMillis() <= waitStartTime + 100) {
+                //noinspection BusyWait
+                Thread.sleep(100);
+            }
+
+            unpooledHttpClient.execute(request);
+
+            assertThat(webServer.requests(), hasSize(2));
+            // the connection expired before re-use so we made a new one
+            assertThat(webServer.requests().get(0).getRemoteAddress(), not(equalTo(webServer.requests().get(1).getRemoteAddress())));
+        }
+    }
+
     private void assertCreateUri(String uri, String expectedPath) {
         final HttpRequest request = HttpRequest.builder().fromUrl(uri).build();
         final Tuple<HttpHost, URI> tuple = HttpClient.createURI(request);