bleve_stats_service_time.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/blevesearch/bleve/v2"
  7. "github.com/blevesearch/bleve/v2/search/query"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // calculateHourlyStatsFromBleve calculates 24-hour UV/PV statistics using Bleve aggregations
  11. // Shows stats for the End Date (target day) only
  12. func (s *BleveStatsService) calculateHourlyStatsFromBleve(ctx context.Context, baseQuery query.Query, startTime, endTime time.Time) ([]HourlyAccessStats, error) {
  13. logger.Info("BleveStatsService: Starting hourly stats calculation")
  14. hourStats := make(map[int]map[string]bool) // hour -> unique IPs
  15. hourPV := make(map[int]int) // hour -> page views
  16. // Initialize all 24 hours
  17. for i := 0; i < 24; i++ {
  18. hourStats[i] = make(map[string]bool)
  19. hourPV[i] = 0
  20. }
  21. // Query all entries for the time range
  22. searchReq := bleve.NewSearchRequest(baseQuery)
  23. searchReq.Size = 10000 // Process in batches
  24. searchReq.Fields = []string{"timestamp", "ip"}
  25. searchReq.SortBy([]string{"timestamp"})
  26. from := 0
  27. totalProcessed := 0
  28. for {
  29. searchReq.From = from
  30. logger.Debugf("BleveStatsService: Executing hourly stats search, from=%d", from)
  31. searchResult, err := s.indexer.index.Search(searchReq)
  32. if err != nil {
  33. logger.Errorf("BleveStatsService: Failed to search logs: %v", err)
  34. return nil, fmt.Errorf("failed to search logs: %w", err)
  35. }
  36. logger.Debugf("BleveStatsService: Search returned %d hits, total=%d", len(searchResult.Hits), searchResult.Total)
  37. if len(searchResult.Hits) == 0 {
  38. break
  39. }
  40. // Process hits
  41. for _, hit := range searchResult.Hits {
  42. timestamp, ip := s.extractTimestampAndIP(hit)
  43. if timestamp != nil && ip != "" {
  44. // For hourly stats, only process entries from the target date (endTime)
  45. if !endTime.IsZero() {
  46. targetDate := endTime.Truncate(24 * time.Hour)
  47. entryDate := timestamp.Truncate(24 * time.Hour)
  48. if !entryDate.Equal(targetDate) {
  49. continue // Skip entries not from the target date
  50. }
  51. }
  52. hour := timestamp.Hour()
  53. hourStats[hour][ip] = true
  54. hourPV[hour]++
  55. totalProcessed++
  56. } else {
  57. logger.Debugf("BleveStatsService: Skipped hit with missing timestamp or IP - timestamp=%v, ip='%s'", timestamp, ip)
  58. }
  59. }
  60. from += len(searchResult.Hits)
  61. if uint64(from) >= searchResult.Total {
  62. break
  63. }
  64. }
  65. logger.Infof("BleveStatsService: Processed %d entries for hourly stats", totalProcessed)
  66. // Convert to result format
  67. result := make([]HourlyAccessStats, 0, 24)
  68. // Use endTime (target date) for hour timestamps, or current date if not specified
  69. var targetDate time.Time
  70. if !endTime.IsZero() {
  71. targetDate = endTime.Truncate(24 * time.Hour)
  72. } else {
  73. now := time.Now()
  74. targetDate = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  75. }
  76. for hour := 0; hour < 24; hour++ {
  77. hourTime := targetDate.Add(time.Duration(hour) * time.Hour)
  78. result = append(result, HourlyAccessStats{
  79. Hour: hour,
  80. UV: len(hourStats[hour]),
  81. PV: hourPV[hour],
  82. Timestamp: hourTime.Unix(),
  83. })
  84. }
  85. return result, nil
  86. }
  87. // calculateDailyStatsFromBleve calculates daily UV/PV statistics using Bleve
  88. func (s *BleveStatsService) calculateDailyStatsFromBleve(ctx context.Context, baseQuery query.Query, startTime, endTime time.Time) ([]DailyAccessStats, error) {
  89. dailyStats := make(map[string]map[string]bool) // date -> unique IPs
  90. dailyPV := make(map[string]int) // date -> page views
  91. // Query all entries for the time range
  92. searchReq := bleve.NewSearchRequest(baseQuery)
  93. searchReq.Size = 10000 // Process in batches
  94. searchReq.Fields = []string{"timestamp", "ip"}
  95. searchReq.SortBy([]string{"timestamp"})
  96. from := 0
  97. for {
  98. searchReq.From = from
  99. searchResult, err := s.indexer.index.Search(searchReq)
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to search logs: %w", err)
  102. }
  103. if len(searchResult.Hits) == 0 {
  104. break
  105. }
  106. // Process hits
  107. for _, hit := range searchResult.Hits {
  108. timestamp, ip := s.extractTimestampAndIP(hit)
  109. if timestamp != nil && ip != "" {
  110. date := timestamp.Format("2006-01-02")
  111. if dailyStats[date] == nil {
  112. dailyStats[date] = make(map[string]bool)
  113. }
  114. dailyStats[date][ip] = true
  115. dailyPV[date]++
  116. }
  117. }
  118. from += len(searchResult.Hits)
  119. if uint64(from) >= searchResult.Total {
  120. break
  121. }
  122. }
  123. // Generate complete date range with padding
  124. result := make([]DailyAccessStats, 0)
  125. // Use default time range if not provided
  126. if startTime.IsZero() || endTime.IsZero() {
  127. endTime = time.Now()
  128. startTime = endTime.AddDate(0, 0, -30) // 30 days ago
  129. }
  130. currentDate := startTime.Truncate(24 * time.Hour)
  131. for currentDate.Before(endTime) || currentDate.Equal(endTime.Truncate(24*time.Hour)) {
  132. dateKey := currentDate.Format("2006-01-02")
  133. if ips, exists := dailyStats[dateKey]; exists {
  134. result = append(result, DailyAccessStats{
  135. Date: dateKey,
  136. UV: len(ips),
  137. PV: dailyPV[dateKey],
  138. Timestamp: currentDate.Unix(),
  139. })
  140. } else {
  141. // Pad with zeros for dates without data
  142. result = append(result, DailyAccessStats{
  143. Date: dateKey,
  144. UV: 0,
  145. PV: 0,
  146. Timestamp: currentDate.Unix(),
  147. })
  148. }
  149. currentDate = currentDate.AddDate(0, 0, 1)
  150. }
  151. return result, nil
  152. }