log_indexer_status.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/blevesearch/bleve/v2"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // GetTimeRange returns the time range of indexed logs by querying Bleve directly
  9. func (li *LogIndexer) GetTimeRange() (start, end time.Time) {
  10. logger.Infof("GetTimeRange called (querying Bleve for min/max timestamp)")
  11. // Find the minimum timestamp
  12. query := bleve.NewMatchAllQuery()
  13. searchRequestMin := bleve.NewSearchRequest(query)
  14. searchRequestMin.Size = 1
  15. searchRequestMin.SortBy([]string{"timestamp"}) // ascending is default
  16. searchRequestMin.Fields = []string{"timestamp"}
  17. searchResultMin, err := li.index.Search(searchRequestMin)
  18. if err != nil {
  19. logger.Warnf("Failed to query min time from Bleve: %v", err)
  20. return time.Time{}, time.Time{}
  21. }
  22. if searchResultMin.Total > 0 && len(searchResultMin.Hits) > 0 {
  23. if tsVal, ok := searchResultMin.Hits[0].Fields["timestamp"].(string); ok {
  24. start, _ = time.Parse(time.RFC3339, tsVal)
  25. }
  26. }
  27. // Find the maximum timestamp
  28. searchRequestMax := bleve.NewSearchRequest(query)
  29. searchRequestMax.Size = 1
  30. searchRequestMax.SortBy([]string{"-timestamp"}) // descending
  31. searchRequestMax.Fields = []string{"timestamp"}
  32. searchResultMax, err := li.index.Search(searchRequestMax)
  33. if err != nil {
  34. logger.Warnf("Failed to query max time from Bleve: %v", err)
  35. // Return start time even if max fails
  36. return start, time.Time{}
  37. }
  38. if searchResultMax.Total > 0 && len(searchResultMax.Hits) > 0 {
  39. if tsVal, ok := searchResultMax.Hits[0].Fields["timestamp"].(string); ok {
  40. end, _ = time.Parse(time.RFC3339, tsVal)
  41. }
  42. }
  43. logger.Infof("GetTimeRange result: start=%s, end=%s", start.Format(time.RFC3339), end.Format(time.RFC3339))
  44. return start, end
  45. }
  46. // GetTimeRangeForPath returns the time range for a specific log path using Bleve
  47. func (li *LogIndexer) GetTimeRangeForPath(logPath string) (start, end time.Time) {
  48. logger.Infof("GetTimeRangeForPath called for %s (querying Bleve)", logPath)
  49. if logPath == "" {
  50. return li.GetTimeRange() // Fallback to general time range
  51. }
  52. // Create query for specific log path
  53. pathQuery := bleve.NewTermQuery(logPath)
  54. pathQuery.SetField("file_path")
  55. // Find minimum timestamp for this path
  56. searchRequestMin := bleve.NewSearchRequest(pathQuery)
  57. searchRequestMin.Size = 1
  58. searchRequestMin.SortBy([]string{"timestamp"})
  59. searchRequestMin.Fields = []string{"timestamp"}
  60. searchResultMin, err := li.index.Search(searchRequestMin)
  61. if err != nil {
  62. logger.Warnf("Failed to query min time for path %s: %v", logPath, err)
  63. return time.Time{}, time.Time{}
  64. }
  65. if searchResultMin.Total > 0 && len(searchResultMin.Hits) > 0 {
  66. if tsVal, ok := searchResultMin.Hits[0].Fields["timestamp"].(string); ok {
  67. start, _ = time.Parse(time.RFC3339, tsVal)
  68. }
  69. }
  70. // Find maximum timestamp for this path
  71. searchRequestMax := bleve.NewSearchRequest(pathQuery)
  72. searchRequestMax.Size = 1
  73. searchRequestMax.SortBy([]string{"-timestamp"})
  74. searchRequestMax.Fields = []string{"timestamp"}
  75. searchResultMax, err := li.index.Search(searchRequestMax)
  76. if err != nil {
  77. logger.Warnf("Failed to query max time for path %s: %v", logPath, err)
  78. return start, time.Time{}
  79. }
  80. if searchResultMax.Total > 0 && len(searchResultMax.Hits) > 0 {
  81. if tsVal, ok := searchResultMax.Hits[0].Fields["timestamp"].(string); ok {
  82. end, _ = time.Parse(time.RFC3339, tsVal)
  83. }
  84. }
  85. logger.Debugf("GetTimeRangeForPath result for %s: start=%s, end=%s", logPath, start.Format(time.RFC3339), end.Format(time.RFC3339))
  86. return start, end
  87. }
  88. // GetTimeRangeFromSummaryStatsForPath returns time range from Bleve stats service
  89. func (li *LogIndexer) GetTimeRangeFromSummaryStatsForPath(logPath string) (start, end time.Time) {
  90. // Delegate to Bleve stats service
  91. bleveStatsService := GetBleveStatsService()
  92. if bleveStatsService == nil {
  93. return time.Time{}, time.Time{}
  94. }
  95. return bleveStatsService.GetTimeRangeFromBleve(logPath)
  96. }
  97. // GetIndexStatus returns comprehensive status information about the indexer
  98. func (li *LogIndexer) GetIndexStatus() (*IndexStatus, error) {
  99. if li.index == nil {
  100. return nil, fmt.Errorf("index not available")
  101. }
  102. // Get document count
  103. docCount, err := li.index.DocCount()
  104. if err != nil {
  105. return nil, fmt.Errorf("failed to get document count: %w", err)
  106. }
  107. // Get tracked log paths
  108. li.mu.RLock()
  109. logPaths := make([]string, 0, len(li.logPaths))
  110. files := make([]FileStatus, 0, len(li.logPaths))
  111. for path, info := range li.logPaths {
  112. logPaths = append(logPaths, path)
  113. fileStatus := FileStatus{
  114. Path: path,
  115. LastModified: info.LastModified,
  116. LastSize: info.LastSize,
  117. LastIndexed: info.LastIndexed,
  118. IsCompressed: info.IsCompressed,
  119. }
  120. // Add time range information if available
  121. if info.TimeRange != nil {
  122. fileStatus.HasTimeRange = true
  123. fileStatus.TimeRangeStart = info.TimeRange.Start
  124. fileStatus.TimeRangeEnd = info.TimeRange.End
  125. }
  126. files = append(files, fileStatus)
  127. }
  128. li.mu.RUnlock()
  129. return &IndexStatus{
  130. DocumentCount: docCount,
  131. LogPaths: logPaths,
  132. LogPathsCount: len(logPaths),
  133. TotalFiles: len(files),
  134. Files: files,
  135. }, nil
  136. }
  137. // IsIndexAvailable checks if the Bleve index is actually accessible for a given log path
  138. func (li *LogIndexer) IsIndexAvailable(logPath string) bool {
  139. if li.index == nil {
  140. return false
  141. }
  142. // First check: try to get document count for the index
  143. docCount, err := li.index.DocCount()
  144. if err != nil {
  145. logger.Debugf("Index not accessible (DocCount failed): %v", err)
  146. return false
  147. }
  148. // If no documents at all, index exists but is empty
  149. if docCount == 0 {
  150. return false
  151. }
  152. // Second check: try a simple search for this specific log path
  153. pathQuery := bleve.NewTermQuery(logPath)
  154. pathQuery.SetField("file_path")
  155. searchRequest := bleve.NewSearchRequest(pathQuery)
  156. searchRequest.Size = 1
  157. result, err := li.index.Search(searchRequest)
  158. if err != nil {
  159. logger.Debugf("Index search failed for %s: %v", logPath, err)
  160. return false
  161. }
  162. // Return true if we found documents for this path
  163. return result.Total > 0
  164. }