Browse Source

Throw RequestedRangeNotSatisfiedException when BlobContainer.readBlob starts reading after blob length (#107408)

This change introduces a specialized exception RequestedRangeNotSatisfiedException 
that is thrown when FsBlobContainer or S3BlobContainer try to read a range of bytes 
from a position that is located after the real length of the blob.

This exception can then be caught to detect such situation and acts accordingly.
Tanguy Leroux 1 year ago
parent
commit
7d4920b878

+ 6 - 5
modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.fixtures.minio.MinioTestContainer;
@@ -243,15 +244,15 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
 
         var exception = expectThrows(UncategorizedExecutionException.class, () -> readBlob(repository, blobName, position, length));
         assertThat(exception.getCause(), instanceOf(ExecutionException.class));
-        assertThat(exception.getCause().getCause(), instanceOf(IOException.class));
+        assertThat(exception.getCause().getCause(), instanceOf(RequestedRangeNotSatisfiedException.class));
         assertThat(
             exception.getCause().getCause().getMessage(),
             containsString(
-                "Requested range [start="
+                "Requested range [position="
                     + position
-                    + ", end="
-                    + (position + length - 1L)
-                    + ", currentOffset=0] cannot be satisfied for blob object ["
+                    + ", length="
+                    + length
+                    + "] cannot be satisfied for ["
                     + repository.basePath().buildAsString()
                     + blobName
                     + ']'

+ 5 - 10
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

@@ -20,6 +20,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.blobstore.OperationPurpose;
 import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
 import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
 import org.elasticsearch.rest.RestStatus;
 
@@ -109,16 +110,10 @@ class S3RetryingInputStream extends InputStream {
                     }
                     if (amazonS3Exception.getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
                         throw addSuppressedExceptions(
-                            new IOException(
-                                "Requested range [start="
-                                    + start
-                                    + ", end="
-                                    + end
-                                    + ", currentOffset="
-                                    + currentOffset
-                                    + "] cannot be satisfied for blob object ["
-                                    + blobKey
-                                    + ']',
+                            new RequestedRangeNotSatisfiedException(
+                                blobKey,
+                                currentStreamFirstOffset,
+                                (end < Long.MAX_VALUE - 1) ? end - currentStreamFirstOffset + 1 : end,
                                 amazonS3Exception
                             )
                         );

+ 48 - 6
modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RetryingInputStreamTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.repositories.s3;
 
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
@@ -16,6 +17,8 @@ import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import org.apache.http.client.methods.HttpGet;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.ByteArrayInputStream;
@@ -23,7 +26,9 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -81,24 +86,61 @@ public class S3RetryingInputStreamTests extends ESTestCase {
         assertThat(stream.isAborted(), is(true));
     }
 
+    public void testReadAfterBlobLengthThrowsRequestedRangeNotSatisfiedException() throws IOException {
+        final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512));
+        {
+            final int position = bytes.length + randomIntBetween(0, 100);
+            final int length = randomIntBetween(1, 100);
+            var exception = expectThrows(RequestedRangeNotSatisfiedException.class, () -> {
+                try (var ignored = createInputStream(bytes, position, length)) {
+                    fail();
+                }
+            });
+            assertThat(exception.getResource(), equalTo("_blob"));
+            assertThat(exception.getPosition(), equalTo((long) position));
+            assertThat(exception.getLength(), equalTo((long) length));
+            assertThat(
+                exception.getMessage(),
+                startsWith("Requested range [position=" + position + ", length=" + length + "] cannot be satisfied for [_blob]")
+            );
+        }
+        {
+            int position = randomIntBetween(0, Math.max(0, bytes.length - 1));
+            int maxLength = bytes.length - position;
+            int length = randomIntBetween(maxLength + 1, Integer.MAX_VALUE - 1);
+            try (var stream = createInputStream(bytes, position, length)) {
+                assertThat(Streams.consumeFully(stream), equalTo((long) maxLength));
+            }
+        }
+    }
+
     private S3RetryingInputStream createInputStream(final byte[] data, @Nullable final Integer position, @Nullable final Integer length)
         throws IOException {
-        final S3Object s3Object = new S3Object();
         final AmazonS3 client = mock(AmazonS3.class);
-        when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);
         final AmazonS3Reference clientReference = mock(AmazonS3Reference.class);
         when(clientReference.client()).thenReturn(client);
         final S3BlobStore blobStore = mock(S3BlobStore.class);
         when(blobStore.clientReference()).thenReturn(clientReference);
 
         if (position != null && length != null) {
+            if (data.length <= position) {
+                var amazonS3Exception = new AmazonS3Exception("test");
+                amazonS3Exception.setStatusCode(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus());
+                when(client.getObject(any(GetObjectRequest.class))).thenThrow(amazonS3Exception);
+                return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob", position, Math.addExact(position, length - 1));
+            }
+
+            final S3Object s3Object = new S3Object();
             s3Object.getObjectMetadata().setContentLength(length);
             s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data, position, length), new HttpGet()));
+            when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);
             return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob", position, Math.addExact(position, length - 1));
-        } else {
-            s3Object.getObjectMetadata().setContentLength(data.length);
-            s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));
-            return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob");
         }
+
+        final S3Object s3Object = new S3Object();
+        s3Object.getObjectMetadata().setContentLength(data.length);
+        s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));
+        when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);
+        return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob");
     }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -30,6 +30,7 @@ import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.SuppressForbidden;
+import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
 
 import java.io.Closeable;
 import java.io.FileNotFoundException;
@@ -197,7 +198,11 @@ public class FsBlobContainer extends AbstractBlobContainer {
         assert BlobContainer.assertPurposeConsistency(purpose, blobName);
         final SeekableByteChannel channel = Files.newByteChannel(path.resolve(blobName));
         if (position > 0L) {
-            assert position < channel.size() : "reading from " + position + " exceeds file length " + channel.size();
+            if (channel.size() <= position) {
+                try (channel) {
+                    throw new RequestedRangeNotSatisfiedException(blobName, position, length);
+                }
+            }
             channel.position(position);
         }
         assert channel.position() == position;

+ 50 - 0
server/src/main/java/org/elasticsearch/repositories/blobstore/RequestedRangeNotSatisfiedException.java

@@ -0,0 +1,50 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.repositories.blobstore;
+
+import org.elasticsearch.common.Strings;
+
+import java.io.IOException;
+
+public class RequestedRangeNotSatisfiedException extends IOException {
+
+    private final String resource;
+    private final long position;
+    private final long length;
+
+    public RequestedRangeNotSatisfiedException(String resource, long position, long length) {
+        super(message(resource, position, length));
+        this.resource = resource;
+        this.position = position;
+        this.length = length;
+    }
+
+    public RequestedRangeNotSatisfiedException(String resource, long position, long length, Throwable cause) {
+        super(message(resource, position, length), cause);
+        this.resource = resource;
+        this.position = position;
+        this.length = length;
+    }
+
+    public String getResource() {
+        return resource;
+    }
+
+    public long getPosition() {
+        return position;
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    private static String message(String resource, long position, long length) {
+        return Strings.format("Requested range [position=%d, length=%d] cannot be satisfied for [%s]", position, length, resource);
+    }
+}

+ 39 - 0
server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.core.PathUtilsForTesting;
+import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.After;
 import org.junit.Before;
@@ -95,6 +96,44 @@ public class FsBlobContainerTests extends ESTestCase {
         }
     }
 
+    public void testReadAfterBlobLengthThrowsRequestedRangeNotSatisfiedException() throws IOException {
+        final var blobName = "blob";
+        final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
+
+        final Path path = PathUtils.get(createTempDir().toString());
+        Files.write(path.resolve(blobName), blobData);
+
+        final FsBlobContainer container = new FsBlobContainer(
+            new FsBlobStore(randomIntBetween(1, 8) * 1024, path, true),
+            BlobPath.EMPTY,
+            path
+        );
+
+        {
+            long position = randomLongBetween(blobData.length, Long.MAX_VALUE - 1L);
+            long length = randomLongBetween(1L, Long.MAX_VALUE - position);
+            var exception = expectThrows(
+                RequestedRangeNotSatisfiedException.class,
+                () -> container.readBlob(randomPurpose(), blobName, position, length)
+            );
+            assertThat(
+                exception.getMessage(),
+                equalTo("Requested range [position=" + position + ", length=" + length + "] cannot be satisfied for [" + blobName + ']')
+            );
+        }
+
+        {
+            long position = randomLongBetween(0L, Math.max(0L, blobData.length - 1));
+            long maxLength = blobData.length - position;
+            long length = randomLongBetween(maxLength + 1L, Long.MAX_VALUE - 1L);
+            try (var stream = container.readBlob(randomPurpose(), blobName, position, length)) {
+                assertThat(totalBytesRead.get(), equalTo(0L));
+                assertThat(Streams.consumeFully(stream), equalTo(maxLength));
+                assertThat(totalBytesRead.get(), equalTo(maxLength));
+            }
+        }
+    }
+
     public void testTempBlobName() {
         final String blobName = randomAlphaOfLengthBetween(1, 20);
         final String tempBlobName = FsBlobContainer.tempBlobName(blobName);