|
|
@@ -18,9 +18,14 @@
|
|
|
*/
|
|
|
package org.elasticsearch.repositories.azure;
|
|
|
|
|
|
+import com.microsoft.azure.storage.Constants;
|
|
|
+import com.microsoft.azure.storage.RetryExponentialRetry;
|
|
|
+import com.microsoft.azure.storage.RetryPolicyFactory;
|
|
|
import com.sun.net.httpserver.HttpExchange;
|
|
|
import com.sun.net.httpserver.HttpHandler;
|
|
|
import com.sun.net.httpserver.HttpServer;
|
|
|
+import org.apache.http.HttpStatus;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.io.Streams;
|
|
|
@@ -48,6 +53,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
|
|
public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
|
|
@@ -62,7 +68,11 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|
|
|
|
|
@Before
|
|
|
public void setUpHttpServer() {
|
|
|
- httpServer.createContext("/container", new InternalHttpHandler());
|
|
|
+ HttpHandler handler = new InternalHttpHandler();
|
|
|
+ if (randomBoolean()) {
|
|
|
+ handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
|
|
+ }
|
|
|
+ httpServer.createContext("/container", handler);
|
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
|
@@ -91,7 +101,7 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|
|
|
|
|
@Override
|
|
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
|
- return Collections.singletonList(AzureRepositoryPlugin.class);
|
|
|
+ return Collections.singletonList(TestAzureRepositoryPlugin.class);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -112,6 +122,26 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy
|
|
|
+ */
|
|
|
+ public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {
|
|
|
+
|
|
|
+ public TestAzureRepositoryPlugin(Settings settings) {
|
|
|
+ super(settings);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ AzureStorageService createAzureStoreService(final Settings settings) {
|
|
|
+ return new AzureStorageService(settings) {
|
|
|
+ @Override
|
|
|
+ RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
|
|
|
+ return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Minimal HTTP handler that acts as an Azure compliant server
|
|
|
*/
|
|
|
@@ -187,4 +217,49 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * HTTP handler that injects random Azure service errors
|
|
|
+ *
|
|
|
+ * Note: it is not a good idea to allow this handler to simulate too many errors as it would
|
|
|
+ * slow down the test suite.
|
|
|
+ */
|
|
|
+ @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
|
|
+ private static class ErroneousHttpHandler implements HttpHandler {
|
|
|
+
|
|
|
+ // first key is the remote address, second key is the HTTP request unique id provided by the SDK client,
|
|
|
+ // value is the number of times the request has been seen
|
|
|
+ private final Map<String, AtomicInteger> requests;
|
|
|
+ private final HttpHandler delegate;
|
|
|
+ private final int maxErrorsPerRequest;
|
|
|
+
|
|
|
+ private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
|
|
+ this.requests = new ConcurrentHashMap<>();
|
|
|
+ this.delegate = delegate;
|
|
|
+ this.maxErrorsPerRequest = maxErrorsPerRequest;
|
|
|
+ assert maxErrorsPerRequest > 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(final HttpExchange exchange) throws IOException {
|
|
|
+ final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
|
|
|
+ assert Strings.hasText(requestId);
|
|
|
+
|
|
|
+ final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
|
|
+ if (count >= maxErrorsPerRequest || randomBoolean()) {
|
|
|
+ requests.remove(requestId);
|
|
|
+ delegate.handle(exchange);
|
|
|
+ } else {
|
|
|
+ handleAsError(exchange, requestId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException {
|
|
|
+ Streams.readFully(exchange.getRequestBody());
|
|
|
+ exchange.getResponseHeaders().add(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER, requestId);
|
|
|
+ exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
|
|
+ exchange.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|