bleve_stats_service_utils.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package nginx_log
  2. import (
  3. "time"
  4. "github.com/blevesearch/bleve/v2"
  5. "github.com/blevesearch/bleve/v2/search"
  6. "github.com/blevesearch/bleve/v2/search/query"
  7. "github.com/uozi-tech/cosy/logger"
  8. )
  9. // Helper functions for dashboard summary calculation
  10. func (s *BleveStatsService) calculateAvgDailyUVFromStats(analytics *DashboardAnalytics) float64 {
  11. if len(analytics.DailyStats) == 0 {
  12. return 0.0
  13. }
  14. uvValues := make([]int, len(analytics.DailyStats))
  15. for i, daily := range analytics.DailyStats {
  16. uvValues[i] = daily.UV
  17. }
  18. return calculateAverage(uvValues)
  19. }
  20. func (s *BleveStatsService) calculateAvgDailyPVFromStats(analytics *DashboardAnalytics) float64 {
  21. if len(analytics.DailyStats) == 0 {
  22. return 0.0
  23. }
  24. pvValues := make([]int, len(analytics.DailyStats))
  25. for i, daily := range analytics.DailyStats {
  26. pvValues[i] = daily.PV
  27. }
  28. return calculateAverage(pvValues)
  29. }
  30. func (s *BleveStatsService) findPeakHourFromStats(analytics *DashboardAnalytics) int {
  31. if len(analytics.HourlyStats) == 0 {
  32. return 0
  33. }
  34. pvValues := make([]int, len(analytics.HourlyStats))
  35. for i, hourly := range analytics.HourlyStats {
  36. pvValues[i] = hourly.PV
  37. }
  38. _, maxIndex := findMax(pvValues)
  39. if maxIndex >= 0 && maxIndex < len(analytics.HourlyStats) {
  40. return analytics.HourlyStats[maxIndex].Hour
  41. }
  42. return 0
  43. }
  44. func (s *BleveStatsService) findPeakHourTrafficFromStats(analytics *DashboardAnalytics) int {
  45. if len(analytics.HourlyStats) == 0 {
  46. return 0
  47. }
  48. pvValues := make([]int, len(analytics.HourlyStats))
  49. for i, hourly := range analytics.HourlyStats {
  50. pvValues[i] = hourly.PV
  51. }
  52. maxTraffic, _ := findMax(pvValues)
  53. return maxTraffic
  54. }
  55. // extractTimestampAndIP extracts timestamp and IP from search hit
  56. func (s *BleveStatsService) extractTimestampAndIP(hit *search.DocumentMatch) (*time.Time, string) {
  57. timestamp, ip, _ := s.extractTimestampIPAndPath(hit)
  58. return timestamp, ip
  59. }
  60. // extractTimestampIPAndPath extracts timestamp, IP, and file_path from search hit
  61. func (s *BleveStatsService) extractTimestampIPAndPath(hit *search.DocumentMatch) (*time.Time, string, string) {
  62. var timestamp *time.Time
  63. var ip string
  64. var filePath string
  65. if timestampField, ok := hit.Fields["timestamp"]; ok {
  66. if timestampFloat, ok := timestampField.(float64); ok {
  67. t := time.Unix(int64(timestampFloat), 0)
  68. timestamp = &t
  69. } else if timestampStr, ok := timestampField.(string); ok {
  70. // Fallback for old RFC3339 format
  71. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  72. timestamp = &t
  73. }
  74. }
  75. }
  76. if ipField, ok := hit.Fields["ip"]; ok {
  77. if ipStr, ok := ipField.(string); ok {
  78. ip = ipStr
  79. }
  80. }
  81. if filePathField, ok := hit.Fields["file_path"]; ok {
  82. if filePathStr, ok := filePathField.(string); ok {
  83. filePath = filePathStr
  84. }
  85. }
  86. return timestamp, ip, filePath
  87. }
  88. // GetTimeRangeFromBleve returns the available time range from Bleve index
  89. func (s *BleveStatsService) GetTimeRangeFromBleve(logPath string) (start, end time.Time) {
  90. if s.indexer == nil {
  91. logger.Error("BleveStatsService.GetTimeRangeFromBleve: indexer is nil")
  92. return time.Time{}, time.Time{}
  93. }
  94. if s.indexer.index == nil {
  95. logger.Error("BleveStatsService.GetTimeRangeFromBleve: index is nil")
  96. return time.Time{}, time.Time{}
  97. }
  98. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Getting time range for log_path='%s'", logPath)
  99. // First, let's check if the index has any documents at all
  100. docCount, err := s.indexer.index.DocCount()
  101. if err != nil {
  102. logger.Errorf("BleveStatsService.GetTimeRangeFromBleve: Failed to get doc count: %v", err)
  103. } else {
  104. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Total documents in index: %d", docCount)
  105. }
  106. var searchQuery query.Query = bleve.NewMatchAllQuery()
  107. // Add log path filter if specified
  108. if logPath != "" {
  109. // Use proper field-specific MatchQuery with keyword analyzer
  110. boolQuery := bleve.NewBooleanQuery()
  111. // Add base query (MatchAllQuery in this case)
  112. if searchQuery != nil {
  113. boolQuery.AddMust(searchQuery)
  114. }
  115. // Use MatchQuery with field specification for exact file_path matching
  116. filePathMatchQuery := bleve.NewMatchQuery(logPath)
  117. filePathMatchQuery.SetField("file_path") // Now this should work with TextFieldMapping + keyword analyzer
  118. boolQuery.AddMust(filePathMatchQuery)
  119. searchQuery = boolQuery
  120. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Using BooleanQuery with field-specific file_path MatchQuery for '%s'", logPath)
  121. }
  122. // Get earliest entry
  123. searchReq := bleve.NewSearchRequest(searchQuery)
  124. searchReq.Size = 1
  125. searchReq.Fields = []string{"timestamp"}
  126. searchReq.SortBy([]string{"timestamp"})
  127. logger.Debug("BleveStatsService.GetTimeRangeFromBleve: Searching for earliest entry")
  128. searchResult, err := s.indexer.index.Search(searchReq)
  129. if err != nil {
  130. logger.Errorf("BleveStatsService.GetTimeRangeFromBleve: Failed to search for earliest entry: %v", err)
  131. return time.Time{}, time.Time{}
  132. }
  133. if len(searchResult.Hits) == 0 {
  134. logger.Warn("BleveStatsService.GetTimeRangeFromBleve: No entries found for earliest search")
  135. return time.Time{}, time.Time{}
  136. }
  137. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Found %d entries (total=%d)", len(searchResult.Hits), searchResult.Total)
  138. if timestampField, ok := searchResult.Hits[0].Fields["timestamp"]; ok {
  139. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: timestamp field exists, type=%T, value=%v", timestampField, timestampField)
  140. if timestampFloat, ok := timestampField.(float64); ok {
  141. start = time.Unix(int64(timestampFloat), 0)
  142. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Parsed start time from float64: %v", start)
  143. } else if timestampStr, ok := timestampField.(string); ok {
  144. // Fallback for old RFC3339 format (backward compatibility)
  145. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  146. start = t
  147. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Parsed start time from string: %v", start)
  148. } else {
  149. logger.Errorf("BleveStatsService.GetTimeRangeFromBleve: Failed to parse RFC3339 string: %v, error: %v", timestampStr, err)
  150. }
  151. } else {
  152. logger.Errorf("BleveStatsService.GetTimeRangeFromBleve: timestamp field has unexpected type: %T", timestampField)
  153. }
  154. } else {
  155. logger.Error("BleveStatsService.GetTimeRangeFromBleve: timestamp field not found in search result")
  156. // Let's see what fields are actually available
  157. for key, value := range searchResult.Hits[0].Fields {
  158. logger.Infof("BleveStatsService.GetTimeRangeFromBleve: Available field: %s = %v (type: %T)", key, value, value)
  159. }
  160. }
  161. // Get latest entry
  162. searchReq.SortBy([]string{"-timestamp"})
  163. searchResult, err = s.indexer.index.Search(searchReq)
  164. if err != nil || len(searchResult.Hits) == 0 {
  165. return start, start
  166. }
  167. if timestampField, ok := searchResult.Hits[0].Fields["timestamp"]; ok {
  168. if timestampFloat, ok := timestampField.(float64); ok {
  169. end = time.Unix(int64(timestampFloat), 0)
  170. } else if timestampStr, ok := timestampField.(string); ok {
  171. // Fallback for old RFC3339 format (backward compatibility)
  172. if t, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  173. end = t
  174. }
  175. }
  176. }
  177. return start, end
  178. }
  179. // Global Bleve stats service instance
  180. var bleveStatsService *BleveStatsService
  181. // InitBleveStatsService initializes the global Bleve stats service
  182. func InitBleveStatsService() {
  183. bleveStatsService = NewBleveStatsService()
  184. logger.Info("Bleve stats service initialized")
  185. }
  186. // GetBleveStatsService returns the global Bleve stats service instance
  187. func GetBleveStatsService() *BleveStatsService {
  188. return bleveStatsService
  189. }
  190. // SetBleveStatsServiceIndexer sets the indexer for the global Bleve stats service
  191. func SetBleveStatsServiceIndexer(indexer *LogIndexer) {
  192. if bleveStatsService != nil {
  193. bleveStatsService.SetIndexer(indexer)
  194. }
  195. }