Browse Source

Fix AzureBlobStore#convertStreamToByteBuffer chunking in Windows (#78772)

Today we are using Math.ceil in order to calculate the number of
chunks in the request. Since we cast the values to a double...
there be dragons. This could cause issues depending on the platform.

This commit uses good old integers to compute the number of parts.
Francisco Fernández Castaño 4 years ago
parent
commit
c766b5fec8

+ 5 - 1
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -547,7 +547,11 @@ public class AzureBlobStore implements BlobStore {
             // reclaim them (see MonoSendMany). Additionally, that very same operator requests
             // 128 elements (that's hardcoded) once it's subscribed (later on, it requests
             // by 64 elements), that's why we provide 64kb buffers.
-            return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize))
+
+            // length is at most 100MB so it's safe to cast back to an integer in this case
+            final int parts = (int) length / chunkSize;
+            final long remaining = length % chunkSize;
+            return Flux.range(0, remaining == 0 ? parts : parts + 1)
                 .map(i -> i * chunkSize)
                 .concatMap(pos -> Mono.fromCallable(() -> {
                     long count = pos + chunkSize > length ? length - pos : chunkSize;

+ 21 - 2
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

@@ -329,8 +329,11 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
     public void testWriteLargeBlob() throws Exception {
         final int maxRetries = randomIntBetween(2, 5);
 
-        final byte[] data = randomBytes((int) ByteSizeUnit.MB.toBytes(10));
-        int nbBlocks = (int) Math.ceil((double) data.length / (double) ByteSizeUnit.MB.toBytes(1));
+        final byte[] data = randomBytes(ByteSizeUnit.MB.toIntBytes(10) + randomIntBetween(0, ByteSizeUnit.MB.toIntBytes(1)));
+        int nbBlocks = data.length / ByteSizeUnit.MB.toIntBytes(1);
+        if (data.length % ByteSizeUnit.MB.toIntBytes(1) != 0) {
+            nbBlocks += 1;
+        }
 
         final int nbErrors = 2; // we want all requests to fail at least once
         final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks);
@@ -378,6 +381,9 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
             if (randomBoolean()) {
                 Streams.readFully(exchange.getRequestBody());
                 AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
+            } else {
+                long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
+                readFromInputStream(exchange.getRequestBody(), randomLongBetween(0, contentLength));
             }
             exchange.close();
         });
@@ -621,4 +627,17 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
         InetSocketAddress address = server.getAddress();
         return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/" + accountName;
     }
+
+    private void readFromInputStream(InputStream inputStream, long bytesToRead) {
+        try {
+            long totalBytesRead = 0;
+            int bytesRead;
+            while ((bytesRead = inputStream.read()) != -1 && totalBytesRead < bytesToRead) {
+                totalBytesRead += bytesRead;
+            }
+            assertThat(totalBytesRead, equalTo(bytesToRead));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }