123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package indexer
- import (
- "sync"
- "time"
- )
- // ObjectPool provides zero-allocation object pooling for indexer components
- type ObjectPool struct {
- jobPool sync.Pool
- resultPool sync.Pool
- docPool sync.Pool
- bufferPool sync.Pool
- }
- // NewObjectPool creates a new object pool with pre-allocated pools
- func NewObjectPool() *ObjectPool {
- return &ObjectPool{
- jobPool: sync.Pool{
- New: func() interface{} {
- return &IndexJob{
- Documents: make([]*Document, 0, 1000), // Pre-allocate capacity
- Priority: 0,
- }
- },
- },
- resultPool: sync.Pool{
- New: func() interface{} {
- return &IndexResult{
- Processed: 0,
- Succeeded: 0,
- Failed: 0,
- Duration: 0,
- ErrorRate: 0.0,
- Throughput: 0.0,
- }
- },
- },
- docPool: sync.Pool{
- New: func() interface{} {
- return &Document{
- ID: "",
- Fields: nil,
- }
- },
- },
- bufferPool: sync.Pool{
- New: func() interface{} {
- // Pre-allocate 4KB buffer for common operations
- b := make([]byte, 0, 4096)
- return &b
- },
- },
- }
- }
- // GetIndexJob returns a pooled IndexJob, reset and ready for use
- func (p *ObjectPool) GetIndexJob() *IndexJob {
- job := p.jobPool.Get().(*IndexJob)
- // Reset job state
- job.Documents = job.Documents[:0] // Keep capacity, reset length
- job.Priority = 0
- job.Callback = nil
- return job
- }
- // PutIndexJob returns an IndexJob to the pool
- func (p *ObjectPool) PutIndexJob(job *IndexJob) {
- if job != nil {
- // Clear any references to prevent memory leaks
- for i := range job.Documents {
- if job.Documents[i] != nil {
- p.PutDocument(job.Documents[i])
- }
- }
- job.Documents = job.Documents[:0]
- job.Callback = nil
- p.jobPool.Put(job)
- }
- }
- // GetIndexResult returns a pooled IndexResult, reset and ready for use
- func (p *ObjectPool) GetIndexResult() *IndexResult {
- result := p.resultPool.Get().(*IndexResult)
- // Reset result state
- result.Processed = 0
- result.Succeeded = 0
- result.Failed = 0
- result.Duration = 0
- result.ErrorRate = 0.0
- result.Throughput = 0.0
- return result
- }
- // PutIndexResult returns an IndexResult to the pool
- func (p *ObjectPool) PutIndexResult(result *IndexResult) {
- if result != nil {
- p.resultPool.Put(result)
- }
- }
- // GetDocument returns a pooled Document, reset and ready for use
- func (p *ObjectPool) GetDocument() *Document {
- doc := p.docPool.Get().(*Document)
- // Reset document state
- doc.ID = ""
- doc.Fields = nil
- return doc
- }
- // PutDocument returns a Document to the pool
- func (p *ObjectPool) PutDocument(doc *Document) {
- if doc != nil {
- doc.ID = ""
- doc.Fields = nil
- p.docPool.Put(doc)
- }
- }
- // GetBuffer returns a pooled byte buffer, reset and ready for use
- func (p *ObjectPool) GetBuffer() *[]byte {
- bufPtr := p.bufferPool.Get().(*[]byte)
- b := *bufPtr
- b = b[:0]
- *bufPtr = b
- return bufPtr
- }
- // PutBuffer returns a byte buffer to the pool
- func (p *ObjectPool) PutBuffer(bufPtr *[]byte) {
- if bufPtr == nil {
- return
- }
- b := *bufPtr
- if cap(b) > 0 && cap(b) <= 64*1024 { // Only keep reasonable buffers
- b = b[:0]
- *bufPtr = b
- p.bufferPool.Put(bufPtr)
- }
- }
- // ZeroAllocBatchProcessor provides zero-allocation batch processing
- type ZeroAllocBatchProcessor struct {
- pool *ObjectPool
- config *Config
- // Metrics
- allocationsAvoided int64
- poolHitRate float64
- poolStats sync.RWMutex
- }
- // NewZeroAllocBatchProcessor creates a new zero-allocation batch processor
- func NewZeroAllocBatchProcessor(config *Config) *ZeroAllocBatchProcessor {
- return &ZeroAllocBatchProcessor{
- pool: NewObjectPool(),
- config: config,
- }
- }
- // CreateJobBatch creates a batch of jobs using object pooling
- func (z *ZeroAllocBatchProcessor) CreateJobBatch(documents []*Document, batchSize int) []*IndexJob {
- jobCount := (len(documents) + batchSize - 1) / batchSize
- jobs := make([]*IndexJob, 0, jobCount)
- for i := 0; i < len(documents); i += batchSize {
- end := i + batchSize
- if end > len(documents) {
- end = len(documents)
- }
- // Use pooled job
- job := z.pool.GetIndexJob()
- // Add documents to job (reusing pre-allocated slice capacity)
- for j := i; j < end; j++ {
- job.Documents = append(job.Documents, documents[j])
- }
- job.Priority = 1
- jobs = append(jobs, job)
- }
- return jobs
- }
- // ProcessJobResults processes job results with zero allocation
- func (z *ZeroAllocBatchProcessor) ProcessJobResults(results []*IndexResult) *IndexResult {
- // Use pooled result for aggregation
- aggregatedResult := z.pool.GetIndexResult()
- totalProcessed := 0
- totalSucceeded := 0
- totalFailed := 0
- var totalDuration time.Duration
- for _, result := range results {
- totalProcessed += result.Processed
- totalSucceeded += result.Succeeded
- totalFailed += result.Failed
- totalDuration += result.Duration
- // Return individual result to pool
- z.pool.PutIndexResult(result)
- }
- // Set aggregated values
- aggregatedResult.Processed = totalProcessed
- aggregatedResult.Succeeded = totalSucceeded
- aggregatedResult.Failed = totalFailed
- aggregatedResult.Duration = totalDuration
- if totalProcessed > 0 {
- aggregatedResult.ErrorRate = float64(totalFailed) / float64(totalProcessed)
- aggregatedResult.Throughput = float64(totalSucceeded) / totalDuration.Seconds()
- }
- return aggregatedResult
- }
- // ReleaseBatch releases a batch of jobs back to the pool
- func (z *ZeroAllocBatchProcessor) ReleaseBatch(jobs []*IndexJob) {
- for _, job := range jobs {
- z.pool.PutIndexJob(job)
- }
- }
- // GetPoolStats returns current pool utilization statistics
- func (z *ZeroAllocBatchProcessor) GetPoolStats() PoolStats {
- z.poolStats.RLock()
- defer z.poolStats.RUnlock()
- return PoolStats{
- AllocationsAvoided: z.allocationsAvoided,
- PoolHitRate: z.poolHitRate,
- ActiveObjects: z.getActiveObjectCount(),
- }
- }
- func (z *ZeroAllocBatchProcessor) getActiveObjectCount() int {
- // This is an approximation - actual implementation would need more sophisticated tracking
- return 0
- }
- // PoolStats represents object pool statistics
- type PoolStats struct {
- AllocationsAvoided int64 `json:"allocations_avoided"`
- PoolHitRate float64 `json:"pool_hit_rate"`
- ActiveObjects int `json:"active_objects"`
- }
- // GetPool returns the underlying object pool for direct access if needed
- func (z *ZeroAllocBatchProcessor) GetPool() *ObjectPool {
- return z.pool
- }
|