handler.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package stream
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "sync"
  7. "github.com/imgproxy/imgproxy/v3/cookies"
  8. "github.com/imgproxy/imgproxy/v3/headerwriter"
  9. "github.com/imgproxy/imgproxy/v3/httpheaders"
  10. "github.com/imgproxy/imgproxy/v3/ierrors"
  11. "github.com/imgproxy/imgproxy/v3/imagefetcher"
  12. "github.com/imgproxy/imgproxy/v3/monitoring"
  13. "github.com/imgproxy/imgproxy/v3/monitoring/stats"
  14. "github.com/imgproxy/imgproxy/v3/options"
  15. "github.com/imgproxy/imgproxy/v3/server"
  16. log "github.com/sirupsen/logrus"
  17. )
  18. const (
  19. streamBufferSize = 4096 // Size of the buffer used for streaming
  20. categoryStreaming = "streaming" // Streaming error category
  21. )
  22. var (
  23. // streamBufPool is a sync.Pool for reusing byte slices used for streaming
  24. streamBufPool = sync.Pool{
  25. New: func() any {
  26. buf := make([]byte, streamBufferSize)
  27. return &buf
  28. },
  29. }
  30. )
  31. // Handler handles image passthrough requests, allowing images to be streamed directly
  32. type Handler struct {
  33. fetcher *imagefetcher.Fetcher // Fetcher instance to handle image fetching
  34. config *Config // Configuration for the streamer
  35. hwConfig *headerwriter.Config // Configuration for header writing
  36. }
  37. // request holds the parameters and state for a single streaming request
  38. type request struct {
  39. handler *Handler
  40. imageRequest *http.Request
  41. imageURL string
  42. reqID string
  43. po *options.ProcessingOptions
  44. rw http.ResponseWriter
  45. }
  46. // New creates new handler object
  47. func New(config *Config, hwConfig *headerwriter.Config, fetcher *imagefetcher.Fetcher) *Handler {
  48. return &Handler{
  49. fetcher: fetcher,
  50. config: config,
  51. hwConfig: hwConfig,
  52. }
  53. }
  54. // Stream handles the image passthrough request, streaming the image directly to the response writer
  55. func (s *Handler) Execute(
  56. ctx context.Context,
  57. userRequest *http.Request,
  58. imageURL string,
  59. reqID string,
  60. po *options.ProcessingOptions,
  61. rw http.ResponseWriter,
  62. ) error {
  63. stream := &request{
  64. handler: s,
  65. imageRequest: userRequest,
  66. imageURL: imageURL,
  67. reqID: reqID,
  68. po: po,
  69. rw: rw,
  70. }
  71. return stream.execute(ctx)
  72. }
  73. // execute handles the actual streaming logic
  74. func (s *request) execute(ctx context.Context) error {
  75. stats.IncImagesInProgress()
  76. defer stats.DecImagesInProgress()
  77. defer monitoring.StartStreamingSegment(ctx)()
  78. // Passthrough request headers from the original request
  79. requestHeaders := s.getImageRequestHeaders()
  80. cookieJar, err := s.getCookieJar()
  81. if err != nil {
  82. return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
  83. }
  84. // Build the request to fetch the image
  85. r, err := s.handler.fetcher.BuildRequest(ctx, s.imageURL, requestHeaders, cookieJar)
  86. defer r.Cancel()
  87. if err != nil {
  88. return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
  89. }
  90. // Send the request to fetch the image
  91. res, err := r.Send()
  92. if res != nil {
  93. defer res.Body.Close()
  94. }
  95. if err != nil {
  96. return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
  97. }
  98. // Output streaming response headers
  99. hw := headerwriter.New(s.handler.hwConfig, res.Header, s.imageURL)
  100. hw.Passthrough(s.handler.config.PassthroughResponseHeaders) // NOTE: priority? This is lowest as it was
  101. hw.SetContentLength(int(res.ContentLength))
  102. hw.SetCanonical()
  103. hw.SetExpires(s.po.Expires)
  104. hw.Write(s.rw)
  105. // Write Content-Disposition header
  106. s.writeContentDisposition(r.URL().Path, res)
  107. // Copy the status code from the original response
  108. s.rw.WriteHeader(res.StatusCode)
  109. // Write the actual data
  110. s.streamData(res)
  111. return nil
  112. }
  113. // getCookieJar returns non-empty cookie jar if cookie passthrough is enabled
  114. func (s *request) getCookieJar() (http.CookieJar, error) {
  115. if !s.handler.config.CookiePassthrough {
  116. return nil, nil
  117. }
  118. return cookies.JarFromRequest(s.imageRequest)
  119. }
  120. // getImageRequestHeaders returns a new http.Header containing only
  121. // the headers that should be passed through from the user request
  122. func (s *request) getImageRequestHeaders() http.Header {
  123. h := make(http.Header)
  124. for _, key := range s.handler.config.PassthroughRequestHeaders {
  125. values := s.imageRequest.Header.Values(key)
  126. for _, value := range values {
  127. h.Add(key, value)
  128. }
  129. }
  130. return h
  131. }
  132. // writeContentDisposition writes the headers to the response writer
  133. func (s *request) writeContentDisposition(imagePath string, serverResponse *http.Response) {
  134. // Try to set correct Content-Disposition file name and extension
  135. if serverResponse.StatusCode < 200 || serverResponse.StatusCode >= 300 {
  136. return
  137. }
  138. ct := serverResponse.Header.Get(httpheaders.ContentType)
  139. // Try to best guess the file name and extension
  140. cd := httpheaders.ContentDispositionValue(
  141. imagePath,
  142. s.po.Filename,
  143. "",
  144. ct,
  145. s.po.ReturnAttachment,
  146. )
  147. // Write the Content-Disposition header
  148. s.rw.Header().Set(httpheaders.ContentDisposition, cd)
  149. }
  150. // streamData copies the image data from the response body to the response writer
  151. func (s *request) streamData(res *http.Response) {
  152. buf := streamBufPool.Get().(*[]byte)
  153. defer streamBufPool.Put(buf)
  154. _, copyerr := io.CopyBuffer(s.rw, res.Body, *buf)
  155. server.LogResponse(
  156. s.reqID, s.imageRequest, res.StatusCode, nil,
  157. log.Fields{
  158. "image_url": s.imageURL,
  159. "processing_options": s.po,
  160. },
  161. )
  162. // We've got to skip logging here
  163. if copyerr != nil {
  164. panic(http.ErrAbortHandler)
  165. }
  166. }