Forráskód Böngészése

Merge pull request #13671 from xuzha/gce-retry

close #13460
Xu Zhang 10 éve
szülő
commit
3910fad1d6

+ 13 - 0
docs/plugins/discovery-gce.asciidoc

@@ -46,6 +46,19 @@ discovery:
       type: gce
 --------------------------------------------------
 
+The following gce settings (prefixed with `cloud.gce`) are supported:
+
+ `retry`::
+
+     If set to `true`, client will use
+     https://developers.google.com/api-client-library/java/google-http-java-client/backoff[ExponentialBackOff]
+     policy to retry the failed http request. Defaults to `true`.
+
+ `max_wait`::
+
+     The maximum elapsed time in milliseconds after the client instantiating retry. If the time elapsed goes past the
+     `max_wait`, client stops to retry. Defaults to 15 minutes (900000 milliseconds).
+
 
 [IMPORTANT]
 .Binding the network host

+ 3 - 0
plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java

@@ -32,6 +32,9 @@ public interface GceComputeService extends LifecycleComponent<GceComputeService>
         public static final String REFRESH = "cloud.gce.refresh_interval";
         public static final String TAGS = "discovery.gce.tags";
         public static final String VERSION = "Elasticsearch/GceCloud/1.0";
+
+        public static final String RETRY = "cloud.gce.retry";
+        public static final String MAXWAIT = "cloud.gce.max_wait";
     }
 
     /**

+ 23 - 7
plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java

@@ -39,6 +39,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
 
 import java.io.IOException;
 import java.net.URL;
@@ -171,7 +172,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
 
             logger.info("starting GCE discovery service");
             ComputeCredential credential = new ComputeCredential.Builder(getGceHttpTransport(), gceJsonFactory)
-                        .setTokenServerEncodedUrl(TOKEN_SERVER_ENCODED_URL)
+                    .setTokenServerEncodedUrl(TOKEN_SERVER_ENCODED_URL)
                     .build();
 
             // hack around code messiness in GCE code
@@ -190,14 +191,29 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
 
             logger.debug("token [{}] will expire in [{}] s", credential.getAccessToken(), credential.getExpiresInSeconds());
             if (credential.getExpiresInSeconds() != null) {
-                refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds()-1);
+                refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds() - 1);
             }
 
-            // Once done, let's use this token
-            this.client = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null)
-                    .setApplicationName(Fields.VERSION)
-                    .setHttpRequestInitializer(credential)
-                    .build();
+            boolean ifRetry = settings.getAsBoolean(Fields.RETRY, true);
+            Compute.Builder builder = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null)
+                    .setApplicationName(Fields.VERSION);
+
+            if (ifRetry) {
+                int maxWait = settings.getAsInt(Fields.MAXWAIT, -1);
+                RetryHttpInitializerWrapper retryHttpInitializerWrapper;
+
+                if (maxWait > 0) {
+                    retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, maxWait);
+                } else {
+                    retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential);
+                }
+                builder.setHttpRequestInitializer(retryHttpInitializerWrapper);
+
+            } else {
+                builder.setHttpRequestInitializer(credential);
+            }
+
+            this.client = builder.build();
         } catch (Exception e) {
             logger.warn("unable to start GCE discovery service", e);
             throw new IllegalArgumentException("unable to start GCE discovery service", e);

+ 107 - 0
plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapper.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery.gce;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.*;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.ESLoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
+
+    private int maxWait;
+
+    private static final ESLogger logger =
+            ESLoggerFactory.getLogger(RetryHttpInitializerWrapper.class.getName());
+
+    // Intercepts the request for filling in the "Authorization"
+    // header field, as well as recovering from certain unsuccessful
+    // error codes wherein the Credential must refresh its token for a
+    // retry.
+    private final Credential wrappedCredential;
+
+    // A sleeper; you can replace it with a mock in your test.
+    private final Sleeper sleeper;
+
+    public RetryHttpInitializerWrapper(Credential wrappedCredential) {
+        this(wrappedCredential, Sleeper.DEFAULT, ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS);
+    }
+
+    public RetryHttpInitializerWrapper(Credential wrappedCredential, int maxWait) {
+        this(wrappedCredential, Sleeper.DEFAULT, maxWait);
+    }
+
+    // Use only for testing.
+    RetryHttpInitializerWrapper(
+            Credential wrappedCredential, Sleeper sleeper, int maxWait) {
+        this.wrappedCredential = Objects.requireNonNull(wrappedCredential);
+        this.sleeper = sleeper;
+        this.maxWait = maxWait;
+    }
+
+    @Override
+    public void initialize(HttpRequest httpRequest) {
+        final HttpUnsuccessfulResponseHandler backoffHandler =
+                new HttpBackOffUnsuccessfulResponseHandler(
+                        new ExponentialBackOff.Builder()
+                                .setMaxElapsedTimeMillis(maxWait)
+                                .build())
+                        .setSleeper(sleeper);
+
+        httpRequest.setInterceptor(wrappedCredential);
+        httpRequest.setUnsuccessfulResponseHandler(
+                new HttpUnsuccessfulResponseHandler() {
+                    int retry = 0;
+
+                    @Override
+                    public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException {
+                        if (wrappedCredential.handleResponse(
+                                request, response, supportsRetry)) {
+                            // If credential decides it can handle it,
+                            // the return code or message indicated
+                            // something specific to authentication,
+                            // and no backoff is desired.
+                            return true;
+                        } else if (backoffHandler.handleResponse(
+                                request, response, supportsRetry)) {
+                            // Otherwise, we defer to the judgement of
+                            // our internal backoff handler.
+                            logger.debug("Retrying [{}] times : [{}]", retry, request.getUrl());
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    }
+                });
+        httpRequest.setIOExceptionHandler(
+                new HttpBackOffIOExceptionHandler(
+                        new ExponentialBackOff.Builder()
+                                .setMaxElapsedTimeMillis(maxWait)
+                                .build())
+                        .setSleeper(sleeper)
+        );
+    }
+}
+

+ 174 - 0
plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapperTests.java

@@ -0,0 +1,174 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery.gce;
+
+import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
+import com.google.api.client.http.*;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.client.testing.util.MockSleeper;
+import com.google.api.services.compute.Compute;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class RetryHttpInitializerWrapperTests {
+
+    static private class FailThenSuccessBackoffTransport extends MockHttpTransport {
+
+        public int lowLevelExecCalls;
+        int errorStatusCode;
+        int callsBeforeSuccess;
+        boolean throwException;
+
+        protected FailThenSuccessBackoffTransport(int errorStatusCode, int callsBeforeSuccess) {
+            this.errorStatusCode = errorStatusCode;
+            this.callsBeforeSuccess = callsBeforeSuccess;
+            this.throwException = false;
+        }
+
+        protected FailThenSuccessBackoffTransport(int errorStatusCode, int callsBeforeSuccess, boolean throwException) {
+            this.errorStatusCode = errorStatusCode;
+            this.callsBeforeSuccess = callsBeforeSuccess;
+            this.throwException = throwException;
+        }
+
+        public LowLevelHttpRequest retryableGetRequest = new MockLowLevelHttpRequest() {
+
+            @Override
+            public LowLevelHttpResponse execute() throws IOException {
+                lowLevelExecCalls++;
+
+                if (lowLevelExecCalls <= callsBeforeSuccess) {
+                    if (throwException) {
+                        throw new IOException("Test IOException");
+                    }
+
+                    // Return failure on the first call
+                    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+                    response.setContent("Request should fail");
+                    response.setStatusCode(errorStatusCode);
+                    return response;
+                }
+                // Return success on the second
+                MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+                response.setStatusCode(200);
+                return response;
+            }
+        };
+
+        @Override
+        public LowLevelHttpRequest buildRequest(String method, String url) {
+            return retryableGetRequest;
+        }
+    }
+
+    @Test
+    public void testSimpleRetry() throws Exception {
+
+        FailThenSuccessBackoffTransport fakeTransport =
+                new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 3);
+
+        MockGoogleCredential credential = new MockGoogleCredential.Builder()
+                .build();
+        MockSleeper mockSleeper = new MockSleeper();
+
+        RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, mockSleeper, 5000);
+
+        Compute client = new Compute.Builder(fakeTransport, new JacksonFactory(), null)
+                .setHttpRequestInitializer(retryHttpInitializerWrapper)
+                .setApplicationName("test")
+                .build();
+
+        HttpRequest request = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
+        HttpResponse response = request.execute();
+
+        assertThat(mockSleeper.getCount(), equalTo(3));
+        assertThat(response.getStatusCode(), equalTo(200));
+    }
+
+    @Test
+    public void testRetryWaitTooLong() throws Exception {
+        int maxWait = 50;
+
+        FailThenSuccessBackoffTransport fakeTransport =
+                new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 50);
+        JsonFactory jsonFactory = new JacksonFactory();
+        MockGoogleCredential credential = new MockGoogleCredential.Builder()
+                .build();
+
+        MockSleeper oneTimeSleeper = new MockSleeper() {
+            @Override
+            public void sleep(long millis) throws InterruptedException {
+                Thread.sleep(maxWait);
+                super.sleep(0); // important number, use this to get count
+            }
+        };
+
+        RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, oneTimeSleeper, maxWait);
+
+        Compute client = new Compute.Builder(fakeTransport, jsonFactory, null)
+                .setHttpRequestInitializer(retryHttpInitializerWrapper)
+                .setApplicationName("test")
+                .build();
+
+        HttpRequest request1 = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
+        try {
+            request1.execute();
+            fail("Request should fail if wait too long");
+        } catch (HttpResponseException e) {
+            assertThat(e.getStatusCode(), equalTo(HttpStatusCodes.STATUS_CODE_SERVER_ERROR));
+            assertThat(e.getMessage(), equalTo("500\nRequest should fail"));
+            // should only retry once.
+            assertThat(oneTimeSleeper.getCount(), equalTo(1));
+        }
+    }
+
+    @Test
+    public void testIOExceptionRetry() throws Exception {
+
+        FailThenSuccessBackoffTransport fakeTransport =
+                new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 1, true);
+
+        MockGoogleCredential credential = new MockGoogleCredential.Builder()
+                .build();
+        MockSleeper mockSleeper = new MockSleeper();
+        RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, mockSleeper, 500);
+
+        Compute client = new Compute.Builder(fakeTransport, new JacksonFactory(), null)
+                .setHttpRequestInitializer(retryHttpInitializerWrapper)
+                .setApplicationName("test")
+                .build();
+
+        HttpRequest request = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
+        HttpResponse response = request.execute();
+
+        assertThat(mockSleeper.getCount(), equalTo(1));
+        assertThat(response.getStatusCode(), equalTo(200));
+    }
+}
+