1
0
Эх сурвалжийг харах

Async blob-store compare-and-exchange API (#94092)

Further work towards the S3 compare-and-exchange implementation showed
that we would like this API to permit async operations. This commit
moves to an async API.

Also, this change made it fairly awkward to use an exception to deliver
to the caller the indication that the current value could not be read,
so this commit adjusts things to use `OptionalLong` throughout as
suggested in the discussion on #93955.
David Turner 2 жил өмнө
parent
commit
95daf492fc
15 өөрчлөгдсөн 245 нэмэгдсэн , 177 устгасан
  1. 8 6
      modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
  2. 8 6
      modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java
  3. 8 6
      modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
  4. 4 2
      modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java
  5. 4 2
      plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java
  6. 19 18
      server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
  7. 0 15
      server/src/main/java/org/elasticsearch/common/blobstore/ConcurrentRegisterOperationException.java
  8. 32 31
      server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java
  9. 8 8
      server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java
  10. 37 9
      server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java
  11. 3 2
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java
  12. 5 2
      x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java
  13. 20 11
      x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java
  14. 60 46
      x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RegisterAnalyzeAction.java
  15. 29 13
      x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java

+ 8 - 6
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -13,6 +13,7 @@ import com.azure.storage.blob.models.BlobStorageException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.util.Throwables;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
@@ -28,6 +29,7 @@ import java.io.OutputStream;
 import java.nio.file.NoSuchFileException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.OptionalLong;
 
 public class AzureBlobContainer extends AbstractBlobContainer {
 
@@ -156,17 +158,17 @@ public class AzureBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public boolean compareAndSetRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public long getRegister(String key) throws IOException {
-        throw new UnsupportedOperationException(); // TODO
+    public void getRegister(String key, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 }

+ 8 - 6
modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.repositories.gcs;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStoreException;
@@ -22,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.OptionalLong;
 
 class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
 
@@ -119,17 +121,17 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public boolean compareAndSetRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public long getRegister(String key) throws IOException {
-        throw new UnsupportedOperationException(); // TODO
+    public void getRegister(String key, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 }

+ 8 - 6
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -609,17 +611,17 @@ class S3BlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public boolean compareAndSetRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException(); // TODO
+    public void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 
     @Override
-    public long getRegister(String key) throws IOException {
-        throw new UnsupportedOperationException(); // TODO
+    public void getRegister(String key, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException()); // TODO
     }
 }

+ 4 - 2
modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.common.blobstore.url;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
@@ -29,6 +30,7 @@ import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.OptionalLong;
 
 /**
  * URL blob implementation of {@link org.elasticsearch.common.blobstore.BlobContainer}
@@ -148,8 +150,8 @@ public class URLBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException("URL repository doesn't support this operation");
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException("URL repositories do not support this operation"));
     }
 
 }

+ 4 - 2
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -15,6 +15,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Path;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
@@ -39,6 +40,7 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.OptionalLong;
 
 final class HdfsBlobContainer extends AbstractBlobContainer {
     private final HdfsBlobStore store;
@@ -318,7 +320,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) {
-        throw new UnsupportedOperationException("HDFS repositories do not support this operation");
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        listener.onFailure(new UnsupportedOperationException("HDFS repositories do not support this operation"));
     }
 }

+ 19 - 18
server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.common.blobstore;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.support.BlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.core.CheckedConsumer;
@@ -19,6 +20,7 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.OptionalLong;
 
 /**
  * An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes.
@@ -199,39 +201,38 @@ public interface BlobContainer {
      * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}.
      * Keys not yet used start at initial value 0. Returns the current value (before it was updated).
      *
-     * @param key key of the value to update
+     * @param key      key of the value to update
      * @param expected the expected value
-     * @param updated the new value
-     * @return the value read from the register (before it was updated)
-     * @throws ConcurrentRegisterOperationException if concurrent activity prevents us from even reading the value of the register
+     * @param updated  the new value
+     * @param listener a listener, completed with the value read from the register (before it was updated) or {@code OptionalLong#empty}
+     *                 if the value could not be read due to concurrent activity.
      */
-    long compareAndExchangeRegister(String key, long expected, long updated) throws IOException, ConcurrentRegisterOperationException;
+    void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener);
 
     /**
      * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}.
      * Keys not yet used start at initial value 0.
      *
-     * @param key key of the value to update
+     * @param key      key of the value to update
      * @param expected the expected value
-     * @param updated the new value
-     * @return true if successful, false if the expected value did not match the updated value
-     * @throws ConcurrentRegisterOperationException if concurrent activity prevents us from even reading the value of the register
+     * @param updated  the new value
+     * @param listener a listener which is completed with {@link Boolean#TRUE} if successful, {@link Boolean#FALSE} if the expected value
+     *                 did not match the updated value
      */
-    default boolean compareAndSetRegister(String key, long expected, long updated) throws IOException,
-        ConcurrentRegisterOperationException {
-        return compareAndExchangeRegister(key, expected, updated) == expected;
+    default void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
+        compareAndExchangeRegister(key, expected, updated, listener.map(witness -> witness.isPresent() && witness.getAsLong() == expected));
     }
 
     /**
-     * Gets the value set by {@link #compareAndSetRegister(String, long, long)} for a given key.
+     * Gets the value set by {@link #compareAndSetRegister(String, long, long, ActionListener)} for a given key.
      * If a key has not yet been used, the initial value is 0.
      *
-     * @param key key of the value to get
-     * @return value found
-     * @throws ConcurrentRegisterOperationException if concurrent activity prevents us from even reading the value of the register
+     * @param key      key of the value to get
+     * @param listener a listener, completed with the value read from the register or {@code null} if the value could not be read due to
+     *                 concurrent activity.
      */
-    default long getRegister(String key) throws IOException, ConcurrentRegisterOperationException {
-        return compareAndExchangeRegister(key, 0, 0);
+    default void getRegister(String key, ActionListener<OptionalLong> listener) {
+        compareAndExchangeRegister(key, 0, 0, listener);
     }
 
 }

+ 0 - 15
server/src/main/java/org/elasticsearch/common/blobstore/ConcurrentRegisterOperationException.java

@@ -1,15 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.common.blobstore;
-
-public class ConcurrentRegisterOperationException extends Exception {
-    public ConcurrentRegisterOperationException(Exception cause) {
-        super(cause);
-    }
-}

+ 32 - 31
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -11,10 +11,10 @@ package org.elasticsearch.common.blobstore.fs;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.Constants;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ConcurrentRegisterOperationException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.BlobMetadata;
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Collections.unmodifiableMap;
@@ -380,40 +381,40 @@ public class FsBlobContainer extends AbstractBlobContainer {
 
     @Override
     @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly")
-    public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException,
-        ConcurrentRegisterOperationException {
-
-        try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) {
-            final FileChannel fileChannel = lockedFileChannel.fileChannel();
-            final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
-            final long found;
-            while (buf.remaining() > 0) {
-                if (fileChannel.read(buf) == -1) {
-                    break;
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        ActionListener.completeWith(listener, () -> {
+            try (LockedFileChannel lockedFileChannel = LockedFileChannel.open(path.resolve(key))) {
+                final FileChannel fileChannel = lockedFileChannel.fileChannel();
+                final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
+                final long found;
+                while (buf.remaining() > 0) {
+                    if (fileChannel.read(buf) == -1) {
+                        break;
+                    }
                 }
-            }
-            if (buf.position() == 0) {
-                found = 0L;
-            } else if (buf.position() == Long.BYTES) {
-                found = buf.getLong(0);
-                buf.clear();
-                if (fileChannel.read(buf) != -1) {
-                    throw new IllegalStateException("Read file of length greater than [" + Long.BYTES + "] for [" + key + "]");
+                if (buf.position() == 0) {
+                    found = 0L;
+                } else if (buf.position() == Long.BYTES) {
+                    found = buf.getLong(0);
+                    buf.clear();
+                    if (fileChannel.read(buf) != -1) {
+                        throw new IllegalStateException("Read file of length greater than [" + Long.BYTES + "] for [" + key + "]");
+                    }
+                } else {
+                    throw new IllegalStateException("Read file of length [" + buf.position() + "] for [" + key + "]");
                 }
-            } else {
-                throw new IllegalStateException("Read file of length [" + buf.position() + "] for [" + key + "]");
-            }
-            if (found == expected) {
-                buf.clear().putLong(updated).flip();
-                while (buf.remaining() > 0) {
-                    fileChannel.write(buf, buf.position());
+                if (found == expected) {
+                    buf.clear().putLong(updated).flip();
+                    while (buf.remaining() > 0) {
+                        fileChannel.write(buf, buf.position());
+                    }
+                    fileChannel.force(true);
                 }
-                fileChannel.force(true);
+                return OptionalLong.of(found);
+            } catch (OverlappingFileLockException e) {
+                return OptionalLong.empty();
             }
-            return found;
-        } catch (OverlappingFileLockException e) {
-            throw new ConcurrentRegisterOperationException(e);
-        }
+        });
     }
 
     private record LockedFileChannel(FileChannel fileChannel, Closeable fileLock) implements Closeable {

+ 8 - 8
server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

@@ -8,9 +8,9 @@
 
 package org.elasticsearch.common.blobstore.support;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ConcurrentRegisterOperationException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.core.CheckedConsumer;
@@ -21,6 +21,7 @@ import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 public abstract class FilterBlobContainer implements BlobContainer {
@@ -104,18 +105,17 @@ public abstract class FilterBlobContainer implements BlobContainer {
     }
 
     @Override
-    public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException,
-        ConcurrentRegisterOperationException {
-        return delegate.compareAndExchangeRegister(key, expected, updated);
+    public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+        delegate.compareAndExchangeRegister(key, expected, updated, listener);
     }
 
     @Override
-    public boolean compareAndSetRegister(String key, long expected, long updated) throws IOException, ConcurrentRegisterOperationException {
-        return delegate.compareAndSetRegister(key, expected, updated);
+    public void compareAndSetRegister(String key, long expected, long updated, ActionListener<Boolean> listener) {
+        delegate.compareAndSetRegister(key, expected, updated, listener);
     }
 
     @Override
-    public long getRegister(String key) throws IOException, ConcurrentRegisterOperationException {
-        return delegate.getRegister(key);
+    public void getRegister(String key, ActionListener<OptionalLong> listener) {
+        delegate.getRegister(key, listener);
     }
 }

+ 37 - 9
server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

@@ -10,6 +10,8 @@ package org.elasticsearch.common.blobstore.fs;
 import org.apache.lucene.tests.mockfile.FilterFileSystemProvider;
 import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel;
 import org.apache.lucene.tests.util.LuceneTestCase;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.Streams;
@@ -32,7 +34,9 @@ import java.nio.file.Path;
 import java.nio.file.attribute.FileAttribute;
 import java.nio.file.spi.FileSystemProvider;
 import java.util.Locale;
+import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
@@ -95,6 +99,14 @@ public class FsBlobContainerTests extends ESTestCase {
         assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
     }
 
+    private static long getLongAsync(Consumer<ActionListener<OptionalLong>> consumer) {
+        return getAsync(consumer).orElseThrow(AssertionError::new);
+    }
+
+    private static <T> T getAsync(Consumer<ActionListener<T>> consumer) {
+        return PlainActionFuture.<T, RuntimeException>get(consumer::accept, 0, TimeUnit.SECONDS);
+    }
+
     public void testCompareAndExchange() throws Exception {
         final Path path = PathUtils.get(createTempDir().toString());
         final FsBlobContainer container = new FsBlobContainer(
@@ -108,16 +120,26 @@ public class FsBlobContainerTests extends ESTestCase {
 
         for (int i = 0; i < 5; i++) {
             switch (between(1, 4)) {
-                case 1 -> assertEquals(expectedValue.get(), container.getRegister(key));
+                case 1 -> assertEquals(expectedValue.get(), getLongAsync(l -> container.getRegister(key, l)));
                 case 2 -> assertFalse(
-                    container.compareAndSetRegister(key, randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong), randomLong())
+                    getAsync(
+                        l -> container.compareAndSetRegister(
+                            key,
+                            randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong),
+                            randomLong(),
+                            l
+                        )
+                    )
                 );
                 case 3 -> assertEquals(
                     expectedValue.get(),
-                    container.compareAndExchangeRegister(
-                        key,
-                        randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong),
-                        randomLong()
+                    getLongAsync(
+                        l -> container.compareAndExchangeRegister(
+                            key,
+                            randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong),
+                            randomLong(),
+                            l
+                        )
                     )
                 );
                 case 4 -> {/* no-op */}
@@ -125,16 +147,22 @@ public class FsBlobContainerTests extends ESTestCase {
 
             final var newValue = randomLong();
             if (randomBoolean()) {
-                assertTrue(container.compareAndSetRegister(key, expectedValue.get(), newValue));
+                assertTrue(getAsync(l -> container.compareAndSetRegister(key, expectedValue.get(), newValue, l)));
             } else {
-                assertEquals(expectedValue.get(), container.compareAndExchangeRegister(key, expectedValue.get(), newValue));
+                assertEquals(
+                    expectedValue.get(),
+                    getLongAsync(l -> container.compareAndExchangeRegister(key, expectedValue.get(), newValue, l))
+                );
             }
             expectedValue.set(newValue);
         }
 
         final byte[] corruptContents = new byte[9];
         container.writeBlob(key, new BytesArray(corruptContents, 0, randomFrom(1, 7, 9)), false);
-        expectThrows(IllegalStateException.class, () -> container.compareAndExchangeRegister(key, expectedValue.get(), 0));
+        expectThrows(
+            IllegalStateException.class,
+            () -> getLongAsync(l -> container.compareAndExchangeRegister(key, expectedValue.get(), 0, l))
+        );
     }
 
     static class MockFileSystemProvider extends FilterFileSystemProvider {

+ 3 - 2
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java

@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -229,8 +230,8 @@ public final class TestUtils {
         }
 
         @Override
-        public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException {
-            throw unsupportedException();
+        public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+            listener.onFailure(unsupportedException());
         }
 
         private UnsupportedOperationException unsupportedException() {

+ 5 - 2
x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.repositories.blobstore.testkit;
 
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -52,6 +53,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -584,8 +586,9 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
         }
 
         @Override
-        public long compareAndExchangeRegister(String key, long expected, long updated) {
-            return disruption.onCompareAndExchange(registers.computeIfAbsent(key, ignored -> new AtomicLong()), expected, updated);
+        public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+            final var register = registers.computeIfAbsent(key, ignored -> new AtomicLong());
+            listener.onResponse(OptionalLong.of(disruption.onCompareAndExchange(register, expected, updated)));
         }
     }
 

+ 20 - 11
x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java

@@ -7,13 +7,12 @@
 
 package org.elasticsearch.repositories.blobstore.testkit;
 
-import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.blobstore.ConcurrentRegisterOperationException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.BlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -48,8 +47,10 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -271,6 +272,7 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
         private final Map<String, byte[]> blobs = ConcurrentCollections.newConcurrentMap();
         private final AtomicLong totalBytesWritten = new AtomicLong();
         private final Map<String, AtomicLong> registers = ConcurrentCollections.newConcurrentMap();
+        private final AtomicBoolean firstRegisterRead = new AtomicBoolean(true);
 
         AssertingBlobContainer(
             BlobPath path,
@@ -409,20 +411,27 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
         }
 
         @Override
-        public long getRegister(String key) throws ConcurrentRegisterOperationException {
-            if (randomBoolean() && randomBoolean()) {
-                throw new ConcurrentRegisterOperationException(new ElasticsearchException("simulated"));
+        public void getRegister(String key, ActionListener<OptionalLong> listener) {
+            if (firstRegisterRead.compareAndSet(true, false) && randomBoolean() && randomBoolean()) {
+                // only fail the first read, we must not fail the final check
+                listener.onResponse(OptionalLong.empty());
+            } else if (randomBoolean()) {
+                listener.onResponse(OptionalLong.of(registers.computeIfAbsent(key, ignored -> new AtomicLong()).get()));
+            } else {
+                compareAndExchangeRegister(key, -1, -1, listener);
             }
-            return registers.computeIfAbsent(key, ignored -> new AtomicLong()).get();
         }
 
         @Override
-        public long compareAndExchangeRegister(String key, long expected, long updated) throws ConcurrentRegisterOperationException {
-            if (expected != updated && randomBoolean() && randomBoolean()) {
-                // don't fail the final check because we know there can be no concurrent operations at that point
-                throw new ConcurrentRegisterOperationException(new ElasticsearchException("simulated"));
+        public void compareAndExchangeRegister(String key, long expected, long updated, ActionListener<OptionalLong> listener) {
+            if (updated != -1 && randomBoolean() && randomBoolean()) {
+                // updated != -1 so we don't fail the final check because we know there can be no concurrent operations at that point
+                listener.onResponse(OptionalLong.empty());
+            } else {
+                listener.onResponse(
+                    OptionalLong.of(registers.computeIfAbsent(key, ignored -> new AtomicLong()).compareAndExchange(expected, updated))
+                );
             }
-            return registers.computeIfAbsent(key, ignored -> new AtomicLong()).compareAndExchange(expected, updated);
         }
     }
 

+ 60 - 46
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RegisterAnalyzeAction.java

@@ -21,7 +21,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.ConcurrentRegisterOperationException;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -36,6 +35,7 @@ import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -67,7 +67,8 @@ public class RegisterAnalyzeAction extends ActionType<ActionResponse.Empty> {
         }
 
         @Override
-        protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> listener) {
+        protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> outerListenerOld) {
+            final var outerListener = ActionListener.assertOnce(outerListenerOld);
             final Repository repository = repositoriesService.repository(request.getRepositoryName());
             if (repository instanceof BlobStoreRepository == false) {
                 throw new IllegalArgumentException("repository [" + request.getRepositoryName() + "] is not a blob-store repository");
@@ -83,59 +84,72 @@ public class RegisterAnalyzeAction extends ActionType<ActionResponse.Empty> {
 
             assert task instanceof CancellableTask;
 
-            ActionListener.run(listener.<Void>map(ignored -> ActionResponse.Empty.INSTANCE), l -> {
-                final String registerName = request.getRegisterName();
-                long initialValue;
-                try {
-                    initialValue = blobContainer.getRegister(registerName);
-                } catch (ConcurrentRegisterOperationException e) {
-                    // Concurrent activity prevents us from even reading the initial value, but this is just a best-effort thing so we can
-                    // proceed anyway.
-                    initialValue = 0;
-                } catch (UnsupportedOperationException e) {
-                    // Registers are not supported on all repository types, and that's ok. If it's not supported here then the final check
-                    // will also be unsupported, so it doesn't matter that we didn't do anything before this successful response.
-                    l.onResponse(null);
-                    return;
-                }
+            final String registerName = request.getRegisterName();
+            blobContainer.getRegister(registerName, new ActionListener<>() {
+                @Override
+                public void onResponse(OptionalLong initialValueOrNull) {
+                    final long initialValue = initialValueOrNull.orElse(0L);
 
-                if (initialValue < 0 || initialValue >= request.getRequestCount()) {
-                    throw new IllegalStateException("register holds unexpected value [" + initialValue + "]");
-                }
+                    ActionListener.run(outerListener.<Void>map(ignored -> ActionResponse.Empty.INSTANCE), l -> {
+                        if (initialValue < 0 || initialValue >= request.getRequestCount()) {
+                            throw new IllegalStateException("register holds unexpected value [" + initialValue + "]");
+                        }
 
-                class Execution extends ActionRunnable<Void> {
-                    private long currentValue;
+                        // noinspection OptionalUsedAsFieldOrParameterType
+                        class Execution extends ActionRunnable<Void> {
+                            private long currentValue;
 
-                    Execution(long currentValue) {
-                        super(l);
-                        this.currentValue = currentValue;
-                    }
+                            private final ActionListener<OptionalLong> witnessListener;
 
-                    @Override
-                    protected void doRun() throws Exception {
-                        if (((CancellableTask) task).notifyIfCancelled(listener)) {
-                            return;
-                        }
+                            Execution(long currentValue) {
+                                super(l);
+                                this.currentValue = currentValue;
+                                this.witnessListener = listener.delegateFailure(this::handleWitness);
+                            }
+
+                            @Override
+                            protected void doRun() {
+                                if (((CancellableTask) task).notifyIfCancelled(listener) == false) {
+                                    blobContainer.compareAndExchangeRegister(registerName, currentValue, currentValue + 1, witnessListener);
+                                }
+                            }
 
-                        try {
-                            final var witness = blobContainer.compareAndExchangeRegister(registerName, currentValue, currentValue + 1);
-                            if (witness == currentValue) {
-                                listener.onResponse(null);
-                            } else if (witness < currentValue || witness >= request.getRequestCount()) {
-                                throw new IllegalStateException("register holds unexpected value [" + witness + "]");
-                            } else {
-                                currentValue = witness;
-                                executor.execute(Execution.this);
+                            private void handleWitness(ActionListener<Void> delegate, OptionalLong witnessOrEmpty) {
+                                if (witnessOrEmpty.isEmpty()) {
+                                    // Concurrent activity prevented us from updating the value, or even reading the concurrently-updated
+                                    // result, so we must just try again.
+                                    executor.execute(Execution.this);
+                                    return;
+                                }
+
+                                final long witness = witnessOrEmpty.getAsLong();
+                                if (witness == currentValue) {
+                                    delegate.onResponse(null);
+                                } else if (witness < currentValue || witness >= request.getRequestCount()) {
+                                    delegate.onFailure(new IllegalStateException("register holds unexpected value [" + witness + "]"));
+                                } else {
+                                    currentValue = witness;
+                                    executor.execute(Execution.this);
+                                }
                             }
-                        } catch (ConcurrentRegisterOperationException e) {
-                            // Concurrent activity prevented us from updating the value, or even reading the concurrently-updated result,
-                            // so we must just try again.
-                            executor.execute(Execution.this);
+
                         }
-                    }
+
+                        new Execution(initialValue).run();
+
+                    });
                 }
 
-                new Execution(initialValue).run();
+                @Override
+                public void onFailure(Exception e) {
+                    if (e instanceof UnsupportedOperationException) {
+                        // Registers are not supported on all repository types, and that's ok. If it's not supported here then the final
+                        // check will also be unsupported, so it doesn't matter that we didn't do anything before this successful response.
+                        outerListener.onResponse(ActionResponse.Empty.INSTANCE);
+                    } else {
+                        outerListener.onFailure(e);
+                    }
+                }
             });
         }
     }

+ 29 - 13
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java

@@ -64,6 +64,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -450,7 +451,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
             final List<DiscoveryNode> nodes = getSnapshotNodes(discoveryNodes);
 
             final String registerName = "test-register-" + UUIDs.randomBase64UUID(random);
-            try (var registerRefs = new RefCountingRunnable(finalRegisterValueVerifier(registerName, requestRefs.acquire()))) {
+            try (var registerRefs = new RefCountingRunnable(finalRegisterValueVerifier(registerName, random, requestRefs.acquire()))) {
                 final long registerOperations = Math.max(nodes.size(), request.getConcurrency());
                 for (int i = 0; i < registerOperations; i++) {
                     final RegisterAnalyzeAction.Request registerAnalyzeRequest = new RegisterAnalyzeAction.Request(
@@ -589,21 +590,22 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
             }
         }
 
-        private Runnable finalRegisterValueVerifier(String registerName, Releasable ref) {
+        private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
             return () -> {
                 if (isRunning()) {
                     final var expectedFinalRegisterValue = expectedRegisterValue.get();
                     transportService.getThreadPool()
                         .executor(ThreadPool.Names.SNAPSHOT)
-                        .execute(ActionRunnable.supply(ActionListener.releaseAfter(new ActionListener<>() {
+                        .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<OptionalLong>() {
                             @Override
-                            public void onResponse(Long actualFinalRegisterValue) {
-                                if (actualFinalRegisterValue != expectedFinalRegisterValue) {
+                            public void onResponse(OptionalLong actualFinalRegisterValue) {
+                                if (actualFinalRegisterValue.isEmpty()
+                                    || actualFinalRegisterValue.getAsLong() != expectedFinalRegisterValue) {
                                     fail(
                                         new RepositoryVerificationException(
                                             request.getRepositoryName(),
                                             Strings.format(
-                                                "register [%s] should have value [%d] but instead had value [%d]",
+                                                "register [%s] should have value [%d] but instead had value [%s]",
                                                 registerName,
                                                 expectedFinalRegisterValue,
                                                 actualFinalRegisterValue
@@ -620,13 +622,27 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
                                     fail(exp);
                                 }
                             }
-                        }, ref),
-                            () -> getBlobContainer().compareAndExchangeRegister(
-                                registerName,
-                                expectedFinalRegisterValue,
-                                expectedFinalRegisterValue
-                            )
-                        ));
+                        }, ref), listener -> {
+                            switch (random.nextInt(3)) {
+                                case 0 -> getBlobContainer().getRegister(registerName, listener);
+                                case 1 -> getBlobContainer().compareAndExchangeRegister(
+                                    registerName,
+                                    expectedFinalRegisterValue,
+                                    -1,
+                                    listener
+                                );
+                                case 2 -> getBlobContainer().compareAndSetRegister(
+                                    registerName,
+                                    expectedFinalRegisterValue,
+                                    -1,
+                                    listener.map(b -> b ? OptionalLong.of(expectedFinalRegisterValue) : OptionalLong.empty())
+                                );
+                                default -> {
+                                    assert false;
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }));
                 } else {
                     ref.close();
                 }