|
@@ -21,6 +21,7 @@ package org.elasticsearch.cloud.azure.storage;
|
|
|
|
|
|
import com.microsoft.azure.storage.CloudStorageAccount;
|
|
|
import com.microsoft.azure.storage.LocationMode;
|
|
|
+import com.microsoft.azure.storage.OperationContext;
|
|
|
import com.microsoft.azure.storage.RetryExponentialRetry;
|
|
|
import com.microsoft.azure.storage.RetryPolicy;
|
|
|
import com.microsoft.azure.storage.StorageException;
|
|
@@ -29,6 +30,7 @@ import com.microsoft.azure.storage.blob.BlobProperties;
|
|
|
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
|
|
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
|
|
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
|
|
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
|
|
import com.microsoft.azure.storage.blob.ListBlobItem;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
import org.apache.logging.log4j.util.Supplier;
|
|
@@ -131,12 +133,23 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
return client;
|
|
|
}
|
|
|
|
|
|
+ private OperationContext generateOperationContext(String clientName) {
|
|
|
+ OperationContext context = new OperationContext();
|
|
|
+ AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
|
|
|
+
|
|
|
+ if (azureStorageSettings.getProxy() != null) {
|
|
|
+ context.setProxy(azureStorageSettings.getProxy());
|
|
|
+ }
|
|
|
+
|
|
|
+ return context;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean doesContainerExist(String account, LocationMode mode, String container) {
|
|
|
try {
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
- return SocketAccess.doPrivilegedException(blobContainer::exists);
|
|
|
+ return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)));
|
|
|
} catch (Exception e) {
|
|
|
logger.error("can not access container [{}]", container);
|
|
|
}
|
|
@@ -148,7 +161,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
logger.trace("removing container [{}]", container);
|
|
|
- SocketAccess.doPrivilegedException(blobContainer::deleteIfExists);
|
|
|
+ SocketAccess.doPrivilegedException(() -> blobContainer.deleteIfExists(null, null, generateOperationContext(account)));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -157,7 +170,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
logger.trace("creating container [{}]", container);
|
|
|
- SocketAccess.doPrivilegedException(blobContainer::createIfNotExists);
|
|
|
+ SocketAccess.doPrivilegedException(() -> blobContainer.createIfNotExists(null, null, generateOperationContext(account)));
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
logger.trace((Supplier<?>) () -> new ParameterizedMessage("fails creating container [{}]", container), e);
|
|
|
throw new RepositoryException(container, e.getMessage(), e);
|
|
@@ -174,7 +187,8 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
SocketAccess.doPrivilegedVoidException(() -> {
|
|
|
if (blobContainer.exists()) {
|
|
|
// We list the blobs using a flat blob listing mode
|
|
|
- for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
|
|
+ for (ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
|
|
|
+ generateOperationContext(account))) {
|
|
|
String blobName = blobNameFromUri(blobItem.getUri());
|
|
|
logger.trace("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri());
|
|
|
deleteBlob(account, mode, container, blobName);
|
|
@@ -208,9 +222,9 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
// Container name must be lower case.
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
- if (SocketAccess.doPrivilegedException(blobContainer::exists)) {
|
|
|
+ if (SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)))) {
|
|
|
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
|
|
- return SocketAccess.doPrivilegedException(azureBlob::exists);
|
|
|
+ return SocketAccess.doPrivilegedException(() -> azureBlob.exists(null, null, generateOperationContext(account)));
|
|
|
}
|
|
|
|
|
|
return false;
|
|
@@ -223,10 +237,11 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
// Container name must be lower case.
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
- if (SocketAccess.doPrivilegedException(blobContainer::exists)) {
|
|
|
+ if (SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, generateOperationContext(account)))) {
|
|
|
logger.trace("container [{}]: blob [{}] found. removing.", container, blob);
|
|
|
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
|
|
- SocketAccess.doPrivilegedVoidException(azureBlob::delete);
|
|
|
+ SocketAccess.doPrivilegedVoidException(() -> azureBlob.delete(DeleteSnapshotsOption.NONE, null, null,
|
|
|
+ generateOperationContext(account)));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -235,7 +250,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
logger.trace("reading container [{}], blob [{}]", container, blob);
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
|
|
|
- return SocketAccess.doPrivilegedException(blockBlobReference::openInputStream);
|
|
|
+ return SocketAccess.doPrivilegedException(() -> blockBlobReference.openInputStream(null, null, generateOperationContext(account)));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -243,7 +258,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
logger.trace("writing container [{}], blob [{}]", container, blob);
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
|
|
|
- return SocketAccess.doPrivilegedException(blockBlobReference::openOutputStream);
|
|
|
+ return SocketAccess.doPrivilegedException(() -> blockBlobReference.openOutputStream(null, null, generateOperationContext(account)));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -260,7 +275,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
SocketAccess.doPrivilegedVoidException(() -> {
|
|
|
if (blobContainer.exists()) {
|
|
|
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
|
|
- enumBlobListingDetails, null, null)) {
|
|
|
+ enumBlobListingDetails, null, generateOperationContext(account))) {
|
|
|
URI uri = blobItem.getUri();
|
|
|
logger.trace("blob url [{}]", uri);
|
|
|
|
|
@@ -284,11 +299,11 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|
|
CloudBlobClient client = this.getSelectedClient(account, mode);
|
|
|
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
|
|
CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);
|
|
|
- if (SocketAccess.doPrivilegedException(blobSource::exists)) {
|
|
|
+ if (SocketAccess.doPrivilegedException(() -> blobSource.exists(null, null, generateOperationContext(account)))) {
|
|
|
CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
|
|
|
SocketAccess.doPrivilegedVoidException(() -> {
|
|
|
- blobTarget.startCopy(blobSource);
|
|
|
- blobSource.delete();
|
|
|
+ blobTarget.startCopy(blobSource, null, null, null, generateOperationContext(account));
|
|
|
+ blobSource.delete(DeleteSnapshotsOption.NONE, null, null, generateOperationContext(account));
|
|
|
});
|
|
|
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
|
|
|
}
|