123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package stream
- import (
- "context"
- "io"
- "net/http"
- "sync"
- log "github.com/sirupsen/logrus"
- "github.com/imgproxy/imgproxy/v3/cookies"
- "github.com/imgproxy/imgproxy/v3/fetcher"
- "github.com/imgproxy/imgproxy/v3/httpheaders"
- "github.com/imgproxy/imgproxy/v3/ierrors"
- "github.com/imgproxy/imgproxy/v3/monitoring"
- "github.com/imgproxy/imgproxy/v3/monitoring/stats"
- "github.com/imgproxy/imgproxy/v3/options"
- "github.com/imgproxy/imgproxy/v3/server"
- )
- const (
- streamBufferSize = 4096 // Size of the buffer used for streaming
- categoryStreaming = "streaming" // Streaming error category
- )
- var (
- // streamBufPool is a sync.Pool for reusing byte slices used for streaming
- streamBufPool = sync.Pool{
- New: func() any {
- buf := make([]byte, streamBufferSize)
- return &buf
- },
- }
- )
- // Handler handles image passthrough requests, allowing images to be streamed directly
- type Handler struct {
- config *Config // Configuration for the streamer
- fetcher *fetcher.Fetcher // Fetcher instance to handle image fetching
- }
- // request holds the parameters and state for a single streaming request
- type request struct {
- handler *Handler
- imageRequest *http.Request
- imageURL string
- reqID string
- po *options.ProcessingOptions
- rw server.ResponseWriter
- }
- // New creates new handler object
- func New(config *Config, fetcher *fetcher.Fetcher) (*Handler, error) {
- if err := config.Validate(); err != nil {
- return nil, err
- }
- return &Handler{
- fetcher: fetcher,
- config: config,
- }, nil
- }
- // Stream handles the image passthrough request, streaming the image directly to the response writer
- func (s *Handler) Execute(
- ctx context.Context,
- userRequest *http.Request,
- imageURL string,
- reqID string,
- po *options.ProcessingOptions,
- rw server.ResponseWriter,
- ) error {
- stream := &request{
- handler: s,
- imageRequest: userRequest,
- imageURL: imageURL,
- reqID: reqID,
- po: po,
- rw: rw,
- }
- return stream.execute(ctx)
- }
- // execute handles the actual streaming logic
- func (s *request) execute(ctx context.Context) error {
- stats.IncImagesInProgress()
- defer stats.DecImagesInProgress()
- defer monitoring.StartStreamingSegment(ctx)()
- // Passthrough request headers from the original request
- requestHeaders := s.getImageRequestHeaders()
- cookieJar, err := s.getCookieJar()
- if err != nil {
- return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
- }
- // Build the request to fetch the image
- r, err := s.handler.fetcher.BuildRequest(ctx, s.imageURL, requestHeaders, cookieJar)
- if r != nil {
- defer r.Cancel()
- }
- if err != nil {
- return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
- }
- // Send the request to fetch the image
- res, err := r.Send()
- if res != nil {
- defer res.Body.Close()
- }
- if err != nil {
- return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
- }
- // Output streaming response headers
- s.rw.SetOriginHeaders(res.Header)
- s.rw.Passthrough(s.handler.config.PassthroughResponseHeaders...) // NOTE: priority? This is lowest as it was
- s.rw.SetContentLength(int(res.ContentLength))
- s.rw.SetCanonical(s.imageURL)
- s.rw.SetExpires(s.po.Expires)
- // Set the Content-Disposition header
- s.setContentDisposition(r.URL().Path, res)
- // Copy the status code from the original response
- s.rw.WriteHeader(res.StatusCode)
- // Write the actual data
- s.streamData(res)
- return nil
- }
- // getCookieJar returns non-empty cookie jar if cookie passthrough is enabled
- func (s *request) getCookieJar() (http.CookieJar, error) {
- if !s.handler.config.CookiePassthrough {
- return nil, nil
- }
- return cookies.JarFromRequest(s.imageRequest)
- }
- // getImageRequestHeaders returns a new http.Header containing only
- // the headers that should be passed through from the user request
- func (s *request) getImageRequestHeaders() http.Header {
- h := make(http.Header)
- httpheaders.CopyFromRequest(s.imageRequest, h, s.handler.config.PassthroughRequestHeaders)
- return h
- }
- // setContentDisposition writes the headers to the response writer
- func (s *request) setContentDisposition(imagePath string, serverResponse *http.Response) {
- // Try to set correct Content-Disposition file name and extension
- if serverResponse.StatusCode < 200 || serverResponse.StatusCode >= 300 {
- return
- }
- ct := serverResponse.Header.Get(httpheaders.ContentType)
- s.rw.SetContentDisposition(
- imagePath,
- s.po.Filename,
- "",
- ct,
- s.po.ReturnAttachment,
- )
- }
- // streamData copies the image data from the response body to the response writer
- func (s *request) streamData(res *http.Response) {
- buf := streamBufPool.Get().(*[]byte)
- defer streamBufPool.Put(buf)
- _, copyerr := io.CopyBuffer(s.rw, res.Body, *buf)
- server.LogResponse(
- s.reqID, s.imageRequest, res.StatusCode, nil,
- log.Fields{
- "image_url": s.imageURL,
- "processing_options": s.po,
- },
- )
- // We've got to skip logging here
- if copyerr != nil {
- panic(http.ErrAbortHandler)
- }
- }
|