1
0
Эх сурвалжийг харах

Azure compare-and-exchange implementation (#94276)

David Turner 2 жил өмнө
parent
commit
bc8cdc2f51

+ 22 - 7
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -14,6 +14,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.util.Throwables;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
@@ -157,18 +158,32 @@ public class AzureBlobContainer extends AbstractBlobContainer {
         return keyPath + (blobName == null ? "" : blobName);
     }
 
-    @Override
-    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
-        listener.onFailure(new UnsupportedOperationException()); // TODO
+    private boolean skipRegisterOperation(ActionListener<?> listener) {
+        return skipCas(listener) || skipIfNotPrimaryOnlyLocationMode(listener);
     }
 
-    @Override
-    public void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
-        listener.onFailure(new UnsupportedOperationException()); // TODO
+    private boolean skipIfNotPrimaryOnlyLocationMode(ActionListener<?> listener) {
+        if (blobStore.getLocationMode() == LocationMode.PRIMARY_ONLY) {
+            return false;
+        }
+        listener.onFailure(
+            new UnsupportedOperationException(
+                Strings.format("consistent register operations are not supported with location_mode [%s]", blobStore.getLocationMode())
+            )
+        );
+        return true;
     }
 
     @Override
     public void getRegister(String key, ActionListener<OptionalLong> listener) {
-        listener.onFailure(new UnsupportedOperationException()); // TODO
+        if (skipRegisterOperation(listener)) return;
+        ActionListener.completeWith(listener, () -> blobStore.getRegister(buildKey(key), keyPath, key));
     }
+
+    @Override
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        if (skipRegisterOperation(listener)) return;
+        ActionListener.completeWith(listener, () -> blobStore.compareAndExchangeRegister(buildKey(key), keyPath, key, expected, updated));
+    }
+
 }

+ 110 - 0
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 import com.azure.core.http.rest.ResponseBase;
+import com.azure.core.util.BinaryData;
 import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerAsyncClient;
@@ -31,17 +32,21 @@ import com.azure.storage.blob.models.BlobRequestConditions;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.blob.models.DownloadRetryOptions;
 import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
 import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
+import com.azure.storage.blob.specialized.BlobLeaseClientBuilder;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.util.Throwables;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
+import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
 import org.elasticsearch.common.blobstore.support.BlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -52,6 +57,7 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.repositories.azure.AzureRepository.Repository;
 import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
+import org.elasticsearch.rest.RestStatus;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -71,6 +77,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -795,4 +802,107 @@ public class AzureBlobStore implements BlobStore {
             onHttpRequest.run();
         }
     }
+
+    OptionalLong getRegister(String blobPath, String containerPath, String blobKey) {
+        try {
+            return SocketAccess.doPrivilegedException(
+                () -> OptionalLong.of(
+                    downloadRegisterBlob(
+                        containerPath,
+                        blobKey,
+                        getAzureBlobServiceClientClient().getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath),
+                        null
+                    )
+                )
+            );
+        } catch (Exception e) {
+            if (Throwables.getRootCause(e)instanceof BlobStorageException blobStorageException
+                && blobStorageException.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
+                return OptionalLong.empty();
+            }
+            throw e;
+        }
+    }
+
+    OptionalLong compareAndExchangeRegister(String blobPath, String containerPath, String blobKey, long expected, long updated) {
+        try {
+            return SocketAccess.doPrivilegedException(
+                () -> OptionalLong.of(
+                    innerCompareAndExchangeRegister(
+                        containerPath,
+                        blobKey,
+                        getAzureBlobServiceClientClient().getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath),
+                        expected,
+                        updated
+                    )
+                )
+            );
+        } catch (Exception e) {
+            if (Throwables.getRootCause(e)instanceof BlobStorageException blobStorageException) {
+                if (blobStorageException.getStatusCode() == RestStatus.PRECONDITION_FAILED.getStatus()
+                    || blobStorageException.getStatusCode() == RestStatus.CONFLICT.getStatus()) {
+                    return OptionalLong.empty();
+                }
+            }
+            throw e;
+        }
+    }
+
+    private static long innerCompareAndExchangeRegister(
+        String containerPath,
+        String blobKey,
+        BlobClient blobClient,
+        long expected,
+        long updated
+    ) throws IOException {
+        if (blobClient.exists()) {
+            final var leaseClient = new BlobLeaseClientBuilder().blobClient(blobClient).buildClient();
+            final var leaseId = leaseClient.acquireLease(60);
+            try {
+                final long currentValue = downloadRegisterBlob(
+                    containerPath,
+                    blobKey,
+                    blobClient,
+                    new BlobRequestConditions().setLeaseId(leaseId)
+                );
+                if (currentValue == expected) {
+                    uploadRegisterBlob(updated, blobClient, new BlobRequestConditions().setLeaseId(leaseId));
+                }
+                return currentValue;
+            } finally {
+                leaseClient.releaseLease();
+            }
+        } else {
+            if (expected == 0L) {
+                uploadRegisterBlob(updated, blobClient, new BlobRequestConditions().setIfNoneMatch("*"));
+            }
+            return 0L;
+        }
+    }
+
+    private static long downloadRegisterBlob(
+        String containerPath,
+        String blobKey,
+        BlobClient blobClient,
+        BlobRequestConditions blobRequestConditions
+    ) throws IOException {
+        return BlobContainerUtils.getRegisterUsingConsistentRead(
+            blobClient.downloadContentWithResponse(new DownloadRetryOptions().setMaxRetryRequests(0), blobRequestConditions, null, null)
+                .getValue()
+                .toStream(),
+            containerPath,
+            blobKey
+        );
+    }
+
+    private static void uploadRegisterBlob(long value, BlobClient blobClient, BlobRequestConditions requestConditions) throws IOException {
+        final var blobContents = BlobContainerUtils.getRegisterBlobContents(value);
+        blobClient.uploadWithResponse(
+            new BlobParallelUploadOptions(BinaryData.fromStream(blobContents.streamInput(), (long) blobContents.length()))
+                .setRequestConditions(requestConditions),
+            null,
+            null
+        );
+    }
+
 }

+ 12 - 0
server/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.common.blobstore.support;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 
@@ -22,6 +23,17 @@ public abstract class AbstractBlobContainer implements BlobContainer {
         this.path = path;
     }
 
+    /**
+     * Temporary check that permits disabling CAS operations at runtime; TODO remove this when no longer needed
+     */
+    protected static boolean skipCas(ActionListener<?> listener) {
+        if ("true".equals(System.getProperty("test.repository_test_kit.skip_cas"))) {
+            listener.onFailure(new UnsupportedOperationException());
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public BlobPath path() {
         return this.path;

+ 1 - 0
x-pack/plugin/snapshot-repo-test-kit/qa/azure/build.gradle

@@ -75,6 +75,7 @@ testClusters.matching { it.name == "javaRestTest" }.configureEach {
     setting 'azure.client.repository_test_kit.endpoint_suffix',
       { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${-> fixtureAddress('azure-fixture-other')}/azure_integration_test_account" }, IGNORE_VALUE
 
+    systemProperty 'test.repository_test_kit.skip_cas', 'true' // test fixture does not support CAS yet; TODO fix this
   } else {
     println "Using an external service to test " + project.name
   }