|
@@ -23,7 +23,9 @@ import com.sun.net.httpserver.HttpExchange;
|
|
|
import com.sun.net.httpserver.HttpHandler;
|
|
|
import fixture.s3.S3HttpHandler;
|
|
|
import org.elasticsearch.action.ActionRunnable;
|
|
|
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
|
|
import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
+import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
@@ -31,6 +33,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
|
|
|
import org.elasticsearch.common.blobstore.BlobPath;
|
|
|
import org.elasticsearch.common.blobstore.BlobStore;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.elasticsearch.common.regex.Regex;
|
|
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -41,11 +44,14 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.repositories.RepositoriesService;
|
|
|
import org.elasticsearch.repositories.RepositoryData;
|
|
|
+import org.elasticsearch.repositories.RepositoryMissingException;
|
|
|
+import org.elasticsearch.repositories.RepositoryStats;
|
|
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
|
|
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
|
|
import org.elasticsearch.snapshots.SnapshotId;
|
|
|
import org.elasticsearch.snapshots.SnapshotsService;
|
|
|
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
|
|
|
+import org.elasticsearch.test.BackgroundIndexer;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
@@ -56,8 +62,13 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.stream.StreamSupport;
|
|
|
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
+import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.lessThan;
|
|
|
import static org.hamcrest.Matchers.startsWith;
|
|
@@ -108,12 +119,12 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|
|
|
|
|
@Override
|
|
|
protected Map<String, HttpHandler> createHttpHandlers() {
|
|
|
- return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
|
|
|
+ return Collections.singletonMap("/bucket", new S3StatsHttpHandler(new S3BlobStoreHttpHandler("bucket")));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
|
|
|
- return new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3));
|
|
|
+ return new S3StatsHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -176,6 +187,81 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|
|
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
|
|
|
}
|
|
|
|
|
|
+ public void testRequestStats() throws Exception {
|
|
|
+ final String repository = createRepository(randomName());
|
|
|
+ final String index = "index-no-merges";
|
|
|
+ createIndex(index, Settings.builder()
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
+ .build());
|
|
|
+
|
|
|
+ final long nbDocs = randomLongBetween(100, 1000);
|
|
|
+ try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
|
|
|
+ waitForDocs(nbDocs, indexer);
|
|
|
+ }
|
|
|
+
|
|
|
+ flushAndRefresh(index);
|
|
|
+ ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
|
|
|
+ assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
|
|
|
+ assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
|
|
+
|
|
|
+ final String snapshot = "snapshot";
|
|
|
+ assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
|
|
|
+ .setWaitForCompletion(true).setIndices(index));
|
|
|
+
|
|
|
+ assertAcked(client().admin().indices().prepareDelete(index));
|
|
|
+
|
|
|
+ assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
|
|
|
+ ensureGreen(index);
|
|
|
+ assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
|
|
+
|
|
|
+ assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
|
|
|
+
|
|
|
+ final RepositoryStats repositoryStats = StreamSupport.stream(
|
|
|
+ internalCluster().getInstances(RepositoriesService.class).spliterator(), false)
|
|
|
+ .map(repositoriesService -> {
|
|
|
+ try {
|
|
|
+ return repositoriesService.repository(repository);
|
|
|
+ } catch (RepositoryMissingException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .filter(r -> r != null)
|
|
|
+ .map(r -> r.stats())
|
|
|
+ .reduce((s1, s2) -> s1.merge(s2))
|
|
|
+ .get();
|
|
|
+ final long sdkGetCalls = repositoryStats.requestCounts.get("GET");
|
|
|
+ final long sdkListCalls = repositoryStats.requestCounts.get("LIST");
|
|
|
+
|
|
|
+ final long getCalls = handlers.values().stream()
|
|
|
+ .mapToLong(h -> {
|
|
|
+ while (h instanceof DelegatingHttpHandler) {
|
|
|
+ if (h instanceof S3StatsHttpHandler) {
|
|
|
+ return ((S3StatsHttpHandler) h).getCalls.get();
|
|
|
+ }
|
|
|
+ h = ((DelegatingHttpHandler) h).getDelegate();
|
|
|
+ }
|
|
|
+ assert false;
|
|
|
+ return 0L;
|
|
|
+ })
|
|
|
+ .sum();
|
|
|
+ final long listCalls = handlers.values().stream()
|
|
|
+ .mapToLong(h -> {
|
|
|
+ while (h instanceof DelegatingHttpHandler) {
|
|
|
+ if (h instanceof S3StatsHttpHandler) {
|
|
|
+ return ((S3StatsHttpHandler) h).listCalls.get();
|
|
|
+ }
|
|
|
+ h = ((DelegatingHttpHandler) h).getDelegate();
|
|
|
+ }
|
|
|
+ assert false;
|
|
|
+ return 0L;
|
|
|
+ })
|
|
|
+ .sum();
|
|
|
+
|
|
|
+ assertEquals("SDK sent " + sdkGetCalls + " GET calls and handler measured " + getCalls + " GET calls", getCalls, sdkGetCalls);
|
|
|
+ assertEquals("SDK sent " + sdkListCalls + " LIST calls and handler measured " + listCalls + " LIST calls", listCalls, sdkListCalls);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
|
|
|
*/
|
|
@@ -266,4 +352,33 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|
|
return exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
|
|
|
+ private static class S3StatsHttpHandler implements DelegatingHttpHandler {
|
|
|
+
|
|
|
+ private final HttpHandler delegate;
|
|
|
+
|
|
|
+ public final AtomicLong getCalls = new AtomicLong();
|
|
|
+ public final AtomicLong listCalls = new AtomicLong();
|
|
|
+
|
|
|
+ S3StatsHttpHandler(final HttpHandler delegate) {
|
|
|
+ this.delegate = delegate;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public HttpHandler getDelegate() {
|
|
|
+ return delegate;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(HttpExchange exchange) throws IOException {
|
|
|
+ final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
|
|
+ if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
|
|
|
+ listCalls.incrementAndGet();
|
|
|
+ } else if (Regex.simpleMatch("GET /*/*", request)) {
|
|
|
+ getCalls.incrementAndGet();
|
|
|
+ }
|
|
|
+ delegate.handle(exchange);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|