123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package nginx_log
- import (
- "context"
- "crypto/sha256"
- "fmt"
- "time"
- "github.com/blevesearch/bleve/v2"
- "github.com/blevesearch/bleve/v2/search/query"
- "github.com/uozi-tech/cosy/logger"
- )
- // createStatsQueryHash creates a hash for the given query to use as cache key
- func (li *LogIndexer) createStatsQueryHash(query query.Query) string {
- queryStr := fmt.Sprintf("%+v", query)
- hash := sha256.Sum256([]byte(queryStr))
- return fmt.Sprintf("stats_%x", hash[:16]) // Use first 16 bytes for shorter key
- }
- // getLatestFilesModTime returns the latest modification time of all registered log files
- func (li *LogIndexer) getLatestFilesModTime() time.Time {
- li.mu.RLock()
- defer li.mu.RUnlock()
- var latest time.Time
- for _, fileInfo := range li.logPaths {
- if fileInfo.LastModified.After(latest) {
- latest = fileInfo.LastModified
- }
- }
- return latest
- }
- // isCacheValid checks if the cached statistics are still valid
- func (li *LogIndexer) isCacheValid(cached *CachedStatsResult) bool {
- // Get current document count
- docCount, err := li.index.DocCount()
- if err != nil {
- logger.Warnf("Failed to get document count for cache validation: %v", err)
- return false
- }
- // Get latest file modification time
- latestModTime := li.getLatestFilesModTime()
- // Cache is valid if:
- // 1. Document count hasn't changed
- // 2. No files have been modified since cache was created
- // 3. Cache is not older than 5 minutes (safety fallback)
- isValid := cached.DocCount == docCount &&
- !latestModTime.After(cached.FilesModTime) &&
- time.Since(cached.LastCalculated) < 5*time.Minute
- return isValid
- }
- // calculateSummaryStatsFromQuery calculates summary statistics using optimized single query approach with caching
- func (li *LogIndexer) calculateSummaryStatsFromQuery(ctx context.Context, query query.Query) (*SummaryStats, error) {
- // Create cache key
- cacheKey := li.createStatsQueryHash(query)
- // Check cache first
- if cached, found := li.statsCache.Get(cacheKey); found {
- if li.isCacheValid(cached) {
- return cached.Stats, nil
- } else {
- // Remove invalid cache entry
- li.statsCache.Del(cacheKey)
- }
- }
- // Get total page views (PV) - just the count
- countReq := bleve.NewSearchRequest(query)
- countReq.Size = 0 // Don't fetch any documents, just get the count
- countResult, err := li.index.SearchInContext(ctx, countReq)
- if err != nil {
- return nil, fmt.Errorf("count search failed: %w", err)
- }
- pv := int(countResult.Total)
- if pv == 0 {
- return &SummaryStats{}, nil
- }
- // Calculate all statistics by processing all records (same as dashboard algorithm)
- // This ensures consistency between search and dashboard interfaces
- uniqueIPs := make(map[string]bool)
- uniquePages := make(map[string]bool)
- var totalTraffic int64
- // Process all records in batches to avoid memory issues
- batchSize := 10000
- from := 0
-
- for {
- statsReq := bleve.NewSearchRequest(query)
- statsReq.Size = batchSize
- statsReq.From = from
- statsReq.Fields = []string{"ip", "path", "bytes_sent"} // Get all needed fields in one query
- statsResult, err := li.index.SearchInContext(ctx, statsReq)
- if err != nil {
- return nil, fmt.Errorf("stats aggregation search failed: %w", err)
- }
- if len(statsResult.Hits) == 0 {
- break
- }
- // Calculate all statistics in this batch
- for _, hit := range statsResult.Hits {
- if fields := hit.Fields; fields != nil {
- // IP for UV calculation
- if ip := li.getStringField(fields, "ip"); ip != "" {
- uniqueIPs[ip] = true
- }
- // Path for unique pages calculation
- if path := li.getStringField(fields, "path"); path != "" {
- uniquePages[path] = true
- }
- // Bytes sent for traffic calculation
- if bytesSent := li.getFloatField(fields, "bytes_sent"); bytesSent > 0 {
- totalTraffic += int64(bytesSent)
- }
- }
- }
- from += len(statsResult.Hits)
- if uint64(from) >= statsResult.Total {
- break
- }
- }
- // Calculate average traffic per page view
- var avgTrafficPerPV float64
- if pv > 0 {
- avgTrafficPerPV = float64(totalTraffic) / float64(pv)
- }
- uv := len(uniqueIPs)
- uniquePagesCount := len(uniquePages)
- stats := &SummaryStats{
- UV: uv,
- PV: pv,
- TotalTraffic: totalTraffic,
- UniquePages: uniquePagesCount,
- AvgTrafficPerPV: avgTrafficPerPV,
- }
- // Cache the results
- docCount, err := li.index.DocCount()
- if err != nil {
- logger.Warnf("Failed to get document count for caching: %v", err)
- docCount = 0 // Continue without caching on error
- }
- cachedResult := &CachedStatsResult{
- Stats: stats,
- QueryHash: cacheKey,
- LastCalculated: time.Now(),
- FilesModTime: li.getLatestFilesModTime(),
- DocCount: docCount,
- }
- // Store in cache with estimated size (small structures, so use fixed size)
- li.statsCache.Set(cacheKey, cachedResult, 1024) // 1KB estimated size
- return stats, nil
- }
- // invalidateStatsCache clears the statistics cache when data changes
- func (li *LogIndexer) invalidateStatsCache() {
- // Clear all stats cache entries since we don't know which queries might be affected
- li.statsCache.Clear()
- }
- // GetStatsCacheStatus returns statistics about the stats cache for monitoring
- func (li *LogIndexer) GetStatsCacheStatus() map[string]interface{} {
- metrics := li.statsCache.Metrics
- return map[string]interface{}{
- "hits": metrics.Hits(),
- "misses": metrics.Misses(),
- "cost_added": metrics.CostAdded(),
- "cost_evicted": metrics.CostEvicted(),
- "sets_dropped": metrics.SetsDropped(),
- "sets_rejected": metrics.SetsRejected(),
- "gets_kept": metrics.GetsKept(),
- "gets_dropped": metrics.GetsDropped(),
- }
- }
|