Browse Source

Retry indefinitely for s3 indices blob read errors (#103300)

This PR makes the s3 readBlob to retry indefinitely on either opening or
reading errors when the operation purpose is Inidces. It perform retries
with no delay within the configured number of max retries. Beyond that,
it retries with increased delay each time with a capped maximum of 10
seconds.

Relates: ES-6453
Yang Wang 1 year ago
parent
commit
a0cf690be8

+ 5 - 0
docs/changelog/103300.yaml

@@ -0,0 +1,5 @@
+pr: 103300
+summary: Retry indefinitely for s3 indices blob read errors
+area: Snapshot/Restore
+type: enhancement
+issues: []

+ 114 - 50
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

@@ -14,9 +14,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.util.Supplier;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.blobstore.OperationPurpose;
 import org.elasticsearch.core.IOUtils;
@@ -48,7 +48,7 @@ class S3RetryingInputStream extends InputStream {
     private final String blobKey;
     private final long start;
     private final long end;
-    private final List<IOException> failures;
+    private final List<Exception> failures;
 
     private S3ObjectInputStream currentStream;
     private long currentStreamFirstOffset;
@@ -77,29 +77,34 @@ class S3RetryingInputStream extends InputStream {
         this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
         this.start = start;
         this.end = end;
-        openStream();
+        openStreamWithRetry();
     }
 
-    private void openStream() throws IOException {
-        try (AmazonS3Reference clientReference = blobStore.clientReference()) {
-            final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
-            getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose));
-            if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
-                assert start + currentOffset <= end
-                    : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;
-                getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
-            }
-            final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
-            this.currentStreamFirstOffset = Math.addExact(start, currentOffset);
-            this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object));
-            this.currentStream = s3Object.getObjectContent();
-        } catch (final AmazonClientException e) {
-            if (e instanceof AmazonS3Exception amazonS3Exception) {
-                if (404 == amazonS3Exception.getStatusCode()) {
-                    throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()));
+    private void openStreamWithRetry() throws IOException {
+        while (true) {
+            try (AmazonS3Reference clientReference = blobStore.clientReference()) {
+                final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
+                getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose));
+                if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
+                    assert start + currentOffset <= end
+                        : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;
+                    getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
+                }
+                final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
+                this.currentStreamFirstOffset = Math.addExact(start, currentOffset);
+                this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object));
+                this.currentStream = s3Object.getObjectContent();
+                return;
+            } catch (AmazonClientException e) {
+                if (e instanceof AmazonS3Exception amazonS3Exception && 404 == amazonS3Exception.getStatusCode()) {
+                    throw addSuppressedExceptions(
+                        new NoSuchFileException("Blob object [" + blobKey + "] not found: " + amazonS3Exception.getMessage())
+                    );
                 }
+
+                final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e);
+                delayBeforeRetry(delayInMillis);
             }
-            throw addSuppressedExceptions(e);
         }
     }
 
@@ -166,45 +171,104 @@ class S3RetryingInputStream extends InputStream {
     }
 
     private void reopenStreamOrFail(IOException e) throws IOException {
-        if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
-            logger.warn(() -> format("""
-                failed reading [%s/%s] at offset [%s]""", blobStore.bucket(), blobKey, start + currentOffset), e);
-            throw e;
+        final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e);
+        maybeAbort(currentStream);
+        IOUtils.closeWhileHandlingException(currentStream);
+
+        delayBeforeRetry(delayInMillis);
+        openStreamWithRetry();
+    }
+
+    // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure
+    // and compute the delay before retry.
+    private <T extends Exception> long maybeLogAndComputeRetryDelay(String action, T e) throws T {
+        if (shouldRetry(attempt) == false) {
+            final var finalException = addSuppressedExceptions(e);
+            logForFailure(action, finalException);
+            throw finalException;
         }
 
-        final int maxAttempts = blobStore.getMaxRetries() + 1;
+        // Log at info level for the 1st retry and every ~5 minutes afterward
+        logForRetry((attempt == 1 || attempt % 30 == 0) ? Level.INFO : Level.DEBUG, action, e);
+        if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
+            failures.add(e);
+        }
+        final long delayInMillis = getRetryDelayInMillis();
+        attempt += 1; // increment after computing delay because attempt affects the result
+        return delayInMillis;
+    }
+
+    private void logForFailure(String action, Exception e) {
+        logger.warn(
+            () -> format(
+                "failed %s [%s/%s] at offset [%s] with purpose [%s]",
+                action,
+                blobStore.bucket(),
+                blobKey,
+                start + currentOffset,
+                purpose.getKey()
+            ),
+            e
+        );
+    }
 
+    private void logForRetry(Level level, String action, Exception e) {
         final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
         final long currentStreamProgress = Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset);
         if (currentStreamProgress >= meaningfulProgressSize) {
             failuresAfterMeaningfulProgress += 1;
         }
-        final Supplier<String> messageSupplier = () -> format(
-            """
-                failed reading [%s/%s] at offset [%s]; this was attempt [%s] to read this blob which yielded [%s] bytes; in total \
-                [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \
-                retries; the maximum number of read attempts which do not make meaningful progress is [%s]""",
-            blobStore.bucket(),
-            blobKey,
-            start + currentOffset,
-            attempt,
-            currentStreamProgress,
-            failuresAfterMeaningfulProgress,
-            maxAttempts
+        logger.log(
+            level,
+            () -> format(
+                """
+                    failed %s [%s/%s] at offset [%s] with purpose [%s]; \
+                    this was attempt [%s] to read this blob which yielded [%s] bytes; in total \
+                    [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \
+                    retries; the maximum number of read attempts which do not make meaningful progress is [%s]""",
+                action,
+                blobStore.bucket(),
+                blobKey,
+                start + currentOffset,
+                purpose.getKey(),
+                attempt,
+                currentStreamProgress,
+                failuresAfterMeaningfulProgress,
+                maxRetriesForNoMeaningfulProgress()
+            ),
+            e
         );
-        if (attempt >= maxAttempts + failuresAfterMeaningfulProgress) {
-            final var finalException = addSuppressedExceptions(e);
-            logger.warn(messageSupplier, finalException);
-            throw finalException;
+    }
+
+    private boolean shouldRetry(int attempt) {
+        if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
+            return false;
         }
-        logger.debug(messageSupplier, e);
-        attempt += 1;
-        if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
-            failures.add(e);
+        if (purpose == OperationPurpose.INDICES) {
+            return true;
         }
-        maybeAbort(currentStream);
-        IOUtils.closeWhileHandlingException(currentStream);
-        openStream();
+        final int maxAttempts = blobStore.getMaxRetries() + 1;
+        return attempt < maxAttempts + failuresAfterMeaningfulProgress;
+    }
+
+    private int maxRetriesForNoMeaningfulProgress() {
+        return purpose == OperationPurpose.INDICES ? Integer.MAX_VALUE : (blobStore.getMaxRetries() + 1);
+    }
+
+    private void delayBeforeRetry(long delayInMillis) {
+        try {
+            assert shouldRetry(attempt - 1) : "should not have retried";
+            Thread.sleep(delayInMillis);
+        } catch (InterruptedException e) {
+            logger.info("s3 input stream delay interrupted", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    // protected access for testing
+    protected long getRetryDelayInMillis() {
+        // Initial delay is 10 ms and cap max delay at 10 * 1024 millis, i.e. it retries every ~10 seconds at a minimum
+        return 10L << (Math.min(attempt - 1, 10));
     }
 
     @Override
@@ -247,7 +311,7 @@ class S3RetryingInputStream extends InputStream {
     }
 
     private <T extends Exception> T addSuppressedExceptions(T e) {
-        for (IOException failure : failures) {
+        for (Exception failure : failures) {
             e.addSuppressed(failure);
         }
         return e;

+ 6 - 1
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

@@ -178,6 +178,11 @@ class S3Service implements Closeable {
 
     // proxy for testing
     AmazonS3 buildClient(final S3ClientSettings clientSettings) {
+        final AmazonS3ClientBuilder builder = buildClientBuilder(clientSettings);
+        return SocketAccess.doPrivileged(builder::build);
+    }
+
+    protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettings) {
         final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
         builder.withCredentials(buildCredentials(LOGGER, clientSettings, webIdentityTokenCredentialsProvider));
         builder.withClientConfiguration(buildConfiguration(clientSettings));
@@ -206,7 +211,7 @@ class S3Service implements Closeable {
         if (clientSettings.disableChunkedEncoding) {
             builder.disableChunkedEncoding();
         }
-        return SocketAccess.doPrivileged(builder::build);
+        return builder;
     }
 
     // pkg private for tests

+ 184 - 18
modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

@@ -7,7 +7,9 @@
  */
 package org.elasticsearch.repositories.s3;
 
+import com.amazonaws.DnsResolver;
 import com.amazonaws.SdkClientException;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
 import com.amazonaws.util.Base16;
 import com.sun.net.httpserver.HttpExchange;
@@ -50,10 +52,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -67,6 +73,7 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -80,10 +87,25 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
 
     private S3Service service;
+    private AtomicBoolean shouldErrorOnDns;
 
     @Before
     public void setUp() throws Exception {
-        service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY);
+        shouldErrorOnDns = new AtomicBoolean(false);
+        service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY) {
+            @Override
+            protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettings) {
+                final AmazonS3ClientBuilder builder = super.buildClientBuilder(clientSettings);
+                final DnsResolver defaultDnsResolver = builder.getClientConfiguration().getDnsResolver();
+                builder.getClientConfiguration().setDnsResolver(host -> {
+                    if (shouldErrorOnDns.get() && randomBoolean() && randomBoolean()) {
+                        throw new UnknownHostException(host);
+                    }
+                    return defaultDnsResolver.resolve(host);
+                });
+                return builder;
+            }
+        };
         super.setUp();
     }
 
@@ -150,29 +172,51 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
             Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build()
         );
 
-        return new S3BlobContainer(
-            randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"),
-            new S3BlobStore(
-                service,
-                "bucket",
-                S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
-                bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
-                S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
-                S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
-                repositoryMetadata,
-                BigArrays.NON_RECYCLING_INSTANCE,
-                new DeterministicTaskQueue().getThreadPool(),
-                RepositoriesMetrics.NOOP
-            )
-        ) {
+        final S3BlobStore s3BlobStore = new S3BlobStore(
+            service,
+            "bucket",
+            S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
+            bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
+            S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
+            S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
+            repositoryMetadata,
+            BigArrays.NON_RECYCLING_INSTANCE,
+            new DeterministicTaskQueue().getThreadPool(),
+            RepositoriesMetrics.NOOP
+        );
+        return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) {
             @Override
             public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
-                return new AssertingInputStream(super.readBlob(purpose, blobName), blobName);
+                return new AssertingInputStream(new S3RetryingInputStream(purpose, s3BlobStore, buildKey(blobName)) {
+                    @Override
+                    protected long getRetryDelayInMillis() {
+                        assert super.getRetryDelayInMillis() > 0;
+                        return 0;
+                    }
+                }, blobName);
             }
 
             @Override
             public InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException {
-                return new AssertingInputStream(super.readBlob(purpose, blobName, position, length), blobName, position, length);
+                final InputStream inputStream;
+                if (length == 0) {
+                    inputStream = new ByteArrayInputStream(new byte[0]);
+                } else {
+                    inputStream = new S3RetryingInputStream(
+                        purpose,
+                        s3BlobStore,
+                        buildKey(blobName),
+                        position,
+                        Math.addExact(position, length - 1)
+                    ) {
+                        @Override
+                        protected long getRetryDelayInMillis() {
+                            assert super.getRetryDelayInMillis() > 0;
+                            return 0;
+                        }
+                    };
+                }
+                return new AssertingInputStream(inputStream, blobName, position, length);
             }
         };
     }
@@ -574,6 +618,118 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
         });
     }
 
+    public void testReadWithIndicesPurposeRetriesForever() throws IOException {
+        final int maxRetries = between(0, 5);
+        final int totalFailures = Math.max(30, maxRetries * between(30, 80));
+        final int bufferSizeBytes = scaledRandomIntBetween(
+            0,
+            randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
+        );
+        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
+        final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
+
+        final byte[] bytes = randomBlobContent(512);
+
+        shouldErrorOnDns.set(true);
+        final AtomicInteger failures = new AtomicInteger();
+        @SuppressForbidden(reason = "use a http server")
+        class FlakyReadHandler implements HttpHandler {
+
+            @Override
+            public void handle(HttpExchange exchange) throws IOException {
+                Streams.readFully(exchange.getRequestBody());
+                if (failures.get() > totalFailures && randomBoolean()) {
+                    final int rangeStart = getRangeStart(exchange);
+                    assertThat(rangeStart, lessThan(bytes.length));
+                    exchange.getResponseHeaders().add("Content-Type", bytesContentType());
+                    final OptionalInt rangeEnd = getRangeEnd(exchange);
+                    final int length;
+                    if (rangeEnd.isPresent() == false) {
+                        final var remainderLength = bytes.length - rangeStart;
+                        exchange.sendResponseHeaders(HttpStatus.SC_OK, remainderLength);
+                        length = remainderLength < meaningfulProgressBytes
+                            ? remainderLength
+                            : between(meaningfulProgressBytes, remainderLength);
+                    } else {
+                        final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd.getAsInt());
+                        length = (effectiveRangeEnd - rangeStart) + 1;
+                        exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
+                    }
+                    exchange.getResponseBody().write(bytes, rangeStart, length);
+                } else {
+                    failures.incrementAndGet();
+                    if (randomBoolean()) {
+                        exchange.sendResponseHeaders(
+                            randomFrom(
+                                HttpStatus.SC_INTERNAL_SERVER_ERROR,
+                                HttpStatus.SC_BAD_GATEWAY,
+                                HttpStatus.SC_SERVICE_UNAVAILABLE,
+                                HttpStatus.SC_GATEWAY_TIMEOUT
+                            ),
+                            -1
+                        );
+                    } else {
+                        if (randomBoolean()) {
+                            final var bytesSent = sendIncompleteContent(exchange, bytes);
+                            if (bytesSent >= meaningfulProgressBytes) {
+                                exchange.getResponseBody().flush();
+                            }
+                        }
+                    }
+                }
+                exchange.close();
+            }
+        }
+
+        httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_retries_forever"), new FlakyReadHandler());
+
+        // Ranged read
+        final int position = between(0, bytes.length - 1);
+        final int length = between(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
+        logger.info("--> position={}, length={}", position, length);
+        try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever", position, length)) {
+            final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
+            assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
+        }
+        assertThat(failures.get(), greaterThan(totalFailures));
+
+        // Read the whole blob
+        failures.set(0);
+        try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever")) {
+            final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
+            assertArrayEquals(bytes, bytesRead);
+        }
+        assertThat(failures.get(), greaterThan(totalFailures));
+    }
+
+    public void testDoesNotRetryOnNotFound() {
+        final int maxRetries = between(3, 5);
+        final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
+
+        final AtomicInteger numberOfReads = new AtomicInteger(0);
+        @SuppressForbidden(reason = "use a http server")
+        class NotFoundReadHandler implements HttpHandler {
+            @Override
+            public void handle(HttpExchange exchange) throws IOException {
+                numberOfReads.incrementAndGet();
+                exchange.sendResponseHeaders(HttpStatus.SC_NOT_FOUND, -1);
+                exchange.close();
+            }
+        }
+
+        httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_not_found"), new NotFoundReadHandler());
+        expectThrows(NoSuchFileException.class, () -> {
+            try (
+                InputStream inputStream = randomBoolean()
+                    ? blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found")
+                    : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found", between(0, 100), between(1, 100))
+            ) {
+                Streams.readFully(inputStream);
+            }
+        });
+        assertThat(numberOfReads.get(), equalTo(1));
+    }
+
     @Override
     protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
         // some attempts make meaningful progress and do not count towards the max retry limit
@@ -585,6 +741,14 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
         return randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose);
     }
 
+    @Override
+    protected OperationPurpose randomFiniteRetryingPurpose() {
+        return randomValueOtherThanMany(
+            purpose -> purpose == OperationPurpose.REPOSITORY_ANALYSIS || purpose == OperationPurpose.INDICES,
+            BlobStoreTestUtil::randomPurpose
+        );
+    }
+
     /**
      * Asserts that an InputStream is fully consumed, or aborted, when it is closed
      */
@@ -605,6 +769,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
 
         AssertingInputStream(InputStream in, String blobName, long position, long length) {
             super(in);
+            assert position >= 0L;
+            assert length >= 0;
             this.blobName = blobName;
             this.position = position;
             this.length = length;

+ 11 - 7
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java

@@ -253,7 +253,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
 
         Exception exception = expectThrows(
             unresponsiveExceptionType(),
-            () -> Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_unresponsive"))
+            () -> Streams.readFully(blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_unresponsive"))
         );
         assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
         assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
@@ -270,8 +270,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
         exception = expectThrows(Exception.class, () -> {
             try (
                 InputStream stream = randomBoolean()
-                    ? blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete")
-                    : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete", position, length)
+                    ? blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_incomplete")
+                    : blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_incomplete", position, length)
             ) {
                 Streams.readFully(stream);
             }
@@ -294,6 +294,10 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
         return randomPurpose();
     }
 
+    protected OperationPurpose randomFiniteRetryingPurpose() {
+        return randomPurpose();
+    }
+
     public void testReadBlobWithNoHttpResponse() {
         final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
         final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
@@ -303,9 +307,9 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
 
         Exception exception = expectThrows(unresponsiveExceptionType(), () -> {
             if (randomBoolean()) {
-                Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_no_response"));
+                Streams.readFully(blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_no_response"));
             } else {
-                Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_no_response", 0, 1));
+                Streams.readFully(blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_no_response", 0, 1));
             }
         });
         assertThat(
@@ -328,8 +332,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
         final Exception exception = expectThrows(Exception.class, () -> {
             try (
                 InputStream stream = randomBoolean()
-                    ? blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete", 0, 1)
-                    : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_incomplete")
+                    ? blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_incomplete", 0, 1)
+                    : blobContainer.readBlob(randomFiniteRetryingPurpose(), "read_blob_incomplete")
             ) {
                 Streams.readFully(stream);
             }

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

@@ -254,7 +254,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
     /**
      * Consumes and closes the given {@link InputStream}
      */
-    protected static void drainInputStream(final InputStream inputStream) throws IOException {
+    public static void drainInputStream(final InputStream inputStream) throws IOException {
         while (inputStream.read(BUFFER) >= 0)
             ;
     }