|
@@ -12,7 +12,6 @@ import com.sun.net.httpserver.HttpExchange;
|
|
|
import com.sun.net.httpserver.HttpServer;
|
|
|
import org.apache.http.ConnectionClosedException;
|
|
|
import org.apache.http.HttpStatus;
|
|
|
-import org.elasticsearch.common.blobstore.BlobPath;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.SuppressForbidden;
|
|
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
|
@@ -69,17 +68,16 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
super.tearDown();
|
|
|
}
|
|
|
|
|
|
- protected abstract String downloadStorageEndpoint(String blob);
|
|
|
+ protected abstract String downloadStorageEndpoint(BlobContainer container, String blob);
|
|
|
|
|
|
protected abstract String bytesContentType();
|
|
|
|
|
|
protected abstract Class<? extends Exception> unresponsiveExceptionType();
|
|
|
|
|
|
protected abstract BlobContainer createBlobContainer(@Nullable Integer maxRetries,
|
|
|
- @Nullable TimeValue readTimeout,
|
|
|
- @Nullable Boolean disableChunkedEncoding,
|
|
|
- @Nullable ByteSizeValue bufferSize,
|
|
|
- @Nullable BlobPath path);
|
|
|
+ @Nullable TimeValue readTimeout,
|
|
|
+ @Nullable Boolean disableChunkedEncoding,
|
|
|
+ @Nullable ByteSizeValue bufferSize);
|
|
|
|
|
|
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
|
|
|
return either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))
|
|
@@ -87,7 +85,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
|
|
- final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null);
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
|
|
|
final long position = randomLongBetween(0, MAX_RANGE_VAL);
|
|
|
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
|
|
final Exception exception = expectThrows(
|
|
@@ -99,10 +97,11 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
Streams.readFully(blobContainer.readBlob("read_nonexistent_blob", 0, 1));
|
|
|
}
|
|
|
});
|
|
|
- assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
|
|
|
+ final String fullBlobPath = blobContainer.path().buildAsString() + "read_nonexistent_blob";
|
|
|
+ assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [" + fullBlobPath + "] not found"));
|
|
|
assertThat(expectThrows(NoSuchFileException.class,
|
|
|
() -> Streams.readFully(blobContainer.readBlob("read_nonexistent_blob", position, length)))
|
|
|
- .getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
|
|
|
+ .getMessage().toLowerCase(Locale.ROOT), containsString("blob object [" + fullBlobPath + "] not found"));
|
|
|
}
|
|
|
|
|
|
public void testReadBlobWithRetries() throws Exception {
|
|
@@ -110,7 +109,9 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
|
|
|
|
|
final byte[] bytes = randomBlobContent();
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_blob_max_retries"), exchange -> {
|
|
|
+ final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
|
|
+ httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
|
|
|
Streams.readFully(exchange.getRequestBody());
|
|
|
if (countDown.countDown()) {
|
|
|
final int rangeStart = getRangeStart(exchange);
|
|
@@ -132,8 +133,6 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
|
|
- final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
|
|
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
|
|
|
final int readLimit;
|
|
|
final InputStream wrappedStream;
|
|
@@ -161,8 +160,10 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
final int maxRetries = rarely() ? randomInt(5) : 1;
|
|
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
|
|
|
|
|
+ final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
|
|
final byte[] bytes = randomBlobContent();
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_range_blob_max_retries"), exchange -> {
|
|
|
+ httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
|
|
|
Streams.readFully(exchange.getRequestBody());
|
|
|
if (countDown.countDown()) {
|
|
|
final int rangeStart = getRangeStart(exchange);
|
|
@@ -190,8 +191,6 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
|
|
- final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
|
|
final int position = randomIntBetween(0, bytes.length - 1);
|
|
|
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
|
|
|
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
|
|
@@ -220,10 +219,11 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
public void testReadBlobWithReadTimeouts() {
|
|
|
final int maxRetries = randomInt(5);
|
|
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
|
|
- final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
|
|
|
|
|
// HTTP server does not send a response
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_blob_unresponsive"), exchange -> {});
|
|
|
+ httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {
|
|
|
+ });
|
|
|
|
|
|
Exception exception = expectThrows(unresponsiveExceptionType(),
|
|
|
() -> Streams.readFully(blobContainer.readBlob("read_blob_unresponsive")));
|
|
@@ -232,7 +232,10 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
|
|
|
// HTTP server sends a partial response
|
|
|
final byte[] bytes = randomBlobContent();
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> sendIncompleteContent(exchange, bytes));
|
|
|
+ httpServer.createContext(
|
|
|
+ downloadStorageEndpoint(blobContainer, "read_blob_incomplete"),
|
|
|
+ exchange -> sendIncompleteContent(exchange, bytes)
|
|
|
+ );
|
|
|
|
|
|
final int position = randomIntBetween(0, bytes.length - 1);
|
|
|
final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
|
|
@@ -252,10 +255,10 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
|
|
|
public void testReadBlobWithNoHttpResponse() {
|
|
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
|
|
- final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null);
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
|
|
|
|
|
|
// HTTP server closes connection immediately
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_blob_no_response"), HttpExchange::close);
|
|
|
+ httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
|
|
|
|
|
|
Exception exception = expectThrows(unresponsiveExceptionType(),
|
|
|
() -> {
|
|
@@ -271,11 +274,11 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
|
|
|
|
|
public void testReadBlobWithPrematureConnectionClose() {
|
|
|
final int maxRetries = randomInt(20);
|
|
|
- final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
|
|
|
|
|
|
// HTTP server sends a partial response
|
|
|
final byte[] bytes = randomBlobContent(1);
|
|
|
- httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> {
|
|
|
+ httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_incomplete"), exchange -> {
|
|
|
sendIncompleteContent(exchange, bytes);
|
|
|
exchange.close();
|
|
|
});
|