1
0
DarthSim 2 жил өмнө
parent
commit
f8ee5826c5
3 өөрчлөгдсөн 242 нэмэгдсэн , 69 устгасан
  1. 2 0
      CHANGELOG.md
  2. 190 69
      bufpool/bufpool.go
  3. 50 0
      bufpool/bufpool_test.go

+ 2 - 0
CHANGELOG.md

@@ -1,6 +1,8 @@
 # Changelog
 
 ## [Unreleased]
+### Change
+- Optimized memory buffers pooling for better performance and memory reusage.
 
 ## [3.16.1] - 2023-04-26
 ### Fix

+ 190 - 69
bufpool/bufpool.go

@@ -1,118 +1,220 @@
 package bufpool
 
+// Based on https://github.com/valyala/bytebufferpool ideas
+
 import (
 	"bytes"
 	"runtime"
-	"sort"
 	"sync"
+	"sync/atomic"
 
 	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/imath"
 	"github.com/imgproxy/imgproxy/v3/metrics"
 )
 
-type intSlice []int
+const (
+	minBitSize = 6 // 2**6=64 is min bytes.Buffer capacity
+	steps      = 20
+
+	minSize = 1 << minBitSize
+)
 
-func (p intSlice) Len() int           { return len(p) }
-func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
-func (p intSlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+var entriesPool = sync.Pool{
+	New: func() any {
+		return new(entry)
+	},
+}
+
+type entry struct {
+	buf        *bytes.Buffer
+	prev, next *entry
+}
 
 type Pool struct {
 	name        string
 	defaultSize int
-	maxSize     int
-	buffers     []*bytes.Buffer
+	maxSize     uint64
+	root        *entry
+
+	maxLen int
 
-	calls   intSlice
-	callInd int
+	calls    [steps]uint64
+	tmpCalls [steps]uint64
+	callsNum uint64
 
-	mutex sync.Mutex
+	storeMu       sync.Mutex
+	calibratingMu sync.Mutex
 }
 
 func New(name string, n int, defaultSize int) *Pool {
 	pool := Pool{
 		name:        name,
 		defaultSize: defaultSize,
-		buffers:     make([]*bytes.Buffer, n),
-		calls:       make(intSlice, config.BufferPoolCalibrationThreshold),
+		root:        &entry{},
+		maxLen:      n,
 	}
 
-	for i := range pool.buffers {
-		pool.buffers[i] = new(bytes.Buffer)
+	return &pool
+}
+
+func (p *Pool) insert(buf *bytes.Buffer) {
+	e := entriesPool.Get().(*entry)
+	e.buf = buf
+	e.next = p.root.next
+	e.prev = p.root
+
+	p.root.next = e
+}
+
+func (p *Pool) remove(e *entry) {
+	if e.next != nil {
+		e.next.prev = e.prev
 	}
 
-	return &pool
+	e.prev.next = e.next
+
+	saveEntry(e)
 }
 
 func (p *Pool) calibrateAndClean() {
-	sort.Sort(p.calls)
+	if !p.calibratingMu.TryLock() {
+		return
+	}
+	defer p.calibratingMu.Unlock()
 
-	pos := int(float64(len(p.calls)) * 0.95)
-	score := p.calls[pos]
+	var callsSum uint64
+	for i := 0; i < steps; i++ {
+		calls := atomic.SwapUint64(&p.calls[i], 0)
+		callsSum += calls
+		p.tmpCalls[i] = calls
+	}
 
-	p.callInd = 0
-	p.maxSize = p.normalizeSize(score)
+	if callsSum < uint64(config.BufferPoolCalibrationThreshold) {
+		return
+	}
 
-	p.defaultSize = imath.Max(p.defaultSize, p.calls[0])
-	p.maxSize = imath.Max(p.defaultSize, p.maxSize)
+	atomic.StoreUint64(&p.callsNum, 0)
+
+	defSum := uint64(float64(callsSum) * 0.5)
+	maxSum := uint64(float64(callsSum) * 0.95)
+
+	defStep := -1
+	maxStep := -1
+
+	callsSum = 0
+	for i := 0; i < steps; i++ {
+		callsSum += p.tmpCalls[i]
+
+		if callsSum > defSum || defStep < 0 {
+			defStep = i
+		}
+
+		if callsSum > maxSum {
+			maxStep = i
+			break
+		}
+	}
+
+	p.defaultSize = minSize << defStep
+	p.maxSize = minSize << maxStep
+
+	maxSize := int(p.maxSize)
+
+	metrics.SetBufferDefaultSize(p.name, p.defaultSize)
+	metrics.SetBufferMaxSize(p.name, maxSize)
+
+	p.storeMu.Lock()
+	storeUnlocked := false
+	defer func() {
+		if !storeUnlocked {
+			p.storeMu.Unlock()
+		}
+	}()
 
 	cleaned := false
+	last := p.root
+
+	poolLen := 0
+
+	for entry := p.root.next; entry != nil; entry = last.next {
+		if poolLen >= p.maxLen || entry.buf.Cap() > maxSize {
+			last.next = entry.next
+			saveEntry(entry)
 
-	for i, buf := range p.buffers {
-		if buf != nil && buf.Cap() > p.maxSize {
-			p.buffers[i] = nil
 			cleaned = true
+		} else {
+			last.next = entry
+			entry.prev = last
+			last = entry
+
+			poolLen++
 		}
 	}
 
+	// early unlock
+	p.storeMu.Unlock()
+	storeUnlocked = true
+
 	if cleaned {
 		runtime.GC()
 	}
-
-	metrics.SetBufferDefaultSize(p.name, p.defaultSize)
-	metrics.SetBufferMaxSize(p.name, p.maxSize)
 }
 
 func (p *Pool) Get(size int, grow bool) *bytes.Buffer {
-	p.mutex.Lock()
-	defer p.mutex.Unlock()
+	p.storeMu.Lock()
+	storeUnlocked := false
+	defer func() {
+		if !storeUnlocked {
+			p.storeMu.Unlock()
+		}
+	}()
 
-	size = p.normalizeSize(size)
+	best := (*entry)(nil)
+	bestCap := -1
 
-	minSize, maxSize, minInd, maxInd := -1, -1, -1, -1
+	min := (*entry)(nil)
+	minCap := -1
 
-	for i := 0; i < len(p.buffers); i++ {
-		if p.buffers[i] != nil {
-			cap := p.buffers[i].Cap()
+	for entry := p.root.next; entry != nil; entry = entry.next {
+		cap := entry.buf.Cap()
 
-			if size > 0 && cap >= size && (minSize > cap || minSize == -1) {
-				minSize = cap
-				minInd = i
+		if size > 0 {
+			// If we know the required size, pick a buffer with the smallest size
+			// that is larger than the requested size
+			if cap >= size && (bestCap > cap || best == nil) {
+				best = entry
+				bestCap = cap
 			}
 
-			if cap > maxSize {
-				maxSize = cap
-				maxInd = i
+			if cap < minCap || minCap == -1 {
+				min = entry
+				minCap = cap
 			}
+		} else if cap > bestCap {
+			// If we don't know the requested size, pick a largest buffer
+			best = entry
+			bestCap = cap
 		}
 	}
 
 	var buf *bytes.Buffer
 
 	switch {
-	case minInd >= 0:
-		// We found buffer with the desired size
-		buf = p.buffers[minInd]
-		p.buffers[minInd] = nil
-	case maxInd >= 0:
-		// We didn't find buffer with the desired size
-		buf = p.buffers[maxInd]
-		p.buffers[maxInd] = nil
+	case best != nil:
+		buf = best.buf
+		p.remove(best)
+	case min != nil:
+		buf = min.buf
+		p.remove(min)
 	default:
-		// We didn't find buffers at all
 		buf = new(bytes.Buffer)
 	}
 
+	// early unlock
+	p.storeMu.Unlock()
+	storeUnlocked = true
+
 	buf.Reset()
 
 	growSize := p.defaultSize
@@ -120,7 +222,9 @@ func (p *Pool) Get(size int, grow bool) *bytes.Buffer {
 		growSize = imath.Max(size, growSize)
 	}
 
-	if growSize > buf.Cap() {
+	// Grow the buffer only if we know the requested size and it is smaller than
+	// or equal to the grow size. Otherwise we'll grow the buffer twice
+	if size > 0 && size <= growSize && growSize > buf.Cap() {
 		buf.Grow(growSize)
 	}
 
@@ -128,35 +232,52 @@ func (p *Pool) Get(size int, grow bool) *bytes.Buffer {
 }
 
 func (p *Pool) Put(buf *bytes.Buffer) {
-	p.mutex.Lock()
-	defer p.mutex.Unlock()
+	bufLen := buf.Len()
+	bufCap := buf.Cap()
 
-	if buf.Len() > 0 {
-		p.calls[p.callInd] = buf.Len()
-		p.callInd++
+	if bufLen > 0 {
+		ind := index(bufLen)
 
-		if p.callInd == len(p.calls) {
+		atomic.AddUint64(&p.calls[ind], 1)
+
+		if atomic.AddUint64(&p.callsNum, 1) >= uint64(config.BufferPoolCalibrationThreshold) {
 			p.calibrateAndClean()
 		}
 	}
 
-	if p.maxSize > 0 && buf.Cap() > p.maxSize {
+	size := buf.Cap()
+	maxSize := int(atomic.LoadUint64(&p.maxSize))
+	if maxSize > 0 && size > maxSize {
 		return
 	}
 
-	for i, b := range p.buffers {
-		if b == nil {
-			p.buffers[i] = buf
+	if bufLen > 0 {
+		metrics.ObserveBufferSize(p.name, bufCap)
+	}
 
-			if buf.Len() > 0 {
-				metrics.ObserveBufferSize(p.name, buf.Cap())
-			}
+	p.storeMu.Lock()
+	defer p.storeMu.Unlock()
 
-			return
-		}
-	}
+	p.insert(buf)
+}
+
+func saveEntry(e *entry) {
+	e.buf = nil
+	e.next = nil
+	e.prev = nil
+	entriesPool.Put(e)
 }
 
-func (p *Pool) normalizeSize(n int) int {
-	return (n/bytes.MinRead + 2) * bytes.MinRead
+func index(n int) int {
+	n--
+	n >>= minBitSize
+	idx := 0
+	for n > 0 {
+		n >>= 1
+		idx++
+	}
+	if idx >= steps {
+		idx = steps - 1
+	}
+	return idx
 }

+ 50 - 0
bufpool/bufpool_test.go

@@ -0,0 +1,50 @@
+package bufpool
+
+import (
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/imgproxy/imgproxy/v3/config"
+)
+
+var (
+	testData     [][]byte
+	testDataOnce sync.Once
+	testMu       sync.Mutex
+)
+
+func initTestData() {
+	testData = make([][]byte, 1000)
+	for i := 6; i < 1000; i++ {
+		testData[i] = make([]byte, i*1271)
+	}
+
+	rand.Seed(time.Now().UnixNano())
+	rand.Shuffle(len(testData), func(i, j int) { testData[i], testData[j] = testData[j], testData[i] })
+}
+
+func BenchmarkBufpool(b *testing.B) {
+	testMu.Lock()
+	defer testMu.Unlock()
+
+	config.Reset()
+
+	testDataOnce.Do(initTestData)
+
+	pool := New("test", 16, 0)
+
+	b.ResetTimer()
+	b.SetParallelism(16)
+
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			for _, bb := range testData {
+				buf := pool.Get(len(bb), false)
+				buf.Write(bb)
+				pool.Put(buf)
+			}
+		}
+	})
+}