indexer_stats.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package nginx_log
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "fmt"
  6. "time"
  7. "github.com/blevesearch/bleve/v2"
  8. "github.com/blevesearch/bleve/v2/search/query"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // createStatsQueryHash creates a hash for the given query to use as cache key
  12. func (li *LogIndexer) createStatsQueryHash(query query.Query) string {
  13. queryStr := fmt.Sprintf("%+v", query)
  14. hash := sha256.Sum256([]byte(queryStr))
  15. return fmt.Sprintf("stats_%x", hash[:16]) // Use first 16 bytes for shorter key
  16. }
  17. // getLatestFilesModTime returns the latest modification time of all registered log files
  18. func (li *LogIndexer) getLatestFilesModTime() time.Time {
  19. li.mu.RLock()
  20. defer li.mu.RUnlock()
  21. var latest time.Time
  22. for _, fileInfo := range li.logPaths {
  23. if fileInfo.LastModified.After(latest) {
  24. latest = fileInfo.LastModified
  25. }
  26. }
  27. return latest
  28. }
  29. // isCacheValid checks if the cached statistics are still valid
  30. func (li *LogIndexer) isCacheValid(cached *CachedStatsResult) bool {
  31. // Get current document count
  32. docCount, err := li.index.DocCount()
  33. if err != nil {
  34. logger.Warnf("Failed to get document count for cache validation: %v", err)
  35. return false
  36. }
  37. // Get latest file modification time
  38. latestModTime := li.getLatestFilesModTime()
  39. // Cache is valid if:
  40. // 1. Document count hasn't changed
  41. // 2. No files have been modified since cache was created
  42. // 3. Cache is not older than 5 minutes (safety fallback)
  43. isValid := cached.DocCount == docCount &&
  44. !latestModTime.After(cached.FilesModTime) &&
  45. time.Since(cached.LastCalculated) < 5*time.Minute
  46. return isValid
  47. }
  48. // calculateSummaryStatsFromQuery calculates summary statistics using optimized single query approach with caching
  49. func (li *LogIndexer) calculateSummaryStatsFromQuery(ctx context.Context, query query.Query) (*SummaryStats, error) {
  50. // Create cache key
  51. cacheKey := li.createStatsQueryHash(query)
  52. // Check cache first
  53. if cached, found := li.statsCache.Get(cacheKey); found {
  54. if li.isCacheValid(cached) {
  55. return cached.Stats, nil
  56. } else {
  57. // Remove invalid cache entry
  58. li.statsCache.Del(cacheKey)
  59. }
  60. }
  61. // Get total page views (PV) - just the count
  62. countReq := bleve.NewSearchRequest(query)
  63. countReq.Size = 0 // Don't fetch any documents, just get the count
  64. countResult, err := li.index.SearchInContext(ctx, countReq)
  65. if err != nil {
  66. return nil, fmt.Errorf("count search failed: %w", err)
  67. }
  68. pv := int(countResult.Total)
  69. if pv == 0 {
  70. return &SummaryStats{}, nil
  71. }
  72. // Calculate all statistics by processing all records (same as dashboard algorithm)
  73. // This ensures consistency between search and dashboard interfaces
  74. uniqueIPs := make(map[string]bool)
  75. uniquePages := make(map[string]bool)
  76. var totalTraffic int64
  77. // Process all records in batches to avoid memory issues
  78. batchSize := 10000
  79. from := 0
  80. for {
  81. statsReq := bleve.NewSearchRequest(query)
  82. statsReq.Size = batchSize
  83. statsReq.From = from
  84. statsReq.Fields = []string{"ip", "path", "bytes_sent"} // Get all needed fields in one query
  85. statsResult, err := li.index.SearchInContext(ctx, statsReq)
  86. if err != nil {
  87. return nil, fmt.Errorf("stats aggregation search failed: %w", err)
  88. }
  89. if len(statsResult.Hits) == 0 {
  90. break
  91. }
  92. // Calculate all statistics in this batch
  93. for _, hit := range statsResult.Hits {
  94. if fields := hit.Fields; fields != nil {
  95. // IP for UV calculation
  96. if ip := li.getStringField(fields, "ip"); ip != "" {
  97. uniqueIPs[ip] = true
  98. }
  99. // Path for unique pages calculation
  100. if path := li.getStringField(fields, "path"); path != "" {
  101. uniquePages[path] = true
  102. }
  103. // Bytes sent for traffic calculation
  104. if bytesSent := li.getFloatField(fields, "bytes_sent"); bytesSent > 0 {
  105. totalTraffic += int64(bytesSent)
  106. }
  107. }
  108. }
  109. from += len(statsResult.Hits)
  110. if uint64(from) >= statsResult.Total {
  111. break
  112. }
  113. }
  114. // Calculate average traffic per page view
  115. var avgTrafficPerPV float64
  116. if pv > 0 {
  117. avgTrafficPerPV = float64(totalTraffic) / float64(pv)
  118. }
  119. uv := len(uniqueIPs)
  120. uniquePagesCount := len(uniquePages)
  121. stats := &SummaryStats{
  122. UV: uv,
  123. PV: pv,
  124. TotalTraffic: totalTraffic,
  125. UniquePages: uniquePagesCount,
  126. AvgTrafficPerPV: avgTrafficPerPV,
  127. }
  128. // Cache the results
  129. docCount, err := li.index.DocCount()
  130. if err != nil {
  131. logger.Warnf("Failed to get document count for caching: %v", err)
  132. docCount = 0 // Continue without caching on error
  133. }
  134. cachedResult := &CachedStatsResult{
  135. Stats: stats,
  136. QueryHash: cacheKey,
  137. LastCalculated: time.Now(),
  138. FilesModTime: li.getLatestFilesModTime(),
  139. DocCount: docCount,
  140. }
  141. // Store in cache with estimated size (small structures, so use fixed size)
  142. li.statsCache.Set(cacheKey, cachedResult, 1024) // 1KB estimated size
  143. return stats, nil
  144. }
  145. // invalidateStatsCache clears the statistics cache when data changes
  146. func (li *LogIndexer) invalidateStatsCache() {
  147. // Clear all stats cache entries since we don't know which queries might be affected
  148. li.statsCache.Clear()
  149. }
  150. // GetStatsCacheStatus returns statistics about the stats cache for monitoring
  151. func (li *LogIndexer) GetStatsCacheStatus() map[string]interface{} {
  152. metrics := li.statsCache.Metrics
  153. return map[string]interface{}{
  154. "hits": metrics.Hits(),
  155. "misses": metrics.Misses(),
  156. "cost_added": metrics.CostAdded(),
  157. "cost_evicted": metrics.CostEvicted(),
  158. "sets_dropped": metrics.SetsDropped(),
  159. "sets_rejected": metrics.SetsRejected(),
  160. "gets_kept": metrics.GetsKept(),
  161. "gets_dropped": metrics.GetsDropped(),
  162. }
  163. }