1
0

metrics.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package indexer
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // DefaultMetricsCollector implements basic metrics collection for indexing operations
  8. type DefaultMetricsCollector struct {
  9. // Counters
  10. totalOperations int64
  11. successOperations int64
  12. failedOperations int64
  13. totalDocuments int64
  14. totalBatches int64
  15. // Timing
  16. totalDuration int64 // nanoseconds
  17. batchDuration int64 // nanoseconds
  18. optimizationCount int64
  19. optimizationDuration int64 // nanoseconds
  20. // Rate calculations
  21. lastUpdateTime int64 // unix timestamp
  22. lastDocumentCount int64
  23. currentRate int64 // docs per second (atomic)
  24. // Detailed metrics
  25. operationHistory []OperationMetric
  26. historyMutex sync.RWMutex
  27. maxHistorySize int
  28. // Performance tracking
  29. minLatency int64 // nanoseconds
  30. maxLatency int64 // nanoseconds
  31. avgLatency int64 // nanoseconds
  32. }
  33. // OperationMetric represents a single operation's metrics
  34. type OperationMetric struct {
  35. Timestamp time.Time `json:"timestamp"`
  36. Documents int `json:"documents"`
  37. Duration time.Duration `json:"duration"`
  38. Success bool `json:"success"`
  39. Type string `json:"type"` // "index", "batch", "optimize"
  40. }
  41. // NewDefaultMetricsCollector creates a new metrics collector
  42. func NewDefaultMetricsCollector() *DefaultMetricsCollector {
  43. now := time.Now().Unix()
  44. return &DefaultMetricsCollector{
  45. lastUpdateTime: now,
  46. maxHistorySize: 1000, // Keep last 1000 operations
  47. minLatency: int64(time.Hour), // Start with high value
  48. operationHistory: make([]OperationMetric, 0, 1000),
  49. }
  50. }
  51. // RecordIndexOperation records metrics for an indexing operation
  52. func (m *DefaultMetricsCollector) RecordIndexOperation(docs int, duration time.Duration, success bool) {
  53. atomic.AddInt64(&m.totalOperations, 1)
  54. atomic.AddInt64(&m.totalDocuments, int64(docs))
  55. atomic.AddInt64(&m.totalDuration, int64(duration))
  56. if success {
  57. atomic.AddInt64(&m.successOperations, 1)
  58. } else {
  59. atomic.AddInt64(&m.failedOperations, 1)
  60. }
  61. // Update latency tracking
  62. durationNs := int64(duration)
  63. // Update min latency
  64. for {
  65. current := atomic.LoadInt64(&m.minLatency)
  66. if durationNs >= current || atomic.CompareAndSwapInt64(&m.minLatency, current, durationNs) {
  67. break
  68. }
  69. }
  70. // Update max latency
  71. for {
  72. current := atomic.LoadInt64(&m.maxLatency)
  73. if durationNs <= current || atomic.CompareAndSwapInt64(&m.maxLatency, current, durationNs) {
  74. break
  75. }
  76. }
  77. // Update average latency (simple running average)
  78. totalOps := atomic.LoadInt64(&m.totalOperations)
  79. if totalOps > 0 {
  80. currentAvg := atomic.LoadInt64(&m.avgLatency)
  81. newAvg := (currentAvg*(totalOps-1) + durationNs) / totalOps
  82. atomic.StoreInt64(&m.avgLatency, newAvg)
  83. }
  84. // Update rate calculation
  85. m.updateRate(docs)
  86. // Record in history
  87. m.addToHistory(OperationMetric{
  88. Timestamp: time.Now(),
  89. Documents: docs,
  90. Duration: duration,
  91. Success: success,
  92. Type: "index",
  93. })
  94. }
  95. // RecordBatchOperation records metrics for batch operations
  96. func (m *DefaultMetricsCollector) RecordBatchOperation(batchSize int, duration time.Duration) {
  97. atomic.AddInt64(&m.totalBatches, 1)
  98. atomic.AddInt64(&m.batchDuration, int64(duration))
  99. m.addToHistory(OperationMetric{
  100. Timestamp: time.Now(),
  101. Documents: batchSize,
  102. Duration: duration,
  103. Success: true, // Batch operations are typically successful if they complete
  104. Type: "batch",
  105. })
  106. }
  107. // RecordOptimization records metrics for optimization operations
  108. func (m *DefaultMetricsCollector) RecordOptimization(duration time.Duration, success bool) {
  109. atomic.AddInt64(&m.optimizationCount, 1)
  110. atomic.AddInt64(&m.optimizationDuration, int64(duration))
  111. m.addToHistory(OperationMetric{
  112. Timestamp: time.Now(),
  113. Documents: 0, // Optimization doesn't process new documents
  114. Duration: duration,
  115. Success: success,
  116. Type: "optimize",
  117. })
  118. }
  119. // GetMetrics returns current metrics as a structured type
  120. func (m *DefaultMetricsCollector) GetMetrics() *Metrics {
  121. totalOps := atomic.LoadInt64(&m.totalOperations)
  122. successOps := atomic.LoadInt64(&m.successOperations)
  123. failedOps := atomic.LoadInt64(&m.failedOperations)
  124. totalDocs := atomic.LoadInt64(&m.totalDocuments)
  125. totalDuration := atomic.LoadInt64(&m.totalDuration)
  126. totalBatches := atomic.LoadInt64(&m.totalBatches)
  127. batchDuration := atomic.LoadInt64(&m.batchDuration)
  128. optimizationCount := atomic.LoadInt64(&m.optimizationCount)
  129. optimizationDuration := atomic.LoadInt64(&m.optimizationDuration)
  130. currentRate := atomic.LoadInt64(&m.currentRate)
  131. minLatency := atomic.LoadInt64(&m.minLatency)
  132. maxLatency := atomic.LoadInt64(&m.maxLatency)
  133. avgLatency := atomic.LoadInt64(&m.avgLatency)
  134. metrics := &Metrics{
  135. TotalOperations: totalOps,
  136. SuccessOperations: successOps,
  137. FailedOperations: failedOps,
  138. TotalDocuments: totalDocs,
  139. TotalBatches: totalBatches,
  140. OptimizationCount: optimizationCount,
  141. IndexingRate: float64(currentRate), // docs per second
  142. AverageLatencyMS: float64(avgLatency) / float64(time.Millisecond),
  143. MinLatencyMS: float64(minLatency) / float64(time.Millisecond),
  144. MaxLatencyMS: float64(maxLatency) / float64(time.Millisecond),
  145. }
  146. // Calculate derived metrics
  147. if totalOps > 0 {
  148. metrics.SuccessRate = float64(successOps) / float64(totalOps)
  149. if totalDuration > 0 {
  150. totalDurationS := float64(totalDuration) / float64(time.Second)
  151. metrics.AverageThroughput = float64(totalDocs) / totalDurationS
  152. }
  153. }
  154. if totalBatches > 0 && batchDuration > 0 {
  155. metrics.AverageBatchTimeMS = float64(batchDuration) / float64(totalBatches) / float64(time.Millisecond)
  156. }
  157. if optimizationCount > 0 && optimizationDuration > 0 {
  158. metrics.AverageOptTimeS = float64(optimizationDuration) / float64(optimizationCount) / float64(time.Second)
  159. }
  160. // Reset min latency if it's still at the initial high value
  161. if minLatency == int64(time.Hour) {
  162. metrics.MinLatencyMS = 0.0
  163. }
  164. return metrics
  165. }
  166. // Reset resets all metrics
  167. func (m *DefaultMetricsCollector) Reset() {
  168. atomic.StoreInt64(&m.totalOperations, 0)
  169. atomic.StoreInt64(&m.successOperations, 0)
  170. atomic.StoreInt64(&m.failedOperations, 0)
  171. atomic.StoreInt64(&m.totalDocuments, 0)
  172. atomic.StoreInt64(&m.totalBatches, 0)
  173. atomic.StoreInt64(&m.totalDuration, 0)
  174. atomic.StoreInt64(&m.batchDuration, 0)
  175. atomic.StoreInt64(&m.optimizationCount, 0)
  176. atomic.StoreInt64(&m.optimizationDuration, 0)
  177. atomic.StoreInt64(&m.currentRate, 0)
  178. atomic.StoreInt64(&m.lastUpdateTime, time.Now().Unix())
  179. atomic.StoreInt64(&m.lastDocumentCount, 0)
  180. atomic.StoreInt64(&m.minLatency, int64(time.Hour))
  181. atomic.StoreInt64(&m.maxLatency, 0)
  182. atomic.StoreInt64(&m.avgLatency, 0)
  183. m.historyMutex.Lock()
  184. m.operationHistory = m.operationHistory[:0]
  185. m.historyMutex.Unlock()
  186. }
  187. // updateRate calculates the current indexing rate
  188. func (m *DefaultMetricsCollector) updateRate(newDocs int) {
  189. now := time.Now().Unix()
  190. lastUpdate := atomic.LoadInt64(&m.lastUpdateTime)
  191. // Update rate every second
  192. if now > lastUpdate {
  193. currentDocs := atomic.LoadInt64(&m.totalDocuments)
  194. lastDocs := atomic.LoadInt64(&m.lastDocumentCount)
  195. if now > lastUpdate {
  196. timeDiff := now - lastUpdate
  197. docDiff := currentDocs - lastDocs
  198. if timeDiff > 0 {
  199. rate := docDiff / timeDiff
  200. atomic.StoreInt64(&m.currentRate, rate)
  201. atomic.StoreInt64(&m.lastUpdateTime, now)
  202. atomic.StoreInt64(&m.lastDocumentCount, currentDocs)
  203. }
  204. }
  205. }
  206. }
  207. // addToHistory adds an operation to the history buffer
  208. func (m *DefaultMetricsCollector) addToHistory(metric OperationMetric) {
  209. m.historyMutex.Lock()
  210. defer m.historyMutex.Unlock()
  211. // Add new metric
  212. m.operationHistory = append(m.operationHistory, metric)
  213. // Trim history if it exceeds max size
  214. if len(m.operationHistory) > m.maxHistorySize {
  215. // Keep the most recent metrics
  216. copy(m.operationHistory, m.operationHistory[len(m.operationHistory)-m.maxHistorySize:])
  217. m.operationHistory = m.operationHistory[:m.maxHistorySize]
  218. }
  219. }
  220. // GetOperationHistory returns the operation history
  221. func (m *DefaultMetricsCollector) GetOperationHistory(limit int) []OperationMetric {
  222. m.historyMutex.RLock()
  223. defer m.historyMutex.RUnlock()
  224. if limit <= 0 || limit > len(m.operationHistory) {
  225. limit = len(m.operationHistory)
  226. }
  227. // Return the most recent operations
  228. start := len(m.operationHistory) - limit
  229. if start < 0 {
  230. start = 0
  231. }
  232. result := make([]OperationMetric, limit)
  233. copy(result, m.operationHistory[start:])
  234. return result
  235. }
  236. // GetRateHistory returns indexing rate over time
  237. func (m *DefaultMetricsCollector) GetRateHistory(duration time.Duration) []RatePoint {
  238. m.historyMutex.RLock()
  239. defer m.historyMutex.RUnlock()
  240. cutoff := time.Now().Add(-duration)
  241. var points []RatePoint
  242. // Group operations by time windows (e.g., per minute)
  243. window := time.Minute
  244. var currentWindow time.Time
  245. var currentDocs int
  246. for _, op := range m.operationHistory {
  247. if op.Timestamp.Before(cutoff) {
  248. continue
  249. }
  250. windowStart := op.Timestamp.Truncate(window)
  251. if currentWindow.IsZero() || windowStart.After(currentWindow) {
  252. if !currentWindow.IsZero() {
  253. points = append(points, RatePoint{
  254. Timestamp: currentWindow,
  255. Rate: float64(currentDocs) / window.Seconds(),
  256. Documents: currentDocs,
  257. })
  258. }
  259. currentWindow = windowStart
  260. currentDocs = 0
  261. }
  262. if op.Type == "index" {
  263. currentDocs += op.Documents
  264. }
  265. }
  266. // Add the last window
  267. if !currentWindow.IsZero() {
  268. points = append(points, RatePoint{
  269. Timestamp: currentWindow,
  270. Rate: float64(currentDocs) / window.Seconds(),
  271. Documents: currentDocs,
  272. })
  273. }
  274. return points
  275. }
  276. // RatePoint represents a point in time for rate calculation
  277. type RatePoint struct {
  278. Timestamp time.Time `json:"timestamp"`
  279. Rate float64 `json:"rate"` // Documents per second
  280. Documents int `json:"documents"` // Total documents in this time window
  281. }
  282. // GetCurrentRate returns the current indexing rate
  283. func (m *DefaultMetricsCollector) GetCurrentRate() float64 {
  284. return float64(atomic.LoadInt64(&m.currentRate))
  285. }
  286. // SetMaxHistorySize sets the maximum number of operations to keep in history
  287. func (m *DefaultMetricsCollector) SetMaxHistorySize(size int) {
  288. if size <= 0 {
  289. return
  290. }
  291. m.historyMutex.Lock()
  292. defer m.historyMutex.Unlock()
  293. m.maxHistorySize = size
  294. // Trim existing history if needed
  295. if len(m.operationHistory) > size {
  296. start := len(m.operationHistory) - size
  297. copy(m.operationHistory, m.operationHistory[start:])
  298. m.operationHistory = m.operationHistory[:size]
  299. }
  300. }