bufpool.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package main
  2. import (
  3. "bytes"
  4. "runtime"
  5. "sort"
  6. "sync"
  7. )
  8. type intSlice []int
  9. func (p intSlice) Len() int { return len(p) }
  10. func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
  11. func (p intSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  12. type bufPool struct {
  13. name string
  14. defaultSize int
  15. maxSize int
  16. buffers []*bytes.Buffer
  17. calls intSlice
  18. callInd int
  19. mutex sync.Mutex
  20. }
  21. func newBufPool(name string, n int, defaultSize int) *bufPool {
  22. pool := bufPool{
  23. name: name,
  24. defaultSize: defaultSize,
  25. buffers: make([]*bytes.Buffer, n),
  26. calls: make(intSlice, conf.BufferPoolCalibrationThreshold),
  27. }
  28. for i := range pool.buffers {
  29. pool.buffers[i] = new(bytes.Buffer)
  30. }
  31. return &pool
  32. }
  33. func (p *bufPool) calibrateAndClean() {
  34. sort.Sort(p.calls)
  35. pos := int(float64(len(p.calls)) * 0.95)
  36. score := p.calls[pos]
  37. p.callInd = 0
  38. p.maxSize = p.normalizeSize(score)
  39. p.defaultSize = maxInt(p.defaultSize, p.calls[0])
  40. p.maxSize = maxInt(p.defaultSize, p.maxSize)
  41. cleaned := false
  42. for i, buf := range p.buffers {
  43. if buf != nil && buf.Cap() > p.maxSize {
  44. p.buffers[i] = nil
  45. cleaned = true
  46. }
  47. }
  48. if cleaned {
  49. runtime.GC()
  50. }
  51. if prometheusEnabled {
  52. setPrometheusBufferDefaultSize(p.name, p.defaultSize)
  53. setPrometheusBufferMaxSize(p.name, p.maxSize)
  54. }
  55. }
  56. func (p *bufPool) Get(size int) *bytes.Buffer {
  57. p.mutex.Lock()
  58. defer p.mutex.Unlock()
  59. size = p.normalizeSize(size)
  60. minSize, maxSize, minInd, maxInd := -1, -1, -1, -1
  61. for i := 0; i < len(p.buffers); i++ {
  62. if p.buffers[i] != nil {
  63. cap := p.buffers[i].Cap()
  64. if size > 0 && cap >= size && (minSize > cap || minSize == -1) {
  65. minSize = cap
  66. minInd = i
  67. }
  68. if cap > maxSize {
  69. maxSize = cap
  70. maxInd = i
  71. }
  72. }
  73. }
  74. var buf *bytes.Buffer
  75. if minInd >= 0 {
  76. // We found buffer with the desired size
  77. buf = p.buffers[minInd]
  78. p.buffers[minInd] = nil
  79. } else if maxInd >= 0 {
  80. // We didn't find buffer with the desired size
  81. buf = p.buffers[maxInd]
  82. p.buffers[maxInd] = nil
  83. } else {
  84. // We didn't find buffers at all
  85. buf = new(bytes.Buffer)
  86. }
  87. buf.Reset()
  88. growSize := maxInt(size, p.defaultSize)
  89. if growSize > buf.Cap() {
  90. buf.Grow(growSize)
  91. }
  92. return buf
  93. }
  94. func (p *bufPool) Put(buf *bytes.Buffer) {
  95. p.mutex.Lock()
  96. defer p.mutex.Unlock()
  97. if buf.Len() > 0 {
  98. p.calls[p.callInd] = buf.Len()
  99. p.callInd++
  100. if p.callInd == len(p.calls) {
  101. p.calibrateAndClean()
  102. }
  103. }
  104. if p.maxSize > 0 && buf.Cap() > p.maxSize {
  105. return
  106. }
  107. for i, b := range p.buffers {
  108. if b == nil {
  109. p.buffers[i] = buf
  110. if prometheusEnabled && buf.Cap() > 0 {
  111. observePrometheusBufferSize(p.name, buf.Cap())
  112. }
  113. return
  114. }
  115. }
  116. }
  117. func (p *bufPool) normalizeSize(n int) int {
  118. return (n/bytes.MinRead + 2) * bytes.MinRead
  119. }