Prechádzať zdrojové kódy

Optimized memory usage; Reducing memory fragmentation

DarthSim 6 rokov pred
rodič
commit
f4f746489c
9 zmenil súbory, kde vykonal 206 pridanie a 59 odobranie
  1. 62 0
      bufpool.go
  2. 25 0
      config.go
  3. 5 9
      download.go
  4. 0 26
      gzip.go
  5. 63 0
      gzippool.go
  6. 17 0
      main.go
  7. 17 10
      process.go
  8. 16 13
      server.go
  9. 1 1
      vips.h

+ 62 - 0
bufpool.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"bytes"
+	"sync"
+)
+
+type bufPool struct {
+	mutex sync.Mutex
+	size  int
+	top   *bufPoolEntry
+}
+
+type bufPoolEntry struct {
+	buf  *bytes.Buffer
+	next *bufPoolEntry
+}
+
+func newBufPool(n int, size int) *bufPool {
+	pool := bufPool{size: size}
+
+	for i := 0; i < n; i++ {
+		pool.grow()
+	}
+
+	return &pool
+}
+
+func (p *bufPool) grow() {
+	var buf *bytes.Buffer
+
+	if p.size == 0 {
+		buf = new(bytes.Buffer)
+	} else {
+		buf = bytes.NewBuffer(make([]byte, p.size, p.size))
+	}
+
+	p.top = &bufPoolEntry{buf: buf, next: p.top}
+}
+
+func (p *bufPool) get() *bytes.Buffer {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	if p.top == nil {
+		p.grow()
+	}
+
+	buf := p.top.buf
+	buf.Reset()
+
+	p.top = p.top.next
+
+	return buf
+}
+
+func (p *bufPool) put(buf *bytes.Buffer) {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	p.top = &bufPoolEntry{buf: buf, next: p.top}
+}

+ 25 - 0
config.go

@@ -193,6 +193,10 @@ type config struct {
 	SentryDSN         string
 	SentryEnvironment string
 	SentryRelease     string
+
+	FreeMemoryInterval int
+	DownloadBufferSize int
+	GZipBufferSize     int
 }
 
 var conf = config{
@@ -217,6 +221,7 @@ var conf = config{
 	HoneybadgerEnv:        "production",
 	SentryEnvironment:     "production",
 	SentryRelease:         fmt.Sprintf("imgproxy/%s", version),
+	FreeMemoryInterval:    10,
 }
 
 func init() {
@@ -308,6 +313,10 @@ func init() {
 	strEnvConfig(&conf.SentryEnvironment, "IMGPROXY_SENTRY_ENVIRONMENT")
 	strEnvConfig(&conf.SentryRelease, "IMGPROXY_SENTRY_RELEASE")
 
+	intEnvConfig(&conf.FreeMemoryInterval, "IMGPROXY_FREE_MEMORY_INTERVAL")
+	intEnvConfig(&conf.DownloadBufferSize, "IMGPROXY_DOWNLOAD_BUFFER_SIZE")
+	intEnvConfig(&conf.GZipBufferSize, "IMGPROXY_GZIP_BUFFER_SIZE")
+
 	if len(conf.Keys) != len(conf.Salts) {
 		logFatal("Number of keys and number of salts should be equal. Keys: %d, salts: %d", len(conf.Keys), len(conf.Salts))
 	}
@@ -410,6 +419,22 @@ func init() {
 		logFatal("Can't use the same binding for the main server and Prometheus")
 	}
 
+	if conf.FreeMemoryInterval <= 0 {
+		logFatal("Free memory interval should be greater than zero")
+	}
+
+	if conf.DownloadBufferSize < 0 {
+		logFatal("Download buffer size should be greater than or quual to 0")
+	} else if conf.DownloadBufferSize > int(^uint32(0)) {
+		logFatal("Download buffer size can't be creater than %d", ^uint32(0))
+	}
+
+	if conf.GZipBufferSize < 0 {
+		logFatal("GZip buffer size should be greater than or quual to 0")
+	} else if conf.GZipBufferSize > int(^uint32(0)) {
+		logFatal("GZip buffer size can't be creater than %d", ^uint32(0))
+	}
+
 	initDownloading()
 	initNewrelic()
 	initPrometheus()

+ 5 - 9
download.go

@@ -9,7 +9,6 @@ import (
 	"io"
 	"io/ioutil"
 	"net/http"
-	"sync"
 	"time"
 
 	_ "image/gif"
@@ -31,11 +30,7 @@ var (
 
 const msgSourceImageIsUnreachable = "Source image is unreachable"
 
-var downloadBufPool = sync.Pool{
-	New: func() interface{} {
-		return new(bytes.Buffer)
-	},
-}
+var downloadBufPool *bufPool
 
 func initDownloading() {
 	transport := &http.Transport{
@@ -62,6 +57,8 @@ func initDownloading() {
 		Timeout:   time.Duration(conf.DownloadTimeout) * time.Second,
 		Transport: transport,
 	}
+
+	downloadBufPool = newBufPool(conf.Concurrency, conf.DownloadBufferSize)
 }
 
 func checkDimensions(width, height int) error {
@@ -95,10 +92,9 @@ func checkTypeAndDimensions(r io.Reader) (imageType, error) {
 }
 
 func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context, context.CancelFunc, error) {
-	buf := downloadBufPool.Get().(*bytes.Buffer)
+	buf := downloadBufPool.get()
 	cancel := func() {
-		buf.Reset()
-		downloadBufPool.Put(buf)
+		downloadBufPool.put(buf)
 	}
 
 	imgtype, err := checkTypeAndDimensions(io.TeeReader(res.Body, buf))

+ 0 - 26
gzip.go

@@ -1,26 +0,0 @@
-package main
-
-import (
-	"compress/gzip"
-	"io"
-	"os"
-	"sync"
-)
-
-var nullwriter, _ = os.Open("/dev/null")
-
-var gzipPool = sync.Pool{
-	New: func() interface{} {
-		gz, _ := gzip.NewWriterLevel(nullwriter, conf.GZipCompression)
-		return gz
-	},
-}
-
-func gzipData(data []byte, w io.Writer) {
-	gz := gzipPool.Get().(*gzip.Writer)
-	defer gzipPool.Put(gz)
-
-	gz.Reset(w)
-	gz.Write(data)
-	gz.Close()
-}

+ 63 - 0
gzippool.go

@@ -0,0 +1,63 @@
+package main
+
+import (
+	"compress/gzip"
+	"io"
+	"io/ioutil"
+	"sync"
+)
+
+type gzipPool struct {
+	mutex sync.Mutex
+	top   *gzipPoolEntry
+}
+
+type gzipPoolEntry struct {
+	gz   *gzip.Writer
+	next *gzipPoolEntry
+}
+
+func newGzipPool(n int) *gzipPool {
+	pool := new(gzipPool)
+
+	for i := 0; i < n; i++ {
+		pool.grow()
+	}
+
+	return pool
+}
+
+func (p *gzipPool) grow() {
+	gz, err := gzip.NewWriterLevel(ioutil.Discard, conf.GZipCompression)
+	if err != nil {
+		logFatal("Can't init GZip compression: %s", err)
+	}
+
+	p.top = &gzipPoolEntry{
+		gz:   gz,
+		next: p.top,
+	}
+}
+
+func (p *gzipPool) get(w io.Writer) *gzip.Writer {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	if p.top == nil {
+		p.grow()
+	}
+
+	gz := p.top.gz
+	gz.Reset(w)
+
+	p.top = p.top.next
+
+	return gz
+}
+
+func (p *gzipPool) put(gz *gzip.Writer) {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	p.top = &gzipPoolEntry{gz: gz, next: p.top}
+}

+ 17 - 0
main.go

@@ -3,6 +3,9 @@ package main
 import (
 	"os"
 	"os/signal"
+	"runtime"
+	"runtime/debug"
+	"time"
 
 	"net/http"
 	_ "net/http/pprof"
@@ -13,6 +16,20 @@ const version = "2.1.5"
 type ctxKey string
 
 func main() {
+	go func() {
+		var logMemStats = len(os.Getenv("IMGPROXY_LOG_MEM_STATS")) > 0
+
+		for range time.Tick(time.Duration(conf.FreeMemoryInterval) * time.Second) {
+			debug.FreeOSMemory()
+
+			if logMemStats {
+				var m runtime.MemStats
+				runtime.ReadMemStats(&m)
+				logNotice("[MEMORY USAGE] Sys: %d; HeapIdle: %d; HeapInuse: %d", m.Sys/1024/1024, m.HeapIdle/1024/1024, m.HeapInuse/1024/1024)
+			}
+		}
+	}()
+
 	if len(os.Getenv("IMGPROXY_PPROF_BIND")) > 0 {
 		go func() {
 			http.ListenAndServe(os.Getenv("IMGPROXY_PPROF_BIND"), nil)

+ 17 - 10
process.go

@@ -491,7 +491,7 @@ func transformGif(ctx context.Context, img **C.struct__VipsImage, po *processing
 	return nil
 }
 
-func processImage(ctx context.Context) ([]byte, error) {
+func processImage(ctx context.Context) ([]byte, context.CancelFunc, error) {
 	runtime.LockOSThread()
 	defer runtime.UnlockOSThread()
 
@@ -511,7 +511,7 @@ func processImage(ctx context.Context) ([]byte, error) {
 	imgtype := getImageType(ctx)
 
 	if po.Gravity.Type == gravitySmart && !vipsSupportSmartcrop {
-		return nil, errSmartCropNotSupported
+		return nil, func() {}, errSmartCropNotSupported
 	}
 
 	if po.Format == imageTypeUnknown {
@@ -524,17 +524,17 @@ func processImage(ctx context.Context) ([]byte, error) {
 
 	img, err := vipsLoadImage(data, imgtype, 1, 1.0, po.Format == imageTypeGIF)
 	if err != nil {
-		return nil, err
+		return nil, func() {}, err
 	}
 	defer C.clear_image(&img)
 
 	if imgtype == imageTypeGIF && po.Format == imageTypeGIF && vipsIsAnimatedGif(img) {
 		if err := transformGif(ctx, &img, po); err != nil {
-			return nil, err
+			return nil, func() {}, err
 		}
 	} else {
 		if err := transformImage(ctx, &img, data, po, imgtype); err != nil {
-			return nil, err
+			return nil, func() {}, err
 		}
 	}
 
@@ -542,7 +542,7 @@ func processImage(ctx context.Context) ([]byte, error) {
 
 	if po.Format == imageTypeGIF {
 		if err := vipsCastUchar(&img); err != nil {
-			return nil, err
+			return nil, func() {}, err
 		}
 		checkTimeout(ctx)
 	}
@@ -645,9 +645,8 @@ func vipsLoadImage(data []byte, imgtype imageType, shrink int, svgScale float64,
 	return img, nil
 }
 
-func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([]byte, error) {
+func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([]byte, context.CancelFunc, error) {
 	var ptr unsafe.Pointer
-	defer C.g_free_go(&ptr)
 
 	err := C.int(0)
 
@@ -669,10 +668,18 @@ func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([]
 		err = C.vips_icosave_go(img, &ptr, &imgsize)
 	}
 	if err != 0 {
-		return nil, vipsError()
+		return nil, func() {}, vipsError()
+	}
+
+	const maxBufSize = ^uint32(0)
+
+	b := (*[maxBufSize]byte)(ptr)[:int(imgsize):int(imgsize)]
+
+	cancel := func() {
+		C.g_free_go(&ptr)
 	}
 
-	return C.GoBytes(ptr, C.int(imgsize)), nil
+	return b, cancel, nil
 }
 
 func vipsArrayjoin(in []*C.struct__VipsImage, out **C.struct__VipsImage) error {

+ 16 - 13
server.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"bytes"
 	"context"
 	"crypto/subtle"
 	"fmt"
@@ -11,7 +10,6 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	nanoid "github.com/matoous/go-nanoid"
@@ -46,13 +44,10 @@ var (
 
 	errInvalidMethod = newError(422, "Invalid request method", "Method doesn't allowed")
 	errInvalidSecret = newError(403, "Invalid secret", "Forbidden")
-)
 
-var responseBufPool = sync.Pool{
-	New: func() interface{} {
-		return new(bytes.Buffer)
-	},
-}
+	responseGzipBufPool *bufPool
+	responseGzipPool    *gzipPool
+)
 
 type httpHandler struct {
 	sem chan struct{}
@@ -73,6 +68,11 @@ func startServer() *http.Server {
 		MaxHeaderBytes: 1 << 20,
 	}
 
+	if conf.GZipCompression > 0 {
+		responseGzipBufPool = newBufPool(conf.Concurrency, conf.GZipBufferSize)
+		responseGzipPool = newGzipPool(conf.Concurrency)
+	}
+
 	go func() {
 		logNotice("Starting server at %s", conf.Bind)
 		if err := s.Serve(netutil.LimitListener(l, conf.MaxClients)); err != nil && err != http.ErrServerClosed {
@@ -122,12 +122,14 @@ func respondWithImage(ctx context.Context, reqID string, r *http.Request, rw htt
 	rw.Header().Set("Content-Disposition", contentDisposition(getImageURL(ctx), po.Format))
 
 	if conf.GZipCompression > 0 && strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
-		buf := responseBufPool.Get().(*bytes.Buffer)
-		defer responseBufPool.Put(buf)
+		buf := responseGzipBufPool.get()
+		defer responseGzipBufPool.put(buf)
 
-		buf.Reset()
+		gz := responseGzipPool.get(buf)
+		defer responseGzipPool.put(gz)
 
-		gzipData(data, buf)
+		gz.Write(data)
+		gz.Close()
 
 		rw.Header().Set("Content-Encoding", "gzip")
 		rw.Header().Set("Content-Length", strconv.Itoa(buf.Len()))
@@ -279,7 +281,8 @@ func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 
 	checkTimeout(ctx)
 
-	imageData, err := processImage(ctx)
+	imageData, processcancel, err := processImage(ctx)
+	defer processcancel()
 	if err != nil {
 		if newRelicEnabled {
 			sendErrorToNewRelic(ctx, err)

+ 1 - 1
vips.h

@@ -325,6 +325,6 @@ vips_icosave_go(VipsImage *in, void **buf, size_t *len) {
 
 void
 vips_cleanup() {
-  vips_thread_shutdown();
   vips_error_clear();
+  vips_thread_shutdown();
 }