Browse Source

Revert "Use Azure Bulk Deletes in Azure Repository (#53919)" (#54089)

This reverts commit 23cccf088810b8416ed278571352393cc2de9523.
Unfortunately SAS token auth still doesn't work with bulk deletes so we can't use them yet.

Closes #54080
Armin Braun 5 years ago
parent
commit
c94be58b5a

+ 2 - 0
plugins/repository-azure/qa/microsoft-azure-storage/build.gradle

@@ -88,5 +88,7 @@ testClusters.integTest {
     // in a hacky way to change the protocol and endpoint. We must fix that.
     // in a hacky way to change the protocol and endpoint. We must fix that.
     setting 'azure.client.integration_test.endpoint_suffix',
     setting 'azure.client.integration_test.endpoint_suffix',
       { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
       { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
+    String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
+    setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
   }
   }
 }
 }

+ 36 - 5
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -23,12 +23,17 @@ import com.microsoft.azure.storage.LocationMode;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.StorageException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.support.GroupedActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.threadpool.ThreadPool;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
@@ -37,18 +42,20 @@ import java.net.URISyntaxException;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.NoSuchFileException;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutorService;
 
 
 public class AzureBlobContainer extends AbstractBlobContainer {
 public class AzureBlobContainer extends AbstractBlobContainer {
 
 
     private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
     private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
     private final AzureBlobStore blobStore;
     private final AzureBlobStore blobStore;
+    private final ThreadPool threadPool;
     private final String keyPath;
     private final String keyPath;
 
 
-    AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
+    AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
         super(path);
         super(path);
         this.blobStore = blobStore;
         this.blobStore = blobStore;
         this.keyPath = path.buildAsString();
         this.keyPath = path.buildAsString();
+        this.threadPool = threadPool;
     }
     }
 
 
     private boolean blobExists(String blobName) {
     private boolean blobExists(String blobName) {
@@ -105,7 +112,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
     @Override
     @Override
     public DeleteResult delete() throws IOException {
     public DeleteResult delete() throws IOException {
         try {
         try {
-            return blobStore.deleteBlobDirectory(keyPath);
+            return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
         } catch (URISyntaxException | StorageException e) {
         } catch (URISyntaxException | StorageException e) {
             throw new IOException(e);
             throw new IOException(e);
         }
         }
@@ -113,9 +120,33 @@ public class AzureBlobContainer extends AbstractBlobContainer {
 
 
     @Override
     @Override
     public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
     public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
+        final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
+        if (blobNames.isEmpty()) {
+            result.onResponse(null);
+        } else {
+            final GroupedActionListener<Void> listener =
+                new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
+            final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
+            // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
+            // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
+            for (String blobName : blobNames) {
+                executor.execute(ActionRunnable.run(listener, () -> {
+                    logger.trace("deleteBlob({})", blobName);
+                    try {
+                        blobStore.deleteBlob(buildKey(blobName));
+                    } catch (StorageException e) {
+                        if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
+                            throw new IOException(e);
+                        }
+                    } catch (URISyntaxException e) {
+                        throw new IOException(e);
+                    }
+                }));
+            }
+        }
         try {
         try {
-            blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
-        } catch (URISyntaxException | StorageException e) {
+            result.actionGet();
+        } catch (Exception e) {
             throw new IOException("Exception during bulk delete", e);
             throw new IOException("Exception during bulk delete", e);
         }
         }
     }
     }

+ 12 - 8
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -28,13 +28,14 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.repositories.azure.AzureRepository.Repository;
 import org.elasticsearch.repositories.azure.AzureRepository.Repository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.function.Function;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
@@ -43,15 +44,17 @@ import static java.util.Collections.emptyMap;
 public class AzureBlobStore implements BlobStore {
 public class AzureBlobStore implements BlobStore {
 
 
     private final AzureStorageService service;
     private final AzureStorageService service;
+    private final ThreadPool threadPool;
 
 
     private final String clientName;
     private final String clientName;
     private final String container;
     private final String container;
     private final LocationMode locationMode;
     private final LocationMode locationMode;
 
 
-    public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
+    public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
         this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
         this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
         this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
         this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
         this.service = service;
         this.service = service;
+        this.threadPool = threadPool;
         // locationMode is set per repository, not per client
         // locationMode is set per repository, not per client
         this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
         this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
         final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
         final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@@ -77,7 +80,7 @@ public class AzureBlobStore implements BlobStore {
 
 
     @Override
     @Override
     public BlobContainer blobContainer(BlobPath path) {
     public BlobContainer blobContainer(BlobPath path) {
-        return new AzureBlobContainer(path, this);
+        return new AzureBlobContainer(path, this, threadPool);
     }
     }
 
 
     @Override
     @Override
@@ -88,12 +91,13 @@ public class AzureBlobStore implements BlobStore {
         return service.blobExists(clientName, container, blob);
         return service.blobExists(clientName, container, blob);
     }
     }
 
 
-    public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
-        service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
+    public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
+        service.deleteBlob(clientName, container, blob);
     }
     }
 
 
-    public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
-        return service.deleteBlobDirectory(clientName, container, path);
+    public DeleteResult deleteBlobDirectory(String path, Executor executor)
+            throws URISyntaxException, StorageException, IOException {
+        return service.deleteBlobDirectory(clientName, container, path, executor);
     }
     }
 
 
     public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
     public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
@@ -107,7 +111,7 @@ public class AzureBlobStore implements BlobStore {
 
 
     public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
     public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
         return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
         return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
-            Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
+            Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
     }
     }
 
 
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)

+ 1 - 1
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

@@ -115,7 +115,7 @@ public class AzureRepository extends BlobStoreRepository {
 
 
     @Override
     @Override
     protected AzureBlobStore createBlobStore() {
     protected AzureBlobStore createBlobStore() {
-        final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
+        final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
 
 
         logger.debug(() -> new ParameterizedMessage(
         logger.debug(() -> new ParameterizedMessage(
             "using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
             "using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

+ 15 - 0
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

@@ -23,12 +23,16 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsException;
 import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.ReloadablePlugin;
 import org.elasticsearch.plugins.ReloadablePlugin;
 import org.elasticsearch.plugins.RepositoryPlugin;
 import org.elasticsearch.plugins.RepositoryPlugin;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.ScalingExecutorBuilder;
+
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
@@ -39,6 +43,8 @@ import java.util.Map;
  */
  */
 public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
 public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
 
 
+    public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
+
     // protected for testing
     // protected for testing
     final AzureStorageService azureStoreService;
     final AzureStorageService azureStoreService;
 
 
@@ -74,6 +80,15 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
         );
         );
     }
     }
 
 
+    @Override
+    public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
+        return Collections.singletonList(executorBuilder());
+    }
+
+    public static ExecutorBuilder<?> executorBuilder() {
+        return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
+    }
+
     @Override
     @Override
     public void reload(Settings settings) {
     public void reload(Settings settings) {
         // secure settings should be readable
         // secure settings should be readable

+ 54 - 44
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

@@ -20,16 +20,13 @@
 package org.elasticsearch.repositories.azure;
 package org.elasticsearch.repositories.azure;
 
 
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.BatchException;
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.Constants;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.RetryExponentialRetry;
 import com.microsoft.azure.storage.RetryExponentialRetry;
 import com.microsoft.azure.storage.RetryPolicy;
 import com.microsoft.azure.storage.RetryPolicy;
 import com.microsoft.azure.storage.RetryPolicyFactory;
 import com.microsoft.azure.storage.RetryPolicyFactory;
 import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
 import com.microsoft.azure.storage.blob.BlobInputStream;
 import com.microsoft.azure.storage.blob.BlobInputStream;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
 import com.microsoft.azure.storage.blob.BlobProperties;
@@ -44,6 +41,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.DeleteResult;
@@ -53,6 +51,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsException;
 import org.elasticsearch.common.settings.SettingsException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
@@ -63,13 +62,13 @@ import java.nio.file.FileAlreadyExistsException;
 import java.security.InvalidKeyException;
 import java.security.InvalidKeyException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
@@ -188,61 +187,72 @@ public class AzureStorageService {
         });
         });
     }
     }
 
 
-    public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
-            throws URISyntaxException, StorageException {
-        logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
+    public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         // Container name must be lower case.
         // Container name must be lower case.
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
-        final Iterator<String> blobIterator = blobs.iterator();
-        int currentBatchSize = 0;
-        while (blobIterator.hasNext()) {
-            final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
-            do {
-                batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
-                    DeleteSnapshotsOption.NONE, null, null);
-                ++currentBatchSize;
-            } while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
-            currentBatchSize = 0;
-            try {
-                SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
-            } catch (BatchException e) {
-                for (StorageException ex : e.getExceptions().values()) {
-                    if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
-                        logger.error("Batch exceptions [{}]", e.getExceptions());
-                        throw e;
-                    }
-                }
-            }
-        }
+        logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
+        SocketAccess.doPrivilegedVoidException(() -> {
+            final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
+            logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
+            azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
+        });
     }
     }
 
 
-    DeleteResult deleteBlobDirectory(String account, String container, String path)
+    DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
             throws URISyntaxException, StorageException, IOException {
             throws URISyntaxException, StorageException, IOException {
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
         final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
+        final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
+        final AtomicLong outstanding = new AtomicLong(1L);
+        final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
         final AtomicLong blobsDeleted = new AtomicLong();
         final AtomicLong blobsDeleted = new AtomicLong();
         final AtomicLong bytesDeleted = new AtomicLong();
         final AtomicLong bytesDeleted = new AtomicLong();
-        final List<String> blobsToDelete = new ArrayList<>();
         SocketAccess.doPrivilegedVoidException(() -> {
         SocketAccess.doPrivilegedVoidException(() -> {
-            for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
+            for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
                 // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
                 // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
                 // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
                 // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
                 final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
                 final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
-                final long len;
-                if (blobItem instanceof CloudBlob) {
-                    len = ((CloudBlob) blobItem).getProperties().getLength();
-                } else {
-                    len = -1L;
-                }
-                blobsToDelete.add(blobPath);
-                blobsDeleted.incrementAndGet();
-                if (len >= 0) {
-                    bytesDeleted.addAndGet(len);
-                }
+                outstanding.incrementAndGet();
+                executor.execute(new AbstractRunnable() {
+                    @Override
+                    protected void doRun() throws Exception {
+                        final long len;
+                        if (blobItem instanceof CloudBlob) {
+                            len = ((CloudBlob) blobItem).getProperties().getLength();
+                        } else {
+                            len = -1L;
+                        }
+                        deleteBlob(account, container, blobPath);
+                        blobsDeleted.incrementAndGet();
+                        if (len >= 0) {
+                            bytesDeleted.addAndGet(len);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        exceptions.add(e);
+                    }
+
+                    @Override
+                    public void onAfter() {
+                        if (outstanding.decrementAndGet() == 0) {
+                            result.onResponse(null);
+                        }
+                    }
+                });
             }
             }
         });
         });
-        deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
+        if (outstanding.decrementAndGet() == 0) {
+            result.onResponse(null);
+        }
+        result.actionGet();
+        if (exceptions.isEmpty() == false) {
+            final IOException ex = new IOException("Deleting directory [" + path + "] failed");
+            exceptions.forEach(ex::addSuppressed);
+            throw ex;
+        }
         return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
         return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
     }
     }
 
 

+ 7 - 1
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

@@ -44,6 +44,8 @@ import org.elasticsearch.mocksocket.MockHttpServer;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 
 
@@ -62,6 +64,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
@@ -88,9 +91,11 @@ import static org.hamcrest.Matchers.lessThan;
 public class AzureBlobContainerRetriesTests extends ESTestCase {
 public class AzureBlobContainerRetriesTests extends ESTestCase {
 
 
     private HttpServer httpServer;
     private HttpServer httpServer;
+    private ThreadPool threadPool;
 
 
     @Before
     @Before
     public void setUp() throws Exception {
     public void setUp() throws Exception {
+        threadPool = new TestThreadPool(getTestClass().getName(), AzureRepositoryPlugin.executorBuilder());
         httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
         httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
         httpServer.start();
         httpServer.start();
         super.setUp();
         super.setUp();
@@ -100,6 +105,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
     public void tearDown() throws Exception {
     public void tearDown() throws Exception {
         httpServer.stop(0);
         httpServer.stop(0);
         super.tearDown();
         super.tearDown();
+        ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
     }
     }
 
 
     private BlobContainer createBlobContainer(final int maxRetries) {
     private BlobContainer createBlobContainer(final int maxRetries) {
@@ -139,7 +145,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
                 .put(ACCOUNT_SETTING.getKey(), clientName)
                 .put(ACCOUNT_SETTING.getKey(), clientName)
                 .build());
                 .build());
 
 
-        return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service));
+        return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service, threadPool), threadPool);
     }
     }
 
 
     public void testReadNonexistentBlobThrowsNoSuchFileException() {
     public void testReadNonexistentBlobThrowsNoSuchFileException() {

+ 1 - 1
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

@@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
 
 
     @Override
     @Override
     protected Map<String, HttpHandler> createHttpHandlers() {
     protected Map<String, HttpHandler> createHttpHandlers() {
-        return Collections.singletonMap("/", new AzureBlobStoreHttpHandler("container"));
+        return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
     }
     }
 
 
     @Override
     @Override

+ 1 - 1
test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java

@@ -30,7 +30,7 @@ public class AzureHttpFixture {
 
 
     private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
     private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
         this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
         this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
-        server.createContext("/", new AzureHttpHandler(container));
+        server.createContext("/" + container, new AzureHttpHandler(container));
     }
     }
 
 
     private void start() throws Exception {
     private void start() throws Exception {

+ 4 - 51
test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java

@@ -22,21 +22,16 @@ import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpHandler;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.RestUtils;
 
 
-import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -72,21 +67,18 @@ public class AzureHttpHandler implements HttpHandler {
             assert read == -1 : "Request body should have been empty but saw [" + read + "]";
             assert read == -1 : "Request body should have been empty but saw [" + read + "]";
         }
         }
         try {
         try {
-            // Request body is closed in the finally block
-            final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody()));
             if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
             if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
                 // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
                 // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
                 final Map<String, String> params = new HashMap<>();
                 final Map<String, String> params = new HashMap<>();
                 RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
                 RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
 
 
                 final String blockId = params.get("blockid");
                 final String blockId = params.get("blockid");
-                blobs.put(blockId, requestBody);
+                blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
                 exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
                 exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
 
 
             } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
             } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
                 // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
                 // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
-                final String blockList =
-                    Streams.copyToString(new InputStreamReader(requestBody.streamInput(), StandardCharsets.UTF_8));
+                final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
                 final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
                 final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
                     .filter(line -> line.contains("</Latest>"))
                     .filter(line -> line.contains("</Latest>"))
                     .map(line -> line.substring(0, line.indexOf("</Latest>")))
                     .map(line -> line.substring(0, line.indexOf("</Latest>")))
@@ -105,12 +97,12 @@ public class AzureHttpHandler implements HttpHandler {
                 // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
                 // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
                 final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
                 final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
                 if ("*".equals(ifNoneMatch)) {
                 if ("*".equals(ifNoneMatch)) {
-                    if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), requestBody) != null) {
+                    if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) {
                         sendError(exchange, RestStatus.CONFLICT);
                         sendError(exchange, RestStatus.CONFLICT);
                         return;
                         return;
                     }
                     }
                 } else {
                 } else {
-                    blobs.put(exchange.getRequestURI().getPath(), requestBody);
+                    blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
                 }
                 }
                 exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
                 exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
 
 
@@ -198,45 +190,6 @@ public class AzureHttpHandler implements HttpHandler {
                 exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
                 exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
                 exchange.getResponseBody().write(response);
                 exchange.getResponseBody().write(response);
 
 
-            } else if (Regex.simpleMatch("POST /?comp=batch", request)) {
-                // Batch Delete (https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch)
-                try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestBody.streamInput()))) {
-                    final Set<String> toDelete = reader.lines().filter(l -> l.startsWith("DELETE"))
-                        .map(l -> l.split(" ")[1]).collect(Collectors.toSet());
-                    final BytesStreamOutput baos = new BytesStreamOutput();
-                    final String batchSeparator = "batchresponse_" + UUIDs.randomBase64UUID();
-                    try (Writer writer = new OutputStreamWriter(baos)) {
-                        int contentId = 0;
-                        for (String b : toDelete) {
-                            writer.write("\r\n--" + batchSeparator + "\r\n" +
-                                "Content-Type: application/http \r\n" +
-                                "Content-ID: " + contentId++ + " \r\n");
-                            if (blobs.remove(b) == null) {
-                                writer.write("\r\nHTTP/1.1 404 The specified blob does not exist. \r\n" +
-                                    "x-ms-error-code: BlobNotFound \r\n" +
-                                    "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
-                                    "x-ms-version: 2018-11-09\r\n" +
-                                    "Content-Length: 216 \r\n" +
-                                    "Content-Type: application/xml\r\n\r\n" +
-                                    "<?xml version=\"1.0\" encoding=\"utf-8\"?> \r\n" +
-                                    "<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\r\n" +
-                                    "RequestId:" + UUIDs.randomBase64UUID() + "\r\n" +
-                                    "Time:2020-01-01T01:01:01.0000000Z</Message></Error>\r\n");
-                            } else {
-                                writer.write(
-                                    "\r\nHTTP/1.1 202 Accepted \r\n" +
-                                        "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" +
-                                        "x-ms-version: 2018-11-09\r\n\r\n");
-                            }
-                        }
-                        writer.write("--" + batchSeparator + "--");
-                    }
-                    final Headers headers = exchange.getResponseHeaders();
-                    headers.add("Content-Type",
-                        "multipart/mixed; boundary=" + batchSeparator);
-                    exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), baos.size());
-                    baos.bytes().writeTo(exchange.getResponseBody());
-                }
             } else {
             } else {
                 sendError(exchange, RestStatus.BAD_REQUEST);
                 sendError(exchange, RestStatus.BAD_REQUEST);
             }
             }