1
0

zero_allocation_pool.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package indexer
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // ObjectPool provides zero-allocation object pooling for indexer components
  7. type ObjectPool struct {
  8. jobPool sync.Pool
  9. resultPool sync.Pool
  10. docPool sync.Pool
  11. bufferPool sync.Pool
  12. }
  13. // NewObjectPool creates a new object pool with pre-allocated pools
  14. func NewObjectPool() *ObjectPool {
  15. return &ObjectPool{
  16. jobPool: sync.Pool{
  17. New: func() interface{} {
  18. return &IndexJob{
  19. Documents: make([]*Document, 0, 1000), // Pre-allocate capacity
  20. Priority: 0,
  21. }
  22. },
  23. },
  24. resultPool: sync.Pool{
  25. New: func() interface{} {
  26. return &IndexResult{
  27. Processed: 0,
  28. Succeeded: 0,
  29. Failed: 0,
  30. Duration: 0,
  31. ErrorRate: 0.0,
  32. Throughput: 0.0,
  33. }
  34. },
  35. },
  36. docPool: sync.Pool{
  37. New: func() interface{} {
  38. return &Document{
  39. ID: "",
  40. Fields: nil,
  41. }
  42. },
  43. },
  44. bufferPool: sync.Pool{
  45. New: func() interface{} {
  46. // Pre-allocate 4KB buffer for common operations
  47. b := make([]byte, 0, 4096)
  48. return &b
  49. },
  50. },
  51. }
  52. }
  53. // GetIndexJob returns a pooled IndexJob, reset and ready for use
  54. func (p *ObjectPool) GetIndexJob() *IndexJob {
  55. job := p.jobPool.Get().(*IndexJob)
  56. // Reset job state
  57. job.Documents = job.Documents[:0] // Keep capacity, reset length
  58. job.Priority = 0
  59. job.Callback = nil
  60. return job
  61. }
  62. // PutIndexJob returns an IndexJob to the pool
  63. func (p *ObjectPool) PutIndexJob(job *IndexJob) {
  64. if job != nil {
  65. // Clear any references to prevent memory leaks
  66. for i := range job.Documents {
  67. if job.Documents[i] != nil {
  68. p.PutDocument(job.Documents[i])
  69. }
  70. }
  71. job.Documents = job.Documents[:0]
  72. job.Callback = nil
  73. p.jobPool.Put(job)
  74. }
  75. }
  76. // GetIndexResult returns a pooled IndexResult, reset and ready for use
  77. func (p *ObjectPool) GetIndexResult() *IndexResult {
  78. result := p.resultPool.Get().(*IndexResult)
  79. // Reset result state
  80. result.Processed = 0
  81. result.Succeeded = 0
  82. result.Failed = 0
  83. result.Duration = 0
  84. result.ErrorRate = 0.0
  85. result.Throughput = 0.0
  86. return result
  87. }
  88. // PutIndexResult returns an IndexResult to the pool
  89. func (p *ObjectPool) PutIndexResult(result *IndexResult) {
  90. if result != nil {
  91. p.resultPool.Put(result)
  92. }
  93. }
  94. // GetDocument returns a pooled Document, reset and ready for use
  95. func (p *ObjectPool) GetDocument() *Document {
  96. doc := p.docPool.Get().(*Document)
  97. // Reset document state
  98. doc.ID = ""
  99. doc.Fields = nil
  100. return doc
  101. }
  102. // PutDocument returns a Document to the pool
  103. func (p *ObjectPool) PutDocument(doc *Document) {
  104. if doc != nil {
  105. doc.ID = ""
  106. doc.Fields = nil
  107. p.docPool.Put(doc)
  108. }
  109. }
  110. // GetBuffer returns a pooled byte buffer, reset and ready for use
  111. func (p *ObjectPool) GetBuffer() *[]byte {
  112. bufPtr := p.bufferPool.Get().(*[]byte)
  113. b := *bufPtr
  114. b = b[:0]
  115. *bufPtr = b
  116. return bufPtr
  117. }
  118. // PutBuffer returns a byte buffer to the pool
  119. func (p *ObjectPool) PutBuffer(bufPtr *[]byte) {
  120. if bufPtr == nil {
  121. return
  122. }
  123. b := *bufPtr
  124. if cap(b) > 0 && cap(b) <= 64*1024 { // Only keep reasonable buffers
  125. b = b[:0]
  126. *bufPtr = b
  127. p.bufferPool.Put(bufPtr)
  128. }
  129. }
  130. // ZeroAllocBatchProcessor provides zero-allocation batch processing
  131. type ZeroAllocBatchProcessor struct {
  132. pool *ObjectPool
  133. config *Config
  134. // Metrics
  135. allocationsAvoided int64
  136. poolHitRate float64
  137. poolStats sync.RWMutex
  138. }
  139. // NewZeroAllocBatchProcessor creates a new zero-allocation batch processor
  140. func NewZeroAllocBatchProcessor(config *Config) *ZeroAllocBatchProcessor {
  141. return &ZeroAllocBatchProcessor{
  142. pool: NewObjectPool(),
  143. config: config,
  144. }
  145. }
  146. // CreateJobBatch creates a batch of jobs using object pooling
  147. func (z *ZeroAllocBatchProcessor) CreateJobBatch(documents []*Document, batchSize int) []*IndexJob {
  148. jobCount := (len(documents) + batchSize - 1) / batchSize
  149. jobs := make([]*IndexJob, 0, jobCount)
  150. for i := 0; i < len(documents); i += batchSize {
  151. end := i + batchSize
  152. if end > len(documents) {
  153. end = len(documents)
  154. }
  155. // Use pooled job
  156. job := z.pool.GetIndexJob()
  157. // Add documents to job (reusing pre-allocated slice capacity)
  158. for j := i; j < end; j++ {
  159. job.Documents = append(job.Documents, documents[j])
  160. }
  161. job.Priority = 1
  162. jobs = append(jobs, job)
  163. }
  164. return jobs
  165. }
  166. // ProcessJobResults processes job results with zero allocation
  167. func (z *ZeroAllocBatchProcessor) ProcessJobResults(results []*IndexResult) *IndexResult {
  168. // Use pooled result for aggregation
  169. aggregatedResult := z.pool.GetIndexResult()
  170. totalProcessed := 0
  171. totalSucceeded := 0
  172. totalFailed := 0
  173. var totalDuration time.Duration
  174. for _, result := range results {
  175. totalProcessed += result.Processed
  176. totalSucceeded += result.Succeeded
  177. totalFailed += result.Failed
  178. totalDuration += result.Duration
  179. // Return individual result to pool
  180. z.pool.PutIndexResult(result)
  181. }
  182. // Set aggregated values
  183. aggregatedResult.Processed = totalProcessed
  184. aggregatedResult.Succeeded = totalSucceeded
  185. aggregatedResult.Failed = totalFailed
  186. aggregatedResult.Duration = totalDuration
  187. if totalProcessed > 0 {
  188. aggregatedResult.ErrorRate = float64(totalFailed) / float64(totalProcessed)
  189. aggregatedResult.Throughput = float64(totalSucceeded) / totalDuration.Seconds()
  190. }
  191. return aggregatedResult
  192. }
  193. // ReleaseBatch releases a batch of jobs back to the pool
  194. func (z *ZeroAllocBatchProcessor) ReleaseBatch(jobs []*IndexJob) {
  195. for _, job := range jobs {
  196. z.pool.PutIndexJob(job)
  197. }
  198. }
  199. // GetPoolStats returns current pool utilization statistics
  200. func (z *ZeroAllocBatchProcessor) GetPoolStats() PoolStats {
  201. z.poolStats.RLock()
  202. defer z.poolStats.RUnlock()
  203. return PoolStats{
  204. AllocationsAvoided: z.allocationsAvoided,
  205. PoolHitRate: z.poolHitRate,
  206. ActiveObjects: z.getActiveObjectCount(),
  207. }
  208. }
  209. func (z *ZeroAllocBatchProcessor) getActiveObjectCount() int {
  210. // This is an approximation - actual implementation would need more sophisticated tracking
  211. return 0
  212. }
  213. // PoolStats represents object pool statistics
  214. type PoolStats struct {
  215. AllocationsAvoided int64 `json:"allocations_avoided"`
  216. PoolHitRate float64 `json:"pool_hit_rate"`
  217. ActiveObjects int `json:"active_objects"`
  218. }
  219. // GetPool returns the underlying object pool for direct access if needed
  220. func (z *ZeroAllocBatchProcessor) GetPool() *ObjectPool {
  221. return z.pool
  222. }