|
@@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.elasticsearch.index.translog.BufferedChecksumStreamOutput;
|
|
|
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
|
|
|
+import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
@@ -210,6 +211,67 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testAtomicWriteFailures() throws Exception {
|
|
|
+ final String name = randomAlphaOfLength(10);
|
|
|
+ final BlobObj blobObj = new BlobObj("test");
|
|
|
+ final ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
|
|
|
+ xContentRegistry(), randomBoolean(), randomBoolean() ? XContentType.SMILE : XContentType.JSON);
|
|
|
+
|
|
|
+ final BlobStore blobStore = createTestBlobStore();
|
|
|
+ final BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
|
|
|
+
|
|
|
+ {
|
|
|
+ IOException writeBlobException = expectThrows(IOException.class, () -> {
|
|
|
+ BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
|
|
+ @Override
|
|
|
+ public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
|
|
+ throw new IOException("Exception thrown in writeBlob() for " + blobName);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ checksumFormat.writeAtomic(blobObj, wrapper, name);
|
|
|
+ });
|
|
|
+
|
|
|
+ assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
|
|
|
+ assertEquals(0, writeBlobException.getSuppressed().length);
|
|
|
+ }
|
|
|
+ {
|
|
|
+ IOException moveException = expectThrows(IOException.class, () -> {
|
|
|
+ BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
|
|
+ @Override
|
|
|
+ public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
|
|
+ throw new IOException("Exception thrown in move() for " + sourceBlobName);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ checksumFormat.writeAtomic(blobObj, wrapper, name);
|
|
|
+ });
|
|
|
+ assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
|
|
|
+ assertEquals(0, moveException.getSuppressed().length);
|
|
|
+ }
|
|
|
+ {
|
|
|
+ IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
|
|
|
+ BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
|
|
|
+ @Override
|
|
|
+ public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
|
|
+ throw new IOException("Exception thrown in move() for " + sourceBlobName);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteBlob(String blobName) throws IOException {
|
|
|
+ throw new IOException("Exception thrown in deleteBlob() for " + blobName);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ checksumFormat.writeAtomic(blobObj, wrapper, name);
|
|
|
+ });
|
|
|
+
|
|
|
+ assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
|
|
|
+ assertEquals(1, moveThenDeleteException.getSuppressed().length);
|
|
|
+
|
|
|
+ final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
|
|
|
+ assertTrue(suppressedThrowable instanceof IOException);
|
|
|
+ assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected BlobStore createTestBlobStore() throws IOException {
|
|
|
Settings settings = Settings.builder().build();
|
|
|
return new FsBlobStore(settings, randomRepoPath());
|