|
|
@@ -18,6 +18,7 @@ import org.elasticsearch.common.blobstore.BlobStore;
|
|
|
import org.elasticsearch.common.blobstore.DeleteResult;
|
|
|
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
@@ -49,6 +50,7 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
@@ -97,6 +99,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnReadError() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
final CountDown countDown = new CountDown(between(1, request.getBlobCount()));
|
|
|
blobStore.setDisruption(new Disruption() {
|
|
|
@@ -118,6 +121,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnNotFoundAfterWrite() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
request.rareActionProbability(0.0); // not found on an early read or an overwrite is ok
|
|
|
|
|
|
final CountDown countDown = new CountDown(between(1, request.getBlobCount()));
|
|
|
@@ -138,6 +142,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnChecksumMismatch() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
final CountDown countDown = new CountDown(between(1, request.getBlobCount()));
|
|
|
|
|
|
@@ -159,6 +164,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnWriteException() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
final CountDown countDown = new CountDown(between(1, request.getBlobCount()));
|
|
|
|
|
|
@@ -182,6 +188,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnIncompleteListing() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
blobStore.setDisruption(new Disruption() {
|
|
|
|
|
|
@@ -200,6 +207,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnListingException() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
final CountDown countDown = new CountDown(1);
|
|
|
blobStore.setDisruption(new Disruption() {
|
|
|
@@ -219,6 +227,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnDeleteException() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
blobStore.setDisruption(new Disruption() {
|
|
|
@Override
|
|
|
@@ -233,6 +242,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void testFailsOnIncompleteDelete() {
|
|
|
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.abortWritePermitted(false);
|
|
|
|
|
|
blobStore.setDisruption(new Disruption() {
|
|
|
|
|
|
@@ -257,6 +267,28 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request));
|
|
|
}
|
|
|
|
|
|
+ public void testFailsIfBlobCreatedOnAbort() {
|
|
|
+ final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
|
|
|
+ request.maxBlobSize(new ByteSizeValue(10L));
|
|
|
+ request.rareActionProbability(0.7); // abort writes quite often
|
|
|
+
|
|
|
+ final AtomicBoolean writeWasAborted = new AtomicBoolean();
|
|
|
+ blobStore.setDisruption(new Disruption() {
|
|
|
+ @Override
|
|
|
+ public boolean createBlobOnAbort() {
|
|
|
+ writeWasAborted.set(true);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ try {
|
|
|
+ analyseRepository(request);
|
|
|
+ assertFalse(writeWasAborted.get());
|
|
|
+ } catch (RepositoryVerificationException e) {
|
|
|
+ assertTrue(writeWasAborted.get());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private RepositoryAnalyzeAction.Response analyseRepository(RepositoryAnalyzeAction.Request request) {
|
|
|
return client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
|
|
|
}
|
|
|
@@ -360,6 +392,10 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
}
|
|
|
|
|
|
default void onDelete() throws IOException {}
|
|
|
+
|
|
|
+ default boolean createBlobOnAbort() {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class DisruptableBlobContainer implements BlobContainer {
|
|
|
@@ -418,7 +454,16 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
|
|
|
@Override
|
|
|
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
|
|
|
- writeBlobAtomic(blobName, bytes.streamInput(), failIfAlreadyExists);
|
|
|
+ final StreamInput inputStream;
|
|
|
+ try {
|
|
|
+ inputStream = bytes.streamInput();
|
|
|
+ } catch (BlobWriteAbortedException e) {
|
|
|
+ if (disruption.createBlobOnAbort()) {
|
|
|
+ blobs.put(blobName, new byte[0]);
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ writeBlobAtomic(blobName, inputStream, failIfAlreadyExists);
|
|
|
}
|
|
|
|
|
|
private void writeBlobAtomic(String blobName, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
|
|
|
@@ -426,7 +471,15 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|
|
throw new FileAlreadyExistsException(blobName);
|
|
|
}
|
|
|
|
|
|
- final byte[] contents = inputStream.readAllBytes();
|
|
|
+ final byte[] contents;
|
|
|
+ try {
|
|
|
+ contents = inputStream.readAllBytes();
|
|
|
+ } catch (BlobWriteAbortedException e) {
|
|
|
+ if (disruption.createBlobOnAbort()) {
|
|
|
+ blobs.put(blobName, new byte[0]);
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
disruption.onWrite();
|
|
|
blobs.put(blobName, contents);
|
|
|
}
|