1
0
Эх сурвалжийг харах

Cleanup S3 BlobContainer Listing Logic (#43088)

* Cleanup duplication in creating and looping over IO Requests
Armin Braun 6 жил өмнө
parent
commit
71186fcb9f

+ 47 - 70
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import org.apache.lucene.util.SetOnce;
@@ -46,7 +45,6 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
 import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.util.Maps;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -56,9 +54,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static java.util.Map.entry;
 import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
 import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
 import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
@@ -238,36 +236,12 @@ class S3BlobContainer extends AbstractBlobContainer {
 
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
-        final var entries = new ArrayList<Map.Entry<String, BlobMetaData>>();
         try (AmazonS3Reference clientReference = blobStore.clientReference()) {
-            ObjectListing prevListing = null;
-            while (true) {
-                ObjectListing list;
-                if (prevListing != null) {
-                    final ObjectListing finalPrevListing = prevListing;
-                    list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
-                } else {
-                    final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-                    listObjectsRequest.setBucketName(blobStore.bucket());
-                    listObjectsRequest.setDelimiter("/");
-                    if (blobNamePrefix != null) {
-                        listObjectsRequest.setPrefix(buildKey(blobNamePrefix));
-                    } else {
-                        listObjectsRequest.setPrefix(keyPath);
-                    }
-                    list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
-                }
-                for (final S3ObjectSummary summary : list.getObjectSummaries()) {
-                    final String name = summary.getKey().substring(keyPath.length());
-                    entries.add(entry(name, new PlainBlobMetaData(name, summary.getSize())));
-                }
-                if (list.isTruncated()) {
-                    prevListing = list;
-                } else {
-                    break;
-                }
-            }
-            return Maps.ofEntries(entries);
+            return executeListing(clientReference, listObjectsRequest(blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix)))
+                .stream()
+                .flatMap(listing -> listing.getObjectSummaries().stream())
+                .map(summary -> new PlainBlobMetaData(summary.getKey().substring(keyPath.length()), summary.getSize()))
+                .collect(Collectors.toMap(PlainBlobMetaData::name, Function.identity()));
         } catch (final AmazonClientException e) {
             throw new IOException("Exception when listing blobs by prefix [" + blobNamePrefix + "]", e);
         }
@@ -281,47 +255,50 @@ class S3BlobContainer extends AbstractBlobContainer {
     @Override
     public Map<String, BlobContainer> children() throws IOException {
         try (AmazonS3Reference clientReference = blobStore.clientReference()) {
-            ObjectListing prevListing = null;
-            final var entries = new ArrayList<Map.Entry<String, BlobContainer>>();
-            while (true) {
-                ObjectListing list;
-                if (prevListing != null) {
-                    final ObjectListing finalPrevListing = prevListing;
-                    list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
-                } else {
-                    final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-                    listObjectsRequest.setBucketName(blobStore.bucket());
-                    listObjectsRequest.setPrefix(keyPath);
-                    listObjectsRequest.setDelimiter("/");
-                    list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
-                }
-                for (final String summary : list.getCommonPrefixes()) {
-                    final String name = summary.substring(keyPath.length());
-                    if (name.isEmpty() == false) {
-                        // Stripping the trailing slash off of the common prefix
-                        final String last = name.substring(0, name.length() - 1);
-                        final BlobPath path = path().add(last);
-                        entries.add(entry(last, blobStore.blobContainer(path)));
-                    }
-                }
-                assert list.getObjectSummaries().stream().noneMatch(s -> {
-                    for (String commonPrefix : list.getCommonPrefixes()) {
-                        if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
-                            return true;
+            return executeListing(clientReference, listObjectsRequest(keyPath)).stream().flatMap(listing -> {
+                    assert listing.getObjectSummaries().stream().noneMatch(s -> {
+                        for (String commonPrefix : listing.getCommonPrefixes()) {
+                            if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
+                                return true;
+                            }
                         }
-                    }
-                    return false;
-                }) : "Response contained children for listed common prefixes.";
-                if (list.isTruncated()) {
-                    prevListing = list;
-                } else {
-                    break;
-                }
-            }
-            return Maps.ofEntries(entries);
+                        return false;
+                    }) : "Response contained children for listed common prefixes.";
+                    return listing.getCommonPrefixes().stream();
+                })
+                .map(prefix -> prefix.substring(keyPath.length()))
+                .filter(name -> name.isEmpty() == false)
+                // Stripping the trailing slash off of the common prefix
+                .map(name -> name.substring(0, name.length() - 1))
+                .collect(Collectors.toMap(Function.identity(), name -> blobStore.blobContainer(path().add(name))));
         } catch (final AmazonClientException e) {
-            throw new IOException("Exception when listing children of [" +  path().buildAsString() + ']', e);
+            throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e);
+        }
+    }
+
+    private static List<ObjectListing> executeListing(AmazonS3Reference clientReference, ListObjectsRequest listObjectsRequest) {
+        final List<ObjectListing> results = new ArrayList<>();
+        ObjectListing prevListing = null;
+        while (true) {
+            ObjectListing list;
+            if (prevListing != null) {
+                final ObjectListing finalPrevListing = prevListing;
+                list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
+            } else {
+                list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
+            }
+            results.add(list);
+            if (list.isTruncated()) {
+                prevListing = list;
+            } else {
+                break;
+            }
         }
+        return results;
+    }
+
+    private ListObjectsRequest listObjectsRequest(String keyPath) {
+        return new ListObjectsRequest().withBucketName(blobStore.bucket()).withPrefix(keyPath).withDelimiter("/");
     }
 
     private String buildKey(String blobName) {