gcs.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package gcs
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "strings"
  9. "cloud.google.com/go/storage"
  10. "github.com/pkg/errors"
  11. "google.golang.org/api/option"
  12. raw "google.golang.org/api/storage/v1"
  13. htransport "google.golang.org/api/transport/http"
  14. "github.com/imgproxy/imgproxy/v3/fetcher/transport/common"
  15. "github.com/imgproxy/imgproxy/v3/fetcher/transport/notmodified"
  16. "github.com/imgproxy/imgproxy/v3/httpheaders"
  17. "github.com/imgproxy/imgproxy/v3/httprange"
  18. "github.com/imgproxy/imgproxy/v3/ierrors"
  19. )
  20. // For tests
  21. var noAuth bool = false
  22. type transport struct {
  23. client *storage.Client
  24. qsSeparator string
  25. }
  26. func buildHTTPClient(config *Config, trans *http.Transport, opts ...option.ClientOption) (*http.Client, error) {
  27. if err := config.Validate(); err != nil {
  28. return nil, err
  29. }
  30. htrans, err := htransport.NewTransport(context.Background(), trans, opts...)
  31. if err != nil {
  32. return nil, errors.Wrap(err, "error creating GCS transport")
  33. }
  34. return &http.Client{Transport: htrans}, nil
  35. }
  36. func New(config *Config, trans *http.Transport, sep string) (http.RoundTripper, error) {
  37. var client *storage.Client
  38. opts := []option.ClientOption{
  39. option.WithScopes(raw.DevstorageReadOnlyScope),
  40. }
  41. if len(config.Key) > 0 {
  42. opts = append(opts, option.WithCredentialsJSON([]byte(config.Key)))
  43. }
  44. if len(config.Endpoint) > 0 {
  45. opts = append(opts, option.WithEndpoint(config.Endpoint))
  46. }
  47. if noAuth {
  48. opts = append(opts, option.WithoutAuthentication())
  49. }
  50. httpClient, err := buildHTTPClient(config, trans, opts...)
  51. if err != nil {
  52. return nil, err
  53. }
  54. opts = append(opts, option.WithHTTPClient(httpClient))
  55. client, err = storage.NewClient(context.Background(), opts...)
  56. if err != nil {
  57. return nil, ierrors.Wrap(err, 0, ierrors.WithPrefix("Can't create GCS client"))
  58. }
  59. return transport{client, sep}, nil
  60. }
  61. func (t transport) RoundTrip(req *http.Request) (*http.Response, error) {
  62. bucket, key, query := common.GetBucketAndKey(req.URL, t.qsSeparator)
  63. if len(bucket) == 0 || len(key) == 0 {
  64. body := strings.NewReader("Invalid GCS URL: bucket name or object key is empty")
  65. return &http.Response{
  66. StatusCode: http.StatusNotFound,
  67. Proto: "HTTP/1.0",
  68. ProtoMajor: 1,
  69. ProtoMinor: 0,
  70. Header: http.Header{"Content-Type": {"text/plain"}},
  71. ContentLength: int64(body.Len()),
  72. Body: io.NopCloser(body),
  73. Close: false,
  74. Request: req,
  75. }, nil
  76. }
  77. bkt := t.client.Bucket(bucket)
  78. obj := bkt.Object(key)
  79. if g, err := strconv.ParseInt(query, 10, 64); err == nil && g > 0 {
  80. obj = obj.Generation(g)
  81. }
  82. var (
  83. reader *storage.Reader
  84. statusCode int
  85. size int64
  86. )
  87. header := make(http.Header)
  88. if r := req.Header.Get(httpheaders.Range); len(r) != 0 {
  89. start, end, err := httprange.Parse(r)
  90. if err != nil {
  91. return httprange.InvalidHTTPRangeResponse(req), nil
  92. }
  93. if end != 0 {
  94. length := end - start + 1
  95. if end < 0 {
  96. length = -1
  97. }
  98. reader, err = obj.NewRangeReader(req.Context(), start, length)
  99. if err != nil {
  100. return nil, err
  101. }
  102. if end < 0 || end >= reader.Attrs.Size {
  103. end = reader.Attrs.Size - 1
  104. }
  105. size = end - reader.Attrs.StartOffset + 1
  106. statusCode = http.StatusPartialContent
  107. header.Set(httpheaders.ContentRange, fmt.Sprintf("bytes %d-%d/%d", reader.Attrs.StartOffset, end, reader.Attrs.Size))
  108. }
  109. }
  110. // We haven't initialize reader yet, this means that we need non-ranged reader
  111. if reader == nil {
  112. attrs, aerr := obj.Attrs(req.Context())
  113. if aerr != nil {
  114. return handleError(req, aerr)
  115. }
  116. header.Set(httpheaders.Etag, attrs.Etag)
  117. header.Set(httpheaders.LastModified, attrs.Updated.Format(http.TimeFormat))
  118. if resp := notmodified.Response(req, header); resp != nil {
  119. return resp, nil
  120. }
  121. var err error
  122. reader, err = obj.NewReader(req.Context())
  123. if err != nil {
  124. return handleError(req, err)
  125. }
  126. statusCode = 200
  127. size = reader.Attrs.Size
  128. }
  129. header.Set(httpheaders.AcceptRanges, "bytes")
  130. header.Set(httpheaders.ContentLength, strconv.Itoa(int(size)))
  131. header.Set(httpheaders.ContentType, reader.Attrs.ContentType)
  132. header.Set(httpheaders.CacheControl, reader.Attrs.CacheControl)
  133. return &http.Response{
  134. StatusCode: statusCode,
  135. Proto: "HTTP/1.0",
  136. ProtoMajor: 1,
  137. ProtoMinor: 0,
  138. Header: header,
  139. ContentLength: reader.Attrs.Size,
  140. Body: reader,
  141. Close: true,
  142. Request: req,
  143. }, nil
  144. }
  145. func handleError(req *http.Request, err error) (*http.Response, error) {
  146. if err != storage.ErrBucketNotExist && err != storage.ErrObjectNotExist {
  147. return nil, err
  148. }
  149. return &http.Response{
  150. StatusCode: http.StatusNotFound,
  151. Proto: "HTTP/1.0",
  152. ProtoMajor: 1,
  153. ProtoMinor: 0,
  154. Header: http.Header{httpheaders.ContentType: {"text/plain"}},
  155. ContentLength: int64(len(err.Error())),
  156. Body: io.NopCloser(strings.NewReader(err.Error())),
  157. Close: false,
  158. Request: req,
  159. }, nil
  160. }