|
@@ -24,7 +24,6 @@ import com.google.cloud.http.HttpTransportOptions;
|
|
|
import com.google.cloud.storage.StorageOptions;
|
|
|
import com.sun.net.httpserver.HttpExchange;
|
|
|
import com.sun.net.httpserver.HttpHandler;
|
|
|
-import com.sun.net.httpserver.HttpServer;
|
|
|
import org.apache.http.HttpStatus;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -32,7 +31,6 @@ import org.elasticsearch.common.SuppressForbidden;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.io.Streams;
|
|
|
-import org.elasticsearch.common.network.InetAddresses;
|
|
|
import org.elasticsearch.common.regex.Regex;
|
|
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -40,22 +38,15 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
-import org.elasticsearch.mocksocket.MockHttpServer;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
-import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
|
|
+import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.rest.RestUtils;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.AfterClass;
|
|
|
-import org.junit.Before;
|
|
|
-import org.junit.BeforeClass;
|
|
|
import org.threeten.bp.Duration;
|
|
|
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
-import java.net.InetAddress;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
import java.net.URLDecoder;
|
|
|
import java.security.KeyPairGenerator;
|
|
|
import java.util.Arrays;
|
|
@@ -70,7 +61,6 @@ import java.util.Map;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -84,42 +74,10 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BU
|
|
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
|
|
|
|
|
|
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
|
|
-public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
|
|
+public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
|
|
|
|
|
- private static HttpServer httpServer;
|
|
|
- private static boolean randomServerErrors;
|
|
|
private static byte[] serviceAccount;
|
|
|
|
|
|
- @BeforeClass
|
|
|
- public static void startHttpServer() throws Exception {
|
|
|
- httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
|
|
- httpServer.start();
|
|
|
- randomServerErrors = randomBoolean();
|
|
|
- serviceAccount = createServiceAccount();
|
|
|
- }
|
|
|
-
|
|
|
- @Before
|
|
|
- public void setUpHttpServer() {
|
|
|
- HttpHandler handler = new InternalHttpHandler();
|
|
|
- if (randomServerErrors) {
|
|
|
- handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
|
|
- }
|
|
|
- httpServer.createContext("/", handler);
|
|
|
- httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
|
|
- }
|
|
|
-
|
|
|
- @AfterClass
|
|
|
- public static void stopHttpServer() {
|
|
|
- httpServer.stop(0);
|
|
|
- httpServer = null;
|
|
|
- }
|
|
|
-
|
|
|
- @After
|
|
|
- public void tearDownHttpServer() {
|
|
|
- httpServer.removeContext("/");
|
|
|
- httpServer.removeContext("/token");
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
protected String repositoryType() {
|
|
|
return GoogleCloudStorageRepository.TYPE;
|
|
@@ -139,15 +97,29 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|
|
return Collections.singletonList(TestGoogleCloudStoragePlugin.class);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected Map<String, HttpHandler> createHttpHandlers() {
|
|
|
+ return Map.of(
|
|
|
+ "/", new InternalHttpHandler(),
|
|
|
+ "/token", new FakeOAuth2HttpHandler()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
|
|
|
+ return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected Settings nodeSettings(int nodeOrdinal) {
|
|
|
+ if (serviceAccount == null) {
|
|
|
+ serviceAccount = createServiceAccount();
|
|
|
+ }
|
|
|
+
|
|
|
final Settings.Builder settings = Settings.builder();
|
|
|
settings.put(super.nodeSettings(nodeOrdinal));
|
|
|
-
|
|
|
- final InetSocketAddress address = httpServer.getAddress();
|
|
|
- final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
|
|
- settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint);
|
|
|
- settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint + "/token");
|
|
|
+ settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl());
|
|
|
+ settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token");
|
|
|
|
|
|
final MockSecureSettings secureSettings = new MockSecureSettings();
|
|
|
secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
|
|
@@ -205,47 +177,48 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|
|
|
|
|
@Override
|
|
|
protected GoogleCloudStorageService createStorageService() {
|
|
|
- if (randomServerErrors) {
|
|
|
- return new GoogleCloudStorageService() {
|
|
|
- @Override
|
|
|
- StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
|
|
- final HttpTransportOptions httpTransportOptions) {
|
|
|
- return super.createStorageOptions(clientSettings, httpTransportOptions)
|
|
|
- .toBuilder()
|
|
|
- .setRetrySettings(RetrySettings.newBuilder()
|
|
|
- .setMaxAttempts(10)
|
|
|
- .setInitialRetryDelay(Duration.ofMillis(10L))
|
|
|
- .setRetryDelayMultiplier(2.0d)
|
|
|
- .setMaxRetryDelay(Duration.ofSeconds(1L))
|
|
|
- .setTotalTimeout(Duration.ofSeconds(30L))
|
|
|
- .build())
|
|
|
- .build();
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
- return super.createStorageService();
|
|
|
+ return new GoogleCloudStorageService() {
|
|
|
+ @Override
|
|
|
+ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
|
|
+ final HttpTransportOptions httpTransportOptions) {
|
|
|
+ return super.createStorageOptions(clientSettings, httpTransportOptions)
|
|
|
+ .toBuilder()
|
|
|
+ .setRetrySettings(RetrySettings.newBuilder()
|
|
|
+ .setMaxAttempts(10)
|
|
|
+ .setInitialRetryDelay(Duration.ofMillis(10L))
|
|
|
+ .setRetryDelayMultiplier(2.0d)
|
|
|
+ .setMaxRetryDelay(Duration.ofSeconds(1L))
|
|
|
+ .setTotalTimeout(Duration.ofSeconds(30L))
|
|
|
+ .build())
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+ };
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static byte[] createServiceAccount() throws Exception {
|
|
|
- final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
|
|
- keyPairGenerator.initialize(1024);
|
|
|
- final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
|
|
|
-
|
|
|
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
|
- try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
|
|
|
- builder.startObject();
|
|
|
- {
|
|
|
- builder.field("type", "service_account");
|
|
|
- builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT));
|
|
|
- builder.field("private_key_id", UUID.randomUUID().toString());
|
|
|
- builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
|
|
|
- builder.field("client_email", "elastic@appspot.gserviceaccount.com");
|
|
|
- builder.field("client_id", String.valueOf(randomNonNegativeLong()));
|
|
|
+ private static byte[] createServiceAccount() {
|
|
|
+ try {
|
|
|
+ final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
|
|
+ keyPairGenerator.initialize(1024);
|
|
|
+ final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
|
|
|
+
|
|
|
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
|
+ try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
|
|
|
+ builder.startObject();
|
|
|
+ {
|
|
|
+ builder.field("type", "service_account");
|
|
|
+ builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT));
|
|
|
+ builder.field("private_key_id", UUID.randomUUID().toString());
|
|
|
+ builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
|
|
|
+ builder.field("client_email", "elastic@appspot.gserviceaccount.com");
|
|
|
+ builder.field("client_id", String.valueOf(randomNonNegativeLong()));
|
|
|
+ }
|
|
|
+ builder.endObject();
|
|
|
}
|
|
|
- builder.endObject();
|
|
|
+ return out.toByteArray();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new AssertionError("Unable to create service account file", e);
|
|
|
}
|
|
|
- return out.toByteArray();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -428,45 +401,24 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|
|
* slow down the test suite.
|
|
|
*/
|
|
|
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
|
|
- private static class ErroneousHttpHandler implements HttpHandler {
|
|
|
-
|
|
|
- // first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
|
|
|
- // value is the number of times the request has been seen
|
|
|
- private final Map<String, AtomicInteger> requests;
|
|
|
- private final HttpHandler delegate;
|
|
|
- private final int maxErrorsPerRequest;
|
|
|
-
|
|
|
- private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
|
|
- this.requests = new ConcurrentHashMap<>();
|
|
|
- this.delegate = delegate;
|
|
|
- this.maxErrorsPerRequest = maxErrorsPerRequest;
|
|
|
- assert maxErrorsPerRequest > 1;
|
|
|
+ private static class GoogleErroneousHttpHandler extends ErroneousHttpHandler {
|
|
|
+
|
|
|
+ GoogleErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
|
|
+ super(delegate, maxErrorsPerRequest);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void handle(final HttpExchange exchange) throws IOException {
|
|
|
- final String requestId = exchange.getRemoteAddress().toString()
|
|
|
+ protected String requestUniqueId(HttpExchange exchange) {
|
|
|
+ return exchange.getRemoteAddress().toString()
|
|
|
+ " " + exchange.getRequestMethod()
|
|
|
+ " " + exchange.getRequestURI();
|
|
|
- assert Strings.hasText(requestId);
|
|
|
+ }
|
|
|
|
|
|
+ @Override
|
|
|
+ protected boolean canFailRequest(final HttpExchange exchange) {
|
|
|
// Batch requests are not retried so we don't want to fail them
|
|
|
// The batched request are supposed to be retried (not tested here)
|
|
|
- final boolean noError = exchange.getRequestURI().toString().startsWith("/batch/") || randomBoolean();
|
|
|
-
|
|
|
- final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
|
|
- if (count >= maxErrorsPerRequest || noError) {
|
|
|
- requests.remove(requestId);
|
|
|
- delegate.handle(exchange);
|
|
|
- } else {
|
|
|
- handleAsError(exchange);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void handleAsError(final HttpExchange exchange) throws IOException {
|
|
|
- Streams.readFully(exchange.getRequestBody());
|
|
|
- exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
|
|
- exchange.close();
|
|
|
+ return exchange.getRequestURI().toString().startsWith("/batch/") == false;
|
|
|
}
|
|
|
}
|
|
|
}
|