bufpool.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package bufpool
  2. // Based on https://github.com/valyala/bytebufferpool ideas
  3. import (
  4. "bytes"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/imgproxy/imgproxy/v3/config"
  9. "github.com/imgproxy/imgproxy/v3/metrics"
  10. )
  11. const (
  12. minBitSize = 6 // 2**6=64 is min bytes.Buffer capacity
  13. steps = 20
  14. minSize = 1 << minBitSize
  15. )
  16. var entriesPool = sync.Pool{
  17. New: func() any {
  18. return new(entry)
  19. },
  20. }
  21. type entry struct {
  22. buf *bytes.Buffer
  23. prev, next *entry
  24. }
  25. type Pool struct {
  26. name string
  27. defaultSize int
  28. maxSize uint64
  29. root *entry
  30. maxLen int
  31. calls [steps]uint64
  32. tmpCalls [steps]uint64
  33. callsNum uint64
  34. storeMu sync.Mutex
  35. calibratingMu sync.Mutex
  36. }
  37. func New(name string, n int, defaultSize int) *Pool {
  38. pool := Pool{
  39. name: name,
  40. defaultSize: defaultSize,
  41. root: &entry{},
  42. maxLen: n,
  43. }
  44. return &pool
  45. }
  46. func (p *Pool) insert(buf *bytes.Buffer) {
  47. e := entriesPool.Get().(*entry)
  48. e.buf = buf
  49. e.next = p.root.next
  50. e.prev = p.root
  51. p.root.next = e
  52. }
  53. func (p *Pool) remove(e *entry) {
  54. if e.next != nil {
  55. e.next.prev = e.prev
  56. }
  57. e.prev.next = e.next
  58. saveEntry(e)
  59. }
  60. func (p *Pool) calibrateAndClean() {
  61. if !p.calibratingMu.TryLock() {
  62. return
  63. }
  64. defer p.calibratingMu.Unlock()
  65. var callsSum uint64
  66. for i := 0; i < steps; i++ {
  67. calls := atomic.SwapUint64(&p.calls[i], 0)
  68. callsSum += calls
  69. p.tmpCalls[i] = calls
  70. }
  71. if callsSum < uint64(config.BufferPoolCalibrationThreshold) {
  72. return
  73. }
  74. atomic.StoreUint64(&p.callsNum, 0)
  75. defSum := uint64(float64(callsSum) * 0.5)
  76. maxSum := uint64(float64(callsSum) * 0.95)
  77. defStep := -1
  78. maxStep := -1
  79. callsSum = 0
  80. for i := 0; i < steps; i++ {
  81. callsSum += p.tmpCalls[i]
  82. if defStep < 0 && callsSum > defSum {
  83. defStep = i
  84. }
  85. if callsSum > maxSum {
  86. maxStep = i
  87. break
  88. }
  89. }
  90. p.defaultSize = minSize << defStep
  91. p.maxSize = minSize << maxStep
  92. maxSize := int(p.maxSize)
  93. metrics.SetBufferDefaultSize(p.name, p.defaultSize)
  94. metrics.SetBufferMaxSize(p.name, maxSize)
  95. p.storeMu.Lock()
  96. storeUnlocked := false
  97. defer func() {
  98. if !storeUnlocked {
  99. p.storeMu.Unlock()
  100. }
  101. }()
  102. cleaned := false
  103. last := p.root
  104. poolLen := 0
  105. for entry := p.root.next; entry != nil; entry = last.next {
  106. if poolLen >= p.maxLen || entry.buf.Cap() > maxSize {
  107. last.next = entry.next
  108. saveEntry(entry)
  109. cleaned = true
  110. } else {
  111. last.next = entry
  112. entry.prev = last
  113. last = entry
  114. poolLen++
  115. }
  116. }
  117. // early unlock
  118. p.storeMu.Unlock()
  119. storeUnlocked = true
  120. if cleaned {
  121. runtime.GC()
  122. }
  123. }
  124. func (p *Pool) Get(size int, grow bool) *bytes.Buffer {
  125. p.storeMu.Lock()
  126. storeUnlocked := false
  127. defer func() {
  128. if !storeUnlocked {
  129. p.storeMu.Unlock()
  130. }
  131. }()
  132. best := (*entry)(nil)
  133. bestCap := -1
  134. min := (*entry)(nil)
  135. minCap := -1
  136. for entry := p.root.next; entry != nil; entry = entry.next {
  137. cap := entry.buf.Cap()
  138. if size > 0 {
  139. // If we know the required size, pick a buffer with the smallest size
  140. // that is larger than the requested size
  141. if cap >= size && (bestCap > cap || best == nil) {
  142. best = entry
  143. bestCap = cap
  144. }
  145. if cap < minCap || minCap == -1 {
  146. min = entry
  147. minCap = cap
  148. }
  149. } else if cap > bestCap {
  150. // If we don't know the requested size, pick a largest buffer
  151. best = entry
  152. bestCap = cap
  153. }
  154. }
  155. var buf *bytes.Buffer
  156. switch {
  157. case best != nil:
  158. buf = best.buf
  159. p.remove(best)
  160. case min != nil:
  161. buf = min.buf
  162. p.remove(min)
  163. default:
  164. buf = new(bytes.Buffer)
  165. }
  166. // early unlock
  167. p.storeMu.Unlock()
  168. storeUnlocked = true
  169. buf.Reset()
  170. growSize := p.defaultSize
  171. if grow {
  172. growSize = max(p.normalizeCap(size), growSize)
  173. }
  174. // Grow the buffer only if we know the requested size and it is smaller than
  175. // or equal to the grow size. Otherwise we'll grow the buffer twice
  176. if size > 0 && size <= growSize && growSize > buf.Cap() {
  177. buf.Grow(growSize)
  178. }
  179. return buf
  180. }
  181. func (p *Pool) Put(buf *bytes.Buffer) {
  182. bufLen := buf.Len()
  183. bufCap := buf.Cap()
  184. if bufLen > 0 {
  185. ind := index(bufLen)
  186. atomic.AddUint64(&p.calls[ind], 1)
  187. if atomic.AddUint64(&p.callsNum, 1) >= uint64(config.BufferPoolCalibrationThreshold) {
  188. p.calibrateAndClean()
  189. }
  190. }
  191. size := buf.Cap()
  192. maxSize := int(atomic.LoadUint64(&p.maxSize))
  193. if maxSize > 0 && size > maxSize {
  194. return
  195. }
  196. if bufLen > 0 {
  197. metrics.ObserveBufferSize(p.name, bufCap)
  198. }
  199. p.storeMu.Lock()
  200. defer p.storeMu.Unlock()
  201. p.insert(buf)
  202. }
  203. // GrowBuffer growth capacity of the buffer to the normalized provided value
  204. func (p *Pool) GrowBuffer(buf *bytes.Buffer, cap int) {
  205. cap = p.normalizeCap(cap)
  206. if buf.Cap() < cap {
  207. buf.Grow(cap - buf.Len())
  208. }
  209. }
  210. func (p *Pool) normalizeCap(cap int) int {
  211. // Don't normalize cap if it's larger than maxSize
  212. // since we'll throw this buf out anyway
  213. maxSize := int(atomic.LoadUint64(&p.maxSize))
  214. if maxSize > 0 && cap > maxSize {
  215. return cap
  216. }
  217. ind := index(cap)
  218. return max(cap, minSize<<ind)
  219. }
  220. func saveEntry(e *entry) {
  221. e.buf = nil
  222. e.next = nil
  223. e.prev = nil
  224. entriesPool.Put(e)
  225. }
  226. func index(n int) int {
  227. n--
  228. n >>= minBitSize
  229. idx := 0
  230. for n > 0 {
  231. n >>= 1
  232. idx++
  233. }
  234. if idx >= steps {
  235. idx = steps - 1
  236. }
  237. return idx
  238. }