bufpool.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package main
  2. import (
  3. "bytes"
  4. "sort"
  5. "sync"
  6. )
  7. type intSlice []int
  8. func (p intSlice) Len() int { return len(p) }
  9. func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
  10. func (p intSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  11. type bufPool struct {
  12. mutex sync.Mutex
  13. name string
  14. size int
  15. buffers []*bytes.Buffer
  16. calls intSlice
  17. callInd int
  18. throughput int
  19. }
  20. func newBufPool(name string, n int, size int) *bufPool {
  21. pool := bufPool{
  22. name: name,
  23. size: size,
  24. buffers: make([]*bytes.Buffer, n),
  25. calls: make(intSlice, 1024),
  26. }
  27. for i := range pool.buffers {
  28. pool.buffers[i] = pool.new()
  29. }
  30. if pool.size > 0 {
  31. // Get the real cap
  32. pool.size = pool.buffers[0].Cap()
  33. }
  34. return &pool
  35. }
  36. func (p *bufPool) new() *bytes.Buffer {
  37. var buf *bytes.Buffer
  38. buf = new(bytes.Buffer)
  39. if p.size > 0 {
  40. buf.Grow(p.size)
  41. }
  42. return buf
  43. }
  44. func (p *bufPool) calibrateAndClean() {
  45. sort.Sort(p.calls)
  46. pos := int(float64(len(p.calls)) * 0.95)
  47. score := p.calls[pos]
  48. p.callInd = 0
  49. p.throughput = 64
  50. for {
  51. if p.throughput > score {
  52. break
  53. }
  54. p.throughput <<= 1
  55. }
  56. for i, buf := range p.buffers {
  57. if buf != nil && buf.Cap() > p.throughput {
  58. p.buffers[i] = nil
  59. }
  60. }
  61. }
  62. func (p *bufPool) Get(size int) *bytes.Buffer {
  63. p.mutex.Lock()
  64. defer p.mutex.Unlock()
  65. minSize, maxSize, minInd, maxInd := -1, -1, -1, -1
  66. for i := 0; i < len(p.buffers); i++ {
  67. if p.buffers[i] != nil {
  68. cap := p.buffers[i].Cap()
  69. if size > 0 && cap >= size && (minSize > cap || minSize == -1) {
  70. minSize = cap
  71. minInd = i
  72. }
  73. if cap > maxSize {
  74. maxSize = cap
  75. maxInd = i
  76. }
  77. }
  78. }
  79. var buf *bytes.Buffer
  80. if minInd >= 0 {
  81. // We found buffer with the desired size
  82. buf = p.buffers[minInd]
  83. p.buffers[minInd] = nil
  84. } else if maxInd >= 0 {
  85. // We didn't find buffer with the desired size
  86. buf = p.buffers[maxInd]
  87. p.buffers[maxInd] = nil
  88. } else {
  89. // We didn't find buffers at all
  90. return p.new()
  91. }
  92. buf.Reset()
  93. return buf
  94. }
  95. func (p *bufPool) Put(buf *bytes.Buffer) {
  96. p.mutex.Lock()
  97. defer p.mutex.Unlock()
  98. if buf.Len() > 0 {
  99. p.calls[p.callInd] = buf.Len()
  100. p.callInd++
  101. if p.callInd == len(p.calls) {
  102. p.calibrateAndClean()
  103. }
  104. }
  105. if p.throughput > 0 && buf.Cap() > p.throughput {
  106. return
  107. }
  108. for i, b := range p.buffers {
  109. if b == nil {
  110. p.buffers[i] = buf
  111. if prometheusEnabled && buf.Cap() > 0 {
  112. observeBufferSize(p.name, buf.Cap())
  113. }
  114. return
  115. }
  116. }
  117. }