log_indexer_status.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/blevesearch/bleve/v2"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // GetTimeRange returns the time range of indexed logs by querying Bleve directly
  9. func (li *LogIndexer) GetTimeRange() (start, end time.Time) {
  10. logger.Infof("GetTimeRange called (querying Bleve for min/max timestamp)")
  11. // Find the minimum timestamp
  12. query := bleve.NewMatchAllQuery()
  13. searchRequestMin := bleve.NewSearchRequest(query)
  14. searchRequestMin.Size = 1
  15. searchRequestMin.SortBy([]string{"timestamp"}) // ascending is default
  16. searchRequestMin.Fields = []string{"timestamp"}
  17. searchResultMin, err := li.index.Search(searchRequestMin)
  18. if err != nil {
  19. logger.Warnf("Failed to query min time from Bleve: %v", err)
  20. return time.Time{}, time.Time{}
  21. }
  22. if searchResultMin.Total > 0 && len(searchResultMin.Hits) > 0 {
  23. if tsFloat, ok := searchResultMin.Hits[0].Fields["timestamp"].(float64); ok {
  24. start = time.Unix(int64(tsFloat), 0)
  25. } else if tsVal, ok := searchResultMin.Hits[0].Fields["timestamp"].(string); ok {
  26. // Fallback for old RFC3339 format
  27. start, _ = time.Parse(time.RFC3339, tsVal)
  28. }
  29. }
  30. // Find the maximum timestamp
  31. searchRequestMax := bleve.NewSearchRequest(query)
  32. searchRequestMax.Size = 1
  33. searchRequestMax.SortBy([]string{"-timestamp"}) // descending
  34. searchRequestMax.Fields = []string{"timestamp"}
  35. searchResultMax, err := li.index.Search(searchRequestMax)
  36. if err != nil {
  37. logger.Warnf("Failed to query max time from Bleve: %v", err)
  38. // Return start time even if max fails
  39. return start, time.Time{}
  40. }
  41. if searchResultMax.Total > 0 && len(searchResultMax.Hits) > 0 {
  42. if tsFloat, ok := searchResultMax.Hits[0].Fields["timestamp"].(float64); ok {
  43. end = time.Unix(int64(tsFloat), 0)
  44. } else if tsVal, ok := searchResultMax.Hits[0].Fields["timestamp"].(string); ok {
  45. // Fallback for old RFC3339 format
  46. end, _ = time.Parse(time.RFC3339, tsVal)
  47. }
  48. }
  49. logger.Infof("GetTimeRange result: start=%s, end=%s", start.Format(time.RFC3339), end.Format(time.RFC3339))
  50. return start, end
  51. }
  52. // GetTimeRangeForPath returns the time range for a specific log path using Bleve
  53. func (li *LogIndexer) GetTimeRangeForPath(logPath string) (start, end time.Time) {
  54. logger.Infof("GetTimeRangeForPath called for %s (querying Bleve)", logPath)
  55. if logPath == "" {
  56. return li.GetTimeRange() // Fallback to general time range
  57. }
  58. // Create query for specific log path
  59. pathQuery := bleve.NewTermQuery(logPath)
  60. pathQuery.SetField("file_path")
  61. // Find minimum timestamp for this path
  62. searchRequestMin := bleve.NewSearchRequest(pathQuery)
  63. searchRequestMin.Size = 1
  64. searchRequestMin.SortBy([]string{"timestamp"})
  65. searchRequestMin.Fields = []string{"timestamp"}
  66. searchResultMin, err := li.index.Search(searchRequestMin)
  67. if err != nil {
  68. logger.Warnf("Failed to query min time for path %s: %v", logPath, err)
  69. return time.Time{}, time.Time{}
  70. }
  71. if searchResultMin.Total > 0 && len(searchResultMin.Hits) > 0 {
  72. if tsFloat, ok := searchResultMin.Hits[0].Fields["timestamp"].(float64); ok {
  73. start = time.Unix(int64(tsFloat), 0)
  74. } else if tsVal, ok := searchResultMin.Hits[0].Fields["timestamp"].(string); ok {
  75. // Fallback for old RFC3339 format
  76. start, _ = time.Parse(time.RFC3339, tsVal)
  77. }
  78. }
  79. // Find maximum timestamp for this path
  80. searchRequestMax := bleve.NewSearchRequest(pathQuery)
  81. searchRequestMax.Size = 1
  82. searchRequestMax.SortBy([]string{"-timestamp"})
  83. searchRequestMax.Fields = []string{"timestamp"}
  84. searchResultMax, err := li.index.Search(searchRequestMax)
  85. if err != nil {
  86. logger.Warnf("Failed to query max time for path %s: %v", logPath, err)
  87. return start, time.Time{}
  88. }
  89. if searchResultMax.Total > 0 && len(searchResultMax.Hits) > 0 {
  90. if tsFloat, ok := searchResultMax.Hits[0].Fields["timestamp"].(float64); ok {
  91. end = time.Unix(int64(tsFloat), 0)
  92. } else if tsVal, ok := searchResultMax.Hits[0].Fields["timestamp"].(string); ok {
  93. // Fallback for old RFC3339 format
  94. end, _ = time.Parse(time.RFC3339, tsVal)
  95. }
  96. }
  97. logger.Debugf("GetTimeRangeForPath result for %s: start=%s, end=%s", logPath, start.Format(time.RFC3339), end.Format(time.RFC3339))
  98. return start, end
  99. }
  100. // GetTimeRangeFromSummaryStatsForPath returns time range from Bleve stats service
  101. func (li *LogIndexer) GetTimeRangeFromSummaryStatsForPath(logPath string) (start, end time.Time) {
  102. // Delegate to Bleve stats service
  103. bleveStatsService := GetBleveStatsService()
  104. if bleveStatsService == nil {
  105. return time.Time{}, time.Time{}
  106. }
  107. return bleveStatsService.GetTimeRangeFromBleve(logPath)
  108. }
  109. // GetIndexStatus returns comprehensive status information about the indexer
  110. func (li *LogIndexer) GetIndexStatus() (*IndexStatus, error) {
  111. if li.index == nil {
  112. return nil, fmt.Errorf("index not available")
  113. }
  114. // Get document count
  115. docCount, err := li.index.DocCount()
  116. if err != nil {
  117. return nil, fmt.Errorf("failed to get document count: %w", err)
  118. }
  119. // Get tracked log paths
  120. li.mu.RLock()
  121. logPaths := make([]string, 0, len(li.logPaths))
  122. files := make([]FileStatus, 0, len(li.logPaths))
  123. for path, info := range li.logPaths {
  124. logPaths = append(logPaths, path)
  125. fileStatus := FileStatus{
  126. Path: path,
  127. LastModified: info.LastModified,
  128. LastSize: info.LastSize,
  129. LastIndexed: info.LastIndexed,
  130. IsCompressed: info.IsCompressed,
  131. }
  132. // Add time range information if available
  133. if info.TimeRange != nil {
  134. fileStatus.HasTimeRange = true
  135. fileStatus.TimeRangeStart = info.TimeRange.Start
  136. fileStatus.TimeRangeEnd = info.TimeRange.End
  137. }
  138. files = append(files, fileStatus)
  139. }
  140. li.mu.RUnlock()
  141. return &IndexStatus{
  142. DocumentCount: docCount,
  143. LogPaths: logPaths,
  144. LogPathsCount: len(logPaths),
  145. TotalFiles: len(files),
  146. Files: files,
  147. }, nil
  148. }
  149. // IsIndexAvailable checks if the Bleve index is actually accessible for a given log path
  150. func (li *LogIndexer) IsIndexAvailable(logPath string) bool {
  151. if li.index == nil {
  152. return false
  153. }
  154. // First check: try to get document count for the index
  155. docCount, err := li.index.DocCount()
  156. if err != nil {
  157. logger.Debugf("Index not accessible (DocCount failed): %v", err)
  158. return false
  159. }
  160. // If no documents at all, index exists but is empty
  161. if docCount == 0 {
  162. return false
  163. }
  164. // Second check: try a simple search for this specific log path
  165. pathQuery := bleve.NewTermQuery(logPath)
  166. pathQuery.SetField("file_path")
  167. searchRequest := bleve.NewSearchRequest(pathQuery)
  168. searchRequest.Size = 1
  169. result, err := li.index.Search(searchRequest)
  170. if err != nil {
  171. logger.Debugf("Index search failed for %s: %v", logPath, err)
  172. return false
  173. }
  174. // Return true if we found documents for this path
  175. return result.Total > 0
  176. }