瀏覽代碼

Add disable_chunked_encoding Setting to S3 Repo (#44052)

* Add disable_chunked_encoding setting to S3 repo plugin to support S3 implementations that don't support chunked encoding
Jinhu Wu 6 年之前
父節點
當前提交
6d70276af1

+ 11 - 0
docs/plugins/repository-s3.asciidoc

@@ -159,6 +159,17 @@ https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the
 path style access pattern. If your deployment requires the path style access
 path style access pattern. If your deployment requires the path style access
 pattern then you should set this setting to `true` when upgrading.
 pattern then you should set this setting to `true` when upgrading.
 
 
+`disable_chunked_encoding`::
+
+    Whether chunked encoding should be disabled or not. If `false`, chunked
+    encoding is enabled and will be used where appropriate. If `true`, chunked
+    encoding is disabled and will not be used, which may mean that snapshot
+    operations consume more resources and take longer to complete. It should
+    only be set to `true` if you are using a storage service that does not
+    support chunked encoding. See the
+    https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Builder.html#disableChunkedEncoding--[AWS
+    Java SDK documentation] for details. Defaults to `false`.
+
 [float]
 [float]
 [[repository-s3-compatible-services]]
 [[repository-s3-compatible-services]]
 ===== S3-compatible services
 ===== S3-compatible services

+ 6 - 2
plugins/repository-s3/build.gradle

@@ -101,6 +101,8 @@ String s3EC2BasePath = System.getenv("amazon_s3_base_path_ec2")
 String s3ECSBucket = System.getenv("amazon_s3_bucket_ecs")
 String s3ECSBucket = System.getenv("amazon_s3_bucket_ecs")
 String s3ECSBasePath = System.getenv("amazon_s3_base_path_ecs")
 String s3ECSBasePath = System.getenv("amazon_s3_base_path_ecs")
 
 
+boolean s3DisableChunkedEncoding = (new Random(Long.parseUnsignedLong(project.rootProject.testSeed.tokenize(':').get(0), 16))).nextBoolean()
+
 // If all these variables are missing then we are testing against the internal fixture instead, which has the following
 // If all these variables are missing then we are testing against the internal fixture instead, which has the following
 // credentials hard-coded in.
 // credentials hard-coded in.
 
 
@@ -229,7 +231,8 @@ task s3FixtureProperties {
       "s3Fixture.temporary_key"          : s3TemporaryAccessKey,
       "s3Fixture.temporary_key"          : s3TemporaryAccessKey,
       "s3Fixture.temporary_session_token": s3TemporarySessionToken,
       "s3Fixture.temporary_session_token": s3TemporarySessionToken,
       "s3Fixture.ec2_bucket_name"        : s3EC2Bucket,
       "s3Fixture.ec2_bucket_name"        : s3EC2Bucket,
-      "s3Fixture.ecs_bucket_name"        : s3ECSBucket
+      "s3Fixture.ecs_bucket_name"        : s3ECSBucket,
+      "s3Fixture.disableChunkedEncoding" : s3DisableChunkedEncoding
   ]
   ]
 
 
   doLast {
   doLast {
@@ -257,7 +260,8 @@ processTestResources {
           'ec2_bucket': s3EC2Bucket,
           'ec2_bucket': s3EC2Bucket,
           'ec2_base_path': s3EC2BasePath,
           'ec2_base_path': s3EC2BasePath,
           'ecs_bucket': s3ECSBucket,
           'ecs_bucket': s3ECSBucket,
-          'ecs_base_path': s3ECSBasePath
+          'ecs_base_path': s3ECSBasePath,
+          'disable_chunked_encoding': s3DisableChunkedEncoding,
   ]
   ]
   inputs.properties(expansions)
   inputs.properties(expansions)
   MavenFilteringHack.filter(it, expansions)
   MavenFilteringHack.filter(it, expansions)

+ 20 - 6
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientSettings.java

@@ -99,6 +99,10 @@ final class S3ClientSettings {
     static final Setting.AffixSetting<Boolean> USE_PATH_STYLE_ACCESS = Setting.affixKeySetting(PREFIX, "path_style_access",
     static final Setting.AffixSetting<Boolean> USE_PATH_STYLE_ACCESS = Setting.affixKeySetting(PREFIX, "path_style_access",
         key -> Setting.boolSetting(key, false, Property.NodeScope));
         key -> Setting.boolSetting(key, false, Property.NodeScope));
 
 
+    /** Whether chunked encoding should be disabled or not (Default is false). */
+    static final Setting.AffixSetting<Boolean> DISABLE_CHUNKED_ENCODING = Setting.affixKeySetting(PREFIX, "disable_chunked_encoding",
+        key -> Setting.boolSetting(key, false, Property.NodeScope));
+
     /** Credentials to authenticate with s3. */
     /** Credentials to authenticate with s3. */
     final S3BasicCredentials credentials;
     final S3BasicCredentials credentials;
 
 
@@ -134,10 +138,13 @@ final class S3ClientSettings {
     /** Whether the s3 client should use path style access. */
     /** Whether the s3 client should use path style access. */
     final boolean pathStyleAccess;
     final boolean pathStyleAccess;
 
 
+    /** Whether chunked encoding should be disabled or not. */
+    final boolean disableChunkedEncoding;
+
     private S3ClientSettings(S3BasicCredentials credentials, String endpoint, Protocol protocol,
     private S3ClientSettings(S3BasicCredentials credentials, String endpoint, Protocol protocol,
                              String proxyHost, int proxyPort, String proxyUsername, String proxyPassword,
                              String proxyHost, int proxyPort, String proxyUsername, String proxyPassword,
                              int readTimeoutMillis, int maxRetries, boolean throttleRetries,
                              int readTimeoutMillis, int maxRetries, boolean throttleRetries,
-                             boolean pathStyleAccess) {
+                             boolean pathStyleAccess, boolean disableChunkedEncoding) {
         this.credentials = credentials;
         this.credentials = credentials;
         this.endpoint = endpoint;
         this.endpoint = endpoint;
         this.protocol = protocol;
         this.protocol = protocol;
@@ -149,6 +156,7 @@ final class S3ClientSettings {
         this.maxRetries = maxRetries;
         this.maxRetries = maxRetries;
         this.throttleRetries = throttleRetries;
         this.throttleRetries = throttleRetries;
         this.pathStyleAccess = pathStyleAccess;
         this.pathStyleAccess = pathStyleAccess;
+        this.disableChunkedEncoding = disableChunkedEncoding;
     }
     }
 
 
     /**
     /**
@@ -172,6 +180,8 @@ final class S3ClientSettings {
         final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
         final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
         final boolean newThrottleRetries = getRepoSettingOrDefault(USE_THROTTLE_RETRIES_SETTING, normalizedSettings, throttleRetries);
         final boolean newThrottleRetries = getRepoSettingOrDefault(USE_THROTTLE_RETRIES_SETTING, normalizedSettings, throttleRetries);
         final boolean usePathStyleAccess = getRepoSettingOrDefault(USE_PATH_STYLE_ACCESS, normalizedSettings, pathStyleAccess);
         final boolean usePathStyleAccess = getRepoSettingOrDefault(USE_PATH_STYLE_ACCESS, normalizedSettings, pathStyleAccess);
+        final boolean newDisableChunkedEncoding = getRepoSettingOrDefault(
+            DISABLE_CHUNKED_ENCODING, normalizedSettings, disableChunkedEncoding);
         final S3BasicCredentials newCredentials;
         final S3BasicCredentials newCredentials;
         if (checkDeprecatedCredentials(repoSettings)) {
         if (checkDeprecatedCredentials(repoSettings)) {
             newCredentials = loadDeprecatedCredentials(repoSettings);
             newCredentials = loadDeprecatedCredentials(repoSettings);
@@ -180,7 +190,8 @@ final class S3ClientSettings {
         }
         }
         if (Objects.equals(endpoint, newEndpoint) && protocol == newProtocol && Objects.equals(proxyHost, newProxyHost)
         if (Objects.equals(endpoint, newEndpoint) && protocol == newProtocol && Objects.equals(proxyHost, newProxyHost)
             && proxyPort == newProxyPort && newReadTimeoutMillis == readTimeoutMillis && maxRetries == newMaxRetries
             && proxyPort == newProxyPort && newReadTimeoutMillis == readTimeoutMillis && maxRetries == newMaxRetries
-            && newThrottleRetries == throttleRetries && Objects.equals(credentials, newCredentials)) {
+            && newThrottleRetries == throttleRetries && Objects.equals(credentials, newCredentials)
+            && newDisableChunkedEncoding == disableChunkedEncoding) {
             return this;
             return this;
         }
         }
         return new S3ClientSettings(
         return new S3ClientSettings(
@@ -194,7 +205,8 @@ final class S3ClientSettings {
             newReadTimeoutMillis,
             newReadTimeoutMillis,
             newMaxRetries,
             newMaxRetries,
             newThrottleRetries,
             newThrottleRetries,
-            usePathStyleAccess
+            usePathStyleAccess,
+            newDisableChunkedEncoding
         );
         );
     }
     }
 
 
@@ -282,7 +294,8 @@ final class S3ClientSettings {
                 Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
                 Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
                 getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
                 getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
                 getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING),
                 getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING),
-                getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS)
+                getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS),
+                getConfigValue(settings, clientName, DISABLE_CHUNKED_ENCODING)
             );
             );
         }
         }
     }
     }
@@ -305,13 +318,14 @@ final class S3ClientSettings {
             protocol == that.protocol &&
             protocol == that.protocol &&
             Objects.equals(proxyHost, that.proxyHost) &&
             Objects.equals(proxyHost, that.proxyHost) &&
             Objects.equals(proxyUsername, that.proxyUsername) &&
             Objects.equals(proxyUsername, that.proxyUsername) &&
-            Objects.equals(proxyPassword, that.proxyPassword);
+            Objects.equals(proxyPassword, that.proxyPassword) &&
+            Objects.equals(disableChunkedEncoding, that.disableChunkedEncoding);
     }
     }
 
 
     @Override
     @Override
     public int hashCode() {
     public int hashCode() {
         return Objects.hash(credentials, endpoint, protocol, proxyHost, proxyPort, proxyUsername, proxyPassword,
         return Objects.hash(credentials, endpoint, protocol, proxyHost, proxyPort, proxyUsername, proxyPassword,
-            readTimeoutMillis, maxRetries, throttleRetries);
+            readTimeoutMillis, maxRetries, throttleRetries, disableChunkedEncoding);
     }
     }
 
 
     private static <T> T getConfigValue(Settings settings, String clientName,
     private static <T> T getConfigValue(Settings settings, String clientName,

+ 3 - 0
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

@@ -157,6 +157,9 @@ class S3Service implements Closeable {
         if (clientSettings.pathStyleAccess) {
         if (clientSettings.pathStyleAccess) {
             builder.enablePathStyleAccess();
             builder.enablePathStyleAccess();
         }
         }
+        if (clientSettings.disableChunkedEncoding) {
+            builder.disableChunkedEncoding();
+        }
         return builder.build();
         return builder.build();
     }
     }
 
 

+ 23 - 7
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.test.fixture.AbstractHttpFixture;
 import org.elasticsearch.test.fixture.AbstractHttpFixture;
 import com.amazonaws.util.DateUtils;
 import com.amazonaws.util.DateUtils;
+import com.amazonaws.util.IOUtils;
 
 
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.Streams;
@@ -75,6 +76,7 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
     /** Request handlers for the requests made by the S3 client **/
     /** Request handlers for the requests made by the S3 client **/
     private final PathTrie<RequestHandler> handlers;
     private final PathTrie<RequestHandler> handlers;
 
 
+    private final boolean disableChunkedEncoding;
     /**
     /**
      * Creates a {@link AmazonS3Fixture}
      * Creates a {@link AmazonS3Fixture}
      */
      */
@@ -92,6 +94,8 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
             randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10));
             randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10));
 
 
         this.handlers = defaultHandlers(buckets, ec2Bucket, ecsBucket);
         this.handlers = defaultHandlers(buckets, ec2Bucket, ecsBucket);
+
+        this.disableChunkedEncoding = Boolean.parseBoolean(prop(properties, "s3Fixture.disableChunkedEncoding"));
     }
     }
 
 
     private static String nonAuthPath(Request request) {
     private static String nonAuthPath(Request request) {
@@ -216,13 +220,16 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
 
 
                 final String destObjectName = objectName(request.getParameters());
                 final String destObjectName = objectName(request.getParameters());
 
 
-                // This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
-                // to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
-                //
-                // See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
-                //
                 String headerDecodedContentLength = request.getHeader("X-amz-decoded-content-length");
                 String headerDecodedContentLength = request.getHeader("X-amz-decoded-content-length");
                 if (headerDecodedContentLength != null) {
                 if (headerDecodedContentLength != null) {
+                    if (disableChunkedEncoding) {
+                        return newInternalError(request.getId(), "Something is wrong with this PUT request");
+                    }
+                    // This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
+                    // to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
+                    //
+                    // See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
+                    //
                     int contentLength = Integer.valueOf(headerDecodedContentLength);
                     int contentLength = Integer.valueOf(headerDecodedContentLength);
 
 
                     // Chunked requests have a payload like this:
                     // Chunked requests have a payload like this:
@@ -246,9 +253,18 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
                         destBucket.objects.put(destObjectName, bytes);
                         destBucket.objects.put(destObjectName, bytes);
                         return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
                         return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
                     }
                     }
-                }
+                } else {
+                    if (disableChunkedEncoding == false) {
+                        return newInternalError(request.getId(), "Something is wrong with this PUT request");
+                    }
+                    // Read from body directly
+                    try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) {
+                        byte[] bytes = IOUtils.toByteArray(inputStream);
 
 
-                return newInternalError(request.getId(), "Something is wrong with this PUT request");
+                        destBucket.objects.put(destObjectName, bytes);
+                        return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
+                    }
+                }
             })
             })
         );
         );
 
 

+ 7 - 0
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java

@@ -151,4 +151,11 @@ public class S3ClientSettingsTests extends ESTestCase {
         assertThat(settings.get("default").pathStyleAccess, is(false));
         assertThat(settings.get("default").pathStyleAccess, is(false));
         assertThat(settings.get("other").pathStyleAccess, is(true));
         assertThat(settings.get("other").pathStyleAccess, is(true));
     }
     }
+
+    public void testUseChunkedEncodingCanBeSet() {
+        final Map<String, S3ClientSettings> settings = S3ClientSettings.load(
+            Settings.builder().put("s3.client.other.disable_chunked_encoding", true).build());
+        assertThat(settings.get("default").disableChunkedEncoding, is(false));
+        assertThat(settings.get("other").disableChunkedEncoding, is(true));
+    }
 }
 }

+ 1 - 0
plugins/repository-s3/src/test/resources/rest-api-spec/test/repository_s3/20_repository_permanent_credentials.yml

@@ -15,6 +15,7 @@ setup:
             base_path: "${permanent_base_path}"
             base_path: "${permanent_base_path}"
             canned_acl: private
             canned_acl: private
             storage_class: standard
             storage_class: standard
+            disable_chunked_encoding: ${disable_chunked_encoding}
 
 
   # Remove the snapshots, if a previous test failed to delete them. This is
   # Remove the snapshots, if a previous test failed to delete them. This is
   # useful for third party tests that runs the test against a real external service.
   # useful for third party tests that runs the test against a real external service.

+ 1 - 0
plugins/repository-s3/src/test/resources/rest-api-spec/test/repository_s3/30_repository_temporary_credentials.yml

@@ -15,6 +15,7 @@ setup:
             base_path: "${temporary_base_path}"
             base_path: "${temporary_base_path}"
             canned_acl: private
             canned_acl: private
             storage_class: standard
             storage_class: standard
+            disable_chunked_encoding: ${disable_chunked_encoding}
 
 
 ---
 ---
 "Snapshot and Restore with repository-s3 using temporary credentials":
 "Snapshot and Restore with repository-s3 using temporary credentials":

+ 1 - 0
plugins/repository-s3/src/test/resources/rest-api-spec/test/repository_s3/40_repository_ec2_credentials.yml

@@ -15,6 +15,7 @@ setup:
             base_path: "${ec2_base_path}"
             base_path: "${ec2_base_path}"
             canned_acl: private
             canned_acl: private
             storage_class: standard
             storage_class: standard
+            disable_chunked_encoding: ${disable_chunked_encoding}
 
 
 ---
 ---
 "Snapshot and Restore with repository-s3 using ec2 credentials":
 "Snapshot and Restore with repository-s3 using ec2 credentials":

+ 1 - 0
plugins/repository-s3/src/test/resources/rest-api-spec/test/repository_s3/50_repository_ecs_credentials.yml

@@ -15,6 +15,7 @@ setup:
             base_path: "${ecs_base_path}"
             base_path: "${ecs_base_path}"
             canned_acl: private
             canned_acl: private
             storage_class: standard
             storage_class: standard
+            disable_chunked_encoding: ${disable_chunked_encoding}
 
 
 ---
 ---
 "Snapshot and Restore with repository-s3 using ecs credentials":
 "Snapshot and Restore with repository-s3 using ecs credentials":