bufpool.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package bufpool
  2. import (
  3. "bytes"
  4. "runtime"
  5. "sort"
  6. "sync"
  7. "github.com/imgproxy/imgproxy/v3/config"
  8. "github.com/imgproxy/imgproxy/v3/imath"
  9. "github.com/imgproxy/imgproxy/v3/metrics"
  10. )
  11. type intSlice []int
  12. func (p intSlice) Len() int { return len(p) }
  13. func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
  14. func (p intSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  15. type Pool struct {
  16. name string
  17. defaultSize int
  18. maxSize int
  19. buffers []*bytes.Buffer
  20. calls intSlice
  21. callInd int
  22. mutex sync.Mutex
  23. }
  24. func New(name string, n int, defaultSize int) *Pool {
  25. pool := Pool{
  26. name: name,
  27. defaultSize: defaultSize,
  28. buffers: make([]*bytes.Buffer, n),
  29. calls: make(intSlice, config.BufferPoolCalibrationThreshold),
  30. }
  31. for i := range pool.buffers {
  32. pool.buffers[i] = new(bytes.Buffer)
  33. }
  34. return &pool
  35. }
  36. func (p *Pool) calibrateAndClean() {
  37. sort.Sort(p.calls)
  38. pos := int(float64(len(p.calls)) * 0.95)
  39. score := p.calls[pos]
  40. p.callInd = 0
  41. p.maxSize = p.normalizeSize(score)
  42. p.defaultSize = imath.Max(p.defaultSize, p.calls[0])
  43. p.maxSize = imath.Max(p.defaultSize, p.maxSize)
  44. cleaned := false
  45. for i, buf := range p.buffers {
  46. if buf != nil && buf.Cap() > p.maxSize {
  47. p.buffers[i] = nil
  48. cleaned = true
  49. }
  50. }
  51. if cleaned {
  52. runtime.GC()
  53. }
  54. metrics.SetBufferDefaultSize(p.name, p.defaultSize)
  55. metrics.SetBufferMaxSize(p.name, p.maxSize)
  56. }
  57. func (p *Pool) Get(size int) *bytes.Buffer {
  58. p.mutex.Lock()
  59. defer p.mutex.Unlock()
  60. size = p.normalizeSize(size)
  61. minSize, maxSize, minInd, maxInd := -1, -1, -1, -1
  62. for i := 0; i < len(p.buffers); i++ {
  63. if p.buffers[i] != nil {
  64. cap := p.buffers[i].Cap()
  65. if size > 0 && cap >= size && (minSize > cap || minSize == -1) {
  66. minSize = cap
  67. minInd = i
  68. }
  69. if cap > maxSize {
  70. maxSize = cap
  71. maxInd = i
  72. }
  73. }
  74. }
  75. var buf *bytes.Buffer
  76. switch {
  77. case minInd >= 0:
  78. // We found buffer with the desired size
  79. buf = p.buffers[minInd]
  80. p.buffers[minInd] = nil
  81. case maxInd >= 0:
  82. // We didn't find buffer with the desired size
  83. buf = p.buffers[maxInd]
  84. p.buffers[maxInd] = nil
  85. default:
  86. // We didn't find buffers at all
  87. buf = new(bytes.Buffer)
  88. }
  89. buf.Reset()
  90. growSize := imath.Max(size, p.defaultSize)
  91. if growSize > buf.Cap() {
  92. buf.Grow(growSize)
  93. }
  94. return buf
  95. }
  96. func (p *Pool) Put(buf *bytes.Buffer) {
  97. p.mutex.Lock()
  98. defer p.mutex.Unlock()
  99. if buf.Len() > 0 {
  100. p.calls[p.callInd] = buf.Len()
  101. p.callInd++
  102. if p.callInd == len(p.calls) {
  103. p.calibrateAndClean()
  104. }
  105. }
  106. if p.maxSize > 0 && buf.Cap() > p.maxSize {
  107. return
  108. }
  109. for i, b := range p.buffers {
  110. if b == nil {
  111. p.buffers[i] = buf
  112. if buf.Cap() > 0 {
  113. metrics.ObserveBufferSize(p.name, buf.Cap())
  114. }
  115. return
  116. }
  117. }
  118. }
  119. func (p *Pool) normalizeSize(n int) int {
  120. return (n/bytes.MinRead + 2) * bytes.MinRead
  121. }