Browse Source

Streamline S3 Repository- and Client-Settings (#37393)

* Make repository settings override static settings
* Cache clients according to settings
   * Introduce custom implementations for the AWS credentials here to be able to use them as part of a hash key
Armin Braun 6 years ago
parent
commit
57823c484f

+ 26 - 0
docs/plugins/repository-s3.asciidoc

@@ -221,6 +221,32 @@ The following settings are supported:
     currently supported by the plugin. For more information about the
     different classes, see http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html[AWS Storage Classes Guide]
 
+NOTE: The option of defining client settings in the repository settings as documented below is considered deprecated:
+
+In addition to the above settings, you may also specify all non-secure client settings in the repository settings.
+In this case, the client settings found in the repository settings will be merged with those of the named client used by the repository.
+Conflicts between client and repository settings are resolved by the repository settings taking precedence over client settings.
+
+For example:
+
+[source,js]
+----
+PUT _snapshot/my_s3_repository
+{
+  "type": "s3",
+  "settings": {
+    "client": "my_client_name",
+    "bucket": "my_bucket_name",
+    "endpoint": "my.s3.endpoint"
+  }
+}
+----
+// CONSOLE
+// TEST[skip:we don't have s3 set up while testing this]
+
+This sets up a repository that uses all client settings from the client `my_client_named` except for the `endpoint` that is overridden
+to `my.s3.endpoint` by the repository settings.
+
 [[repository-s3-permissions]]
 ===== Recommended S3 Permissions
 

+ 62 - 0
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BasicCredentials.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.s3;
+
+import com.amazonaws.auth.AWSCredentials;
+
+import java.util.Objects;
+
+class S3BasicCredentials implements AWSCredentials {
+
+    private final String accessKey;
+
+    private final String secretKey;
+
+    S3BasicCredentials(String accessKey, String secretKey) {
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+    }
+
+    @Override
+    public final String getAWSAccessKeyId() {
+        return accessKey;
+    }
+
+    @Override
+    public final String getAWSSecretKey() {
+        return secretKey;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final S3BasicCredentials that = (S3BasicCredentials) o;
+        return accessKey.equals(that.accessKey) && secretKey.equals(that.secretKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(accessKey, secretKey);
+    }
+}

+ 57 - 0
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BasicSessionCredentials.java

@@ -0,0 +1,57 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.s3;
+
+import com.amazonaws.auth.AWSSessionCredentials;
+
+import java.util.Objects;
+
+final class S3BasicSessionCredentials extends S3BasicCredentials implements AWSSessionCredentials {
+
+    private final String sessionToken;
+
+    S3BasicSessionCredentials(String accessKey, String secretKey, String sessionToken) {
+        super(accessKey, secretKey);
+        this.sessionToken = sessionToken;
+    }
+
+    @Override
+    public String getSessionToken() {
+        return sessionToken;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final S3BasicSessionCredentials that = (S3BasicSessionCredentials) o;
+        return sessionToken.equals(that.sessionToken) &&
+            getAWSAccessKeyId().equals(that.getAWSAccessKeyId()) &&
+            getAWSSecretKey().equals(that.getAWSSecretKey());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sessionToken, getAWSAccessKeyId(), getAWSSecretKey());
+    }
+}

+ 8 - 6
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

@@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.StorageClass;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
@@ -39,8 +40,6 @@ class S3BlobStore implements BlobStore {
 
     private final S3Service service;
 
-    private final String clientName;
-
     private final String bucket;
 
     private final ByteSizeValue bufferSize;
@@ -51,15 +50,18 @@ class S3BlobStore implements BlobStore {
 
     private final StorageClass storageClass;
 
-    S3BlobStore(S3Service service, String clientName, String bucket, boolean serverSideEncryption,
-                ByteSizeValue bufferSize, String cannedACL, String storageClass) {
+    private final RepositoryMetaData repositoryMetaData;
+
+    S3BlobStore(S3Service service, String bucket, boolean serverSideEncryption,
+                ByteSizeValue bufferSize, String cannedACL, String storageClass,
+                RepositoryMetaData repositoryMetaData) {
         this.service = service;
-        this.clientName = clientName;
         this.bucket = bucket;
         this.serverSideEncryption = serverSideEncryption;
         this.bufferSize = bufferSize;
         this.cannedACL = initCannedACL(cannedACL);
         this.storageClass = initStorageClass(storageClass);
+        this.repositoryMetaData = repositoryMetaData;
     }
 
     @Override
@@ -68,7 +70,7 @@ class S3BlobStore implements BlobStore {
     }
 
     public AmazonS3Reference clientReference() {
-        return service.client(clientName);
+        return service.client(repositoryMetaData);
     }
 
     public String bucket() {

+ 96 - 30
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientSettings.java

@@ -21,9 +21,6 @@ package org.elasticsearch.repositories.s3;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.BasicSessionCredentials;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.settings.SecureSetting;
 import org.elasticsearch.common.settings.SecureString;
@@ -36,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -46,6 +44,9 @@ final class S3ClientSettings {
     // prefix for s3 client settings
     private static final String PREFIX = "s3.client.";
 
+    /** Placeholder client name for normalizing client settings in the repository settings. */
+    private static final String PLACEHOLDER_CLIENT = "placeholder";
+
     /** The access key (ie login id) for connecting to s3. */
     static final Setting.AffixSetting<SecureString> ACCESS_KEY_SETTING = Setting.affixKeySetting(PREFIX, "access_key",
         key -> SecureSetting.secureString(key, null));
@@ -95,7 +96,7 @@ final class S3ClientSettings {
         key -> Setting.boolSetting(key, ClientConfiguration.DEFAULT_THROTTLE_RETRIES, Property.NodeScope));
 
     /** Credentials to authenticate with s3. */
-    final AWSCredentials credentials;
+    final S3BasicCredentials credentials;
 
     /** The s3 endpoint the client should talk to, or empty string to use the default. */
     final String endpoint;
@@ -126,7 +127,7 @@ final class S3ClientSettings {
     /** Whether the s3 client should use an exponential backoff retry policy. */
     final boolean throttleRetries;
 
-    protected S3ClientSettings(AWSCredentials credentials, String endpoint, Protocol protocol,
+    private S3ClientSettings(S3BasicCredentials credentials, String endpoint, Protocol protocol,
                              String proxyHost, int proxyPort, String proxyUsername, String proxyPassword,
                              int readTimeoutMillis, int maxRetries, boolean throttleRetries) {
         this.credentials = credentials;
@@ -141,6 +142,51 @@ final class S3ClientSettings {
         this.throttleRetries = throttleRetries;
     }
 
+    /**
+     * Overrides the settings in this instance with settings found in repository metadata.
+     *
+     * @param metadata RepositoryMetaData
+     * @return S3ClientSettings
+     */
+    S3ClientSettings refine(RepositoryMetaData metadata) {
+        final Settings repoSettings = metadata.settings();
+        // Normalize settings to placeholder client settings prefix so that we can use the affix settings directly
+        final Settings normalizedSettings =
+            Settings.builder().put(repoSettings).normalizePrefix(PREFIX + PLACEHOLDER_CLIENT + '.').build();
+        final String newEndpoint = getRepoSettingOrDefault(ENDPOINT_SETTING, normalizedSettings, endpoint);
+
+        final Protocol newProtocol = getRepoSettingOrDefault(PROTOCOL_SETTING, normalizedSettings, protocol);
+        final String newProxyHost = getRepoSettingOrDefault(PROXY_HOST_SETTING, normalizedSettings, proxyHost);
+        final int newProxyPort = getRepoSettingOrDefault(PROXY_PORT_SETTING, normalizedSettings, proxyPort);
+        final int newReadTimeoutMillis = Math.toIntExact(
+            getRepoSettingOrDefault(READ_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(readTimeoutMillis)).millis());
+        final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
+        final boolean newThrottleRetries = getRepoSettingOrDefault(USE_THROTTLE_RETRIES_SETTING, normalizedSettings, throttleRetries);
+        final S3BasicCredentials newCredentials;
+        if (checkDeprecatedCredentials(repoSettings)) {
+            newCredentials = loadDeprecatedCredentials(repoSettings);
+        } else {
+            newCredentials = credentials;
+        }
+        if (Objects.equals(endpoint, newEndpoint) && protocol == newProtocol && Objects.equals(proxyHost, newProxyHost)
+            && proxyPort == newProxyPort && newReadTimeoutMillis == readTimeoutMillis && maxRetries == newMaxRetries
+            && newThrottleRetries == throttleRetries && Objects.equals(credentials, newCredentials)) {
+            return this;
+        }
+        return new S3ClientSettings(
+            newCredentials,
+            newEndpoint,
+            newProtocol,
+            newProxyHost,
+            newProxyPort,
+            proxyUsername,
+            proxyPassword,
+            newReadTimeoutMillis,
+            newMaxRetries,
+            newThrottleRetries
+        );
+    }
+
     /**
      * Load all client settings from the given settings.
      *
@@ -175,24 +221,24 @@ final class S3ClientSettings {
     }
 
     // backcompat for reading keys out of repository settings (clusterState)
-    static BasicAWSCredentials loadDeprecatedCredentials(Settings repositorySettings) {
+    private static S3BasicCredentials loadDeprecatedCredentials(Settings repositorySettings) {
         assert checkDeprecatedCredentials(repositorySettings);
         try (SecureString key = S3Repository.ACCESS_KEY_SETTING.get(repositorySettings);
                 SecureString secret = S3Repository.SECRET_KEY_SETTING.get(repositorySettings)) {
-            return new BasicAWSCredentials(key.toString(), secret.toString());
+            return new S3BasicCredentials(key.toString(), secret.toString());
         }
     }
 
-    static AWSCredentials loadCredentials(Settings settings, String clientName) {
+    private static S3BasicCredentials loadCredentials(Settings settings, String clientName) {
         try (SecureString accessKey = getConfigValue(settings, clientName, ACCESS_KEY_SETTING);
              SecureString secretKey = getConfigValue(settings, clientName, SECRET_KEY_SETTING);
              SecureString sessionToken = getConfigValue(settings, clientName, SESSION_TOKEN_SETTING)) {
             if (accessKey.length() != 0) {
                 if (secretKey.length() != 0) {
                     if (sessionToken.length() != 0) {
-                        return new BasicSessionCredentials(accessKey.toString(), secretKey.toString(), sessionToken.toString());
+                        return new S3BasicSessionCredentials(accessKey.toString(), secretKey.toString(), sessionToken.toString());
                     } else {
-                        return new BasicAWSCredentials(accessKey.toString(), secretKey.toString());
+                        return new S3BasicCredentials(accessKey.toString(), secretKey.toString());
                     }
                 } else {
                     throw new IllegalArgumentException("Missing secret key for s3 client [" + clientName + "]");
@@ -212,34 +258,48 @@ final class S3ClientSettings {
     // pkg private for tests
     /** Parse settings for a single client. */
     static S3ClientSettings getClientSettings(final Settings settings, final String clientName) {
-        final AWSCredentials credentials = S3ClientSettings.loadCredentials(settings, clientName);
-        return getClientSettings(settings, clientName, credentials);
-    }
-
-    static S3ClientSettings getClientSettings(final Settings settings, final String clientName, final AWSCredentials credentials) {
         try (SecureString proxyUsername = getConfigValue(settings, clientName, PROXY_USERNAME_SETTING);
              SecureString proxyPassword = getConfigValue(settings, clientName, PROXY_PASSWORD_SETTING)) {
             return new S3ClientSettings(
-                    credentials,
-                    getConfigValue(settings, clientName, ENDPOINT_SETTING),
-                    getConfigValue(settings, clientName, PROTOCOL_SETTING),
-                    getConfigValue(settings, clientName, PROXY_HOST_SETTING),
-                    getConfigValue(settings, clientName, PROXY_PORT_SETTING),
-                    proxyUsername.toString(),
-                    proxyPassword.toString(),
-                    Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
-                    getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
-                    getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING)
+                S3ClientSettings.loadCredentials(settings, clientName),
+                getConfigValue(settings, clientName, ENDPOINT_SETTING),
+                getConfigValue(settings, clientName, PROTOCOL_SETTING),
+                getConfigValue(settings, clientName, PROXY_HOST_SETTING),
+                getConfigValue(settings, clientName, PROXY_PORT_SETTING),
+                proxyUsername.toString(),
+                proxyPassword.toString(),
+                Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
+                getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
+                getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING)
             );
         }
     }
 
-    static S3ClientSettings getClientSettings(final RepositoryMetaData metadata, final AWSCredentials credentials) {
-        final Settings.Builder builder = Settings.builder();
-        for (final String key : metadata.settings().keySet()) {
-            builder.put(PREFIX + "provided" + "." + key, metadata.settings().get(key));
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
         }
-        return getClientSettings(builder.build(), "provided", credentials);
+        final S3ClientSettings that = (S3ClientSettings) o;
+        return proxyPort == that.proxyPort &&
+            readTimeoutMillis == that.readTimeoutMillis &&
+            maxRetries == that.maxRetries &&
+            throttleRetries == that.throttleRetries &&
+            Objects.equals(credentials, that.credentials) &&
+            Objects.equals(endpoint, that.endpoint) &&
+            protocol == that.protocol &&
+            Objects.equals(proxyHost, that.proxyHost) &&
+            Objects.equals(proxyUsername, that.proxyUsername) &&
+            Objects.equals(proxyPassword, that.proxyPassword);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(credentials, endpoint, protocol, proxyHost, proxyPort, proxyUsername, proxyPassword,
+            readTimeoutMillis, maxRetries, throttleRetries);
     }
 
     private static <T> T getConfigValue(Settings settings, String clientName,
@@ -248,4 +308,10 @@ final class S3ClientSettings {
         return concreteSetting.get(settings);
     }
 
+    private static <T> T getRepoSettingOrDefault(Setting.AffixSetting<T> setting, Settings normalizedSettings, T defaultValue) {
+        if (setting.getConcreteSettingForNamespace(PLACEHOLDER_CLIENT).exists(normalizedSettings)) {
+            return getConfigValue(normalizedSettings, PLACEHOLDER_CLIENT, setting);
+        }
+        return defaultValue;
+    }
 }

+ 4 - 46
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.repositories.s3;
 
-import com.amazonaws.auth.BasicAWSCredentials;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@@ -148,8 +147,6 @@ class S3Repository extends BlobStoreRepository {
      */
     static final Setting<String> BASE_PATH_SETTING = Setting.simpleString("base_path");
 
-    private final Settings settings;
-
     private final S3Service service;
 
     private final String bucket;
@@ -168,9 +165,7 @@ class S3Repository extends BlobStoreRepository {
 
     private final String cannedACL;
 
-    private final String clientName;
-
-    private final AmazonS3Reference reference;
+    private final RepositoryMetaData repositoryMetaData;
 
     /**
      * Constructs an s3 backed repository
@@ -180,9 +175,10 @@ class S3Repository extends BlobStoreRepository {
                  final NamedXContentRegistry namedXContentRegistry,
                  final S3Service service) {
         super(metadata, settings, namedXContentRegistry);
-        this.settings = settings;
         this.service = service;
 
+        this.repositoryMetaData = metadata;
+
         // Parse and validate the user's S3 Storage Class setting
         this.bucket = BUCKET_SETTING.get(metadata.settings());
         if (bucket == null) {
@@ -211,24 +207,10 @@ class S3Repository extends BlobStoreRepository {
         this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
         this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
 
-        this.clientName = CLIENT_NAME.get(metadata.settings());
-
-        if (CLIENT_NAME.exists(metadata.settings()) && S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
-            logger.warn(
-                    "ignoring use of named client [{}] for repository [{}] as insecure credentials were specified",
-                    clientName,
-                    metadata.name());
-        }
-
         if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
             // provided repository settings
             deprecationLogger.deprecated("Using s3 access/secret key from repository settings. Instead "
                     + "store these in named clients and the elasticsearch keystore for secure settings.");
-            final BasicAWSCredentials insecureCredentials = S3ClientSettings.loadDeprecatedCredentials(metadata.settings());
-            final S3ClientSettings s3ClientSettings = S3ClientSettings.getClientSettings(metadata, insecureCredentials);
-            this.reference = new AmazonS3Reference(service.buildClient(s3ClientSettings));
-        } else {
-            reference = null;
         }
 
         logger.debug(
@@ -243,21 +225,7 @@ class S3Repository extends BlobStoreRepository {
 
     @Override
     protected S3BlobStore createBlobStore() {
-        if (reference != null) {
-            assert S3ClientSettings.checkDeprecatedCredentials(metadata.settings()) : metadata.name();
-            return new S3BlobStore(service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass) {
-                @Override
-                public AmazonS3Reference clientReference() {
-                    if (reference.tryIncRef()) {
-                        return reference;
-                    } else {
-                        throw new IllegalStateException("S3 client is closed");
-                    }
-                }
-            };
-        } else {
-            return new S3BlobStore(service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
-        }
+        return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetaData);
     }
 
     // only use for testing
@@ -286,14 +254,4 @@ class S3Repository extends BlobStoreRepository {
     protected ByteSizeValue chunkSize() {
         return chunkSize;
     }
-
-    @Override
-    protected void doClose() {
-        if (reference != null) {
-            assert S3ClientSettings.checkDeprecatedCredentials(metadata.settings()) : metadata.name();
-            reference.decRef();
-        }
-        super.doClose();
-    }
-
 }

+ 73 - 31
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

@@ -22,18 +22,20 @@ package org.elasticsearch.repositories.s3;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.http.IdleConnectionReaper;
-import com.amazonaws.internal.StaticCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.internal.Constants;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.settings.Settings;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -43,56 +45,96 @@ import static java.util.Collections.emptyMap;
 
 
 class S3Service implements Closeable {
-    
     private static final Logger logger = LogManager.getLogger(S3Service.class);
 
-    private volatile Map<String, AmazonS3Reference> clientsCache = emptyMap();
-    private volatile Map<String, S3ClientSettings> clientsSettings = emptyMap();
+    private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();
+
+    /**
+     * Client settings calculated from static configuration and settings in the keystore.
+     */
+    private volatile Map<String, S3ClientSettings> staticClientSettings = MapBuilder.<String, S3ClientSettings>newMapBuilder()
+        .put("default", S3ClientSettings.getClientSettings(Settings.EMPTY, "default")).immutableMap();
+
+    /**
+     * Client settings derived from those in {@link #staticClientSettings} by combining them with settings
+     * in the {@link RepositoryMetaData}.
+     */
+    private volatile Map<S3ClientSettings, Map<RepositoryMetaData, S3ClientSettings>> derivedClientSettings = emptyMap();
 
     /**
      * Refreshes the settings for the AmazonS3 clients and clears the cache of
      * existing clients. New clients will be build using these new settings. Old
      * clients are usable until released. On release they will be destroyed instead
-     * to being returned to the cache.
+     * of being returned to the cache.
      */
-    public synchronized Map<String, S3ClientSettings> refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
+    public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
         // shutdown all unused clients
         // others will shutdown on their respective release
         releaseCachedClients();
-        final Map<String, S3ClientSettings> prevSettings = this.clientsSettings;
-        this.clientsSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
-        assert this.clientsSettings.containsKey("default") : "always at least have 'default'";
-        // clients are built lazily by {@link client(String)}
-        return prevSettings;
+        this.staticClientSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
+        derivedClientSettings = emptyMap();
+        assert this.staticClientSettings.containsKey("default") : "always at least have 'default'";
+        // clients are built lazily by {@link client}
     }
 
     /**
-     * Attempts to retrieve a client by name from the cache. If the client does not
-     * exist it will be created.
+     * Attempts to retrieve a client by its repository metadata and settings from the cache.
+     * If the client does not exist it will be created.
      */
-    public AmazonS3Reference client(String clientName) {
-        AmazonS3Reference clientReference = clientsCache.get(clientName);
-        if ((clientReference != null) && clientReference.tryIncRef()) {
-            return clientReference;
-        }
-        synchronized (this) {
-            clientReference = clientsCache.get(clientName);
-            if ((clientReference != null) && clientReference.tryIncRef()) {
+    public AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
+        final S3ClientSettings clientSettings = settings(repositoryMetaData);
+        {
+            final AmazonS3Reference clientReference = clientsCache.get(clientSettings);
+            if (clientReference != null && clientReference.tryIncRef()) {
                 return clientReference;
             }
-            final S3ClientSettings clientSettings = clientsSettings.get(clientName);
-            if (clientSettings == null) {
-                throw new IllegalArgumentException("Unknown s3 client name [" + clientName + "]. Existing client configs: "
-                        + Strings.collectionToDelimitedString(clientsSettings.keySet(), ","));
+        }
+        synchronized (this) {
+            final AmazonS3Reference existing = clientsCache.get(clientSettings);
+            if (existing != null && existing.tryIncRef()) {
+                return existing;
             }
-            logger.debug("creating S3 client with client_name [{}], endpoint [{}]", clientName, clientSettings.endpoint);
-            clientReference = new AmazonS3Reference(buildClient(clientSettings));
+            final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings));
             clientReference.incRef();
-            clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientReference).immutableMap();
+            clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientSettings, clientReference).immutableMap();
             return clientReference;
         }
     }
 
+    /**
+     * Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetaData} from cached settings or creates them
+     * by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata.
+     * @param repositoryMetaData Repository Metadata
+     * @return S3ClientSettings
+     */
+    private S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
+        final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetaData.settings());
+        final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
+        if (staticSettings != null) {
+            {
+                final S3ClientSettings existing = derivedClientSettings.getOrDefault(staticSettings, emptyMap()).get(repositoryMetaData);
+                if (existing != null) {
+                    return existing;
+                }
+            }
+            synchronized (this) {
+                final Map<RepositoryMetaData, S3ClientSettings> derivedSettings =
+                    derivedClientSettings.getOrDefault(staticSettings, emptyMap());
+                final S3ClientSettings existing = derivedSettings.get(repositoryMetaData);
+                if (existing != null) {
+                    return existing;
+                }
+                final S3ClientSettings newSettings = staticSettings.refine(repositoryMetaData);
+                derivedClientSettings = MapBuilder.newMapBuilder(derivedClientSettings).put(
+                    staticSettings, MapBuilder.newMapBuilder(derivedSettings).put(repositoryMetaData, newSettings).immutableMap()
+                ).immutableMap();
+                return newSettings;
+            }
+        }
+        throw new IllegalArgumentException("Unknown s3 client name [" + clientName + "]. Existing client configs: "
+            + Strings.collectionToDelimitedString(staticClientSettings.keySet(), ","));
+    }
+
     // proxy for testing
     AmazonS3 buildClient(final S3ClientSettings clientSettings) {
         final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
@@ -141,17 +183,17 @@ class S3Service implements Closeable {
 
     // pkg private for tests
     static AWSCredentialsProvider buildCredentials(Logger logger, S3ClientSettings clientSettings) {
-        final AWSCredentials credentials = clientSettings.credentials;
+        final S3BasicCredentials credentials = clientSettings.credentials;
         if (credentials == null) {
             logger.debug("Using instance profile credentials");
             return new PrivilegedInstanceProfileCredentialsProvider();
         } else {
             logger.debug("Using basic key/secret credentials");
-            return new StaticCredentialsProvider(credentials);
+            return new AWSStaticCredentialsProvider(credentials);
         }
     }
 
-    protected synchronized void releaseCachedClients() {
+    private synchronized void releaseCachedClients() {
         // the clients will shutdown when they will not be used anymore
         for (final AmazonS3Reference clientReference : clientsCache.values()) {
             clientReference.decRef();

+ 3 - 3
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AwsS3ServiceImplTests.java

@@ -22,7 +22,7 @@ package org.elasticsearch.repositories.s3;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
 
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -61,7 +61,7 @@ public class AwsS3ServiceImplTests extends ESTestCase {
             final String clientName = clientNamePrefix + i;
             final S3ClientSettings someClientSettings = allClientsSettings.get(clientName);
             final AWSCredentialsProvider credentialsProvider = S3Service.buildCredentials(logger, someClientSettings);
-            assertThat(credentialsProvider, instanceOf(StaticCredentialsProvider.class));
+            assertThat(credentialsProvider, instanceOf(AWSStaticCredentialsProvider.class));
             assertThat(credentialsProvider.getCredentials().getAWSAccessKeyId(), is(clientName + "_aws_access_key"));
             assertThat(credentialsProvider.getCredentials().getAWSSecretKey(), is(clientName + "_aws_secret_key"));
         }
@@ -83,7 +83,7 @@ public class AwsS3ServiceImplTests extends ESTestCase {
         // test default exists and is an Instance provider
         final S3ClientSettings defaultClientSettings = allClientsSettings.get("default");
         final AWSCredentialsProvider defaultCredentialsProvider = S3Service.buildCredentials(logger, defaultClientSettings);
-        assertThat(defaultCredentialsProvider, instanceOf(StaticCredentialsProvider.class));
+        assertThat(defaultCredentialsProvider, instanceOf(AWSStaticCredentialsProvider.class));
         assertThat(defaultCredentialsProvider.getCredentials().getAWSAccessKeyId(), is(awsAccessKey));
         assertThat(defaultCredentialsProvider.getCredentials().getAWSSecretKey(), is(awsSecretKey));
     }

+ 0 - 2
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

@@ -59,7 +59,6 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
 
     private static final ConcurrentMap<String, byte[]> blobs = new ConcurrentHashMap<>();
     private static String bucket;
-    private static String client;
     private static ByteSizeValue bufferSize;
     private static boolean serverSideEncryption;
     private static String cannedACL;
@@ -68,7 +67,6 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
     @BeforeClass
     public static void setUpRepositorySettings() {
         bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
-        client = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
         bufferSize = new ByteSizeValue(randomIntBetween(5, 50), ByteSizeUnit.MB);
         serverSideEncryption = randomBoolean();
         if (randomBoolean()) {

+ 5 - 4
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreTests.java

@@ -22,8 +22,10 @@ package org.elasticsearch.repositories.s3;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.StorageClass;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.BlobStoreException;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.repositories.ESBlobStoreTestCase;
@@ -114,15 +116,14 @@ public class S3BlobStoreTests extends ESBlobStoreTestCase {
             storageClass = randomValueOtherThan(StorageClass.Glacier, () -> randomFrom(StorageClass.values())).toString();
         }
 
-        final String theClientName = randomAlphaOfLength(4);
         final AmazonS3 client = new MockAmazonS3(new ConcurrentHashMap<>(), bucket, serverSideEncryption, cannedACL, storageClass);
         final S3Service service = new S3Service() {
             @Override
-            public synchronized AmazonS3Reference client(String clientName) {
-                assert theClientName.equals(clientName);
+            public synchronized AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
                 return new AmazonS3Reference(client);
             }
         };
-        return new S3BlobStore(service, theClientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
+        return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass,
+            new RepositoryMetaData(bucket, "s3", Settings.EMPTY));
     }
 }

+ 28 - 4
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java

@@ -21,8 +21,7 @@ package org.elasticsearch.repositories.s3;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.BasicSessionCredentials;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
@@ -103,7 +102,7 @@ public class S3ClientSettingsTests extends ESTestCase {
         secureSettings.setString("s3.client.default.secret_key", "secret_key");
         final Map<String, S3ClientSettings> settings = S3ClientSettings.load(Settings.builder().setSecureSettings(secureSettings).build());
         final S3ClientSettings defaultSettings = settings.get("default");
-        BasicAWSCredentials credentials = (BasicAWSCredentials) defaultSettings.credentials;
+        S3BasicCredentials credentials = defaultSettings.credentials;
         assertThat(credentials.getAWSAccessKeyId(), is("access_key"));
         assertThat(credentials.getAWSSecretKey(), is("secret_key"));
     }
@@ -115,9 +114,34 @@ public class S3ClientSettingsTests extends ESTestCase {
         secureSettings.setString("s3.client.default.session_token", "session_token");
         final Map<String, S3ClientSettings> settings = S3ClientSettings.load(Settings.builder().setSecureSettings(secureSettings).build());
         final S3ClientSettings defaultSettings = settings.get("default");
-        BasicSessionCredentials credentials = (BasicSessionCredentials) defaultSettings.credentials;
+        S3BasicSessionCredentials credentials = (S3BasicSessionCredentials) defaultSettings.credentials;
         assertThat(credentials.getAWSAccessKeyId(), is("access_key"));
         assertThat(credentials.getAWSSecretKey(), is("secret_key"));
         assertThat(credentials.getSessionToken(), is("session_token"));
     }
+
+    public void testRefineWithRepoSettings() {
+        final MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("s3.client.default.access_key", "access_key");
+        secureSettings.setString("s3.client.default.secret_key", "secret_key");
+        secureSettings.setString("s3.client.default.session_token", "session_token");
+        final S3ClientSettings baseSettings = S3ClientSettings.load(
+            Settings.builder().setSecureSettings(secureSettings).build()).get("default");
+
+        {
+            final S3ClientSettings refinedSettings = baseSettings.refine(new RepositoryMetaData("name", "type", Settings.EMPTY));
+            assertTrue(refinedSettings == baseSettings);
+        }
+
+        {
+            final String endpoint = "some.host";
+            final S3ClientSettings refinedSettings = baseSettings.refine(new RepositoryMetaData("name", "type",
+                Settings.builder().put("endpoint", endpoint).build()));
+            assertThat(refinedSettings.endpoint, is(endpoint));
+            S3BasicSessionCredentials credentials = (S3BasicSessionCredentials) refinedSettings.credentials;
+            assertThat(credentials.getAWSAccessKeyId(), is("access_key"));
+            assertThat(credentials.getAWSSecretKey(), is("secret_key"));
+            assertThat(credentials.getSessionToken(), is("session_token"));
+        }
+    }
 }

+ 2 - 4
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java

@@ -29,7 +29,6 @@ import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Matchers;
 
-import java.util.Collections;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.containsString;
@@ -49,13 +48,12 @@ public class S3RepositoryTests extends ESTestCase {
 
     private static class DummyS3Service extends S3Service {
         @Override
-        public AmazonS3Reference client(String clientName) {
+        public AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
             return new AmazonS3Reference(new DummyS3Client());
         }
 
         @Override
-        public Map<String, S3ClientSettings> refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
-            return Collections.emptyMap();
+        public void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
         }
 
         @Override

+ 80 - 0
plugins/repository-s3/src/test/resources/rest-api-spec/test/repository_s3/20_repository_permanent_credentials.yml

@@ -29,6 +29,86 @@ setup:
         snapshot: snapshot-two
         ignore: 404
 
+---
+"Try to create repository with broken endpoint override and named client":
+
+  # Register repository with broken endpoint setting
+  - do:
+      catch: /repository_verification_exception/
+      snapshot.create_repository:
+        repository: repository_broken
+        body:
+          type: s3
+          settings:
+            bucket: ${permanent_bucket}
+            client: integration_test_permanent
+            base_path: "${permanent_base_path}"
+            endpoint: 127.0.0.1:5
+            canned_acl: private
+            storage_class: standard
+
+  # Turn of verification to be able to create the repo with broken endpoint setting
+  - do:
+      snapshot.create_repository:
+        verify: false
+        repository: repository_broken
+        body:
+          type: s3
+          settings:
+            bucket: ${permanent_bucket}
+            client: integration_test_permanent
+            base_path: "${permanent_base_path}"
+            endpoint: 127.0.0.1:5
+            canned_acl: private
+            storage_class: standard
+
+  # Index documents
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - index:
+              _index: docs
+              _type:  doc
+              _id:    1
+          - snapshot: one
+          - index:
+              _index: docs
+              _type:  doc
+              _id:    2
+          - snapshot: one
+          - index:
+              _index: docs
+              _type:  doc
+              _id:    3
+          - snapshot: one
+
+  - do:
+      count:
+        index: docs
+
+  - match: {count: 3}
+
+  # Creating snapshot with broken repo should fail
+  - do:
+      catch: /repository_exception/
+      snapshot.create:
+        repository: repository_broken
+        snapshot: snapshot-one
+        wait_for_completion: true
+
+  # Creating snapshot with existing working repository should work
+  - do:
+      snapshot.create:
+        repository: repository_permanent
+        snapshot: snapshot-one
+        wait_for_completion: true
+
+  - match: { snapshot.snapshot: snapshot-one }
+  - match: { snapshot.state: SUCCESS }
+  - match: { snapshot.include_global_state: true }
+  - match: { snapshot.shards.failed: 0 }
+
 ---
 "Snapshot and Restore with repository-s3 using permanent credentials":