1
0

log_file_manager.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. package indexer
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "regexp"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // IndexStatus constants
  14. const (
  15. IndexStatusIndexed = "indexed"
  16. IndexStatusIndexing = "indexing"
  17. IndexStatusNotIndexed = "not_indexed"
  18. )
  19. // NginxLogCache represents a cached log entry from nginx configuration
  20. type NginxLogCache struct {
  21. Path string `json:"path"` // Path to the log file
  22. Type string `json:"type"` // Type of log: "access" or "error"
  23. Name string `json:"name"` // Name of the log file
  24. ConfigFile string `json:"config_file"` // Path to the configuration file that contains this log directive
  25. }
  26. // NginxLogWithIndex represents a log file with its index status information
  27. type NginxLogWithIndex struct {
  28. Path string `json:"path"` // Path to the log file
  29. Type string `json:"type"` // Type of log: "access" or "error"
  30. Name string `json:"name"` // Name of the log file
  31. ConfigFile string `json:"config_file"` // Path to the configuration file
  32. IndexStatus string `json:"index_status"` // Index status: indexed, indexing, not_indexed
  33. LastModified int64 `json:"last_modified,omitempty"` // Unix timestamp of last modification time
  34. LastSize int64 `json:"last_size,omitempty"` // Last known size of the file
  35. LastIndexed int64 `json:"last_indexed,omitempty"` // Unix timestamp when the file was last indexed
  36. IndexStartTime int64 `json:"index_start_time,omitempty"` // Unix timestamp when the last indexing operation started
  37. IndexDuration int64 `json:"index_duration,omitempty"` // Duration of last indexing operation in milliseconds
  38. IsCompressed bool `json:"is_compressed"` // Whether the file is compressed
  39. HasTimeRange bool `json:"has_timerange"` // Whether time range is available
  40. TimeRangeStart int64 `json:"timerange_start,omitempty"` // Unix timestamp of start of time range in the log
  41. TimeRangeEnd int64 `json:"timerange_end,omitempty"` // Unix timestamp of end of time range in the log
  42. DocumentCount uint64 `json:"document_count,omitempty"` // Number of indexed documents from this file
  43. }
  44. // LogFileManager manages nginx log file discovery and index status
  45. type LogFileManager struct {
  46. logCache map[string]*NginxLogCache
  47. cacheMutex sync.RWMutex
  48. persistence *PersistenceManager
  49. indexingStatus map[string]bool
  50. indexingMutex sync.RWMutex
  51. }
  52. // NewLogFileManager creates a new log file manager
  53. func NewLogFileManager() *LogFileManager {
  54. return &LogFileManager{
  55. logCache: make(map[string]*NginxLogCache),
  56. persistence: NewPersistenceManager(DefaultIncrementalConfig()),
  57. indexingStatus: make(map[string]bool),
  58. }
  59. }
  60. // AddLogPath adds a log path to the log cache with the source config file
  61. func (lm *LogFileManager) AddLogPath(path, logType, name, configFile string) {
  62. lm.cacheMutex.Lock()
  63. defer lm.cacheMutex.Unlock()
  64. lm.logCache[path] = &NginxLogCache{
  65. Path: path,
  66. Type: logType,
  67. Name: name,
  68. ConfigFile: configFile,
  69. }
  70. }
  71. // RemoveLogPathsFromConfig removes all log paths associated with a specific config file
  72. func (lm *LogFileManager) RemoveLogPathsFromConfig(configFile string) {
  73. lm.cacheMutex.Lock()
  74. defer lm.cacheMutex.Unlock()
  75. for path, logEntry := range lm.logCache {
  76. if logEntry.ConfigFile == configFile {
  77. delete(lm.logCache, path)
  78. }
  79. }
  80. }
  81. // GetAllLogPaths returns all cached log paths, optionally filtered
  82. func (lm *LogFileManager) GetAllLogPaths(filters ...func(*NginxLogCache) bool) []*NginxLogCache {
  83. lm.cacheMutex.RLock()
  84. defer lm.cacheMutex.RUnlock()
  85. var logs []*NginxLogCache
  86. for _, logEntry := range lm.logCache {
  87. // Apply all filters
  88. include := true
  89. for _, filter := range filters {
  90. if !filter(logEntry) {
  91. include = false
  92. break
  93. }
  94. }
  95. if include {
  96. // Create a copy to avoid race conditions
  97. logCopy := *logEntry
  98. logs = append(logs, &logCopy)
  99. }
  100. }
  101. return logs
  102. }
  103. // SetIndexingStatus sets the indexing status for a specific file path
  104. func (lm *LogFileManager) SetIndexingStatus(path string, isIndexing bool) {
  105. lm.indexingMutex.Lock()
  106. defer lm.indexingMutex.Unlock()
  107. if isIndexing {
  108. lm.indexingStatus[path] = true
  109. } else {
  110. delete(lm.indexingStatus, path)
  111. }
  112. }
  113. // GetIndexingFiles returns a list of files currently being indexed
  114. func (lm *LogFileManager) GetIndexingFiles() []string {
  115. lm.indexingMutex.RLock()
  116. defer lm.indexingMutex.RUnlock()
  117. var files []string
  118. for path := range lm.indexingStatus {
  119. files = append(files, path)
  120. }
  121. return files
  122. }
  123. // getBaseLogName determines the base log file name for grouping rotated files
  124. func getBaseLogName(filePath string) string {
  125. dir := filepath.Dir(filePath)
  126. filename := filepath.Base(filePath)
  127. // Remove compression extensions first
  128. filename = strings.TrimSuffix(filename, ".gz")
  129. filename = strings.TrimSuffix(filename, ".bz2")
  130. // Handle numbered rotation (access.log.1, access.log.2, etc.)
  131. if match := regexp.MustCompile(`^(.+)\.(\d+)$`).FindStringSubmatch(filename); len(match) > 1 {
  132. baseFilename := match[1]
  133. return filepath.Join(dir, baseFilename)
  134. }
  135. // Handle date rotation suffixes
  136. parts := strings.Split(filename, ".")
  137. if len(parts) >= 2 {
  138. lastPart := parts[len(parts)-1]
  139. if isDatePattern(lastPart) {
  140. baseFilename := strings.Join(parts[:len(parts)-1], ".")
  141. // If the base doesn't end with .log, add it
  142. if !strings.HasSuffix(baseFilename, ".log") {
  143. baseFilename += ".log"
  144. }
  145. return filepath.Join(dir, baseFilename)
  146. }
  147. }
  148. // If it already looks like a base log file, return as-is
  149. return filePath
  150. }
  151. // GetAllLogsWithIndexGrouped returns logs grouped by their base name (e.g., access.log includes access.log.1, access.log.2.gz etc.)
  152. func (lm *LogFileManager) GetAllLogsWithIndexGrouped(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
  153. lm.cacheMutex.RLock()
  154. defer lm.cacheMutex.RUnlock()
  155. // Get all logs from both cache (config files) and persistence (indexed files)
  156. allLogsMap := make(map[string]*NginxLogWithIndex)
  157. // First, get logs from the cache (these are from nginx config)
  158. for _, cache := range lm.logCache {
  159. logWithIndex := &NginxLogWithIndex{
  160. Path: cache.Path,
  161. Type: cache.Type,
  162. Name: cache.Name,
  163. ConfigFile: cache.ConfigFile,
  164. IndexStatus: IndexStatusNotIndexed,
  165. IsCompressed: false,
  166. HasTimeRange: false,
  167. }
  168. allLogsMap[cache.Path] = logWithIndex
  169. }
  170. // Get persistence indexes and update status
  171. persistenceIndexes, err := lm.persistence.GetAllLogIndexes()
  172. if err != nil {
  173. logger.Warnf("Failed to get persistence indexes: %v", err)
  174. persistenceIndexes = []*model.NginxLogIndex{}
  175. }
  176. // --- START DIAGNOSTIC LOGGING ---
  177. logger.Debugf("===== DB STATE BEFORE GROUPING =====")
  178. for _, pIdx := range persistenceIndexes {
  179. logger.Debugf("DB Record: Path=%s, MainLogPath=%s, DocCount=%d, LastIndexed=%s", pIdx.Path, pIdx.MainLogPath, pIdx.DocumentCount, pIdx.LastIndexed)
  180. }
  181. logger.Debugf("===================================")
  182. // --- END DIAGNOSTIC LOGGING ---
  183. // Add all indexed files from persistence (including rotated files)
  184. for _, idx := range persistenceIndexes {
  185. if _, exists := allLogsMap[idx.Path]; !exists {
  186. // This is a rotated file not in config cache, create entry for it
  187. logType := "access"
  188. if strings.Contains(idx.Path, "error") {
  189. logType = "error"
  190. }
  191. logWithIndex := &NginxLogWithIndex{
  192. Path: idx.Path,
  193. Type: logType,
  194. Name: filepath.Base(idx.Path),
  195. ConfigFile: "",
  196. IndexStatus: IndexStatusNotIndexed,
  197. }
  198. allLogsMap[idx.Path] = logWithIndex
  199. }
  200. // Update index status from persistence data
  201. logWithIndex := allLogsMap[idx.Path]
  202. logWithIndex.LastModified = idx.LastModified.Unix()
  203. logWithIndex.LastSize = idx.LastSize
  204. logWithIndex.LastIndexed = idx.LastIndexed.Unix()
  205. logWithIndex.IndexStartTime = idx.IndexStartTime.Unix()
  206. if idx.IndexDuration != nil {
  207. logWithIndex.IndexDuration = *idx.IndexDuration
  208. }
  209. logWithIndex.DocumentCount = idx.DocumentCount
  210. // Determine index status
  211. lm.indexingMutex.RLock()
  212. isIndexing := lm.indexingStatus[idx.Path]
  213. lm.indexingMutex.RUnlock()
  214. if isIndexing {
  215. logWithIndex.IndexStatus = IndexStatusIndexing
  216. } else if idx.DocumentCount > 0 {
  217. logWithIndex.IndexStatus = IndexStatusIndexed
  218. }
  219. // Set time range if available
  220. if idx.TimeRangeStart != nil && idx.TimeRangeEnd != nil && !idx.TimeRangeStart.IsZero() && !idx.TimeRangeEnd.IsZero() {
  221. logWithIndex.HasTimeRange = true
  222. logWithIndex.TimeRangeStart = idx.TimeRangeStart.Unix()
  223. logWithIndex.TimeRangeEnd = idx.TimeRangeEnd.Unix()
  224. }
  225. logWithIndex.IsCompressed = strings.HasSuffix(idx.Path, ".gz") || strings.HasSuffix(idx.Path, ".bz2")
  226. }
  227. // Convert to slice and apply filters
  228. var logs []*NginxLogWithIndex
  229. for _, log := range allLogsMap {
  230. // Apply all filters
  231. include := true
  232. for _, filter := range filters {
  233. if !filter(log) {
  234. include = false
  235. break
  236. }
  237. }
  238. if include {
  239. logs = append(logs, log)
  240. }
  241. }
  242. // Group by base log name with stable aggregation
  243. groupedMap := make(map[string]*NginxLogWithIndex)
  244. // Sort logs by path first to ensure consistent processing order
  245. sort.Slice(logs, func(i, j int) bool {
  246. return logs[i].Path < logs[j].Path
  247. })
  248. for _, log := range logs {
  249. baseLogName := getBaseLogName(log.Path)
  250. if existing, exists := groupedMap[baseLogName]; exists {
  251. // Merge with existing entry using consistent rules
  252. // Always use the most recent data for single-value fields
  253. if log.LastIndexed > existing.LastIndexed {
  254. existing.LastModified = log.LastModified
  255. existing.LastIndexed = log.LastIndexed
  256. existing.IndexStartTime = log.IndexStartTime
  257. existing.IndexDuration = log.IndexDuration
  258. }
  259. // Accumulate countable metrics
  260. existing.DocumentCount += log.DocumentCount
  261. existing.LastSize += log.LastSize
  262. // Update status to most significant (indexing > indexed > not_indexed)
  263. if log.IndexStatus == IndexStatusIndexing {
  264. existing.IndexStatus = IndexStatusIndexing
  265. } else if log.IndexStatus == IndexStatusIndexed && existing.IndexStatus != IndexStatusIndexing {
  266. existing.IndexStatus = IndexStatusIndexed
  267. }
  268. // Expand time range to encompass both (deterministic expansion)
  269. if log.HasTimeRange {
  270. if !existing.HasTimeRange {
  271. existing.HasTimeRange = true
  272. existing.TimeRangeStart = log.TimeRangeStart
  273. existing.TimeRangeEnd = log.TimeRangeEnd
  274. } else {
  275. // Always expand range consistently
  276. if log.TimeRangeStart > 0 && (existing.TimeRangeStart == 0 || log.TimeRangeStart < existing.TimeRangeStart) {
  277. existing.TimeRangeStart = log.TimeRangeStart
  278. }
  279. if log.TimeRangeEnd > existing.TimeRangeEnd {
  280. existing.TimeRangeEnd = log.TimeRangeEnd
  281. }
  282. }
  283. }
  284. } else {
  285. // Create new entry with base log name as path for grouping
  286. groupedLog := *log
  287. groupedLog.Path = baseLogName
  288. groupedLog.Name = filepath.Base(baseLogName)
  289. groupedMap[baseLogName] = &groupedLog
  290. }
  291. }
  292. // Convert map to slice with consistent ordering
  293. var result []*NginxLogWithIndex
  294. // Create a sorted list of keys to ensure consistent order
  295. var keys []string
  296. for key := range groupedMap {
  297. keys = append(keys, key)
  298. }
  299. sort.Strings(keys)
  300. // Build result in consistent order
  301. for _, key := range keys {
  302. result = append(result, groupedMap[key])
  303. }
  304. // --- START DIAGNOSTIC LOGGING ---
  305. logger.Debugf("===== FINAL GROUPED LIST =====")
  306. for _, fLog := range result {
  307. logger.Debugf("Final Group: Path=%s, DocCount=%d, Status=%s", fLog.Path, fLog.DocumentCount, fLog.IndexStatus)
  308. }
  309. logger.Debugf("===============================")
  310. // --- END DIAGNOSTIC LOGGING ---
  311. return result
  312. }
  313. // SaveIndexMetadata saves the metadata for a log group after an indexing operation.
  314. // It creates a new record for the base log path.
  315. func (lm *LogFileManager) SaveIndexMetadata(basePath string, documentCount uint64, startTime time.Time, duration time.Duration, minTime *time.Time, maxTime *time.Time) error {
  316. // We want to save the metadata against the base path (the "log group").
  317. // We get or create a record for this specific path.
  318. logIndex, err := lm.persistence.GetLogIndex(basePath)
  319. if err != nil {
  320. // If the error is anything other than "not found", it's a real problem.
  321. // GetLogIndex is designed to return a new object if not found, so this should be rare.
  322. return fmt.Errorf("could not get or create log index for '%s': %w", basePath, err)
  323. }
  324. // Update the record with the new metadata
  325. logIndex.DocumentCount = documentCount
  326. logIndex.LastIndexed = time.Now()
  327. logIndex.IndexStartTime = &startTime
  328. durationMs := duration.Milliseconds()
  329. logIndex.IndexDuration = &durationMs
  330. // Set the time range from the parsed logs
  331. logIndex.TimeRangeStart = minTime
  332. logIndex.TimeRangeEnd = maxTime
  333. // Save the updated record to the database
  334. return lm.persistence.SaveLogIndex(logIndex)
  335. }
  336. // DeleteIndexMetadataByGroup deletes all database records for a given log group.
  337. func (lm *LogFileManager) DeleteIndexMetadataByGroup(basePath string) error {
  338. // The basePath is the main log path for the group.
  339. return lm.persistence.DeleteLogIndexesByGroup(basePath)
  340. }
  341. // DeleteAllIndexMetadata deletes all index metadata from the database.
  342. func (lm *LogFileManager) DeleteAllIndexMetadata() error {
  343. return lm.persistence.DeleteAllLogIndexes()
  344. }
  345. // GetLogByPath returns the full NginxLogWithIndex struct for a given base path.
  346. func (lm *LogFileManager) GetLogByPath(basePath string) (*NginxLogWithIndex, error) {
  347. // This is not the most efficient way, but it's reliable.
  348. // It ensures we get the same grouped and aggregated data the UI sees.
  349. allLogs := lm.GetAllLogsWithIndexGrouped()
  350. for _, log := range allLogs {
  351. if log.Path == basePath {
  352. return log, nil
  353. }
  354. }
  355. return nil, fmt.Errorf("log group with base path not found: %s", basePath)
  356. }
  357. // GetFilePathsForGroup returns all physical file paths for a given log group base path.
  358. func (lm *LogFileManager) GetFilePathsForGroup(basePath string) ([]string, error) {
  359. // Query the database for all log indexes with matching main_log_path
  360. logIndexes, err := lm.persistence.GetLogIndexesByGroup(basePath)
  361. if err != nil {
  362. return nil, fmt.Errorf("failed to get log indexes for group %s: %w", basePath, err)
  363. }
  364. // Extract file paths from the database records
  365. filePaths := make([]string, 0, len(logIndexes))
  366. for _, logIndex := range logIndexes {
  367. filePaths = append(filePaths, logIndex.Path)
  368. }
  369. return filePaths, nil
  370. }
  371. // maxInt64 returns the maximum of two int64 values
  372. func maxInt64(a, b int64) int64 {
  373. if a > b {
  374. return a
  375. }
  376. return b
  377. }
  378. // GetAllLogsWithIndex returns all cached log paths with their index status (non-grouped)
  379. func (lm *LogFileManager) GetAllLogsWithIndex(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
  380. lm.cacheMutex.RLock()
  381. defer lm.cacheMutex.RUnlock()
  382. result := make([]*NginxLogWithIndex, 0, len(lm.logCache))
  383. // Get persistence indexes
  384. persistenceIndexes, err := lm.persistence.GetAllLogIndexes()
  385. if err != nil {
  386. logger.Warnf("Failed to get persistence indexes: %v", err)
  387. persistenceIndexes = []*model.NginxLogIndex{}
  388. }
  389. // Create a map of persistence indexes for quick lookup
  390. persistenceMap := make(map[string]*model.NginxLogIndex)
  391. for _, idx := range persistenceIndexes {
  392. persistenceMap[idx.Path] = idx
  393. }
  394. // Process cached logs (from nginx config)
  395. for _, cache := range lm.logCache {
  396. logWithIndex := &NginxLogWithIndex{
  397. Path: cache.Path,
  398. Type: cache.Type,
  399. Name: cache.Name,
  400. ConfigFile: cache.ConfigFile,
  401. IndexStatus: IndexStatusNotIndexed,
  402. IsCompressed: strings.HasSuffix(cache.Path, ".gz") || strings.HasSuffix(cache.Path, ".bz2"),
  403. }
  404. // Update with persistence data if available
  405. if idx, exists := persistenceMap[cache.Path]; exists {
  406. logWithIndex.LastModified = idx.LastModified.Unix()
  407. logWithIndex.LastSize = idx.LastSize
  408. logWithIndex.LastIndexed = idx.LastIndexed.Unix()
  409. logWithIndex.IndexStartTime = idx.IndexStartTime.Unix()
  410. if idx.IndexDuration != nil {
  411. logWithIndex.IndexDuration = *idx.IndexDuration
  412. }
  413. logWithIndex.DocumentCount = idx.DocumentCount
  414. // Determine status
  415. lm.indexingMutex.RLock()
  416. isIndexing := lm.indexingStatus[cache.Path]
  417. lm.indexingMutex.RUnlock()
  418. if isIndexing {
  419. logWithIndex.IndexStatus = IndexStatusIndexing
  420. } else if idx.DocumentCount > 0 {
  421. logWithIndex.IndexStatus = IndexStatusIndexed
  422. }
  423. // Set time range if available
  424. if idx.TimeRangeStart != nil && idx.TimeRangeEnd != nil && !idx.TimeRangeStart.IsZero() && !idx.TimeRangeEnd.IsZero() {
  425. logWithIndex.HasTimeRange = true
  426. logWithIndex.TimeRangeStart = idx.TimeRangeStart.Unix()
  427. logWithIndex.TimeRangeEnd = idx.TimeRangeEnd.Unix()
  428. }
  429. }
  430. // Apply filters
  431. include := true
  432. for _, filter := range filters {
  433. if !filter(logWithIndex) {
  434. include = false
  435. break
  436. }
  437. }
  438. if include {
  439. result = append(result, logWithIndex)
  440. }
  441. }
  442. return result
  443. }