progress_tracker.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. package indexer
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "errors"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // ProgressTracker manages progress tracking for indexing operations
  15. type ProgressTracker struct {
  16. mu sync.RWMutex
  17. logGroupPath string
  18. startTime time.Time
  19. files map[string]*FileProgress
  20. totalEstimate int64 // Total estimated lines across all files
  21. totalActual int64 // Total actual lines processed
  22. isCompleted bool
  23. completionNotified bool // Flag to prevent duplicate completion notifications
  24. lastNotify time.Time
  25. notifyInterval time.Duration // Configurable notification interval
  26. // Callback functions for notifications
  27. onProgress func(ProgressNotification)
  28. onCompletion func(CompletionNotification)
  29. }
  30. // FileProgress tracks progress for individual files
  31. type FileProgress struct {
  32. FilePath string `json:"file_path"`
  33. State FileState `json:"state"`
  34. EstimatedLines int64 `json:"estimated_lines"` // Estimated total lines in this file
  35. ProcessedLines int64 `json:"processed_lines"` // Actually processed lines
  36. FileSize int64 `json:"file_size"` // Total file size in bytes
  37. CurrentPos int64 `json:"current_pos"` // Current reading position in bytes
  38. AvgLineSize int64 `json:"avg_line_size"` // Dynamic average line size in bytes
  39. SampleCount int64 `json:"sample_count"` // Number of lines sampled for average calculation
  40. IsCompressed bool `json:"is_compressed"`
  41. StartTime time.Time `json:"start_time"`
  42. CompletedTime time.Time `json:"completed_time"`
  43. ErrorMsg string `json:"error_msg,omitempty"` // Error message if processing failed
  44. }
  45. // FileState represents the current state of file processing
  46. type FileState int
  47. const (
  48. FileStatePending FileState = iota
  49. FileStateProcessing
  50. FileStateCompleted
  51. FileStateFailed
  52. )
  53. func (fs FileState) String() string {
  54. switch fs {
  55. case FileStatePending:
  56. return "pending"
  57. case FileStateProcessing:
  58. return "processing"
  59. case FileStateCompleted:
  60. return "completed"
  61. case FileStateFailed:
  62. return "failed"
  63. default:
  64. return "unknown"
  65. }
  66. }
  67. // ProgressNotification contains progress update information
  68. type ProgressNotification struct {
  69. LogGroupPath string `json:"log_group_path"`
  70. Percentage float64 `json:"percentage"`
  71. TotalFiles int `json:"total_files"`
  72. CompletedFiles int `json:"completed_files"`
  73. ProcessingFiles int `json:"processing_files"`
  74. FailedFiles int `json:"failed_files"`
  75. ProcessedLines int64 `json:"processed_lines"`
  76. EstimatedLines int64 `json:"estimated_lines"`
  77. ElapsedTime time.Duration `json:"elapsed_time"`
  78. EstimatedRemain time.Duration `json:"estimated_remain"`
  79. IsCompleted bool `json:"is_completed"`
  80. }
  81. // CompletionNotification contains completion information
  82. type CompletionNotification struct {
  83. LogGroupPath string `json:"log_group_path"`
  84. Success bool `json:"success"`
  85. Duration time.Duration `json:"duration"`
  86. TotalLines int64 `json:"total_lines"`
  87. TotalFiles int `json:"total_files"`
  88. FailedFiles int `json:"failed_files"`
  89. IndexedSize int64 `json:"indexed_size"`
  90. Error string `json:"error,omitempty"`
  91. }
  92. // ProgressConfig contains configuration for progress tracking
  93. type ProgressConfig struct {
  94. NotifyInterval time.Duration // Minimum time between progress notifications
  95. OnProgress func(ProgressNotification)
  96. OnCompletion func(CompletionNotification)
  97. }
  98. // NewProgressTracker creates a new progress tracker for indexing operations
  99. func NewProgressTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  100. pt := &ProgressTracker{
  101. logGroupPath: logGroupPath,
  102. startTime: time.Now(),
  103. files: make(map[string]*FileProgress),
  104. completionNotified: false,
  105. }
  106. if config != nil {
  107. if config.NotifyInterval == 0 {
  108. config.NotifyInterval = 1 * time.Second // Default notify interval
  109. }
  110. pt.notifyInterval = config.NotifyInterval
  111. pt.onProgress = config.OnProgress
  112. pt.onCompletion = config.OnCompletion
  113. } else {
  114. pt.notifyInterval = 1 * time.Second // Default when no config provided
  115. }
  116. return pt
  117. }
  118. // AddFile adds a file to the progress tracker
  119. func (pt *ProgressTracker) AddFile(filePath string, isCompressed bool) {
  120. pt.mu.Lock()
  121. defer pt.mu.Unlock()
  122. pt.files[filePath] = &FileProgress{
  123. FilePath: filePath,
  124. State: FileStatePending,
  125. IsCompressed: isCompressed,
  126. AvgLineSize: 120, // Initial estimate: 120 bytes per line
  127. SampleCount: 0,
  128. }
  129. }
  130. // SetFileEstimate sets the estimated line count for a file
  131. func (pt *ProgressTracker) SetFileEstimate(filePath string, estimatedLines int64) {
  132. pt.mu.Lock()
  133. defer pt.mu.Unlock()
  134. if progress, exists := pt.files[filePath]; exists {
  135. oldEstimate := progress.EstimatedLines
  136. progress.EstimatedLines = estimatedLines
  137. // Update total estimate
  138. pt.totalEstimate = pt.totalEstimate - oldEstimate + estimatedLines
  139. }
  140. }
  141. // SetFileSize sets the file size for a file
  142. func (pt *ProgressTracker) SetFileSize(filePath string, fileSize int64) {
  143. pt.mu.Lock()
  144. defer pt.mu.Unlock()
  145. if progress, exists := pt.files[filePath]; exists {
  146. progress.FileSize = fileSize
  147. }
  148. }
  149. // UpdateFileProgress updates the processed line count and position for a file
  150. func (pt *ProgressTracker) UpdateFileProgress(filePath string, processedLines int64, currentPos ...int64) {
  151. pt.mu.Lock()
  152. defer pt.mu.Unlock()
  153. if progress, exists := pt.files[filePath]; exists {
  154. // Update total actual processed, ensuring not to double-count on completion
  155. if progress.State != FileStateCompleted {
  156. pt.totalActual = pt.totalActual - progress.ProcessedLines + processedLines
  157. }
  158. progress.ProcessedLines = processedLines
  159. // Update position if provided
  160. if len(currentPos) > 0 && !progress.IsCompressed {
  161. progress.CurrentPos = currentPos[0]
  162. }
  163. // Update average line size for compressed files
  164. if progress.IsCompressed && processedLines > 0 && currentPos != nil && len(currentPos) > 0 {
  165. progress.SampleCount++
  166. if progress.SampleCount > 0 {
  167. progress.AvgLineSize = currentPos[0] / processedLines
  168. }
  169. }
  170. // Notify progress if enough time has passed
  171. pt.notifyProgressLocked()
  172. }
  173. }
  174. // StartFile marks a file as started processing
  175. func (pt *ProgressTracker) StartFile(filePath string) {
  176. pt.mu.Lock()
  177. defer pt.mu.Unlock()
  178. if progress, exists := pt.files[filePath]; exists {
  179. progress.State = FileStateProcessing
  180. progress.StartTime = time.Now()
  181. pt.notifyProgressLocked()
  182. }
  183. }
  184. // CompleteFile marks a file as completed successfully
  185. func (pt *ProgressTracker) CompleteFile(filePath string, finalProcessedLines int64) {
  186. pt.mu.Lock()
  187. defer pt.mu.Unlock()
  188. if progress, exists := pt.files[filePath]; exists {
  189. // Prevent marking as completed multiple times
  190. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  191. return
  192. }
  193. // Ensure final processed lines are correctly accounted for in total
  194. pt.totalActual = pt.totalActual - progress.ProcessedLines + finalProcessedLines
  195. progress.ProcessedLines = finalProcessedLines
  196. progress.State = FileStateCompleted
  197. progress.CompletedTime = time.Now()
  198. pt.checkCompletionLocked()
  199. }
  200. }
  201. // FailFile marks a file as failed processing
  202. func (pt *ProgressTracker) FailFile(filePath string, errorMsg string) {
  203. pt.mu.Lock()
  204. defer pt.mu.Unlock()
  205. if progress, exists := pt.files[filePath]; exists {
  206. // Prevent marking as failed multiple times
  207. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  208. return
  209. }
  210. progress.State = FileStateFailed
  211. progress.ErrorMsg = errorMsg
  212. progress.CompletedTime = time.Now()
  213. pt.checkCompletionLocked()
  214. }
  215. }
  216. // GetProgress returns the current progress percentage and stats
  217. func (pt *ProgressTracker) GetProgress() ProgressNotification {
  218. pt.mu.RLock()
  219. defer pt.mu.RUnlock()
  220. return pt.getProgressLocked()
  221. }
  222. // GetFileProgress returns progress for a specific file
  223. func (pt *ProgressTracker) GetFileProgress(filePath string) (*FileProgress, bool) {
  224. pt.mu.RLock()
  225. defer pt.mu.RUnlock()
  226. progress, exists := pt.files[filePath]
  227. if !exists {
  228. return nil, false
  229. }
  230. // Return a copy to avoid race conditions
  231. copy := *progress
  232. return &copy, true
  233. }
  234. // GetAllFiles returns all file progress information
  235. func (pt *ProgressTracker) GetAllFiles() map[string]*FileProgress {
  236. pt.mu.RLock()
  237. defer pt.mu.RUnlock()
  238. result := make(map[string]*FileProgress)
  239. for path, progress := range pt.files {
  240. copy := *progress
  241. result[path] = &copy
  242. }
  243. return result
  244. }
  245. // IsCompleted returns whether all files have been processed
  246. func (pt *ProgressTracker) IsCompleted() bool {
  247. pt.mu.RLock()
  248. defer pt.mu.RUnlock()
  249. return pt.isCompleted
  250. }
  251. // Cancel marks the tracker as cancelled and stops processing
  252. func (pt *ProgressTracker) Cancel(reason string) {
  253. pt.mu.Lock()
  254. defer pt.mu.Unlock()
  255. for _, progress := range pt.files {
  256. if progress.State == FileStateProcessing || progress.State == FileStatePending {
  257. progress.State = FileStateFailed
  258. progress.ErrorMsg = "cancelled: " + reason
  259. progress.CompletedTime = time.Now()
  260. }
  261. }
  262. pt.isCompleted = true
  263. pt.notifyCompletionLocked()
  264. }
  265. // checkCompletionLocked checks if all files are completed and notifies if so
  266. func (pt *ProgressTracker) checkCompletionLocked() {
  267. if pt.completionNotified {
  268. return
  269. }
  270. allCompleted := true
  271. for _, fp := range pt.files {
  272. if fp.State != FileStateCompleted && fp.State != FileStateFailed {
  273. allCompleted = false
  274. break
  275. }
  276. }
  277. if allCompleted {
  278. pt.isCompleted = true
  279. pt.completionNotified = true
  280. pt.notifyCompletionLocked()
  281. } else {
  282. pt.notifyProgressLocked()
  283. }
  284. }
  285. // notifyProgressLocked sends progress notification (must be called with lock held)
  286. func (pt *ProgressTracker) notifyProgressLocked() {
  287. // Throttle notifications to avoid spam using configurable interval
  288. now := time.Now()
  289. if now.Sub(pt.lastNotify) < pt.notifyInterval {
  290. return
  291. }
  292. pt.lastNotify = now
  293. if pt.onProgress != nil {
  294. notification := pt.getProgressLocked()
  295. go pt.onProgress(notification) // Non-blocking notification
  296. }
  297. }
  298. // notifyCompletionLocked sends completion notification (must be called with lock held)
  299. func (pt *ProgressTracker) notifyCompletionLocked() {
  300. if pt.onCompletion == nil {
  301. return
  302. }
  303. elapsed := time.Since(pt.startTime)
  304. // Calculate total size processed
  305. var totalSize int64
  306. var failedFiles int
  307. for _, fp := range pt.files {
  308. if fp.State == FileStateFailed {
  309. failedFiles++
  310. continue
  311. }
  312. if fp.IsCompressed {
  313. // For compressed files, use dynamic average line size
  314. totalSize += fp.ProcessedLines * fp.AvgLineSize
  315. } else {
  316. // For uncompressed files, use actual bytes processed if available
  317. if fp.CurrentPos > 0 {
  318. totalSize += fp.CurrentPos
  319. } else {
  320. // Fallback to line-based estimation
  321. totalSize += fp.ProcessedLines * 150
  322. }
  323. }
  324. }
  325. notification := CompletionNotification{
  326. LogGroupPath: pt.logGroupPath,
  327. Success: failedFiles == 0,
  328. Duration: elapsed,
  329. TotalLines: pt.totalActual,
  330. TotalFiles: len(pt.files),
  331. FailedFiles: failedFiles,
  332. IndexedSize: totalSize,
  333. }
  334. if failedFiles > 0 {
  335. notification.Error = "some files failed to process"
  336. }
  337. go pt.onCompletion(notification) // Non-blocking notification
  338. }
  339. // getProgressLocked returns progress without notification (must be called with lock held)
  340. func (pt *ProgressTracker) getProgressLocked() ProgressNotification {
  341. var completedFiles, processingFiles, failedFiles int
  342. // Count files by state
  343. for _, fp := range pt.files {
  344. switch fp.State {
  345. case FileStateCompleted:
  346. completedFiles++
  347. case FileStateProcessing:
  348. processingFiles++
  349. case FileStateFailed:
  350. failedFiles++
  351. }
  352. }
  353. // Calculate progress percentage using hybrid approach
  354. var percentage float64
  355. // Calculate weighted progress combining file count, file sizes, and line estimates
  356. var totalFileWeight, processedFileWeight float64
  357. var totalSizeWeight, processedSizeWeight float64
  358. var totalLineWeight, processedLineWeight float64
  359. // Collect metrics for hybrid calculation
  360. for _, fp := range pt.files {
  361. fileWeight := 1.0 // Each file contributes equally to file-based progress
  362. sizeWeight := float64(fp.FileSize)
  363. lineWeight := float64(fp.EstimatedLines)
  364. totalFileWeight += fileWeight
  365. totalSizeWeight += sizeWeight
  366. if lineWeight > 0 {
  367. totalLineWeight += lineWeight
  368. }
  369. if fp.State == FileStateCompleted {
  370. processedFileWeight += fileWeight
  371. processedSizeWeight += sizeWeight
  372. if lineWeight > 0 {
  373. processedLineWeight += float64(fp.ProcessedLines)
  374. }
  375. } else if fp.State == FileStateProcessing && fp.ProcessedLines > 0 {
  376. // For processing files, add partial progress
  377. if lineWeight > 0 {
  378. processedLineWeight += float64(fp.ProcessedLines)
  379. // For size-based progress, estimate based on lines processed
  380. if fp.EstimatedLines > 0 {
  381. sizeProgress := float64(fp.ProcessedLines) / float64(fp.EstimatedLines)
  382. processedSizeWeight += sizeWeight * sizeProgress
  383. }
  384. } else if fp.FileSize > 0 && fp.CurrentPos > 0 {
  385. // Use position-based progress for files without line estimates
  386. sizeProgress := float64(fp.CurrentPos) / float64(fp.FileSize)
  387. processedSizeWeight += sizeWeight * sizeProgress
  388. processedFileWeight += fileWeight * sizeProgress
  389. }
  390. }
  391. }
  392. // Calculate progress using the most reliable metric available
  393. if totalLineWeight > 0 && processedLineWeight > 0 {
  394. // Dynamic line estimation: adjust line estimates based on completed files
  395. dynamicLineWeight := totalLineWeight
  396. if completedFiles >= 2 {
  397. // Calculate average lines per completed file
  398. var totalLinesFromCompleted int64
  399. for _, fp := range pt.files {
  400. if fp.State == FileStateCompleted {
  401. totalLinesFromCompleted += fp.ProcessedLines
  402. }
  403. }
  404. if completedFiles > 0 {
  405. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  406. // Adjust remaining file estimates based on observed average
  407. remainingFiles := len(pt.files) - completedFiles
  408. if remainingFiles > 0 {
  409. // Calculate current estimate for remaining files
  410. var remainingEstimate int64
  411. for _, fp := range pt.files {
  412. if fp.State != FileStateCompleted {
  413. remainingEstimate += fp.EstimatedLines
  414. }
  415. }
  416. // Replace remaining estimates with dynamic estimate
  417. dynamicRemainingEstimate := int64(avgLinesPerFile * float64(remainingFiles))
  418. dynamicLineWeight = float64(int64(totalLinesFromCompleted) + dynamicRemainingEstimate)
  419. }
  420. }
  421. }
  422. // Use line-based progress with dynamic estimation
  423. linePercentage := (processedLineWeight / float64(dynamicLineWeight)) * 100
  424. // Use file-based progress as primary metric for reliability
  425. filePercentage := (processedFileWeight / totalFileWeight) * 100
  426. // Weight them: 40% lines, 60% files for better reliability
  427. // Files are more predictable than line estimates for compressed files
  428. percentage = (linePercentage * 0.4) + (filePercentage * 0.6)
  429. // Additional safety: never exceed file-based progress by more than 10%
  430. // This prevents premature 100% when file-based progress is more reliable
  431. maxAllowedPercentage := filePercentage + 10.0
  432. if percentage > maxAllowedPercentage {
  433. percentage = maxAllowedPercentage
  434. }
  435. } else if totalSizeWeight > 0 {
  436. // Fallback to size-based progress
  437. percentage = (processedSizeWeight / totalSizeWeight) * 100
  438. } else if totalFileWeight > 0 {
  439. // Fallback to file-based progress only
  440. percentage = (processedFileWeight / totalFileWeight) * 100
  441. }
  442. // Cap at 100%
  443. if percentage > 100 {
  444. percentage = 100
  445. }
  446. elapsed := time.Since(pt.startTime)
  447. var estimatedRemain time.Duration
  448. if percentage > 0 && percentage < 100 {
  449. avgTimePerPercent := float64(elapsed.Nanoseconds()) / percentage
  450. remainingPercent := 100.0 - percentage
  451. estimatedRemain = time.Duration(int64(avgTimePerPercent * remainingPercent))
  452. }
  453. // Use dynamic line estimation for reporting
  454. adjustedEstimate := pt.totalEstimate
  455. if completedFiles >= 2 {
  456. // Calculate average lines per completed file
  457. var totalLinesFromCompleted int64
  458. for _, fp := range pt.files {
  459. if fp.State == FileStateCompleted {
  460. totalLinesFromCompleted += fp.ProcessedLines
  461. }
  462. }
  463. if completedFiles > 0 {
  464. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  465. // Project total lines based on completed files
  466. projectedTotal := int64(avgLinesPerFile * float64(len(pt.files)))
  467. // Use dynamic estimate with some constraints to prevent extreme changes
  468. if completedFiles >= len(pt.files)/5 { // At least 20% of files processed
  469. // More confidence, allow larger adjustments
  470. adjustedEstimate = projectedTotal
  471. } else if projectedTotal > adjustedEstimate {
  472. // Less confidence, be more conservative but still adjust upward
  473. maxIncrease := adjustedEstimate + (projectedTotal-adjustedEstimate)/2
  474. adjustedEstimate = maxIncrease
  475. }
  476. }
  477. }
  478. return ProgressNotification{
  479. LogGroupPath: pt.logGroupPath,
  480. Percentage: percentage,
  481. TotalFiles: len(pt.files),
  482. CompletedFiles: completedFiles,
  483. ProcessingFiles: processingFiles,
  484. FailedFiles: failedFiles,
  485. ProcessedLines: pt.totalActual,
  486. EstimatedLines: adjustedEstimate,
  487. ElapsedTime: elapsed,
  488. EstimatedRemain: estimatedRemain,
  489. IsCompleted: pt.isCompleted,
  490. }
  491. }
  492. // EstimateFileLines estimates the number of lines in a file based on sampling
  493. func EstimateFileLines(ctx context.Context, filePath string, fileSize int64, isCompressed bool) (int64, error) {
  494. if fileSize == 0 {
  495. return 0, nil
  496. }
  497. file, err := os.Open(filePath)
  498. if err != nil {
  499. // Return fallback estimate instead of error
  500. return fileSize / 150, nil // Fallback: ~150 bytes per line
  501. }
  502. defer file.Close()
  503. var reader io.Reader = file
  504. // Handle compressed files
  505. if isCompressed {
  506. gzReader, err := gzip.NewReader(file)
  507. if err != nil {
  508. return (fileSize * 3) / 150, nil // Fallback for compressed: 3:1 ratio
  509. }
  510. defer gzReader.Close()
  511. reader = gzReader
  512. }
  513. // Sample the first 1MB of the file content (decompressed if necessary)
  514. sampleSize := int64(1024 * 1024)
  515. buf := make([]byte, sampleSize)
  516. // Check for context cancellation
  517. select {
  518. case <-ctx.Done():
  519. return 0, ctx.Err()
  520. default:
  521. }
  522. bytesRead, err := io.ReadFull(reader, buf)
  523. if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
  524. return fileSize / 150, nil // Fallback on read error
  525. }
  526. if bytesRead == 0 {
  527. return 0, nil // Empty file
  528. }
  529. // Count lines in the sample
  530. lineCount := bytes.Count(buf[:bytesRead], []byte{'\n'})
  531. if lineCount == 0 {
  532. // Avoid division by zero, fallback to rough estimate
  533. return fileSize / 150, nil
  534. }
  535. // Calculate average line size from the sample
  536. avgLineSize := float64(bytesRead) / float64(lineCount)
  537. if avgLineSize == 0 {
  538. return fileSize / 150, nil // Fallback
  539. }
  540. // Estimate total lines
  541. var estimatedLines int64
  542. if isCompressed {
  543. // For compressed files, use a compression ratio estimate
  544. estimatedUncompressedSize := fileSize * 5 // Conservative 5:1 compression ratio
  545. estimatedLines = int64(float64(estimatedUncompressedSize) / avgLineSize)
  546. } else {
  547. estimatedLines = int64(float64(fileSize) / avgLineSize)
  548. }
  549. return estimatedLines, nil
  550. }
  551. // IsCompressedFile determines if a file is compressed based on its extension
  552. func IsCompressedFile(filePath string) bool {
  553. ext := strings.ToLower(filepath.Ext(filePath))
  554. return ext == ".gz" || ext == ".bz2" || ext == ".xz" || ext == ".lz4"
  555. }
  556. // IsRotationLogFile determines if a file is a rotation log file
  557. func IsRotationLogFile(filePath string) bool {
  558. base := filepath.Base(filePath)
  559. // Common nginx rotation patterns:
  560. // access.log, access.log.1, access.log.2.gz
  561. // access.1.log, access.2.log.gz
  562. // error.log, error.log.1, error.log.2.gz
  563. // Remove compression extensions first
  564. if IsCompressedFile(base) {
  565. base = strings.TrimSuffix(base, filepath.Ext(base))
  566. }
  567. // Check for basic .log files
  568. if strings.HasSuffix(base, ".log") {
  569. return true
  570. }
  571. // Check for numbered rotation patterns: access.log.1, error.log.10, etc.
  572. parts := strings.Split(base, ".")
  573. if len(parts) >= 3 {
  574. // Pattern: name.log.number (e.g., access.log.1)
  575. if parts[len(parts)-2] == "log" && isNumeric(parts[len(parts)-1]) {
  576. return true
  577. }
  578. // Pattern: name.number.log (e.g., access.1.log)
  579. if parts[len(parts)-1] == "log" {
  580. for i := 1; i < len(parts)-1; i++ {
  581. if isNumeric(parts[i]) {
  582. return true
  583. }
  584. }
  585. }
  586. }
  587. return false
  588. }
  589. // isNumeric checks if a string represents a number
  590. func isNumeric(s string) bool {
  591. if len(s) == 0 {
  592. return false
  593. }
  594. for _, r := range s {
  595. if r < '0' || r > '9' {
  596. return false
  597. }
  598. }
  599. return true
  600. }
  601. // AddRotationFiles automatically detects and adds rotation log files with appropriate compression detection
  602. func (pt *ProgressTracker) AddRotationFiles(filePaths ...string) {
  603. for _, filePath := range filePaths {
  604. isCompressed := IsCompressedFile(filePath)
  605. pt.AddFile(filePath, isCompressed)
  606. }
  607. }
  608. // ProgressManager manages multiple progress trackers
  609. type ProgressManager struct {
  610. mu sync.RWMutex
  611. trackers map[string]*ProgressTracker
  612. }
  613. // NewProgressManager creates a new progress manager
  614. func NewProgressManager() *ProgressManager {
  615. return &ProgressManager{
  616. trackers: make(map[string]*ProgressTracker),
  617. }
  618. }
  619. // GetTracker gets or creates a progress tracker for a log group
  620. func (pm *ProgressManager) GetTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  621. pm.mu.Lock()
  622. defer pm.mu.Unlock()
  623. if tracker, exists := pm.trackers[logGroupPath]; exists {
  624. return tracker
  625. }
  626. tracker := NewProgressTracker(logGroupPath, config)
  627. pm.trackers[logGroupPath] = tracker
  628. return tracker
  629. }
  630. // RemoveTracker removes a progress tracker
  631. func (pm *ProgressManager) RemoveTracker(logGroupPath string) {
  632. pm.mu.Lock()
  633. defer pm.mu.Unlock()
  634. delete(pm.trackers, logGroupPath)
  635. }
  636. // GetAllTrackers returns all current trackers
  637. func (pm *ProgressManager) GetAllTrackers() map[string]*ProgressTracker {
  638. pm.mu.RLock()
  639. defer pm.mu.RUnlock()
  640. result := make(map[string]*ProgressTracker)
  641. for path, tracker := range pm.trackers {
  642. result[path] = tracker
  643. }
  644. return result
  645. }
  646. // Cleanup removes completed or failed trackers
  647. func (pm *ProgressManager) Cleanup() {
  648. pm.mu.Lock()
  649. defer pm.mu.Unlock()
  650. for path, tracker := range pm.trackers {
  651. if tracker.IsCompleted() {
  652. delete(pm.trackers, path)
  653. }
  654. }
  655. }