progress_tracker.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. package indexer
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "errors"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // ProgressTracker manages progress tracking for indexing operations
  15. type ProgressTracker struct {
  16. mu sync.RWMutex
  17. logGroupPath string
  18. startTime time.Time
  19. files map[string]*FileProgress
  20. totalEstimate int64 // Total estimated lines across all files
  21. totalActual int64 // Total actual lines processed
  22. isCompleted bool
  23. completionNotified bool // Flag to prevent duplicate completion notifications
  24. lastNotify time.Time
  25. // Callback functions for notifications
  26. onProgress func(ProgressNotification)
  27. onCompletion func(CompletionNotification)
  28. }
  29. // FileProgress tracks progress for individual files
  30. type FileProgress struct {
  31. FilePath string `json:"file_path"`
  32. State FileState `json:"state"`
  33. EstimatedLines int64 `json:"estimated_lines"` // Estimated total lines in this file
  34. ProcessedLines int64 `json:"processed_lines"` // Actually processed lines
  35. FileSize int64 `json:"file_size"` // Total file size in bytes
  36. CurrentPos int64 `json:"current_pos"` // Current reading position in bytes
  37. AvgLineSize int64 `json:"avg_line_size"` // Dynamic average line size in bytes
  38. SampleCount int64 `json:"sample_count"` // Number of lines sampled for average calculation
  39. IsCompressed bool `json:"is_compressed"`
  40. StartTime time.Time `json:"start_time"`
  41. CompletedTime time.Time `json:"completed_time"`
  42. ErrorMsg string `json:"error_msg,omitempty"` // Error message if processing failed
  43. }
  44. // FileState represents the current state of file processing
  45. type FileState int
  46. const (
  47. FileStatePending FileState = iota
  48. FileStateProcessing
  49. FileStateCompleted
  50. FileStateFailed
  51. )
  52. func (fs FileState) String() string {
  53. switch fs {
  54. case FileStatePending:
  55. return "pending"
  56. case FileStateProcessing:
  57. return "processing"
  58. case FileStateCompleted:
  59. return "completed"
  60. case FileStateFailed:
  61. return "failed"
  62. default:
  63. return "unknown"
  64. }
  65. }
  66. // ProgressNotification contains progress update information
  67. type ProgressNotification struct {
  68. LogGroupPath string `json:"log_group_path"`
  69. Percentage float64 `json:"percentage"`
  70. TotalFiles int `json:"total_files"`
  71. CompletedFiles int `json:"completed_files"`
  72. ProcessingFiles int `json:"processing_files"`
  73. FailedFiles int `json:"failed_files"`
  74. ProcessedLines int64 `json:"processed_lines"`
  75. EstimatedLines int64 `json:"estimated_lines"`
  76. ElapsedTime time.Duration `json:"elapsed_time"`
  77. EstimatedRemain time.Duration `json:"estimated_remain"`
  78. IsCompleted bool `json:"is_completed"`
  79. }
  80. // CompletionNotification contains completion information
  81. type CompletionNotification struct {
  82. LogGroupPath string `json:"log_group_path"`
  83. Success bool `json:"success"`
  84. Duration time.Duration `json:"duration"`
  85. TotalLines int64 `json:"total_lines"`
  86. TotalFiles int `json:"total_files"`
  87. FailedFiles int `json:"failed_files"`
  88. IndexedSize int64 `json:"indexed_size"`
  89. Error string `json:"error,omitempty"`
  90. }
  91. // ProgressConfig contains configuration for progress tracking
  92. type ProgressConfig struct {
  93. NotifyInterval time.Duration // Minimum time between progress notifications
  94. OnProgress func(ProgressNotification)
  95. OnCompletion func(CompletionNotification)
  96. }
  97. // NewProgressTracker creates a new progress tracker for indexing operations
  98. func NewProgressTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  99. pt := &ProgressTracker{
  100. logGroupPath: logGroupPath,
  101. startTime: time.Now(),
  102. files: make(map[string]*FileProgress),
  103. completionNotified: false,
  104. }
  105. if config != nil {
  106. if config.NotifyInterval == 0 {
  107. config.NotifyInterval = 2 * time.Second // Default notify interval
  108. }
  109. pt.onProgress = config.OnProgress
  110. pt.onCompletion = config.OnCompletion
  111. }
  112. return pt
  113. }
  114. // AddFile adds a file to the progress tracker
  115. func (pt *ProgressTracker) AddFile(filePath string, isCompressed bool) {
  116. pt.mu.Lock()
  117. defer pt.mu.Unlock()
  118. pt.files[filePath] = &FileProgress{
  119. FilePath: filePath,
  120. State: FileStatePending,
  121. IsCompressed: isCompressed,
  122. AvgLineSize: 120, // Initial estimate: 120 bytes per line
  123. SampleCount: 0,
  124. }
  125. }
  126. // SetFileEstimate sets the estimated line count for a file
  127. func (pt *ProgressTracker) SetFileEstimate(filePath string, estimatedLines int64) {
  128. pt.mu.Lock()
  129. defer pt.mu.Unlock()
  130. if progress, exists := pt.files[filePath]; exists {
  131. oldEstimate := progress.EstimatedLines
  132. progress.EstimatedLines = estimatedLines
  133. // Update total estimate
  134. pt.totalEstimate = pt.totalEstimate - oldEstimate + estimatedLines
  135. }
  136. }
  137. // SetFileSize sets the file size for a file
  138. func (pt *ProgressTracker) SetFileSize(filePath string, fileSize int64) {
  139. pt.mu.Lock()
  140. defer pt.mu.Unlock()
  141. if progress, exists := pt.files[filePath]; exists {
  142. progress.FileSize = fileSize
  143. }
  144. }
  145. // UpdateFileProgress updates the processed line count and position for a file
  146. func (pt *ProgressTracker) UpdateFileProgress(filePath string, processedLines int64, currentPos ...int64) {
  147. pt.mu.Lock()
  148. defer pt.mu.Unlock()
  149. if progress, exists := pt.files[filePath]; exists {
  150. // Update total actual processed, ensuring not to double-count on completion
  151. if progress.State != FileStateCompleted {
  152. pt.totalActual = pt.totalActual - progress.ProcessedLines + processedLines
  153. }
  154. progress.ProcessedLines = processedLines
  155. // Update position if provided
  156. if len(currentPos) > 0 && !progress.IsCompressed {
  157. progress.CurrentPos = currentPos[0]
  158. }
  159. // Update average line size for compressed files
  160. if progress.IsCompressed && processedLines > 0 && currentPos != nil && len(currentPos) > 0 {
  161. progress.SampleCount++
  162. if progress.SampleCount > 0 {
  163. progress.AvgLineSize = currentPos[0] / processedLines
  164. }
  165. }
  166. // Notify progress if enough time has passed
  167. pt.notifyProgressLocked()
  168. }
  169. }
  170. // StartFile marks a file as started processing
  171. func (pt *ProgressTracker) StartFile(filePath string) {
  172. pt.mu.Lock()
  173. defer pt.mu.Unlock()
  174. if progress, exists := pt.files[filePath]; exists {
  175. progress.State = FileStateProcessing
  176. progress.StartTime = time.Now()
  177. pt.notifyProgressLocked()
  178. }
  179. }
  180. // CompleteFile marks a file as completed successfully
  181. func (pt *ProgressTracker) CompleteFile(filePath string, finalProcessedLines int64) {
  182. pt.mu.Lock()
  183. defer pt.mu.Unlock()
  184. if progress, exists := pt.files[filePath]; exists {
  185. // Prevent marking as completed multiple times
  186. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  187. return
  188. }
  189. // Ensure final processed lines are correctly accounted for in total
  190. pt.totalActual = pt.totalActual - progress.ProcessedLines + finalProcessedLines
  191. progress.ProcessedLines = finalProcessedLines
  192. progress.State = FileStateCompleted
  193. progress.CompletedTime = time.Now()
  194. pt.checkCompletionLocked()
  195. }
  196. }
  197. // FailFile marks a file as failed processing
  198. func (pt *ProgressTracker) FailFile(filePath string, errorMsg string) {
  199. pt.mu.Lock()
  200. defer pt.mu.Unlock()
  201. if progress, exists := pt.files[filePath]; exists {
  202. // Prevent marking as failed multiple times
  203. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  204. return
  205. }
  206. progress.State = FileStateFailed
  207. progress.ErrorMsg = errorMsg
  208. progress.CompletedTime = time.Now()
  209. pt.checkCompletionLocked()
  210. }
  211. }
  212. // GetProgress returns the current progress percentage and stats
  213. func (pt *ProgressTracker) GetProgress() ProgressNotification {
  214. pt.mu.RLock()
  215. defer pt.mu.RUnlock()
  216. return pt.getProgressLocked()
  217. }
  218. // GetFileProgress returns progress for a specific file
  219. func (pt *ProgressTracker) GetFileProgress(filePath string) (*FileProgress, bool) {
  220. pt.mu.RLock()
  221. defer pt.mu.RUnlock()
  222. progress, exists := pt.files[filePath]
  223. if !exists {
  224. return nil, false
  225. }
  226. // Return a copy to avoid race conditions
  227. copy := *progress
  228. return &copy, true
  229. }
  230. // GetAllFiles returns all file progress information
  231. func (pt *ProgressTracker) GetAllFiles() map[string]*FileProgress {
  232. pt.mu.RLock()
  233. defer pt.mu.RUnlock()
  234. result := make(map[string]*FileProgress)
  235. for path, progress := range pt.files {
  236. copy := *progress
  237. result[path] = &copy
  238. }
  239. return result
  240. }
  241. // IsCompleted returns whether all files have been processed
  242. func (pt *ProgressTracker) IsCompleted() bool {
  243. pt.mu.RLock()
  244. defer pt.mu.RUnlock()
  245. return pt.isCompleted
  246. }
  247. // Cancel marks the tracker as cancelled and stops processing
  248. func (pt *ProgressTracker) Cancel(reason string) {
  249. pt.mu.Lock()
  250. defer pt.mu.Unlock()
  251. for _, progress := range pt.files {
  252. if progress.State == FileStateProcessing || progress.State == FileStatePending {
  253. progress.State = FileStateFailed
  254. progress.ErrorMsg = "cancelled: " + reason
  255. progress.CompletedTime = time.Now()
  256. }
  257. }
  258. pt.isCompleted = true
  259. pt.notifyCompletionLocked()
  260. }
  261. // checkCompletionLocked checks if all files are completed and notifies if so
  262. func (pt *ProgressTracker) checkCompletionLocked() {
  263. if pt.completionNotified {
  264. return
  265. }
  266. allCompleted := true
  267. for _, fp := range pt.files {
  268. if fp.State != FileStateCompleted && fp.State != FileStateFailed {
  269. allCompleted = false
  270. break
  271. }
  272. }
  273. if allCompleted {
  274. pt.isCompleted = true
  275. pt.completionNotified = true
  276. pt.notifyCompletionLocked()
  277. } else {
  278. pt.notifyProgressLocked()
  279. }
  280. }
  281. // notifyProgressLocked sends progress notification (must be called with lock held)
  282. func (pt *ProgressTracker) notifyProgressLocked() {
  283. // Throttle notifications to avoid spam
  284. now := time.Now()
  285. if now.Sub(pt.lastNotify) < 2*time.Second {
  286. return
  287. }
  288. pt.lastNotify = now
  289. if pt.onProgress != nil {
  290. notification := pt.getProgressLocked()
  291. go pt.onProgress(notification) // Non-blocking notification
  292. }
  293. }
  294. // notifyCompletionLocked sends completion notification (must be called with lock held)
  295. func (pt *ProgressTracker) notifyCompletionLocked() {
  296. if pt.onCompletion == nil {
  297. return
  298. }
  299. elapsed := time.Since(pt.startTime)
  300. // Calculate total size processed
  301. var totalSize int64
  302. var failedFiles int
  303. for _, fp := range pt.files {
  304. if fp.State == FileStateFailed {
  305. failedFiles++
  306. continue
  307. }
  308. if fp.IsCompressed {
  309. // For compressed files, use dynamic average line size
  310. totalSize += fp.ProcessedLines * fp.AvgLineSize
  311. } else {
  312. // For uncompressed files, use actual bytes processed if available
  313. if fp.CurrentPos > 0 {
  314. totalSize += fp.CurrentPos
  315. } else {
  316. // Fallback to line-based estimation
  317. totalSize += fp.ProcessedLines * 150
  318. }
  319. }
  320. }
  321. notification := CompletionNotification{
  322. LogGroupPath: pt.logGroupPath,
  323. Success: failedFiles == 0,
  324. Duration: elapsed,
  325. TotalLines: pt.totalActual,
  326. TotalFiles: len(pt.files),
  327. FailedFiles: failedFiles,
  328. IndexedSize: totalSize,
  329. }
  330. if failedFiles > 0 {
  331. notification.Error = "some files failed to process"
  332. }
  333. go pt.onCompletion(notification) // Non-blocking notification
  334. }
  335. // getProgressLocked returns progress without notification (must be called with lock held)
  336. func (pt *ProgressTracker) getProgressLocked() ProgressNotification {
  337. var completedFiles, processingFiles, failedFiles int
  338. // Count files by state
  339. for _, fp := range pt.files {
  340. switch fp.State {
  341. case FileStateCompleted:
  342. completedFiles++
  343. case FileStateProcessing:
  344. processingFiles++
  345. case FileStateFailed:
  346. failedFiles++
  347. }
  348. }
  349. // Calculate progress percentage using hybrid approach
  350. var percentage float64
  351. // Calculate weighted progress combining file count, file sizes, and line estimates
  352. var totalFileWeight, processedFileWeight float64
  353. var totalSizeWeight, processedSizeWeight float64
  354. var totalLineWeight, processedLineWeight float64
  355. // Collect metrics for hybrid calculation
  356. for _, fp := range pt.files {
  357. fileWeight := 1.0 // Each file contributes equally to file-based progress
  358. sizeWeight := float64(fp.FileSize)
  359. lineWeight := float64(fp.EstimatedLines)
  360. totalFileWeight += fileWeight
  361. totalSizeWeight += sizeWeight
  362. if lineWeight > 0 {
  363. totalLineWeight += lineWeight
  364. }
  365. if fp.State == FileStateCompleted {
  366. processedFileWeight += fileWeight
  367. processedSizeWeight += sizeWeight
  368. if lineWeight > 0 {
  369. processedLineWeight += float64(fp.ProcessedLines)
  370. }
  371. } else if fp.State == FileStateProcessing && fp.ProcessedLines > 0 {
  372. // For processing files, add partial progress
  373. if lineWeight > 0 {
  374. processedLineWeight += float64(fp.ProcessedLines)
  375. // For size-based progress, estimate based on lines processed
  376. if fp.EstimatedLines > 0 {
  377. sizeProgress := float64(fp.ProcessedLines) / float64(fp.EstimatedLines)
  378. processedSizeWeight += sizeWeight * sizeProgress
  379. }
  380. } else if fp.FileSize > 0 && fp.CurrentPos > 0 {
  381. // Use position-based progress for files without line estimates
  382. sizeProgress := float64(fp.CurrentPos) / float64(fp.FileSize)
  383. processedSizeWeight += sizeWeight * sizeProgress
  384. processedFileWeight += fileWeight * sizeProgress
  385. }
  386. }
  387. }
  388. // Calculate progress using the most reliable metric available
  389. if totalLineWeight > 0 && processedLineWeight > 0 {
  390. // Dynamic line estimation: adjust line estimates based on completed files
  391. dynamicLineWeight := totalLineWeight
  392. if completedFiles >= 2 {
  393. // Calculate average lines per completed file
  394. var totalLinesFromCompleted int64
  395. for _, fp := range pt.files {
  396. if fp.State == FileStateCompleted {
  397. totalLinesFromCompleted += fp.ProcessedLines
  398. }
  399. }
  400. if completedFiles > 0 {
  401. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  402. // Adjust remaining file estimates based on observed average
  403. remainingFiles := len(pt.files) - completedFiles
  404. if remainingFiles > 0 {
  405. // Calculate current estimate for remaining files
  406. var remainingEstimate int64
  407. for _, fp := range pt.files {
  408. if fp.State != FileStateCompleted {
  409. remainingEstimate += fp.EstimatedLines
  410. }
  411. }
  412. // Replace remaining estimates with dynamic estimate
  413. dynamicRemainingEstimate := int64(avgLinesPerFile * float64(remainingFiles))
  414. dynamicLineWeight = float64(int64(totalLinesFromCompleted) + dynamicRemainingEstimate)
  415. }
  416. }
  417. }
  418. // Use line-based progress with dynamic estimation
  419. linePercentage := (processedLineWeight / float64(dynamicLineWeight)) * 100
  420. // Use file-based progress as primary metric for reliability
  421. filePercentage := (processedFileWeight / totalFileWeight) * 100
  422. // Weight them: 40% lines, 60% files for better reliability
  423. // Files are more predictable than line estimates for compressed files
  424. percentage = (linePercentage * 0.4) + (filePercentage * 0.6)
  425. // Additional safety: never exceed file-based progress by more than 10%
  426. // This prevents premature 100% when file-based progress is more reliable
  427. maxAllowedPercentage := filePercentage + 10.0
  428. if percentage > maxAllowedPercentage {
  429. percentage = maxAllowedPercentage
  430. }
  431. } else if totalSizeWeight > 0 {
  432. // Fallback to size-based progress
  433. percentage = (processedSizeWeight / totalSizeWeight) * 100
  434. } else if totalFileWeight > 0 {
  435. // Fallback to file-based progress only
  436. percentage = (processedFileWeight / totalFileWeight) * 100
  437. }
  438. // Cap at 100%
  439. if percentage > 100 {
  440. percentage = 100
  441. }
  442. elapsed := time.Since(pt.startTime)
  443. var estimatedRemain time.Duration
  444. if percentage > 0 && percentage < 100 {
  445. avgTimePerPercent := float64(elapsed.Nanoseconds()) / percentage
  446. remainingPercent := 100.0 - percentage
  447. estimatedRemain = time.Duration(int64(avgTimePerPercent * remainingPercent))
  448. }
  449. // Use dynamic line estimation for reporting
  450. adjustedEstimate := pt.totalEstimate
  451. if completedFiles >= 2 {
  452. // Calculate average lines per completed file
  453. var totalLinesFromCompleted int64
  454. for _, fp := range pt.files {
  455. if fp.State == FileStateCompleted {
  456. totalLinesFromCompleted += fp.ProcessedLines
  457. }
  458. }
  459. if completedFiles > 0 {
  460. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  461. // Project total lines based on completed files
  462. projectedTotal := int64(avgLinesPerFile * float64(len(pt.files)))
  463. // Use dynamic estimate with some constraints to prevent extreme changes
  464. if completedFiles >= len(pt.files)/5 { // At least 20% of files processed
  465. // More confidence, allow larger adjustments
  466. adjustedEstimate = projectedTotal
  467. } else if projectedTotal > adjustedEstimate {
  468. // Less confidence, be more conservative but still adjust upward
  469. maxIncrease := adjustedEstimate + (projectedTotal-adjustedEstimate)/2
  470. adjustedEstimate = maxIncrease
  471. }
  472. }
  473. }
  474. return ProgressNotification{
  475. LogGroupPath: pt.logGroupPath,
  476. Percentage: percentage,
  477. TotalFiles: len(pt.files),
  478. CompletedFiles: completedFiles,
  479. ProcessingFiles: processingFiles,
  480. FailedFiles: failedFiles,
  481. ProcessedLines: pt.totalActual,
  482. EstimatedLines: adjustedEstimate,
  483. ElapsedTime: elapsed,
  484. EstimatedRemain: estimatedRemain,
  485. IsCompleted: pt.isCompleted,
  486. }
  487. }
  488. // EstimateFileLines estimates the number of lines in a file based on sampling
  489. func EstimateFileLines(ctx context.Context, filePath string, fileSize int64, isCompressed bool) (int64, error) {
  490. if fileSize == 0 {
  491. return 0, nil
  492. }
  493. file, err := os.Open(filePath)
  494. if err != nil {
  495. // Return fallback estimate instead of error
  496. return fileSize / 150, nil // Fallback: ~150 bytes per line
  497. }
  498. defer file.Close()
  499. var reader io.Reader = file
  500. // Handle compressed files
  501. if isCompressed {
  502. gzReader, err := gzip.NewReader(file)
  503. if err != nil {
  504. return (fileSize * 3) / 150, nil // Fallback for compressed: 3:1 ratio
  505. }
  506. defer gzReader.Close()
  507. reader = gzReader
  508. }
  509. // Sample the first 1MB of the file content (decompressed if necessary)
  510. sampleSize := int64(1024 * 1024)
  511. buf := make([]byte, sampleSize)
  512. // Check for context cancellation
  513. select {
  514. case <-ctx.Done():
  515. return 0, ctx.Err()
  516. default:
  517. }
  518. bytesRead, err := io.ReadFull(reader, buf)
  519. if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
  520. return fileSize / 150, nil // Fallback on read error
  521. }
  522. if bytesRead == 0 {
  523. return 0, nil // Empty file
  524. }
  525. // Count lines in the sample
  526. lineCount := bytes.Count(buf[:bytesRead], []byte{'\n'})
  527. if lineCount == 0 {
  528. // Avoid division by zero, fallback to rough estimate
  529. return fileSize / 150, nil
  530. }
  531. // Calculate average line size from the sample
  532. avgLineSize := float64(bytesRead) / float64(lineCount)
  533. if avgLineSize == 0 {
  534. return fileSize / 150, nil // Fallback
  535. }
  536. // Estimate total lines
  537. var estimatedLines int64
  538. if isCompressed {
  539. // For compressed files, use a compression ratio estimate
  540. estimatedUncompressedSize := fileSize * 5 // Conservative 5:1 compression ratio
  541. estimatedLines = int64(float64(estimatedUncompressedSize) / avgLineSize)
  542. } else {
  543. estimatedLines = int64(float64(fileSize) / avgLineSize)
  544. }
  545. return estimatedLines, nil
  546. }
  547. // IsCompressedFile determines if a file is compressed based on its extension
  548. func IsCompressedFile(filePath string) bool {
  549. ext := strings.ToLower(filepath.Ext(filePath))
  550. return ext == ".gz" || ext == ".bz2" || ext == ".xz" || ext == ".lz4"
  551. }
  552. // IsRotationLogFile determines if a file is a rotation log file
  553. func IsRotationLogFile(filePath string) bool {
  554. base := filepath.Base(filePath)
  555. // Common nginx rotation patterns:
  556. // access.log, access.log.1, access.log.2.gz
  557. // access.1.log, access.2.log.gz
  558. // error.log, error.log.1, error.log.2.gz
  559. // Remove compression extensions first
  560. if IsCompressedFile(base) {
  561. base = strings.TrimSuffix(base, filepath.Ext(base))
  562. }
  563. // Check for basic .log files
  564. if strings.HasSuffix(base, ".log") {
  565. return true
  566. }
  567. // Check for numbered rotation patterns: access.log.1, error.log.10, etc.
  568. parts := strings.Split(base, ".")
  569. if len(parts) >= 3 {
  570. // Pattern: name.log.number (e.g., access.log.1)
  571. if parts[len(parts)-2] == "log" && isNumeric(parts[len(parts)-1]) {
  572. return true
  573. }
  574. // Pattern: name.number.log (e.g., access.1.log)
  575. if parts[len(parts)-1] == "log" {
  576. for i := 1; i < len(parts)-1; i++ {
  577. if isNumeric(parts[i]) {
  578. return true
  579. }
  580. }
  581. }
  582. }
  583. return false
  584. }
  585. // isNumeric checks if a string represents a number
  586. func isNumeric(s string) bool {
  587. if len(s) == 0 {
  588. return false
  589. }
  590. for _, r := range s {
  591. if r < '0' || r > '9' {
  592. return false
  593. }
  594. }
  595. return true
  596. }
  597. // AddRotationFiles automatically detects and adds rotation log files with appropriate compression detection
  598. func (pt *ProgressTracker) AddRotationFiles(filePaths ...string) {
  599. for _, filePath := range filePaths {
  600. isCompressed := IsCompressedFile(filePath)
  601. pt.AddFile(filePath, isCompressed)
  602. }
  603. }
  604. // ProgressManager manages multiple progress trackers
  605. type ProgressManager struct {
  606. mu sync.RWMutex
  607. trackers map[string]*ProgressTracker
  608. }
  609. // NewProgressManager creates a new progress manager
  610. func NewProgressManager() *ProgressManager {
  611. return &ProgressManager{
  612. trackers: make(map[string]*ProgressTracker),
  613. }
  614. }
  615. // GetTracker gets or creates a progress tracker for a log group
  616. func (pm *ProgressManager) GetTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  617. pm.mu.Lock()
  618. defer pm.mu.Unlock()
  619. if tracker, exists := pm.trackers[logGroupPath]; exists {
  620. return tracker
  621. }
  622. tracker := NewProgressTracker(logGroupPath, config)
  623. pm.trackers[logGroupPath] = tracker
  624. return tracker
  625. }
  626. // RemoveTracker removes a progress tracker
  627. func (pm *ProgressManager) RemoveTracker(logGroupPath string) {
  628. pm.mu.Lock()
  629. defer pm.mu.Unlock()
  630. delete(pm.trackers, logGroupPath)
  631. }
  632. // GetAllTrackers returns all current trackers
  633. func (pm *ProgressManager) GetAllTrackers() map[string]*ProgressTracker {
  634. pm.mu.RLock()
  635. defer pm.mu.RUnlock()
  636. result := make(map[string]*ProgressTracker)
  637. for path, tracker := range pm.trackers {
  638. result[path] = tracker
  639. }
  640. return result
  641. }
  642. // Cleanup removes completed or failed trackers
  643. func (pm *ProgressManager) Cleanup() {
  644. pm.mu.Lock()
  645. defer pm.mu.Unlock()
  646. for path, tracker := range pm.trackers {
  647. if tracker.IsCompleted() {
  648. delete(pm.trackers, path)
  649. }
  650. }
  651. }