Browse Source

Faster Azure Blob InputStream (#61812)

Building our own that should perform better than the one in the SDK.
Also, as a result saving a HEAD call for each ranged read on Azure.
Armin Braun 5 years ago
parent
commit
953536b457

+ 111 - 23
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -26,7 +26,6 @@ import com.microsoft.azure.storage.RequestCompletedEvent;
 import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageEvent;
 import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.BlobInputStream;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
 import com.microsoft.azure.storage.blob.CloudBlob;
@@ -49,10 +48,12 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.repositories.azure.AzureRepository.Repository;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -246,9 +247,22 @@ public class AzureBlobStore implements BlobStore {
         final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
         final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
         logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
-        final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
-            blockBlobReference.openInputStream(position, length, null, null, context));
-        return giveSocketPermissionsToStream(is);
+        final long limit;
+        if (length == null) {
+            // Loading the blob attributes so we can get its length
+            SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadAttributes(null, null, context));
+            limit = blockBlobReference.getProperties().getLength() - position;
+        }
+        else {
+            limit = length;
+        }
+        final BlobInputStream blobInputStream = new BlobInputStream(limit, blockBlobReference, position, context);
+        if (length != null) {
+            // pre-filling the buffer in case of ranged reads so this method throws a 404 storage exception right away in case the blob
+            // does not exist
+            blobInputStream.fill();
+        }
+        return blobInputStream;
     }
 
     public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix)
@@ -348,25 +362,6 @@ public class AzureBlobStore implements BlobStore {
         return context;
     }
 
-    static InputStream giveSocketPermissionsToStream(final InputStream stream) {
-        return new InputStream() {
-            @Override
-            public int read() throws IOException {
-                return SocketAccess.doPrivilegedIOException(stream::read);
-            }
-
-            @Override
-            public int read(byte[] b) throws IOException {
-                return SocketAccess.doPrivilegedIOException(() -> stream.read(b));
-            }
-
-            @Override
-            public int read(byte[] b, int off, int len) throws IOException {
-                return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len));
-            }
-        };
-    }
-
     @Override
     public Map<String, Long> stats() {
         return stats.toMap();
@@ -395,4 +390,97 @@ public class AzureBlobStore implements BlobStore {
                 "PutBlockList", putBlockListOperations.get());
         }
     }
+
+    /**
+     * Building our own input stream instead of using the SDK's {@link com.microsoft.azure.storage.blob.BlobInputStream}
+     * because that stream is highly inefficient in both memory and CPU use.
+     */
+    private static class BlobInputStream extends InputStream {
+
+        /**
+         * Maximum number of bytes to fetch per read request and thus to buffer on heap at a time.
+         * Set to 4M because that's what {@link com.microsoft.azure.storage.blob.BlobInputStream} uses.
+         */
+        private static final int MAX_READ_CHUNK_SIZE = ByteSizeUnit.MB.toIntBytes(4);
+
+        /**
+         * Using a {@link ByteArrayOutputStream} as a buffer instead of a byte array since the byte array APIs on the SDK are less
+         * efficient.
+         */
+        private final ByteArrayOutputStream buffer;
+
+        private final long limit;
+
+        private final CloudBlockBlob blockBlobReference;
+
+        private final long start;
+
+        private final OperationContext context;
+
+        // current read position on the byte array backing #buffer
+        private int pos;
+
+        // current position up to which the contents of the blob where buffered
+        private long offset;
+
+        BlobInputStream(long limit, CloudBlockBlob blockBlobReference, long start, OperationContext context) {
+            this.limit = limit;
+            this.blockBlobReference = blockBlobReference;
+            this.start = start;
+            this.context = context;
+            buffer = new ByteArrayOutputStream(Math.min(MAX_READ_CHUNK_SIZE, Math.toIntExact(Math.min(limit, Integer.MAX_VALUE)))) {
+                @Override
+                public byte[] toByteArray() {
+                    return buf;
+                }
+            };
+            pos = 0;
+            offset = 0;
+        }
+
+        @Override
+        public int read() throws IOException {
+            try {
+                fill();
+            } catch (StorageException | URISyntaxException ex) {
+                throw new IOException(ex);
+            }
+            if (pos == buffer.size()) {
+                return -1;
+            }
+            return buffer.toByteArray()[pos++];
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            try {
+                fill();
+            } catch (StorageException | URISyntaxException ex) {
+                throw new IOException(ex);
+            }
+            final int buffered = buffer.size();
+            int remaining = buffered - pos;
+            if (len > 0 && remaining == 0) {
+                return -1;
+            }
+            final int toRead = Math.min(remaining, len);
+            System.arraycopy(buffer.toByteArray(), pos, b, off, toRead);
+            pos += toRead;
+            return toRead;
+        }
+
+        void fill() throws StorageException, URISyntaxException {
+            if (pos == buffer.size()) {
+                final long toFill = Math.min(limit - this.offset, MAX_READ_CHUNK_SIZE);
+                if (toFill <= 0L) {
+                    return;
+                }
+                buffer.reset();
+                SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadRange(
+                        start + this.offset, toFill, buffer, null, null, context));
+                this.offset += buffer.size();
+                pos = 0;
+            }
+        }
+    }
 }

+ 1 - 9
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

@@ -217,20 +217,13 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
 
     public void testReadRangeBlobWithRetries() throws Exception {
         final int maxRetries = randomIntBetween(1, 5);
-        final CountDown countDownHead = new CountDown(maxRetries);
         final CountDown countDownGet = new CountDown(maxRetries);
         final byte[] bytes = randomBlobContent();
         httpServer.createContext("/container/read_range_blob_max_retries", exchange -> {
             try {
                 Streams.readFully(exchange.getRequestBody());
                 if ("HEAD".equals(exchange.getRequestMethod())) {
-                    if (countDownHead.countDown()) {
-                        exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
-                        exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
-                        exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
-                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
-                        return;
-                    }
+                    throw new AssertionError("Should not HEAD blob for ranged reads");
                 } else if ("GET".equals(exchange.getRequestMethod())) {
                     if (countDownGet.countDown()) {
                         final int rangeStart = getRangeStart(exchange);
@@ -262,7 +255,6 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
         try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
             final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
             assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
-            assertThat(countDownHead.isCountedDown(), is(true));
             assertThat(countDownGet.isCountedDown(), is(true));
         }
     }