123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- package nginx_log
- import (
- "context"
- "fmt"
- "time"
- "github.com/blevesearch/bleve/v2"
- "github.com/blevesearch/bleve/v2/search/query"
- "github.com/uozi-tech/cosy/logger"
- )
- // calculateHourlyStatsFromBleve calculates 24-hour UV/PV statistics using Bleve aggregations
- // Shows stats for the End Date (target day) only
- func (s *BleveStatsService) calculateHourlyStatsFromBleve(ctx context.Context, baseQuery query.Query, startTime, endTime time.Time) ([]HourlyAccessStats, error) {
- logger.Info("BleveStatsService: Starting hourly stats calculation")
- hourStats := make(map[int]map[string]bool) // hour -> unique IPs
- hourPV := make(map[int]int) // hour -> page views
- // Initialize all 24 hours
- for i := 0; i < 24; i++ {
- hourStats[i] = make(map[string]bool)
- hourPV[i] = 0
- }
- // Query all entries for the time range
- searchReq := bleve.NewSearchRequest(baseQuery)
- searchReq.Size = 10000 // Process in batches
- searchReq.Fields = []string{"timestamp", "ip"}
- searchReq.SortBy([]string{"timestamp"})
- from := 0
- totalProcessed := 0
- for {
- searchReq.From = from
- logger.Debugf("BleveStatsService: Executing hourly stats search, from=%d", from)
- searchResult, err := s.indexer.index.Search(searchReq)
- if err != nil {
- logger.Errorf("BleveStatsService: Failed to search logs: %v", err)
- return nil, fmt.Errorf("failed to search logs: %w", err)
- }
- logger.Debugf("BleveStatsService: Search returned %d hits, total=%d", len(searchResult.Hits), searchResult.Total)
- if len(searchResult.Hits) == 0 {
- break
- }
- // Process hits
- for _, hit := range searchResult.Hits {
- timestamp, ip := s.extractTimestampAndIP(hit)
- if timestamp != nil && ip != "" {
- // For hourly stats, only process entries from the target date (endTime)
- if !endTime.IsZero() {
- targetDate := endTime.Truncate(24 * time.Hour)
- entryDate := timestamp.Truncate(24 * time.Hour)
- if !entryDate.Equal(targetDate) {
- continue // Skip entries not from the target date
- }
- }
- hour := timestamp.Hour()
- hourStats[hour][ip] = true
- hourPV[hour]++
- totalProcessed++
- } else {
- logger.Debugf("BleveStatsService: Skipped hit with missing timestamp or IP - timestamp=%v, ip='%s'", timestamp, ip)
- }
- }
- from += len(searchResult.Hits)
- if uint64(from) >= searchResult.Total {
- break
- }
- }
- logger.Infof("BleveStatsService: Processed %d entries for hourly stats", totalProcessed)
- // Convert to result format
- result := make([]HourlyAccessStats, 0, 24)
- // Use endTime (target date) for hour timestamps, or current date if not specified
- var targetDate time.Time
- if !endTime.IsZero() {
- targetDate = endTime.Truncate(24 * time.Hour)
- } else {
- now := time.Now()
- targetDate = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- }
- for hour := 0; hour < 24; hour++ {
- hourTime := targetDate.Add(time.Duration(hour) * time.Hour)
- result = append(result, HourlyAccessStats{
- Hour: hour,
- UV: len(hourStats[hour]),
- PV: hourPV[hour],
- Timestamp: hourTime.Unix(),
- })
- }
- return result, nil
- }
- // calculateDailyStatsFromBleve calculates daily UV/PV statistics using Bleve
- func (s *BleveStatsService) calculateDailyStatsFromBleve(ctx context.Context, baseQuery query.Query, startTime, endTime time.Time) ([]DailyAccessStats, error) {
- dailyStats := make(map[string]map[string]bool) // date -> unique IPs
- dailyPV := make(map[string]int) // date -> page views
- // Query all entries for the time range
- searchReq := bleve.NewSearchRequest(baseQuery)
- searchReq.Size = 10000 // Process in batches
- searchReq.Fields = []string{"timestamp", "ip"}
- searchReq.SortBy([]string{"timestamp"})
- from := 0
- for {
- searchReq.From = from
- searchResult, err := s.indexer.index.Search(searchReq)
- if err != nil {
- return nil, fmt.Errorf("failed to search logs: %w", err)
- }
- if len(searchResult.Hits) == 0 {
- break
- }
- // Process hits
- for _, hit := range searchResult.Hits {
- timestamp, ip := s.extractTimestampAndIP(hit)
- if timestamp != nil && ip != "" {
- date := timestamp.Format("2006-01-02")
- if dailyStats[date] == nil {
- dailyStats[date] = make(map[string]bool)
- }
- dailyStats[date][ip] = true
- dailyPV[date]++
- }
- }
- from += len(searchResult.Hits)
- if uint64(from) >= searchResult.Total {
- break
- }
- }
- // Generate complete date range with padding
- result := make([]DailyAccessStats, 0)
- // Use default time range if not provided
- if startTime.IsZero() || endTime.IsZero() {
- endTime = time.Now()
- startTime = endTime.AddDate(0, 0, -30) // 30 days ago
- }
- currentDate := startTime.Truncate(24 * time.Hour)
- for currentDate.Before(endTime) || currentDate.Equal(endTime.Truncate(24*time.Hour)) {
- dateKey := currentDate.Format("2006-01-02")
- if ips, exists := dailyStats[dateKey]; exists {
- result = append(result, DailyAccessStats{
- Date: dateKey,
- UV: len(ips),
- PV: dailyPV[dateKey],
- Timestamp: currentDate.Unix(),
- })
- } else {
- // Pad with zeros for dates without data
- result = append(result, DailyAccessStats{
- Date: dateKey,
- UV: 0,
- PV: 0,
- Timestamp: currentDate.Unix(),
- })
- }
- currentDate = currentDate.AddDate(0, 0, 1)
- }
- return result, nil
- }
|