batch_writer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package indexer
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. )
  7. // BatchWriter provides efficient batch operations for indexing
  8. type BatchWriter struct {
  9. indexer Indexer
  10. documents []*Document
  11. maxSize int
  12. mutex sync.Mutex
  13. }
  14. // NewBatchWriter creates a new batch writer
  15. func NewBatchWriter(indexer Indexer, maxSize int) *BatchWriter {
  16. if maxSize <= 0 {
  17. maxSize = 1000
  18. }
  19. return &BatchWriter{
  20. indexer: indexer,
  21. documents: make([]*Document, 0, maxSize),
  22. maxSize: maxSize,
  23. }
  24. }
  25. // Add adds a document to the batch
  26. func (bw *BatchWriter) Add(doc *Document) error {
  27. if doc == nil {
  28. return nil
  29. }
  30. bw.mutex.Lock()
  31. defer bw.mutex.Unlock()
  32. bw.documents = append(bw.documents, doc)
  33. // Auto-flush if batch is full
  34. if len(bw.documents) >= bw.maxSize {
  35. return bw.flushLocked()
  36. }
  37. return nil
  38. }
  39. // Flush processes all documents in the batch
  40. func (bw *BatchWriter) Flush() (*IndexResult, error) {
  41. bw.mutex.Lock()
  42. defer bw.mutex.Unlock()
  43. if len(bw.documents) == 0 {
  44. return &IndexResult{}, nil
  45. }
  46. startTime := time.Now()
  47. // Make a copy of documents to avoid race conditions
  48. docs := make([]*Document, len(bw.documents))
  49. copy(docs, bw.documents)
  50. // Clear the batch
  51. bw.documents = bw.documents[:0]
  52. // Process the batch synchronously for rebuilds to ensure completion.
  53. err := bw.indexer.IndexDocuments(context.Background(), docs)
  54. result := &IndexResult{
  55. Processed: len(docs),
  56. Duration: time.Since(startTime),
  57. }
  58. if err != nil {
  59. result.Failed = len(docs)
  60. result.ErrorRate = 1.0
  61. return result, err
  62. }
  63. result.Succeeded = len(docs)
  64. result.Throughput = float64(len(docs)) / result.Duration.Seconds()
  65. return result, nil
  66. }
  67. // flushLocked performs the flush operation while holding the mutex
  68. func (bw *BatchWriter) flushLocked() error {
  69. if len(bw.documents) == 0 {
  70. return nil
  71. }
  72. // Make a copy of documents to avoid race conditions
  73. docs := make([]*Document, len(bw.documents))
  74. copy(docs, bw.documents)
  75. // Clear the batch
  76. bw.documents = bw.documents[:0]
  77. // Process the batch synchronously.
  78. return bw.indexer.IndexDocuments(context.Background(), docs)
  79. }
  80. // Size returns the current batch size
  81. func (bw *BatchWriter) Size() int {
  82. bw.mutex.Lock()
  83. defer bw.mutex.Unlock()
  84. return len(bw.documents)
  85. }
  86. // Reset clears the batch without processing
  87. func (bw *BatchWriter) Reset() {
  88. bw.mutex.Lock()
  89. defer bw.mutex.Unlock()
  90. bw.documents = bw.documents[:0]
  91. }
  92. // IsFull returns true if the batch is at maximum capacity
  93. func (bw *BatchWriter) IsFull() bool {
  94. bw.mutex.Lock()
  95. defer bw.mutex.Unlock()
  96. return len(bw.documents) >= bw.maxSize
  97. }
  98. // SetMaxSize updates the maximum batch size
  99. func (bw *BatchWriter) SetMaxSize(size int) {
  100. if size <= 0 {
  101. return
  102. }
  103. bw.mutex.Lock()
  104. defer bw.mutex.Unlock()
  105. bw.maxSize = size
  106. // If current batch exceeds new limit, resize the slice
  107. if len(bw.documents) > size {
  108. // Keep the first 'size' documents
  109. bw.documents = bw.documents[:size]
  110. }
  111. }
  112. // GetMaxSize returns the maximum batch size
  113. func (bw *BatchWriter) GetMaxSize() int {
  114. return bw.maxSize
  115. }