s3.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package s3
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. s3Crypto "github.com/aws/amazon-s3-encryption-client-go/v3/client"
  12. s3CryptoMaterials "github.com/aws/amazon-s3-encryption-client-go/v3/materials"
  13. "github.com/aws/aws-sdk-go-v2/aws"
  14. awsHttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
  15. awsConfig "github.com/aws/aws-sdk-go-v2/config"
  16. "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
  17. "github.com/aws/aws-sdk-go-v2/service/kms"
  18. "github.com/aws/aws-sdk-go-v2/service/s3"
  19. "github.com/aws/aws-sdk-go-v2/service/sts"
  20. "github.com/imgproxy/imgproxy/v3/fetcher/transport/common"
  21. "github.com/imgproxy/imgproxy/v3/httpheaders"
  22. "github.com/imgproxy/imgproxy/v3/ierrors"
  23. )
  24. type s3Client interface {
  25. GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error)
  26. }
  27. // transport implements RoundTripper for the 's3' protocol.
  28. type transport struct {
  29. clientOptions []func(*s3.Options)
  30. defaultClient s3Client
  31. defaultConfig aws.Config
  32. clientsByRegion map[string]s3Client
  33. clientsByBucket map[string]s3Client
  34. mu sync.RWMutex
  35. config *Config
  36. }
  37. func New(config *Config, trans *http.Transport) (http.RoundTripper, error) {
  38. if err := config.Validate(); err != nil {
  39. return nil, err
  40. }
  41. conf, err := awsConfig.LoadDefaultConfig(context.Background())
  42. if err != nil {
  43. return nil, ierrors.Wrap(err, 0, ierrors.WithPrefix("can't load AWS S3 config"))
  44. }
  45. conf.HTTPClient = &http.Client{Transport: trans}
  46. if len(config.Region) != 0 {
  47. conf.Region = config.Region
  48. }
  49. if len(conf.Region) == 0 {
  50. conf.Region = "us-west-1"
  51. }
  52. if len(config.AssumeRoleArn) != 0 {
  53. creds := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(conf), config.AssumeRoleArn, func(o *stscreds.AssumeRoleOptions) {
  54. if len(config.AssumeRoleExternalID) != 0 {
  55. o.ExternalID = aws.String(config.AssumeRoleExternalID)
  56. }
  57. })
  58. conf.Credentials = creds
  59. }
  60. clientOptions := []func(*s3.Options){
  61. func(o *s3.Options) {
  62. o.DisableLogOutputChecksumValidationSkipped = true
  63. },
  64. }
  65. if len(config.Endpoint) != 0 {
  66. endpoint := config.Endpoint
  67. if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
  68. endpoint = "http://" + endpoint
  69. }
  70. clientOptions = append(clientOptions, func(o *s3.Options) {
  71. o.BaseEndpoint = aws.String(endpoint)
  72. o.UsePathStyle = config.EndpointUsePathStyle
  73. })
  74. }
  75. client, err := createClient(conf, clientOptions, config)
  76. if err != nil {
  77. return nil, ierrors.Wrap(err, 0, ierrors.WithPrefix("can't create S3 client"))
  78. }
  79. return &transport{
  80. clientOptions: clientOptions,
  81. defaultClient: client,
  82. defaultConfig: conf,
  83. clientsByRegion: map[string]s3Client{conf.Region: client},
  84. clientsByBucket: make(map[string]s3Client),
  85. config: config,
  86. }, nil
  87. }
  88. func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
  89. bucket, key, query := common.GetBucketAndKey(req.URL)
  90. if len(bucket) == 0 || len(key) == 0 {
  91. body := strings.NewReader("Invalid S3 URL: bucket name or object key is empty")
  92. return &http.Response{
  93. StatusCode: http.StatusNotFound,
  94. Proto: "HTTP/1.0",
  95. ProtoMajor: 1,
  96. ProtoMinor: 0,
  97. Header: http.Header{httpheaders.ContentType: {"text/plain"}},
  98. ContentLength: int64(body.Len()),
  99. Body: io.NopCloser(body),
  100. Close: false,
  101. Request: req,
  102. }, nil
  103. }
  104. input := &s3.GetObjectInput{
  105. Bucket: aws.String(bucket),
  106. Key: aws.String(key),
  107. }
  108. if len(query) > 0 {
  109. input.VersionId = aws.String(query)
  110. }
  111. statusCode := http.StatusOK
  112. if r := req.Header.Get("Range"); len(r) != 0 {
  113. input.Range = aws.String(r)
  114. } else {
  115. if ifNoneMatch := req.Header.Get("If-None-Match"); len(ifNoneMatch) > 0 {
  116. input.IfNoneMatch = aws.String(ifNoneMatch)
  117. }
  118. if ifModifiedSince := req.Header.Get("If-Modified-Since"); len(ifModifiedSince) > 0 {
  119. parsedIfModifiedSince, err := time.Parse(http.TimeFormat, ifModifiedSince)
  120. if err == nil {
  121. input.IfModifiedSince = &parsedIfModifiedSince
  122. }
  123. }
  124. }
  125. client := t.getBucketClient(bucket)
  126. output, err := client.GetObject(req.Context(), input)
  127. defer func() {
  128. if err != nil && output != nil && output.Body != nil {
  129. output.Body.Close()
  130. }
  131. }()
  132. if err != nil {
  133. // Check if the error is the region mismatch error.
  134. // If so, create a new client with the correct region and retry the request.
  135. if region := regionFromError(err); len(region) != 0 {
  136. client, err = t.createBucketClient(bucket, region)
  137. if err != nil {
  138. return handleError(req, err)
  139. }
  140. output, err = client.GetObject(req.Context(), input)
  141. }
  142. }
  143. if err != nil {
  144. return handleError(req, err)
  145. }
  146. contentLength := int64(-1)
  147. if output.ContentLength != nil {
  148. contentLength = *output.ContentLength
  149. }
  150. if t.config.DecryptionClientEnabled {
  151. if unencryptedContentLength := output.Metadata["X-Amz-Meta-X-Amz-Unencrypted-Content-Length"]; len(unencryptedContentLength) != 0 {
  152. cl, err := strconv.ParseInt(unencryptedContentLength, 10, 64)
  153. if err != nil {
  154. handleError(req, err)
  155. }
  156. contentLength = cl
  157. }
  158. }
  159. header := make(http.Header)
  160. if contentLength > 0 {
  161. header.Set(httpheaders.ContentLength, strconv.FormatInt(contentLength, 10))
  162. }
  163. if output.ContentType != nil {
  164. header.Set(httpheaders.ContentType, *output.ContentType)
  165. }
  166. if output.ContentEncoding != nil {
  167. header.Set(httpheaders.ContentEncoding, *output.ContentEncoding)
  168. }
  169. if output.CacheControl != nil {
  170. header.Set(httpheaders.CacheControl, *output.CacheControl)
  171. }
  172. if output.ExpiresString != nil {
  173. header.Set(httpheaders.Expires, *output.ExpiresString)
  174. }
  175. if output.ETag != nil {
  176. header.Set(httpheaders.Etag, *output.ETag)
  177. }
  178. if output.LastModified != nil {
  179. header.Set(httpheaders.LastModified, output.LastModified.Format(http.TimeFormat))
  180. }
  181. if output.AcceptRanges != nil {
  182. header.Set(httpheaders.AcceptRanges, *output.AcceptRanges)
  183. }
  184. if output.ContentRange != nil {
  185. header.Set(httpheaders.ContentRange, *output.ContentRange)
  186. statusCode = http.StatusPartialContent
  187. }
  188. return &http.Response{
  189. StatusCode: statusCode,
  190. Proto: "HTTP/1.0",
  191. ProtoMajor: 1,
  192. ProtoMinor: 0,
  193. Header: header,
  194. ContentLength: contentLength,
  195. Body: output.Body,
  196. Close: true,
  197. Request: req,
  198. }, nil
  199. }
  200. func (t *transport) getBucketClient(bucket string) s3Client {
  201. var client s3Client
  202. func() {
  203. t.mu.RLock()
  204. defer t.mu.RUnlock()
  205. client = t.clientsByBucket[bucket]
  206. }()
  207. if client != nil {
  208. return client
  209. }
  210. return t.defaultClient
  211. }
  212. func (t *transport) createBucketClient(bucket, region string) (s3Client, error) {
  213. t.mu.Lock()
  214. defer t.mu.Unlock()
  215. // Check again if someone did this before us
  216. if client := t.clientsByBucket[bucket]; client != nil {
  217. return client, nil
  218. }
  219. if client := t.clientsByRegion[region]; client != nil {
  220. t.clientsByBucket[bucket] = client
  221. return client, nil
  222. }
  223. conf := t.defaultConfig.Copy()
  224. conf.Region = region
  225. client, err := createClient(conf, t.clientOptions, t.config)
  226. if err != nil {
  227. return nil, ierrors.Wrap(err, 0, ierrors.WithPrefix("can't create regional S3 client"))
  228. }
  229. t.clientsByRegion[region] = client
  230. t.clientsByBucket[bucket] = client
  231. return client, nil
  232. }
  233. func createClient(conf aws.Config, opts []func(*s3.Options), config *Config) (s3Client, error) {
  234. client := s3.NewFromConfig(conf, opts...)
  235. if config.DecryptionClientEnabled {
  236. kmsClient := kms.NewFromConfig(conf)
  237. keyring := s3CryptoMaterials.NewKmsDecryptOnlyAnyKeyKeyring(kmsClient)
  238. cmm, err := s3CryptoMaterials.NewCryptographicMaterialsManager(keyring)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return s3Crypto.New(client, cmm)
  243. } else {
  244. return client, nil
  245. }
  246. }
  247. func regionFromError(err error) string {
  248. var rerr *awsHttp.ResponseError
  249. if !errors.As(err, &rerr) {
  250. return ""
  251. }
  252. if rerr.Response == nil || rerr.Response.StatusCode != 301 {
  253. return ""
  254. }
  255. return rerr.Response.Header.Get("X-Amz-Bucket-Region")
  256. }
  257. func handleError(req *http.Request, err error) (*http.Response, error) {
  258. var rerr *awsHttp.ResponseError
  259. if !errors.As(err, &rerr) {
  260. return nil, ierrors.Wrap(err, 0)
  261. }
  262. if rerr.Response == nil || rerr.Response.StatusCode < 100 || rerr.Response.StatusCode == 301 {
  263. return nil, ierrors.Wrap(err, 0)
  264. }
  265. return &http.Response{
  266. StatusCode: rerr.Response.StatusCode,
  267. Proto: "HTTP/1.0",
  268. ProtoMajor: 1,
  269. ProtoMinor: 0,
  270. Header: http.Header{"Content-Type": {"text/plain"}},
  271. ContentLength: int64(len(err.Error())),
  272. Body: io.NopCloser(strings.NewReader(err.Error())),
  273. Close: false,
  274. Request: req,
  275. }, nil
  276. }