Browse Source

Fix Bug in Azure Repo Exception Handling (#47968)

We were incorrectly handling `IOExceptions` thrown by
the `InputStream` side of the upload operation, resulting
in a `ClassCastException` as we expected to never get
`IOException` from the Azure SDK code but we do in practice.
This PR also sets an assertion on `markSupported` for the
streams used by the SDK as adding the test for this scenario
revealed that the SDK client would retry uploads for
non-mark-supporting streams on `IOException` in the `InputStream`.
Armin Braun 6 years ago
parent
commit
58c62c56fe

+ 1 - 2
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -62,7 +62,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
         logger.trace("blobExists({})", blobName);
         try {
             return blobStore.blobExists(buildKey(blobName));
-        } catch (URISyntaxException | StorageException e) {
+        } catch (URISyntaxException | StorageException | IOException e) {
             logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
         }
         return false;
@@ -97,7 +97,6 @@ public class AzureBlobContainer extends AbstractBlobContainer {
     @Override
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
         logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
-
         try {
             blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
         } catch (URISyntaxException|StorageException e) {

+ 5 - 6
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -33,7 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
-import java.nio.file.FileAlreadyExistsException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -88,11 +87,11 @@ public class AzureBlobStore implements BlobStore {
     public void close() {
     }
 
-    public boolean blobExists(String blob) throws URISyntaxException, StorageException {
+    public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
         return service.blobExists(clientName, container, blob);
     }
 
-    public void deleteBlob(String blob) throws URISyntaxException, StorageException {
+    public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
         service.deleteBlob(clientName, container, blob);
     }
 
@@ -106,17 +105,17 @@ public class AzureBlobStore implements BlobStore {
     }
 
     public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
-        throws URISyntaxException, StorageException {
+        throws URISyntaxException, StorageException, IOException {
         return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
     }
 
-    public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
+    public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
         return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
             Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
     }
 
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
-        throws URISyntaxException, StorageException, FileAlreadyExistsException {
+        throws URISyntaxException, StorageException, IOException {
         service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
     }
 }

+ 5 - 4
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

@@ -267,7 +267,7 @@ public class AzureStorageService {
     }
 
     public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
-        throws URISyntaxException, StorageException {
+            throws URISyntaxException, StorageException, IOException {
         // NOTE: this should be here: if (prefix == null) prefix = "";
         // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
         // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
@@ -295,7 +295,7 @@ public class AzureStorageService {
         return Map.copyOf(blobsBuilder);
     }
 
-    public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
+    public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException {
         final var blobsBuilder = new HashSet<String>();
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
@@ -319,8 +319,9 @@ public class AzureStorageService {
     }
 
     public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
-                          boolean failIfAlreadyExists)
-        throws URISyntaxException, StorageException, FileAlreadyExistsException {
+                          boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
+        assert inputStream.markSupported()
+            : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
         logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);

+ 8 - 8
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.repositories.azure;
 
 import com.microsoft.azure.storage.StorageException;
+import org.apache.logging.log4j.core.util.Throwables;
 import org.elasticsearch.SpecialPermission;
 
 import java.io.IOException;
@@ -44,7 +45,9 @@ public final class SocketAccess {
         try {
             return AccessController.doPrivileged(operation);
         } catch (PrivilegedActionException e) {
-            throw (IOException) e.getCause();
+            Throwables.rethrow(e.getCause());
+            assert false : "always throws";
+            return null;
         }
     }
 
@@ -53,7 +56,9 @@ public final class SocketAccess {
         try {
             return AccessController.doPrivileged(operation);
         } catch (PrivilegedActionException e) {
-            throw (StorageException) e.getCause();
+            Throwables.rethrow(e.getCause());
+            assert false : "always throws";
+            return null;
         }
     }
 
@@ -65,12 +70,7 @@ public final class SocketAccess {
                 return null;
             });
         } catch (PrivilegedActionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof StorageException) {
-                throw (StorageException) cause;
-            } else {
-                throw (URISyntaxException) cause;
-            }
+            Throwables.rethrow(e.getCause());
         }
     }
 

+ 40 - 0
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

@@ -49,6 +49,7 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.InetAddress;
@@ -63,6 +64,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -294,6 +296,44 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
         assertThat(blocks.isEmpty(), is(true));
     }
 
+    public void testRetryUntilFail() throws IOException {
+        final AtomicBoolean requestReceived = new AtomicBoolean(false);
+        httpServer.createContext("/container/write_blob_max_retries", exchange -> {
+            try {
+                if (requestReceived.compareAndSet(false, true)) {
+                    throw new AssertionError("Should not receive two requests");
+                } else {
+                    exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
+                }
+            } finally {
+                exchange.close();
+            }
+        });
+
+        final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5));
+        try (InputStream stream = new InputStream() {
+
+            @Override
+            public int read() throws IOException {
+                throw new IOException("foo");
+            }
+
+            @Override
+            public boolean markSupported() {
+                return true;
+            }
+
+            @Override
+            public void reset() {
+                throw new AssertionError("should not be called");
+            }
+        }) {
+            final IOException ioe = expectThrows(IOException.class, () ->
+                blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean()));
+            assertThat(ioe.getMessage(), is("foo"));
+        }
+    }
+
     private static byte[] randomBlobContent() {
         return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
     }