bleve_stats_service_utils.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package nginx_log
  2. import (
  3. "time"
  4. "github.com/blevesearch/bleve/v2"
  5. "github.com/blevesearch/bleve/v2/search"
  6. "github.com/blevesearch/bleve/v2/search/query"
  7. "github.com/uozi-tech/cosy/logger"
  8. )
  9. // Helper functions for dashboard summary calculation
  10. func (s *BleveStatsService) calculateAvgDailyUVFromStats(analytics *DashboardAnalytics) float64 {
  11. if len(analytics.DailyStats) == 0 {
  12. return 0.0
  13. }
  14. uvValues := make([]int, len(analytics.DailyStats))
  15. for i, daily := range analytics.DailyStats {
  16. uvValues[i] = daily.UV
  17. }
  18. return calculateAverage(uvValues)
  19. }
  20. func (s *BleveStatsService) calculateAvgDailyPVFromStats(analytics *DashboardAnalytics) float64 {
  21. if len(analytics.DailyStats) == 0 {
  22. return 0.0
  23. }
  24. pvValues := make([]int, len(analytics.DailyStats))
  25. for i, daily := range analytics.DailyStats {
  26. pvValues[i] = daily.PV
  27. }
  28. return calculateAverage(pvValues)
  29. }
  30. func (s *BleveStatsService) findPeakHourFromStats(analytics *DashboardAnalytics) int {
  31. if len(analytics.HourlyStats) == 0 {
  32. return 0
  33. }
  34. pvValues := make([]int, len(analytics.HourlyStats))
  35. for i, hourly := range analytics.HourlyStats {
  36. pvValues[i] = hourly.PV
  37. }
  38. _, maxIndex := findMax(pvValues)
  39. if maxIndex >= 0 && maxIndex < len(analytics.HourlyStats) {
  40. return analytics.HourlyStats[maxIndex].Hour
  41. }
  42. return 0
  43. }
  44. func (s *BleveStatsService) findPeakHourTrafficFromStats(analytics *DashboardAnalytics) int {
  45. if len(analytics.HourlyStats) == 0 {
  46. return 0
  47. }
  48. pvValues := make([]int, len(analytics.HourlyStats))
  49. for i, hourly := range analytics.HourlyStats {
  50. pvValues[i] = hourly.PV
  51. }
  52. maxTraffic, _ := findMax(pvValues)
  53. return maxTraffic
  54. }
  55. // extractTimestampAndIP extracts timestamp and IP from search hit
  56. func (s *BleveStatsService) extractTimestampAndIP(hit *search.DocumentMatch) (*time.Time, string) {
  57. timestamp, ip, _ := s.extractTimestampIPAndPath(hit)
  58. return timestamp, ip
  59. }
  60. // extractTimestampIPAndPath extracts timestamp, IP, and file_path from search hit
  61. func (s *BleveStatsService) extractTimestampIPAndPath(hit *search.DocumentMatch) (*time.Time, string, string) {
  62. var timestamp *time.Time
  63. var ip string
  64. var filePath string
  65. if timestampField, ok := hit.Fields["timestamp"]; ok {
  66. if timestampStr, ok := timestampField.(string); ok {
  67. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  68. timestamp = &t
  69. }
  70. }
  71. }
  72. if ipField, ok := hit.Fields["ip"]; ok {
  73. if ipStr, ok := ipField.(string); ok {
  74. ip = ipStr
  75. }
  76. }
  77. if filePathField, ok := hit.Fields["file_path"]; ok {
  78. if filePathStr, ok := filePathField.(string); ok {
  79. filePath = filePathStr
  80. }
  81. }
  82. return timestamp, ip, filePath
  83. }
  84. // GetTimeRangeFromBleve returns the available time range from Bleve index
  85. func (s *BleveStatsService) GetTimeRangeFromBleve(logPath string) (start, end time.Time) {
  86. if s.indexer == nil {
  87. logger.Warn("BleveStatsService.GetTimeRangeFromBleve: indexer is nil")
  88. return time.Time{}, time.Time{}
  89. }
  90. if s.indexer.index == nil {
  91. logger.Warn("BleveStatsService.GetTimeRangeFromBleve: index is nil")
  92. return time.Time{}, time.Time{}
  93. }
  94. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Getting time range for log_path='%s'", logPath)
  95. var searchQuery query.Query = bleve.NewMatchAllQuery()
  96. // Add log path filter if specified
  97. if logPath != "" {
  98. // Use proper field-specific MatchQuery with keyword analyzer
  99. boolQuery := bleve.NewBooleanQuery()
  100. // Add base query (MatchAllQuery in this case)
  101. if searchQuery != nil {
  102. boolQuery.AddMust(searchQuery)
  103. }
  104. // Use MatchQuery with field specification for exact file_path matching
  105. filePathMatchQuery := bleve.NewMatchQuery(logPath)
  106. filePathMatchQuery.SetField("file_path") // Now this should work with TextFieldMapping + keyword analyzer
  107. boolQuery.AddMust(filePathMatchQuery)
  108. searchQuery = boolQuery
  109. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Using BooleanQuery with field-specific file_path MatchQuery for '%s'", logPath)
  110. }
  111. // Get earliest entry
  112. searchReq := bleve.NewSearchRequest(searchQuery)
  113. searchReq.Size = 1
  114. searchReq.Fields = []string{"timestamp"}
  115. searchReq.SortBy([]string{"timestamp"})
  116. logger.Debug("BleveStatsService.GetTimeRangeFromBleve: Searching for earliest entry")
  117. searchResult, err := s.indexer.index.Search(searchReq)
  118. if err != nil {
  119. logger.Errorf("BleveStatsService.GetTimeRangeFromBleve: Failed to search for earliest entry: %v", err)
  120. return time.Time{}, time.Time{}
  121. }
  122. if len(searchResult.Hits) == 0 {
  123. logger.Warn("BleveStatsService.GetTimeRangeFromBleve: No entries found for earliest search")
  124. return time.Time{}, time.Time{}
  125. }
  126. logger.Debugf("BleveStatsService.GetTimeRangeFromBleve: Found %d entries (total=%d)", len(searchResult.Hits), searchResult.Total)
  127. if timestampField, ok := searchResult.Hits[0].Fields["timestamp"]; ok {
  128. if timestampStr, ok := timestampField.(string); ok {
  129. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  130. start = t
  131. }
  132. }
  133. }
  134. // Get latest entry
  135. searchReq.SortBy([]string{"-timestamp"})
  136. searchResult, err = s.indexer.index.Search(searchReq)
  137. if err != nil || len(searchResult.Hits) == 0 {
  138. return start, start
  139. }
  140. if timestampField, ok := searchResult.Hits[0].Fields["timestamp"]; ok {
  141. if timestampStr, ok := timestampField.(string); ok {
  142. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  143. end = t
  144. }
  145. }
  146. }
  147. return start, end
  148. }
  149. // Global Bleve stats service instance
  150. var bleveStatsService *BleveStatsService
  151. // InitBleveStatsService initializes the global Bleve stats service
  152. func InitBleveStatsService() {
  153. bleveStatsService = NewBleveStatsService()
  154. logger.Info("Bleve stats service initialized")
  155. }
  156. // GetBleveStatsService returns the global Bleve stats service instance
  157. func GetBleveStatsService() *BleveStatsService {
  158. return bleveStatsService
  159. }
  160. // SetBleveStatsServiceIndexer sets the indexer for the global Bleve stats service
  161. func SetBleveStatsServiceIndexer(indexer *LogIndexer) {
  162. if bleveStatsService != nil {
  163. bleveStatsService.SetIndexer(indexer)
  164. }
  165. }