ソースを参照

HttpHandlers should return correct list of objects (#49283)

This commit fixes the server side logic of "List Objects" operations 
of Azure and S3 fixtures. Until today, the fixtures were returning a "
flat" view of stored objects and were not correctly handling the 
delimiter parameter. This causes some objects listing to be wrongly 
interpreted by the snapshot deletion logic in Elasticsearch which 
relies on the ability to list child containers of BlobContainer (#42653) 
to correctly delete stale indices.

As a consequence, the blobs were not correctly deleted from the
 emulated storage service and stayed in heap until they got garbage 
collected, causing CI failures like #48978.

This commit fixes the server side logic of Azure and S3 fixture when 
listing objects so that it now return correct common blob prefixes as 
expected by the snapshot deletion process. It also adds an after-test 
check to ensure that tests leave the repository empty (besides the 
root index files).

Closes #48978
Tanguy Leroux 6 年 前
コミット
b07d1107c8

+ 9 - 1
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

@@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
 
     @Override
     protected Map<String, HttpHandler> createHttpHandlers() {
-        return Collections.singletonMap("/container", new AzureHttpHandler("container"));
+        return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
     }
 
     @Override
@@ -115,6 +115,14 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
         }
     }
 
+    @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
+    private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {
+
+        AzureBlobStoreHttpHandler(final String container) {
+            super(container);
+        }
+    }
+
     /**
      * HTTP handler that injects random Azure service errors
      *

+ 9 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -77,7 +77,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
     @Override
     protected Map<String, HttpHandler> createHttpHandlers() {
         return Map.of(
-            "/", new GoogleCloudStorageHttpHandler("bucket"),
+            "/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"),
             "/token", new FakeOAuth2HttpHandler()
         );
     }
@@ -186,6 +186,14 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
         }
     }
 
+    @SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint")
+    private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler {
+
+        GoogleCloudStorageBlobStoreHttpHandler(final String bucket) {
+            super(bucket);
+        }
+    }
+
     /**
      * HTTP handler that injects random  Google Cloud Storage service errors
      *

+ 9 - 1
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

@@ -67,7 +67,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
 
     @Override
     protected Map<String, HttpHandler> createHttpHandlers() {
-        return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
+        return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
     }
 
     @Override
@@ -134,6 +134,14 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
         }
     }
 
+    @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
+    private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {
+
+        S3BlobStoreHttpHandler(final String bucket) {
+            super(bucket);
+        }
+    }
+
     /**
      * HTTP handler that injects random S3 service errors
      *

+ 29 - 4
test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java

@@ -36,9 +36,11 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -153,13 +155,32 @@ public class AzureHttpHandler implements HttpHandler {
                 list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
                 list.append("<EnumerationResults>");
                 final String prefix = params.get("prefix");
+                final Set<String> blobPrefixes = new HashSet<>();
+                final String delimiter = params.get("delimiter");
+                if (delimiter != null) {
+                    list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
+                }
                 list.append("<Blobs>");
                 for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
-                    if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
-                        list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
-                        list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
-                        list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
+                    if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) {
+                        continue;
+                    }
+                    String blobPath = blob.getKey().replace("/" + container + "/", "");
+                    if (delimiter != null) {
+                        int fromIndex = (prefix != null ? prefix.length() : 0);
+                        int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
+                        if (delimiterPosition > 0) {
+                            blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
+                            continue;
+                        }
                     }
+                    list.append("<Blob><Name>").append(blobPath).append("</Name>");
+                    list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
+                    list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
+                }
+                if (blobPrefixes.isEmpty() == false) {
+                    blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));
+
                 }
                 list.append("</Blobs>");
                 list.append("</EnumerationResults>");
@@ -177,6 +198,10 @@ public class AzureHttpHandler implements HttpHandler {
         }
     }
 
+    public Map<String, BytesReference> blobs() {
+        return blobs;
+    }
+
     public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
         final Headers headers = exchange.getResponseHeaders();
         headers.add("Content-Type", "application/xml");

+ 13 - 9
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.network.InetAddresses;
@@ -64,7 +65,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
 public class GoogleCloudStorageHttpHandler implements HttpHandler {
 
-    private final ConcurrentMap<String, BytesArray> blobs;
+    private final ConcurrentMap<String, BytesReference> blobs;
     private final String bucket;
 
     public GoogleCloudStorageHttpHandler(final String bucket) {
@@ -86,7 +87,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 final Set<String> prefixes = new HashSet<>();
                 final List<String> listOfBlobs = new ArrayList<>();
 
-                for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
+                for (final Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
                     final String blobName = blob.getKey();
                     if (prefix.isEmpty() || blobName.startsWith(prefix)) {
                         int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
@@ -122,7 +123,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
 
             } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
                 // Download Object https://cloud.google.com/storage/docs/request-body
-                BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
+                BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
                 if (blob != null) {
                     final String range = exchange.getRequestHeaders().getFirst("Range");
                     Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
@@ -130,7 +131,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                         throw new AssertionError("Range bytes header does not match expected format: " + range);
                     }
 
-                    byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
+                    byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
                     exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
                     exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
                     exchange.getResponseBody().write(response);
@@ -141,8 +142,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
             } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
                 // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
                 int deletions = 0;
-                for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
-                    Map.Entry<String, BytesArray> blob = iterator.next();
+                for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
+                    Map.Entry<String, BytesReference> blob = iterator.next();
                     if (blob.getKey().equals(exchange.getRequestURI().toString())) {
                         iterator.remove();
                         deletions++;
@@ -208,12 +209,11 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
 
                 final String blobName = params.get("test_blob_name");
-                byte[] blob = blobs.get(blobName).array();
-                if (blob == null) {
+                if (blobs.containsKey(blobName) == false) {
                     exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
                     return;
                 }
-
+                byte[] blob = BytesReference.toBytes(blobs.get(blobName));
                 final String range = exchange.getRequestHeaders().getFirst("Content-Range");
                 final Integer limit = getContentRangeLimit(range);
                 final int start = getContentRangeStart(range);
@@ -249,6 +249,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
         }
     }
 
+    public Map<String, BytesReference> blobs() {
+        return blobs;
+    }
+
     private String httpServerUrl(final HttpExchange exchange) {
         final InetSocketAddress address = exchange.getLocalAddress();
         return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();

+ 32 - 5
test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

@@ -41,10 +41,12 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
@@ -158,13 +160,34 @@ public class S3HttpHandler implements HttpHandler {
                 if (prefix != null) {
                     list.append("<Prefix>").append(prefix).append("</Prefix>");
                 }
+                final Set<String> commonPrefixes = new HashSet<>();
+                final String delimiter = params.get("delimiter");
+                if (delimiter != null) {
+                    list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
+                }
                 for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
-                    if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
-                        list.append("<Contents>");
-                        list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
-                        list.append("<Size>").append(blob.getValue().length()).append("</Size>");
-                        list.append("</Contents>");
+                    if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) {
+                        continue;
+                    }
+                    String blobPath = blob.getKey().replace("/" + bucket + "/", "");
+                    if (delimiter != null) {
+                        int fromIndex = (prefix != null ? prefix.length() : 0);
+                        int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
+                        if (delimiterPosition > 0) {
+                            commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
+                            continue;
+                        }
                     }
+                    list.append("<Contents>");
+                    list.append("<Key>").append(blobPath).append("</Key>");
+                    list.append("<Size>").append(blob.getValue().length()).append("</Size>");
+                    list.append("</Contents>");
+                }
+                if (commonPrefixes.isEmpty() == false) {
+                    list.append("<CommonPrefixes>");
+                    commonPrefixes.forEach(commonPrefix -> list.append("<Prefix>").append(commonPrefix).append("</Prefix>"));
+                    list.append("</CommonPrefixes>");
+
                 }
                 list.append("</ListBucketResult>");
 
@@ -241,6 +264,10 @@ public class S3HttpHandler implements HttpHandler {
         }
     }
 
+    public Map<String, BytesReference> blobs() {
+        return blobs;
+    }
+
     private static String multipartKey(final String uploadId, int partNumber) {
         return uploadId + "\n" + partNumber;
     }

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

@@ -43,7 +43,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -272,9 +271,11 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
                 assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
             }
         }
+
+        assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get());
     }
 
-    protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
+    protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
         IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
         for (int i = 0; i < numDocs; i++) {
             indexRequestBuilders[i] = client().prepareIndex(name).setId(Integer.toString(i))

+ 25 - 3
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

@@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.mocksocket.MockHttpServer;
@@ -41,13 +42,16 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 
 /**
  * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
@@ -55,6 +59,14 @@ import static org.hamcrest.Matchers.equalTo;
 @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
 public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {
 
+    /**
+     * A {@link HttpHandler} that allows to list stored blobs
+     */
+    @SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service")
+    protected interface BlobStoreHttpHandler extends HttpHandler {
+        Map<String, BytesReference> blobs();
+    }
+
     private static final byte[] BUFFER = new byte[1024];
 
     private static HttpServer httpServer;
@@ -81,7 +93,14 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
     @After
     public void tearDownHttpServer() {
         if (handlers != null) {
-            handlers.keySet().forEach(context -> httpServer.removeContext(context));
+            for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
+                httpServer.removeContext(handler.getKey());
+                if (handler.getValue() instanceof BlobStoreHttpHandler) {
+                    List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
+                        .filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
+                    assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
+                }
+            }
         }
     }
 
@@ -110,14 +129,17 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
         assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
         assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
 
-        assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
+        final String snapshot = "snapshot";
+        assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
             .setWaitForCompletion(true).setIndices(index));
 
         assertAcked(client().admin().indices().prepareDelete(index));
 
-        assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
+        assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
         ensureGreen(index);
         assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
+
+        assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
     }
 
     protected static String httpServerUrl() {