| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 | package indexerimport (	"context"	"sync"	"time")// BatchWriter provides efficient batch operations for indexingtype BatchWriter struct {	indexer   Indexer	documents []*Document	maxSize   int	mutex     sync.Mutex}// NewBatchWriter creates a new batch writerfunc NewBatchWriter(indexer Indexer, maxSize int) *BatchWriter {	if maxSize <= 0 {		maxSize = 1000	}	return &BatchWriter{		indexer:   indexer,		documents: make([]*Document, 0, maxSize),		maxSize:   maxSize,	}}// Add adds a document to the batchfunc (bw *BatchWriter) Add(doc *Document) error {	if doc == nil {		return nil	}	bw.mutex.Lock()	defer bw.mutex.Unlock()	bw.documents = append(bw.documents, doc)	// Auto-flush if batch is full	if len(bw.documents) >= bw.maxSize {		return bw.flushLocked()	}	return nil}// Flush processes all documents in the batchfunc (bw *BatchWriter) Flush() (*IndexResult, error) {	bw.mutex.Lock()	defer bw.mutex.Unlock()	if len(bw.documents) == 0 {		return &IndexResult{}, nil	}	startTime := time.Now()	// Make a copy of documents to avoid race conditions	docs := make([]*Document, len(bw.documents))	copy(docs, bw.documents)	// Clear the batch	bw.documents = bw.documents[:0]	// Process the batch synchronously for rebuilds to ensure completion.	err := bw.indexer.IndexDocuments(context.Background(), docs)	result := &IndexResult{		Processed: len(docs),		Duration:  time.Since(startTime),	}	if err != nil {		result.Failed = len(docs)		result.ErrorRate = 1.0		return result, err	}	result.Succeeded = len(docs)	result.Throughput = float64(len(docs)) / result.Duration.Seconds()	return result, nil}// flushLocked performs the flush operation while holding the mutexfunc (bw *BatchWriter) flushLocked() error {	if len(bw.documents) == 0 {		return nil	}	// Make a copy of documents to avoid race conditions	docs := make([]*Document, len(bw.documents))	copy(docs, bw.documents)	// Clear the batch	bw.documents = bw.documents[:0]	// Process the batch synchronously.	return bw.indexer.IndexDocuments(context.Background(), docs)}// Size returns the current batch sizefunc (bw *BatchWriter) Size() int {	bw.mutex.Lock()	defer bw.mutex.Unlock()	return len(bw.documents)}// Reset clears the batch without processingfunc (bw *BatchWriter) Reset() {	bw.mutex.Lock()	defer bw.mutex.Unlock()	bw.documents = bw.documents[:0]}// IsFull returns true if the batch is at maximum capacityfunc (bw *BatchWriter) IsFull() bool {	bw.mutex.Lock()	defer bw.mutex.Unlock()	return len(bw.documents) >= bw.maxSize}// SetMaxSize updates the maximum batch sizefunc (bw *BatchWriter) SetMaxSize(size int) {	if size <= 0 {		return	}	bw.mutex.Lock()	defer bw.mutex.Unlock()	bw.maxSize = size	// If current batch exceeds new limit, resize the slice	if len(bw.documents) > size {		// Keep the first 'size' documents		bw.documents = bw.documents[:size]	}}// GetMaxSize returns the maximum batch sizefunc (bw *BatchWriter) GetMaxSize() int {	return bw.maxSize}
 |