bleve_stats_service_core.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/blevesearch/bleve/v2"
  6. "github.com/blevesearch/bleve/v2/search/query"
  7. "github.com/uozi-tech/cosy/logger"
  8. )
  9. // BleveStatsService provides log statistics using Bleve aggregations only
  10. type BleveStatsService struct {
  11. indexer *LogIndexer
  12. }
  13. // NewBleveStatsService creates a new Bleve-based statistics service
  14. func NewBleveStatsService() *BleveStatsService {
  15. return &BleveStatsService{}
  16. }
  17. // SetIndexer sets the log indexer for the service
  18. func (s *BleveStatsService) SetIndexer(indexer *LogIndexer) {
  19. s.indexer = indexer
  20. }
  21. // GetDashboardAnalytics generates comprehensive dashboard analytics using Bleve aggregations
  22. func (s *BleveStatsService) GetDashboardAnalytics(ctx context.Context, req *DashboardQueryRequest) (*DashboardAnalytics, error) {
  23. if s.indexer == nil {
  24. logger.Error("BleveStatsService: log indexer not available")
  25. return nil, fmt.Errorf("log indexer not available")
  26. }
  27. if s.indexer.index == nil {
  28. logger.Error("BleveStatsService: Bleve index is nil")
  29. return nil, fmt.Errorf("Bleve index not available")
  30. }
  31. logger.Infof("BleveStatsService: Starting dashboard analytics for log_path='%s', start=%v, end=%v",
  32. req.LogPath, req.StartTime, req.EndTime)
  33. // Build time range query
  34. timeQuery := s.buildTimeRangeQuery(req.StartTime, req.EndTime)
  35. // Add log path filter if specified
  36. var searchQuery query.Query = timeQuery
  37. if req.LogPath != "" {
  38. // Use proper field-specific MatchQuery with keyword analyzer (Bleve-layer filtering)
  39. boolQuery := bleve.NewBooleanQuery()
  40. // Add time range query (if it's not just MatchAllQuery)
  41. if timeQuery != nil {
  42. boolQuery.AddMust(timeQuery)
  43. }
  44. // Use MatchQuery with field specification for exact file_path matching
  45. filePathMatchQuery := bleve.NewMatchQuery(req.LogPath)
  46. filePathMatchQuery.SetField("file_path") // Now this should work with TextFieldMapping + keyword analyzer
  47. boolQuery.AddMust(filePathMatchQuery)
  48. searchQuery = boolQuery
  49. }
  50. // Initialize result with empty arrays to ensure JSON structure
  51. analytics := &DashboardAnalytics{
  52. HourlyStats: make([]HourlyAccessStats, 0),
  53. DailyStats: make([]DailyAccessStats, 0),
  54. TopURLs: make([]URLAccessStats, 0),
  55. Browsers: make([]BrowserAccessStats, 0),
  56. OperatingSystems: make([]OSAccessStats, 0),
  57. Devices: make([]DeviceAccessStats, 0),
  58. }
  59. // Execute various aggregation queries in parallel
  60. hourlyStats, err := s.calculateHourlyStatsFromBleve(ctx, searchQuery, req.StartTime, req.EndTime)
  61. if err != nil {
  62. logger.Warnf("Failed to calculate hourly stats: %v", err)
  63. } else {
  64. analytics.HourlyStats = hourlyStats
  65. }
  66. dailyStats, err := s.calculateDailyStatsFromBleve(ctx, searchQuery, req.StartTime, req.EndTime)
  67. if err != nil {
  68. logger.Warnf("Failed to calculate daily stats: %v", err)
  69. } else {
  70. analytics.DailyStats = dailyStats
  71. }
  72. topURLs, err := s.calculateTopURLsFromBleve(ctx, searchQuery)
  73. if err != nil {
  74. logger.Warnf("Failed to calculate top URLs: %v", err)
  75. } else {
  76. analytics.TopURLs = topURLs
  77. }
  78. browsers, err := s.calculateBrowserStatsFromBleve(ctx, searchQuery)
  79. if err != nil {
  80. logger.Warnf("Failed to calculate browser stats: %v", err)
  81. } else {
  82. analytics.Browsers = browsers
  83. }
  84. osStats, err := s.calculateOSStatsFromBleve(ctx, searchQuery)
  85. if err != nil {
  86. logger.Warnf("Failed to calculate OS stats: %v", err)
  87. } else {
  88. analytics.OperatingSystems = osStats
  89. }
  90. deviceStats, err := s.calculateDeviceStatsFromBleve(ctx, searchQuery)
  91. if err != nil {
  92. logger.Warnf("Failed to calculate device stats: %v", err)
  93. } else {
  94. analytics.Devices = deviceStats
  95. }
  96. // Calculate summary statistics using the same algorithm as search interface
  97. summaryStats, err := s.indexer.calculateSummaryStatsFromQuery(ctx, searchQuery)
  98. if err != nil {
  99. logger.Warnf("Failed to calculate summary stats: %v", err)
  100. // Create empty summary on error
  101. analytics.Summary = DashboardSummary{}
  102. } else {
  103. // Convert SummaryStats to DashboardSummary format
  104. analytics.Summary = DashboardSummary{
  105. TotalUV: summaryStats.UV,
  106. TotalPV: summaryStats.PV,
  107. AvgDailyUV: s.calculateAvgDailyUVFromStats(analytics),
  108. AvgDailyPV: s.calculateAvgDailyPVFromStats(analytics),
  109. PeakHour: s.findPeakHourFromStats(analytics),
  110. PeakHourTraffic: s.findPeakHourTrafficFromStats(analytics),
  111. }
  112. }
  113. return analytics, nil
  114. }
  115. // buildTimeRangeQuery builds a time range query for Bleve
  116. func (s *BleveStatsService) buildTimeRangeQuery(startTime, endTime int64) query.Query {
  117. // If both times are zero or the range is too wide, return match all query
  118. if startTime == 0 && endTime == 0 {
  119. return bleve.NewMatchAllQuery()
  120. }
  121. // Check if the time range is reasonable (same as search interface)
  122. if startTime != 0 && endTime != 0 {
  123. if endTime-startTime >= 400*24*3600 { // More than ~400 days in seconds
  124. return bleve.NewMatchAllQuery()
  125. }
  126. }
  127. // Build proper time range query
  128. var timeQuery query.Query
  129. if startTime != 0 && endTime != 0 {
  130. // Add 1 second to endTime to ensure boundary values are included
  131. inclusiveEndTime := endTime + 1
  132. startFloat := float64(startTime)
  133. endFloat := float64(inclusiveEndTime)
  134. timeQuery = bleve.NewNumericRangeQuery(&startFloat, &endFloat)
  135. timeQuery.(*query.NumericRangeQuery).SetField("timestamp")
  136. } else if startTime != 0 {
  137. startFloat := float64(startTime)
  138. timeQuery = bleve.NewNumericRangeQuery(&startFloat, nil)
  139. timeQuery.(*query.NumericRangeQuery).SetField("timestamp")
  140. } else if endTime != 0 {
  141. // Add 1 second to endTime to ensure boundary values are included
  142. inclusiveEndTime := endTime + 1
  143. endFloat := float64(inclusiveEndTime)
  144. timeQuery = bleve.NewNumericRangeQuery(nil, &endFloat)
  145. timeQuery.(*query.NumericRangeQuery).SetField("timestamp")
  146. } else {
  147. return bleve.NewMatchAllQuery()
  148. }
  149. return timeQuery
  150. }