Browse Source

Smart bufPool

DarthSim 6 years ago
parent
commit
b8fc6dc9f0
3 changed files with 117 additions and 39 deletions
  1. 102 24
      bufpool.go
  2. 14 14
      download.go
  3. 1 1
      server.go

+ 102 - 24
bufpool.go

@@ -2,32 +2,50 @@ package main
 
 import (
 	"bytes"
+	"math"
+	"sort"
 	"sync"
 )
 
+type intSlice []int
+
+func (p intSlice) Len() int           { return len(p) }
+func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
+func (p intSlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+
 type bufPool struct {
-	mutex sync.Mutex
-	name  string
-	size  int
-	top   *bufPoolEntry
-}
+	mutex   sync.Mutex
+	name    string
+	size    int
+	buffers []*bytes.Buffer
+
+	calls   intSlice
+	callInd int
 
-type bufPoolEntry struct {
-	buf  *bytes.Buffer
-	next *bufPoolEntry
+	throughput int
 }
 
 func newBufPool(name string, n int, size int) *bufPool {
-	pool := bufPool{name: name, size: size}
+	pool := bufPool{
+		name:    name,
+		size:    size,
+		buffers: make([]*bytes.Buffer, n),
+		calls:   make(intSlice, 1024),
+	}
+
+	for i := range pool.buffers {
+		pool.buffers[i] = pool.new()
+	}
 
-	for i := 0; i < n; i++ {
-		pool.grow()
+	if pool.size > 0 {
+		// Get the real cap
+		pool.size = pool.buffers[0].Cap()
 	}
 
 	return &pool
 }
 
-func (p *bufPool) grow() {
+func (p *bufPool) new() *bytes.Buffer {
 	var buf *bytes.Buffer
 
 	buf = new(bytes.Buffer)
@@ -36,25 +54,68 @@ func (p *bufPool) grow() {
 		buf.Grow(p.size)
 	}
 
-	p.top = &bufPoolEntry{buf: buf, next: p.top}
+	return buf
+}
+
+func (p *bufPool) calibrateAndClean() {
+	var score float64
+
+	sort.Sort(p.calls)
 
-	if prometheusEnabled {
-		incrementBuffersTotal(p.name)
+	pos := 0.95 * float64(p.callInd+1)
+
+	if pos < 1.0 {
+		score = float64(p.calls[0])
+	} else if pos >= float64(p.callInd) {
+		score = float64(p.calls[p.callInd-1])
+	} else {
+		lower := float64(p.calls[int(pos)-1])
+		upper := float64(p.calls[int(pos)])
+		score = lower + (pos-math.Floor(pos))*(upper-lower)
 	}
+
+	p.throughput = int(score)
+	p.callInd = 0
 }
 
-func (p *bufPool) get() *bytes.Buffer {
+func (p *bufPool) get(size int) *bytes.Buffer {
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 
-	if p.top == nil {
-		p.grow()
+	minSize, maxSize, minInd, maxInd := -1, -1, -1, -1
+
+	for i := 0; i < len(p.buffers); i++ {
+		if p.buffers[i] != nil {
+			cap := p.buffers[i].Cap()
+
+			if size > 0 && cap >= size && (minSize > cap || minSize == -1) {
+				minSize = cap
+				minInd = i
+			}
+
+			if cap > maxSize {
+				maxSize = cap
+				maxInd = i
+			}
+		}
 	}
 
-	buf := p.top.buf
-	buf.Reset()
+	var buf *bytes.Buffer
 
-	p.top = p.top.next
+	if minInd >= 0 {
+		// We found buffer with the desired size
+		buf = p.buffers[minInd]
+		p.buffers[minInd] = nil
+	} else if maxInd >= 0 {
+		// We didn't find buffer with the desired size
+		buf = p.buffers[maxInd]
+		p.buffers[maxInd] = nil
+	} else {
+		// We didn't find buffers at all
+		return p.new()
+	}
+
+	buf.Reset()
 
 	return buf
 }
@@ -63,9 +124,26 @@ func (p *bufPool) put(buf *bytes.Buffer) {
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 
-	p.top = &bufPoolEntry{buf: buf, next: p.top}
+	p.calls[p.callInd] = buf.Cap()
+	p.callInd++
+
+	if p.callInd == len(p.calls) {
+		p.calibrateAndClean()
+	}
+
+	if p.throughput > 0 && buf.Cap() > p.throughput {
+		return
+	}
+
+	for i, b := range p.buffers {
+		if b == nil {
+			p.buffers[i] = buf
+
+			if prometheusEnabled {
+				observeBufferSize(p.name, buf.Cap())
+			}
 
-	if prometheusEnabled {
-		observeBufferSize(p.name, buf.Cap())
+			return
+		}
 	}
 }

+ 14 - 14
download.go

@@ -122,11 +122,24 @@ func checkTypeAndDimensions(r io.Reader) (imageType, error) {
 }
 
 func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context, context.CancelFunc, error) {
-	buf := downloadBufPool.get()
+	var contentLength int
+
+	if res.ContentLength > 0 {
+		contentLength = int(res.ContentLength)
+	} else {
+		// ContentLength wasn't set properly, trying to parse the header
+		contentLength, _ = strconv.Atoi(res.Header.Get("Content-Length"))
+	}
+
+	buf := downloadBufPool.get(contentLength)
 	cancel := func() {
 		downloadBufPool.put(buf)
 	}
 
+	if contentLength > buf.Cap() {
+		buf.Grow(contentLength - buf.Len())
+	}
+
 	body := res.Body
 
 	if conf.MaxSrcFileSize > 0 {
@@ -138,19 +151,6 @@ func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context
 		return ctx, cancel, err
 	}
 
-	var contentLength int
-
-	if res.ContentLength > 0 {
-		contentLength = int(res.ContentLength)
-	} else {
-		// ContentLength wasn't set properly, trying to parse the header
-		contentLength, _ = strconv.Atoi(res.Header.Get("Content-Length"))
-	}
-
-	if contentLength > buf.Cap() {
-		buf.Grow(contentLength - buf.Len())
-	}
-
 	if _, err = buf.ReadFrom(body); err != nil {
 		return ctx, cancel, newError(404, err.Error(), msgSourceImageIsUnreachable)
 	}

+ 1 - 1
server.go

@@ -124,7 +124,7 @@ func respondWithImage(ctx context.Context, reqID string, r *http.Request, rw htt
 	addVaryHeader(rw)
 
 	if conf.GZipCompression > 0 && strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
-		buf := responseGzipBufPool.get()
+		buf := responseGzipBufPool.get(0)
 		defer responseGzipBufPool.put(buf)
 
 		gz := responseGzipPool.get(buf)