Explorar o código

Add Ability to List Child Containers to BlobContainer (#42653)

* Add Ability to List Child Containers to BlobContainer
* This is a prerequisite of #42189
Armin Braun %!s(int64=6) %!d(string=hai) anos
pai
achega
2f637d42f1
Modificáronse 18 ficheiros con 400 adicións e 26 borrados
  1. 6 0
      modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java
  2. 11 0
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
  3. 8 0
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
  4. 34 4
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java
  5. 6 0
      plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java
  6. 17 0
      plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java
  7. 20 4
      plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java
  8. 61 0
      plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java
  9. 1 3
      plugins/repository-s3/build.gradle
  10. 54 3
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
  11. 0 6
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
  12. 19 1
      plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java
  13. 10 0
      server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
  14. 15 1
      server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java
  15. 7 2
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  16. 5 0
      server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
  17. 10 0
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
  18. 116 2
      test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

+ 6 - 0
modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.common.blobstore.url;
 
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
@@ -74,6 +75,11 @@ public class URLBlobContainer extends AbstractBlobContainer {
         throw new UnsupportedOperationException("URL repository doesn't support this operation");
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        throw new UnsupportedOperationException("URL repository doesn't support this operation");
+    }
+
     /**
      * This operation is not supported by URLBlobContainer
      */

+ 11 - 0
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
@@ -169,6 +170,16 @@ public class AzureBlobContainer extends AbstractBlobContainer {
         return listBlobsByPrefix(null);
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        final BlobPath path = path();
+        try {
+            return blobStore.children(path);
+        } catch (URISyntaxException | StorageException e) {
+            throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e);
+        }
+    }
+
     protected String buildKey(String blobName) {
         return keyPath + (blobName == null ? "" : blobName);
     }

+ 8 - 0
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -34,7 +34,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 
@@ -101,6 +104,11 @@ public class AzureBlobStore implements BlobStore {
         return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
     }
 
+    public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
+        return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
+            Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
+    }
+
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
         throws URISyntaxException, StorageException, FileAlreadyExistsException {
         service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);

+ 34 - 4
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

@@ -29,8 +29,10 @@ import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.BlobInputStream;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
 import com.microsoft.azure.storage.blob.ListBlobItem;
@@ -38,6 +40,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
@@ -54,7 +57,9 @@ import java.nio.file.FileAlreadyExistsException;
 import java.security.InvalidKeyException;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 
 import static java.util.Collections.emptyMap;
@@ -214,15 +219,40 @@ public class AzureStorageService {
                 // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
                 // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
                 final String blobPath = uri.getPath().substring(1 + container.length() + 1);
-                final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
-                final String name = blobPath.substring(keyPath.length());
-                logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
-                blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
+                if (blobItem instanceof CloudBlob) {
+                    final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
+                    final String name = blobPath.substring(keyPath.length());
+                    logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
+                    blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
+                }
             }
         });
         return Map.copyOf(blobsBuilder);
     }
 
+    public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
+        final var blobsBuilder = new HashSet<String>();
+        final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
+        final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
+        final String keyPath = path.buildAsString();
+        final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
+
+        SocketAccess.doPrivilegedVoidException(() -> {
+            for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
+                if (blobItem instanceof CloudBlobDirectory) {
+                    final URI uri = blobItem.getUri();
+                    logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
+                    // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
+                    // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
+                    // Lastly, we add the length of keyPath to the offset to strip this container's path.
+                    final String uriPath = uri.getPath();
+                    blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
+                }
+            }
+        });
+        return Set.copyOf(blobsBuilder);
+    }
+
     public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
                           boolean failIfAlreadyExists)
         throws URISyntaxException, StorageException, FileAlreadyExistsException {

+ 6 - 0
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.repositories.gcs;
 
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStoreException;
@@ -55,6 +56,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
         return blobStore.listBlobs(path);
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        return blobStore.listChildren(path());
+    }
+
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(String prefix) throws IOException {
         return blobStore.listBlobsByPrefix(path, prefix);

+ 17 - 0
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -142,6 +142,23 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         return mapBuilder.immutableMap();
     }
 
+    Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
+        final String pathStr = path.buildAsString();
+        final MapBuilder<String, BlobContainer> mapBuilder = MapBuilder.newMapBuilder();
+        SocketAccess.doPrivilegedVoidIOException
+            (() -> client().get(bucketName).list(BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)).iterateAll().forEach(
+                blob -> {
+                    if (blob.isDirectory()) {
+                        assert blob.getName().startsWith(pathStr);
+                        final String suffixName = blob.getName().substring(pathStr.length());
+                        if (suffixName.isEmpty() == false) {
+                            mapBuilder.put(suffixName, new GoogleCloudStorageBlobContainer(path.add(suffixName), this));
+                        }
+                    }
+                }));
+        return mapBuilder.immutableMap();
+    }
+
     /**
      * Returns true if the blob exists in the specific bucket
      *

+ 20 - 4
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Path;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
@@ -137,11 +138,13 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
 
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
-        FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
-            path -> prefix == null || path.getName().startsWith(prefix))));
-        Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
+        FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,
+            path -> prefix == null || path.getName().startsWith(prefix)));
+        Map<String, BlobMetaData> map = new LinkedHashMap<>();
         for (FileStatus file : files) {
-            map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
+            if (file.isFile()) {
+                map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
+            }
         }
         return Collections.unmodifiableMap(map);
     }
@@ -151,6 +154,19 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         return listBlobsByPrefix(null);
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path));
+        Map<String, BlobContainer> map = new LinkedHashMap<>();
+        for (FileStatus file : files) {
+            if (file.isDirectory()) {
+                final String name = file.getPath().getName();
+                map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext));
+            }
+        }
+        return Collections.unmodifiableMap(map);
+    }
+
     /**
      * Exists to wrap underlying InputStream methods that might make socket connections in
      * doPrivileged blocks. This is due to the way that hdfs client libraries might open

+ 61 - 0
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.hdfs;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.bootstrap.JavaVersion;
+import org.elasticsearch.common.settings.MockSecureSettings;
+import org.elasticsearch.common.settings.SecureSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
+
+import java.util.Collection;
+
+import static org.hamcrest.Matchers.equalTo;
+
+@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
+public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        return pluginList(HdfsPlugin.class);
+    }
+
+    @Override
+    protected SecureSettings credentials() {
+        return new MockSecureSettings();
+    }
+
+    @Override
+    protected void createRepository(String repoName) {
+        assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11")));
+        AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName)
+            .setType("hdfs")
+            .setSettings(Settings.builder()
+                .put("uri", "hdfs:///")
+                .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
+                .put("path", "foo")
+                .put("chunk_size", randomIntBetween(100, 1000) + "k")
+                .put("compress", randomBoolean())
+            ).get();
+        assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
+    }
+}

+ 1 - 3
plugins/repository-s3/build.gradle

@@ -164,7 +164,7 @@ if (useFixture) {
   def minioAddress = {
     int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
     assert minioPort > 0
-    return 'http://127.0.0.1:' + minioPort
+    'http://127.0.0.1:' + minioPort
   }
 
   File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address')
@@ -173,7 +173,6 @@ if (useFixture) {
   // and pass its name instead.
   task writeMinioAddress {
     dependsOn tasks.bundlePlugin, tasks.postProcessFixture
-    outputs.file(minioAddressFile)
     doLast {
       file(minioAddressFile).text = "${ -> minioAddress.call() }"
     }
@@ -181,7 +180,6 @@ if (useFixture) {
 
   thirdPartyTest {
     dependsOn writeMinioAddress
-    inputs.file(minioAddressFile)
     systemProperty 'test.s3.endpoint', minioAddressFile.name
   }
 

+ 54 - 3
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -38,6 +39,7 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStoreException;
@@ -203,12 +205,15 @@ class S3BlobContainer extends AbstractBlobContainer {
                     final ObjectListing finalPrevListing = prevListing;
                     list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
                 } else {
+                    final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+                    listObjectsRequest.setBucketName(blobStore.bucket());
+                    listObjectsRequest.setDelimiter("/");
                     if (blobNamePrefix != null) {
-                        list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(),
-                                buildKey(blobNamePrefix)));
+                        listObjectsRequest.setPrefix(buildKey(blobNamePrefix));
                     } else {
-                        list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), keyPath));
+                        listObjectsRequest.setPrefix(keyPath);
                     }
+                    list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
                 }
                 for (final S3ObjectSummary summary : list.getObjectSummaries()) {
                     final String name = summary.getKey().substring(keyPath.length());
@@ -231,6 +236,52 @@ class S3BlobContainer extends AbstractBlobContainer {
         return listBlobsByPrefix(null);
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        try (AmazonS3Reference clientReference = blobStore.clientReference()) {
+            ObjectListing prevListing = null;
+            final var entries = new ArrayList<Map.Entry<String, BlobContainer>>();
+            while (true) {
+                ObjectListing list;
+                if (prevListing != null) {
+                    final ObjectListing finalPrevListing = prevListing;
+                    list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
+                } else {
+                    final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+                    listObjectsRequest.setBucketName(blobStore.bucket());
+                    listObjectsRequest.setPrefix(keyPath);
+                    listObjectsRequest.setDelimiter("/");
+                    list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
+                }
+                for (final String summary : list.getCommonPrefixes()) {
+                    final String name = summary.substring(keyPath.length());
+                    if (name.isEmpty() == false) {
+                        // Stripping the trailing slash off of the common prefix
+                        final String last = name.substring(0, name.length() - 1);
+                        final BlobPath path = path().add(last);
+                        entries.add(entry(last, blobStore.blobContainer(path)));
+                    }
+                }
+                assert list.getObjectSummaries().stream().noneMatch(s -> {
+                    for (String commonPrefix : list.getCommonPrefixes()) {
+                        if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
+                            return true;
+                        }
+                    }
+                    return false;
+                }) : "Response contained children for listed common prefixes.";
+                if (list.isTruncated()) {
+                    prevListing = list;
+                } else {
+                    break;
+                }
+            }
+            return Maps.ofEntries(entries);
+        } catch (final AmazonClientException e) {
+            throw new IOException("Exception when listing children of [" +  path().buildAsString() + ']', e);
+        }
+    }
+
     private String buildKey(String blobName) {
         return keyPath + blobName;
     }

+ 0 - 6
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

@@ -216,12 +216,6 @@ class S3Repository extends BlobStoreRepository {
         return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata);
     }
 
-    // only use for testing
-    @Override
-    protected BlobStore blobStore() {
-        return super.blobStore();
-    }
-
     // only use for testing
     @Override
     protected BlobStore getBlobStore() {

+ 19 - 1
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

@@ -19,6 +19,8 @@
 package org.elasticsearch.repositories.s3;
 
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.SecureSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -28,6 +30,8 @@ import org.elasticsearch.test.StreamsUtils;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.blankOrNullString;
 import static org.hamcrest.Matchers.equalTo;
@@ -56,7 +60,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
     protected void createRepository(String repoName) {
         Settings.Builder settings = Settings.builder()
             .put("bucket", System.getProperty("test.s3.bucket"))
-            .put("base_path", System.getProperty("test.s3.base", "/"));
+            .put("base_path", System.getProperty("test.s3.base", "testpath"));
         final String endpointPath = System.getProperty("test.s3.endpoint");
         if (endpointPath != null) {
             try {
@@ -70,4 +74,18 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
             .setSettings(settings).get();
         assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
     }
+
+    @Override
+    protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
+        // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
+        // to become consistent.
+        assertBusy(() -> super.assertBlobsByPrefix(path, prefix, blobs), 10L, TimeUnit.MINUTES);
+    }
+
+    @Override
+    protected void assertChildren(BlobPath path, Collection<String> children) throws Exception {
+        // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
+        // to become consistent.
+        assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES);
+    }
 }

+ 10 - 0
server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

@@ -158,6 +158,16 @@ public interface BlobContainer {
      */
     Map<String, BlobMetaData> listBlobs() throws IOException;
 
+    /**
+     * Lists all child containers under this container. A child container is defined as a container whose {@link #path()} method returns
+     * a path that has this containers {@link #path()} return as its prefix and has one more path element than the current
+     * container's path.
+     *
+     * @return Map of name of the child container to child container
+     * @throws IOException on failure to list child containers
+     */
+    Map<String, BlobContainer> children() throws IOException;
+
     /**
      * Lists all blobs in the container that match the specified prefix.
      *

+ 15 - 1
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.common.blobstore.fs;
 
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
@@ -73,9 +74,22 @@ public class FsBlobContainer extends AbstractBlobContainer {
         return listBlobsByPrefix(null);
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        Map<String, BlobContainer> builder = new HashMap<>();
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+            for (Path file : stream) {
+                if (Files.isDirectory(file)) {
+                    final String name = file.getFileName().toString();
+                    builder.put(name, new FsBlobContainer(blobStore, path().add(name), file));
+                }
+            }
+        }
+        return unmodifiableMap(builder);
+    }
+
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
-        // If we get duplicate files we should just take the last entry
         Map<String, BlobMetaData> builder = new HashMap<>();
 
         blobNamePrefix = blobNamePrefix == null ? "" : blobNamePrefix;

+ 7 - 2
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -255,6 +255,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    public ThreadPool threadPool() {
+        return threadPool;
+    }
+
     // package private, only use for testing
     BlobContainer getBlobContainer() {
         return blobContainer.get();
@@ -286,9 +290,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     /**
-     * maintains single lazy instance of {@link BlobStore}
+     * Maintains single lazy instance of {@link BlobStore}.
+     * Public for testing.
      */
-    protected BlobStore blobStore() {
+    public BlobStore blobStore() {
         assertSnapshotOrGenericThread();
 
         BlobStore store = blobStore.get();

+ 5 - 0
server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java

@@ -74,6 +74,11 @@ public class BlobContainerWrapper implements BlobContainer {
         return delegate.listBlobs();
     }
 
+    @Override
+    public Map<String, BlobContainer> children() throws IOException {
+        return delegate.children();
+    }
+
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
         return delegate.listBlobsByPrefix(blobNamePrefix);

+ 10 - 0
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -52,6 +52,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -338,6 +339,15 @@ public class MockRepository extends FsRepository {
                 return super.listBlobs();
             }
 
+            @Override
+            public Map<String, BlobContainer> children() throws IOException {
+                final Map<String, BlobContainer> res = new HashMap<>();
+                for (Map.Entry<String, BlobContainer> entry : super.children().entrySet()) {
+                    res.put(entry.getKey(), new MockBlobContainer(entry.getValue()));
+                }
+                return res;
+            }
+
             @Override
             public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
                 maybeIOExceptionOrBlock(blobNamePrefix);

+ 116 - 2
test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

@@ -18,12 +18,33 @@
  */
 package org.elasticsearch.repositories;
 
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
 import org.elasticsearch.common.settings.SecureSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
@@ -41,10 +62,34 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
 
     protected abstract void createRepository(String repoName);
 
-
-    public void testCreateSnapshot() {
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
         createRepository("test-repo");
+        final BlobStoreRepository repo = getRepository();
+        final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+        repo.threadPool().generic().execute(new ActionRunnable<>(future) {
+            @Override
+            protected void doRun() throws Exception {
+                deleteContents(repo.blobStore().blobContainer(repo.basePath()));
+                future.onResponse(null);
+            }
+        });
+        future.actionGet();
+        assertChildren(repo.basePath(), Collections.emptyList());
+    }
 
+    private static void deleteContents(BlobContainer container) throws IOException {
+        final List<String> toDelete = new ArrayList<>();
+        for (Map.Entry<String, BlobContainer> child : container.children().entrySet()) {
+            deleteContents(child.getValue());
+            toDelete.add(child.getKey());
+        }
+        toDelete.addAll(container.listBlobs().keySet());
+        container.deleteBlobsIgnoringIfNotExists(toDelete);
+    }
+
+    public void testCreateSnapshot() {
         createIndex("test-idx-1");
         createIndex("test-idx-2");
         createIndex("test-idx-3");
@@ -86,6 +131,75 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
                 .prepareDeleteSnapshot("test-repo", snapshotName)
                 .get()
                 .isAcknowledged());
+    }
+
+    public void testListChildren() throws Exception {
+        final BlobStoreRepository repo = getRepository();
+        final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+        final Executor genericExec = repo.threadPool().generic();
+        final int testBlobLen = randomIntBetween(1, 100);
+        genericExec.execute(new ActionRunnable<>(future) {
+            @Override
+            protected void doRun() throws Exception {
+                final BlobStore blobStore = repo.blobStore();
+                blobStore.blobContainer(repo.basePath().add("foo"))
+                    .writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
+                blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
+                    .writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
+                blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
+                    .writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
+                future.onResponse(null);
+            }
+        });
+        future.actionGet();
+        assertChildren(repo.basePath(), Collections.singleton("foo"));
+        assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap());
+        assertChildren(repo.basePath().add("foo"), List.of("nested", "nested2"));
+        assertBlobsByPrefix(repo.basePath().add("foo"), "nest",
+            Collections.singletonMap("nested-blob", new PlainBlobMetaData("nested-blob", testBlobLen)));
+        assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList());
+    }
+
+    protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
+        final PlainActionFuture<Map<String, BlobMetaData>> future = PlainActionFuture.newFuture();
+        final BlobStoreRepository repository = getRepository();
+        repository.threadPool().generic().execute(new ActionRunnable<>(future) {
+            @Override
+            protected void doRun() throws Exception {
+                final BlobStore blobStore = repository.blobStore();
+                future.onResponse(blobStore.blobContainer(path).listBlobsByPrefix(prefix));
+            }
+        });
+        Map<String, BlobMetaData> foundBlobs = future.actionGet();
+        if (blobs.isEmpty()) {
+            assertThat(foundBlobs.keySet(), empty());
+        } else {
+            assertThat(foundBlobs.keySet(), containsInAnyOrder(blobs.keySet().toArray(Strings.EMPTY_ARRAY)));
+            for (Map.Entry<String, BlobMetaData> entry : foundBlobs.entrySet()) {
+                assertEquals(entry.getValue().length(), blobs.get(entry.getKey()).length());
+            }
+        }
+    }
+
+    protected void assertChildren(BlobPath path, Collection<String> children) throws Exception {
+        final PlainActionFuture<Set<String>> future = PlainActionFuture.newFuture();
+        final BlobStoreRepository repository = getRepository();
+        repository.threadPool().generic().execute(new ActionRunnable<>(future) {
+            @Override
+            protected void doRun() throws Exception {
+                final BlobStore blobStore = repository.blobStore();
+                future.onResponse(blobStore.blobContainer(path).children().keySet());
+            }
+        });
+        Set<String> foundChildren = future.actionGet();
+        if (children.isEmpty()) {
+            assertThat(foundChildren, empty());
+        } else {
+            assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY)));
+        }
+    }
 
+    private BlobStoreRepository getRepository() {
+        return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
     }
 }