Procházet zdrojové kódy

Reorder/extract methods in S3 CompareAndExchangeOperation (#101299)

Shorten `run()` and put the methods it calls into a more logical order.
David Turner před 2 roky
rodič
revize
9e135ebfd1

+ 114 - 112
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -580,111 +580,20 @@ class S3BlobContainer extends AbstractBlobContainer {
             this.threadPool = threadPool;
         }
 
-        private List<MultipartUpload> listMultipartUploads() {
-            final var listRequest = new ListMultipartUploadsRequest(bucket);
-            listRequest.setPrefix(blobKey);
-            listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
-            try {
-                return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
-            } catch (AmazonS3Exception e) {
-                if (e.getStatusCode() == 404) {
-                    return List.of();
-                }
-                throw e;
-            }
-        }
-
-        private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipartUploads) {
-            var uploadIndex = 0;
-            var found = false;
-            for (MultipartUpload multipartUpload : multipartUploads) {
-                final var observedUploadId = multipartUpload.getUploadId();
-                if (observedUploadId.equals(targetUploadId)) {
-                    final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
-                    final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
-                    final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
-                    if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
-                        logger.warn(
-                            """
-                                compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
-                                which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""",
-                            bucket,
-                            blobKey,
-                            multipartUpload.getInitiated(),
-                            multipartUpload.getInitiated().toInstant().toEpochMilli(),
-                            currentTimeMillis,
-                            expectedAgeRangeMillis
-                        );
-                    }
-                    found = true;
-                } else if (observedUploadId.compareTo(targetUploadId) < 0) {
-                    uploadIndex += 1;
-                }
-            }
-
-            return found ? uploadIndex : -1;
-        }
-
-        /**
-         * @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
-         */
-        private boolean hasPreexistingUploads() {
-            final var uploads = listMultipartUploads();
-            if (uploads.isEmpty()) {
-                return false;
-            }
-
-            final var expiryDate = Date.from(
-                Instant.ofEpochMilli(
-                    blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
-                )
-            );
-            if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
-                return true;
-            }
-
-            // there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare)
-            for (final var upload : uploads) {
-                logger.warn(
-                    "cleaning up stale compare-and-swap upload [{}] initiated at [{}]",
-                    upload.getUploadId(),
-                    upload.getInitiated()
-                );
-                safeAbortMultipartUpload(upload.getUploadId());
-            }
-
-            return false;
-        }
-
         void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
             BlobContainerUtils.ensureValidRegisterContent(updated);
 
             if (hasPreexistingUploads()) {
-
                 // This is a small optimization to improve the liveness properties of this algorithm.
                 //
                 // We can safely proceed even if there are other uploads in progress, but that would add to the potential for collisions and
                 // delays. Thus in this case we prefer avoid disturbing the ongoing attempts and just fail up front.
-
                 listener.onResponse(OptionalBytesReference.MISSING);
                 return;
             }
 
-            final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
-            initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
-            final var uploadId = SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();
-
-            final var uploadPartRequest = new UploadPartRequest();
-            uploadPartRequest.setBucketName(bucket);
-            uploadPartRequest.setKey(blobKey);
-            uploadPartRequest.setUploadId(uploadId);
-            uploadPartRequest.setPartNumber(1);
-            uploadPartRequest.setLastPart(true);
-            uploadPartRequest.setInputStream(updated.streamInput());
-            uploadPartRequest.setPartSize(updated.length());
-            uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
-            final var partETag = SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();
-
+            final var uploadId = initiateMultipartUpload();
+            final var partETag = uploadPart(updated, uploadId);
             final var currentUploads = listMultipartUploads();
             final var uploadIndex = getUploadIndex(uploadId, currentUploads);
 
@@ -710,16 +619,7 @@ class S3BlobContainer extends AbstractBlobContainer {
                                 rawKey,
                                 delegate1.delegateFailure((delegate2, currentValue) -> ActionListener.completeWith(delegate2, () -> {
                                     if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
-                                        final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
-                                            bucket,
-                                            blobKey,
-                                            uploadId,
-                                            List.of(partETag)
-                                        );
-                                        completeMultipartUploadRequest.setRequestMetricCollector(
-                                            blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)
-                                        );
-                                        SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
+                                        completeMultipartUpload(uploadId, partETag);
                                         isComplete.set(true);
                                     }
                                     return currentValue;
@@ -740,15 +640,7 @@ class S3BlobContainer extends AbstractBlobContainer {
                     var delayListener = listeners.acquire();
                     final Runnable cancelConcurrentUpdates = () -> {
                         try {
-                            for (MultipartUpload currentUpload : currentUploads) {
-                                final var currentUploadId = currentUpload.getUploadId();
-                                if (uploadId.equals(currentUploadId) == false) {
-                                    blobStore.getSnapshotExecutor()
-                                        .execute(
-                                            ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId))
-                                        );
-                                }
-                            }
+                            cancelOtherUploads(uploadId, currentUploads, listeners);
                         } finally {
                             delayListener.onResponse(null);
                         }
@@ -769,6 +661,111 @@ class S3BlobContainer extends AbstractBlobContainer {
             }
         }
 
+        /**
+         * @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
+         */
+        private boolean hasPreexistingUploads() {
+            final var uploads = listMultipartUploads();
+            if (uploads.isEmpty()) {
+                return false;
+            }
+
+            final var expiryDate = Date.from(
+                Instant.ofEpochMilli(
+                    blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
+                )
+            );
+            if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
+                return true;
+            }
+
+            // there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare)
+            for (final var upload : uploads) {
+                logger.warn(
+                    "cleaning up stale compare-and-swap upload [{}] initiated at [{}]",
+                    upload.getUploadId(),
+                    upload.getInitiated()
+                );
+                safeAbortMultipartUpload(upload.getUploadId());
+            }
+
+            return false;
+        }
+
+        private List<MultipartUpload> listMultipartUploads() {
+            final var listRequest = new ListMultipartUploadsRequest(bucket);
+            listRequest.setPrefix(blobKey);
+            listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
+            try {
+                return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
+            } catch (AmazonS3Exception e) {
+                if (e.getStatusCode() == 404) {
+                    return List.of();
+                }
+                throw e;
+            }
+        }
+
+        private String initiateMultipartUpload() {
+            final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
+            initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
+            return SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();
+        }
+
+        private PartETag uploadPart(BytesReference updated, String uploadId) throws IOException {
+            final var uploadPartRequest = new UploadPartRequest();
+            uploadPartRequest.setBucketName(bucket);
+            uploadPartRequest.setKey(blobKey);
+            uploadPartRequest.setUploadId(uploadId);
+            uploadPartRequest.setPartNumber(1);
+            uploadPartRequest.setLastPart(true);
+            uploadPartRequest.setInputStream(updated.streamInput());
+            uploadPartRequest.setPartSize(updated.length());
+            uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
+            return SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();
+        }
+
+        private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipartUploads) {
+            var uploadIndex = 0;
+            var found = false;
+            for (MultipartUpload multipartUpload : multipartUploads) {
+                final var observedUploadId = multipartUpload.getUploadId();
+                if (observedUploadId.equals(targetUploadId)) {
+                    final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
+                    final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
+                    final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
+                    if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
+                        logger.warn(
+                            """
+                                compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
+                                which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""",
+                            bucket,
+                            blobKey,
+                            multipartUpload.getInitiated(),
+                            multipartUpload.getInitiated().toInstant().toEpochMilli(),
+                            currentTimeMillis,
+                            expectedAgeRangeMillis
+                        );
+                    }
+                    found = true;
+                } else if (observedUploadId.compareTo(targetUploadId) < 0) {
+                    uploadIndex += 1;
+                }
+            }
+
+            return found ? uploadIndex : -1;
+        }
+
+        private void cancelOtherUploads(String uploadId, List<MultipartUpload> currentUploads, RefCountingListener listeners) {
+            for (final var currentUpload : currentUploads) {
+                final var currentUploadId = currentUpload.getUploadId();
+                if (uploadId.equals(currentUploadId) == false) {
+                    blobStore.getSnapshotExecutor()
+                        .execute(ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId)));
+                }
+            }
+        }
+
         private void safeAbortMultipartUpload(String uploadId) {
             try {
                 abortMultipartUploadIfExists(uploadId);
@@ -791,6 +788,11 @@ class S3BlobContainer extends AbstractBlobContainer {
             }
         }
 
+        private void completeMultipartUpload(String uploadId, PartETag partETag) {
+            final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucket, blobKey, uploadId, List.of(partETag));
+            completeMultipartUploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
+            SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
+        }
     }
 
     @Override