Browse Source

Repo analysis: verify empty register (#102048)

We encountered a bad third-party S3 repository implementation which
incorrectly rejects empty multipart uploads. This anomaly is detected by
some repository analysis runs, but not by all of them. This commit adds
a specific check for this incompatibility so that it can be reported
reliably.
David Turner 1 year ago
parent
commit
e6c62a8f95

+ 5 - 0
docs/changelog/102048.yaml

@@ -0,0 +1,5 @@
+pr: 102048
+summary: "Repo analysis: verify empty register"
+area: Snapshot/Restore
+type: enhancement
+issues: []

+ 36 - 1
x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java

@@ -65,6 +65,7 @@ import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegister
 import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes;
 import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.anEmptyMap;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -387,6 +388,30 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
         );
         );
     }
     }
 
 
+    public void testFailsIfEmptyRegisterRejected() {
+        final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
+        blobStore.setDisruption(new Disruption() {
+            @Override
+            public boolean acceptsEmptyRegister() {
+                return false;
+            }
+        });
+        final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request));
+        assertThat(exception.getMessage(), containsString("analysis failed"));
+        final var cause = ExceptionsHelper.unwrapCause(exception.getCause());
+        if (cause instanceof IOException ioException) {
+            assertThat(ioException.getMessage(), containsString("empty register update rejected"));
+        } else {
+            assertThat(
+                asInstanceOf(RepositoryVerificationException.class, ExceptionsHelper.unwrapCause(exception.getCause())).getMessage(),
+                anyOf(
+                    allOf(containsString("uncontended register operation failed"), containsString("did not observe any value")),
+                    containsString("but instead had value [OptionalBytesReference[MISSING]]")
+                )
+            );
+        }
+    }
+
     private void analyseRepository(RepositoryAnalyzeAction.Request request) {
     private void analyseRepository(RepositoryAnalyzeAction.Request request) {
         client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
         client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
     }
     }
@@ -508,6 +533,10 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
             return true;
             return true;
         }
         }
 
 
+        default boolean acceptsEmptyRegister() {
+            return true;
+        }
+
         default BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
         default BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
             return register.compareAndExchange(expected, updated);
             return register.compareAndExchange(expected, updated);
         }
         }
@@ -682,7 +711,13 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
         ) {
         ) {
             assertPurpose(purpose);
             assertPurpose(purpose);
             final boolean isContendedRegister = isContendedRegisterKey(key); // validate key
             final boolean isContendedRegister = isContendedRegisterKey(key); // validate key
-            if (disruption.compareAndExchangeReturnsWitness(key)) {
+            if (disruption.acceptsEmptyRegister() == false && updated.length() == 0) {
+                if (randomBoolean()) {
+                    listener.onResponse(OptionalBytesReference.MISSING);
+                } else {
+                    listener.onFailure(new IOException("empty register update rejected"));
+                }
+            } else if (disruption.compareAndExchangeReturnsWitness(key)) {
                 final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
                 final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
                 if (isContendedRegister) {
                 if (isContendedRegister) {
                     listener.onResponse(OptionalBytesReference.of(disruption.onContendedCompareAndExchange(register, expected, updated)));
                     listener.onResponse(OptionalBytesReference.of(disruption.onContendedCompareAndExchange(register, expected, updated)));

+ 38 - 16
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java

@@ -730,23 +730,45 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
                     return;
                     return;
                 }
                 }
 
 
-                // complete at least request.getConcurrency() steps, but we may as well keep running for longer too
-                if (currentValue > request.getConcurrency() && otherAnalysisComplete.get()) {
-                    return;
+                if (currentValue <= request.getConcurrency() || otherAnalysisComplete.get() == false) {
+                    // complete at least request.getConcurrency() steps, but we may as well keep running for longer too
+                    transportService.sendChildRequest(
+                        nodes.get(currentValue < nodes.size() ? currentValue : random.nextInt(nodes.size())),
+                        UncontendedRegisterAnalyzeAction.NAME,
+                        new UncontendedRegisterAnalyzeAction.Request(request.getRepositoryName(), blobPath, registerName, currentValue),
+                        task,
+                        TransportRequestOptions.EMPTY,
+                        new ActionListenerResponseHandler<>(
+                            ActionListener.releaseAfter(stepListener, requestRefs.acquire()),
+                            in -> ActionResponse.Empty.INSTANCE,
+                            TransportResponseHandler.TRANSPORT_WORKER
+                        )
+                    );
+                } else {
+                    transportService.getThreadPool()
+                        .executor(ThreadPool.Names.SNAPSHOT)
+                        .execute(
+                            ActionRunnable.<Void>wrap(
+                                ActionListener.releaseAfter(
+                                    ActionListener.wrap(
+                                        r -> logger.trace("uncontended register analysis succeeded"),
+                                        AsyncAction.this::fail
+                                    ),
+                                    requestRefs.acquire()
+                                ),
+                                l -> UncontendedRegisterAnalyzeAction.verifyFinalValue(
+                                    new UncontendedRegisterAnalyzeAction.Request(
+                                        request.getRepositoryName(),
+                                        blobPath,
+                                        registerName,
+                                        currentValue
+                                    ),
+                                    repository,
+                                    l
+                                )
+                            )
+                        );
                 }
                 }
-
-                transportService.sendChildRequest(
-                    nodes.get(currentValue < nodes.size() ? currentValue : random.nextInt(nodes.size())),
-                    UncontendedRegisterAnalyzeAction.NAME,
-                    new UncontendedRegisterAnalyzeAction.Request(request.getRepositoryName(), blobPath, registerName, currentValue),
-                    task,
-                    TransportRequestOptions.EMPTY,
-                    new ActionListenerResponseHandler<>(
-                        ActionListener.releaseAfter(stepListener, requestRefs.acquire()),
-                        in -> ActionResponse.Empty.INSTANCE,
-                        TransportResponseHandler.TRANSPORT_WORKER
-                    )
-                );
             }
             }
         }
         }
 
 

+ 21 - 7
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/UncontendedRegisterAnalyzeAction.java

@@ -21,6 +21,8 @@ import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.OperationPurpose;
 import org.elasticsearch.common.blobstore.OperationPurpose;
 import org.elasticsearch.common.blobstore.OptionalBytesReference;
 import org.elasticsearch.common.blobstore.OptionalBytesReference;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoriesService;
@@ -57,9 +59,24 @@ class UncontendedRegisterAnalyzeAction extends HandledTransportAction<Uncontende
     }
     }
 
 
     @Override
     @Override
-    protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> outerListener) {
-        final ActionListener<Void> listener = ActionListener.assertOnce(outerListener.map(ignored -> ActionResponse.Empty.INSTANCE));
-        final Repository repository = repositoriesService.repository(request.getRepositoryName());
+    protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> listener) {
+        logger.trace("handling [{}]", request);
+        updateRegister(
+            request,
+            bytesFromLong(request.getExpectedValue() + 1),
+            repositoriesService.repository(request.getRepositoryName()),
+            ActionListener.assertOnce(listener.map(ignored -> ActionResponse.Empty.INSTANCE))
+        );
+    }
+
+    static void verifyFinalValue(Request request, Repository repository, ActionListener<Void> listener) {
+        // ensure that the repo accepts an empty register
+        logger.trace("handling final value [{}]", request);
+        updateRegister(request, BytesArray.EMPTY, repository, ActionListener.assertOnce(listener));
+    }
+
+    private static void updateRegister(Request request, BytesReference newValue, Repository repository, ActionListener<Void> listener) {
+        assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
         if (repository instanceof BlobStoreRepository == false) {
         if (repository instanceof BlobStoreRepository == false) {
             throw new IllegalArgumentException("repository [" + request.getRepositoryName() + "] is not a blob-store repository");
             throw new IllegalArgumentException("repository [" + request.getRepositoryName() + "] is not a blob-store repository");
         }
         }
@@ -70,14 +87,11 @@ class UncontendedRegisterAnalyzeAction extends HandledTransportAction<Uncontende
         final BlobPath path = blobStoreRepository.basePath().add(request.getContainerPath());
         final BlobPath path = blobStoreRepository.basePath().add(request.getContainerPath());
         final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);
         final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);
 
 
-        logger.trace("handling [{}]", request);
-
-        assert task instanceof CancellableTask;
         blobContainer.compareAndExchangeRegister(
         blobContainer.compareAndExchangeRegister(
             OperationPurpose.REPOSITORY_ANALYSIS,
             OperationPurpose.REPOSITORY_ANALYSIS,
             request.getRegisterName(),
             request.getRegisterName(),
             bytesFromLong(request.getExpectedValue()),
             bytesFromLong(request.getExpectedValue()),
-            bytesFromLong(request.getExpectedValue() + 1),
+            newValue,
             new ActionListener<>() {
             new ActionListener<>() {
                 @Override
                 @Override
                 public void onResponse(OptionalBytesReference optionalBytesReference) {
                 public void onResponse(OptionalBytesReference optionalBytesReference) {