progress_tracker.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. package indexer
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "errors"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
  14. )
  15. // ProgressTracker manages progress tracking for indexing operations
  16. type ProgressTracker struct {
  17. mu sync.RWMutex
  18. logGroupPath string
  19. startTime time.Time
  20. files map[string]*FileProgress
  21. totalEstimate int64 // Total estimated lines across all files
  22. totalActual int64 // Total actual lines processed
  23. isCompleted bool
  24. completionNotified bool // Flag to prevent duplicate completion notifications
  25. lastNotify time.Time
  26. notifyInterval time.Duration // Configurable notification interval
  27. // Callback functions for notifications
  28. onProgress func(ProgressNotification)
  29. onCompletion func(CompletionNotification)
  30. }
  31. // FileProgress tracks progress for individual files
  32. type FileProgress struct {
  33. FilePath string `json:"file_path"`
  34. State FileState `json:"state"`
  35. EstimatedLines int64 `json:"estimated_lines"` // Estimated total lines in this file
  36. ProcessedLines int64 `json:"processed_lines"` // Actually processed lines
  37. FileSize int64 `json:"file_size"` // Total file size in bytes
  38. CurrentPos int64 `json:"current_pos"` // Current reading position in bytes
  39. AvgLineSize int64 `json:"avg_line_size"` // Dynamic average line size in bytes
  40. SampleCount int64 `json:"sample_count"` // Number of lines sampled for average calculation
  41. IsCompressed bool `json:"is_compressed"`
  42. StartTime time.Time `json:"start_time"`
  43. CompletedTime time.Time `json:"completed_time"`
  44. ErrorMsg string `json:"error_msg,omitempty"` // Error message if processing failed
  45. }
  46. // FileState represents the current state of file processing
  47. type FileState int
  48. const (
  49. FileStatePending FileState = iota
  50. FileStateProcessing
  51. FileStateCompleted
  52. FileStateFailed
  53. )
  54. func (fs FileState) String() string {
  55. switch fs {
  56. case FileStatePending:
  57. return "pending"
  58. case FileStateProcessing:
  59. return "processing"
  60. case FileStateCompleted:
  61. return "completed"
  62. case FileStateFailed:
  63. return "failed"
  64. default:
  65. return "unknown"
  66. }
  67. }
  68. // ProgressNotification contains progress update information
  69. type ProgressNotification struct {
  70. LogGroupPath string `json:"log_group_path"`
  71. Percentage float64 `json:"percentage"`
  72. TotalFiles int `json:"total_files"`
  73. CompletedFiles int `json:"completed_files"`
  74. ProcessingFiles int `json:"processing_files"`
  75. FailedFiles int `json:"failed_files"`
  76. ProcessedLines int64 `json:"processed_lines"`
  77. EstimatedLines int64 `json:"estimated_lines"`
  78. ElapsedTime time.Duration `json:"elapsed_time"`
  79. EstimatedRemain time.Duration `json:"estimated_remain"`
  80. IsCompleted bool `json:"is_completed"`
  81. }
  82. // CompletionNotification contains completion information
  83. type CompletionNotification struct {
  84. LogGroupPath string `json:"log_group_path"`
  85. Success bool `json:"success"`
  86. Duration time.Duration `json:"duration"`
  87. TotalLines int64 `json:"total_lines"`
  88. TotalFiles int `json:"total_files"`
  89. FailedFiles int `json:"failed_files"`
  90. IndexedSize int64 `json:"indexed_size"`
  91. Error string `json:"error,omitempty"`
  92. }
  93. // ProgressConfig contains configuration for progress tracking
  94. type ProgressConfig struct {
  95. NotifyInterval time.Duration // Minimum time between progress notifications
  96. OnProgress func(ProgressNotification)
  97. OnCompletion func(CompletionNotification)
  98. }
  99. // NewProgressTracker creates a new progress tracker for indexing operations
  100. func NewProgressTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  101. pt := &ProgressTracker{
  102. logGroupPath: logGroupPath,
  103. startTime: time.Now(),
  104. files: make(map[string]*FileProgress),
  105. completionNotified: false,
  106. }
  107. if config != nil {
  108. if config.NotifyInterval == 0 {
  109. config.NotifyInterval = 1 * time.Second // Default notify interval
  110. }
  111. pt.notifyInterval = config.NotifyInterval
  112. pt.onProgress = config.OnProgress
  113. pt.onCompletion = config.OnCompletion
  114. } else {
  115. pt.notifyInterval = 1 * time.Second // Default when no config provided
  116. }
  117. return pt
  118. }
  119. // AddFile adds a file to the progress tracker
  120. func (pt *ProgressTracker) AddFile(filePath string, isCompressed bool) {
  121. pt.mu.Lock()
  122. defer pt.mu.Unlock()
  123. pt.files[filePath] = &FileProgress{
  124. FilePath: filePath,
  125. State: FileStatePending,
  126. IsCompressed: isCompressed,
  127. AvgLineSize: 120, // Initial estimate: 120 bytes per line
  128. SampleCount: 0,
  129. }
  130. }
  131. // SetFileEstimate sets the estimated line count for a file
  132. func (pt *ProgressTracker) SetFileEstimate(filePath string, estimatedLines int64) {
  133. pt.mu.Lock()
  134. defer pt.mu.Unlock()
  135. if progress, exists := pt.files[filePath]; exists {
  136. oldEstimate := progress.EstimatedLines
  137. progress.EstimatedLines = estimatedLines
  138. // Update total estimate
  139. pt.totalEstimate = pt.totalEstimate - oldEstimate + estimatedLines
  140. }
  141. }
  142. // SetFileSize sets the file size for a file
  143. func (pt *ProgressTracker) SetFileSize(filePath string, fileSize int64) {
  144. pt.mu.Lock()
  145. defer pt.mu.Unlock()
  146. if progress, exists := pt.files[filePath]; exists {
  147. progress.FileSize = fileSize
  148. }
  149. }
  150. // UpdateFileProgress updates the processed line count and position for a file
  151. func (pt *ProgressTracker) UpdateFileProgress(filePath string, processedLines int64, currentPos ...int64) {
  152. pt.mu.Lock()
  153. defer pt.mu.Unlock()
  154. if progress, exists := pt.files[filePath]; exists {
  155. // Update total actual processed, ensuring not to double-count on completion
  156. if progress.State != FileStateCompleted {
  157. pt.totalActual = pt.totalActual - progress.ProcessedLines + processedLines
  158. }
  159. progress.ProcessedLines = processedLines
  160. // Update position if provided
  161. if len(currentPos) > 0 && !progress.IsCompressed {
  162. progress.CurrentPos = currentPos[0]
  163. }
  164. // Update average line size for compressed files
  165. if progress.IsCompressed && processedLines > 0 && currentPos != nil && len(currentPos) > 0 {
  166. progress.SampleCount++
  167. if progress.SampleCount > 0 {
  168. progress.AvgLineSize = currentPos[0] / processedLines
  169. }
  170. }
  171. // Notify progress if enough time has passed
  172. pt.notifyProgressLocked()
  173. }
  174. }
  175. // StartFile marks a file as started processing
  176. func (pt *ProgressTracker) StartFile(filePath string) {
  177. pt.mu.Lock()
  178. defer pt.mu.Unlock()
  179. if progress, exists := pt.files[filePath]; exists {
  180. progress.State = FileStateProcessing
  181. progress.StartTime = time.Now()
  182. pt.notifyProgressLocked()
  183. }
  184. }
  185. // CompleteFile marks a file as completed successfully
  186. func (pt *ProgressTracker) CompleteFile(filePath string, finalProcessedLines int64) {
  187. pt.mu.Lock()
  188. defer pt.mu.Unlock()
  189. if progress, exists := pt.files[filePath]; exists {
  190. // Prevent marking as completed multiple times
  191. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  192. return
  193. }
  194. // Ensure final processed lines are correctly accounted for in total
  195. pt.totalActual = pt.totalActual - progress.ProcessedLines + finalProcessedLines
  196. progress.ProcessedLines = finalProcessedLines
  197. progress.State = FileStateCompleted
  198. progress.CompletedTime = time.Now()
  199. pt.checkCompletionLocked()
  200. }
  201. }
  202. // FailFile marks a file as failed processing
  203. func (pt *ProgressTracker) FailFile(filePath string, errorMsg string) {
  204. pt.mu.Lock()
  205. defer pt.mu.Unlock()
  206. if progress, exists := pt.files[filePath]; exists {
  207. // Prevent marking as failed multiple times
  208. if progress.State == FileStateCompleted || progress.State == FileStateFailed {
  209. return
  210. }
  211. progress.State = FileStateFailed
  212. progress.ErrorMsg = errorMsg
  213. progress.CompletedTime = time.Now()
  214. pt.checkCompletionLocked()
  215. }
  216. }
  217. // GetProgress returns the current progress percentage and stats
  218. func (pt *ProgressTracker) GetProgress() ProgressNotification {
  219. pt.mu.RLock()
  220. defer pt.mu.RUnlock()
  221. return pt.getProgressLocked()
  222. }
  223. // GetFileProgress returns progress for a specific file
  224. func (pt *ProgressTracker) GetFileProgress(filePath string) (*FileProgress, bool) {
  225. pt.mu.RLock()
  226. defer pt.mu.RUnlock()
  227. progress, exists := pt.files[filePath]
  228. if !exists {
  229. return nil, false
  230. }
  231. // Return a copy to avoid race conditions
  232. copy := *progress
  233. return &copy, true
  234. }
  235. // GetAllFiles returns all file progress information
  236. func (pt *ProgressTracker) GetAllFiles() map[string]*FileProgress {
  237. pt.mu.RLock()
  238. defer pt.mu.RUnlock()
  239. result := make(map[string]*FileProgress)
  240. for path, progress := range pt.files {
  241. copy := *progress
  242. result[path] = &copy
  243. }
  244. return result
  245. }
  246. // IsCompleted returns whether all files have been processed
  247. func (pt *ProgressTracker) IsCompleted() bool {
  248. pt.mu.RLock()
  249. defer pt.mu.RUnlock()
  250. return pt.isCompleted
  251. }
  252. // Cancel marks the tracker as cancelled and stops processing
  253. func (pt *ProgressTracker) Cancel(reason string) {
  254. pt.mu.Lock()
  255. defer pt.mu.Unlock()
  256. for _, progress := range pt.files {
  257. if progress.State == FileStateProcessing || progress.State == FileStatePending {
  258. progress.State = FileStateFailed
  259. progress.ErrorMsg = "cancelled: " + reason
  260. progress.CompletedTime = time.Now()
  261. }
  262. }
  263. pt.isCompleted = true
  264. pt.notifyCompletionLocked()
  265. }
  266. // checkCompletionLocked checks if all files are completed and notifies if so
  267. func (pt *ProgressTracker) checkCompletionLocked() {
  268. if pt.completionNotified {
  269. return
  270. }
  271. allCompleted := true
  272. for _, fp := range pt.files {
  273. if fp.State != FileStateCompleted && fp.State != FileStateFailed {
  274. allCompleted = false
  275. break
  276. }
  277. }
  278. if allCompleted {
  279. pt.isCompleted = true
  280. pt.completionNotified = true
  281. pt.notifyCompletionLocked()
  282. } else {
  283. pt.notifyProgressLocked()
  284. }
  285. }
  286. // notifyProgressLocked sends progress notification (must be called with lock held)
  287. func (pt *ProgressTracker) notifyProgressLocked() {
  288. // Throttle notifications to avoid spam using configurable interval
  289. now := time.Now()
  290. if now.Sub(pt.lastNotify) < pt.notifyInterval {
  291. return
  292. }
  293. pt.lastNotify = now
  294. if pt.onProgress != nil {
  295. notification := pt.getProgressLocked()
  296. go pt.onProgress(notification) // Non-blocking notification
  297. }
  298. }
  299. // notifyCompletionLocked sends completion notification (must be called with lock held)
  300. func (pt *ProgressTracker) notifyCompletionLocked() {
  301. if pt.onCompletion == nil {
  302. return
  303. }
  304. elapsed := time.Since(pt.startTime)
  305. // Calculate total size processed
  306. var totalSize int64
  307. var failedFiles int
  308. for _, fp := range pt.files {
  309. if fp.State == FileStateFailed {
  310. failedFiles++
  311. continue
  312. }
  313. if fp.IsCompressed {
  314. // For compressed files, use dynamic average line size
  315. totalSize += fp.ProcessedLines * fp.AvgLineSize
  316. } else {
  317. // For uncompressed files, use actual bytes processed if available
  318. if fp.CurrentPos > 0 {
  319. totalSize += fp.CurrentPos
  320. } else {
  321. // Fallback to line-based estimation
  322. totalSize += fp.ProcessedLines * 150
  323. }
  324. }
  325. }
  326. notification := CompletionNotification{
  327. LogGroupPath: pt.logGroupPath,
  328. Success: failedFiles == 0,
  329. Duration: elapsed,
  330. TotalLines: pt.totalActual,
  331. TotalFiles: len(pt.files),
  332. FailedFiles: failedFiles,
  333. IndexedSize: totalSize,
  334. }
  335. if failedFiles > 0 {
  336. notification.Error = "some files failed to process"
  337. }
  338. go pt.onCompletion(notification) // Non-blocking notification
  339. }
  340. // getProgressLocked returns progress without notification (must be called with lock held)
  341. func (pt *ProgressTracker) getProgressLocked() ProgressNotification {
  342. var completedFiles, processingFiles, failedFiles int
  343. // Count files by state
  344. for _, fp := range pt.files {
  345. switch fp.State {
  346. case FileStateCompleted:
  347. completedFiles++
  348. case FileStateProcessing:
  349. processingFiles++
  350. case FileStateFailed:
  351. failedFiles++
  352. }
  353. }
  354. // Calculate progress percentage using hybrid approach
  355. var percentage float64
  356. // Calculate weighted progress combining file count, file sizes, and line estimates
  357. var totalFileWeight, processedFileWeight float64
  358. var totalSizeWeight, processedSizeWeight float64
  359. var totalLineWeight, processedLineWeight float64
  360. // Collect metrics for hybrid calculation
  361. for _, fp := range pt.files {
  362. fileWeight := 1.0 // Each file contributes equally to file-based progress
  363. sizeWeight := float64(fp.FileSize)
  364. lineWeight := float64(fp.EstimatedLines)
  365. totalFileWeight += fileWeight
  366. totalSizeWeight += sizeWeight
  367. if lineWeight > 0 {
  368. totalLineWeight += lineWeight
  369. }
  370. if fp.State == FileStateCompleted {
  371. processedFileWeight += fileWeight
  372. processedSizeWeight += sizeWeight
  373. if lineWeight > 0 {
  374. processedLineWeight += float64(fp.ProcessedLines)
  375. }
  376. } else if fp.State == FileStateProcessing && fp.ProcessedLines > 0 {
  377. // For processing files, add partial progress
  378. if lineWeight > 0 {
  379. processedLineWeight += float64(fp.ProcessedLines)
  380. // For size-based progress, estimate based on lines processed
  381. if fp.EstimatedLines > 0 {
  382. sizeProgress := float64(fp.ProcessedLines) / float64(fp.EstimatedLines)
  383. processedSizeWeight += sizeWeight * sizeProgress
  384. }
  385. } else if fp.FileSize > 0 && fp.CurrentPos > 0 {
  386. // Use position-based progress for files without line estimates
  387. sizeProgress := float64(fp.CurrentPos) / float64(fp.FileSize)
  388. processedSizeWeight += sizeWeight * sizeProgress
  389. processedFileWeight += fileWeight * sizeProgress
  390. }
  391. }
  392. }
  393. // Calculate progress using the most reliable metric available
  394. if totalLineWeight > 0 && processedLineWeight > 0 {
  395. // Dynamic line estimation: adjust line estimates based on completed files
  396. dynamicLineWeight := totalLineWeight
  397. if completedFiles >= 2 {
  398. // Calculate average lines per completed file
  399. var totalLinesFromCompleted int64
  400. for _, fp := range pt.files {
  401. if fp.State == FileStateCompleted {
  402. totalLinesFromCompleted += fp.ProcessedLines
  403. }
  404. }
  405. if completedFiles > 0 {
  406. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  407. // Adjust remaining file estimates based on observed average
  408. remainingFiles := len(pt.files) - completedFiles
  409. if remainingFiles > 0 {
  410. // Calculate current estimate for remaining files
  411. var remainingEstimate int64
  412. for _, fp := range pt.files {
  413. if fp.State != FileStateCompleted {
  414. remainingEstimate += fp.EstimatedLines
  415. }
  416. }
  417. // Replace remaining estimates with dynamic estimate
  418. dynamicRemainingEstimate := int64(avgLinesPerFile * float64(remainingFiles))
  419. dynamicLineWeight = float64(int64(totalLinesFromCompleted) + dynamicRemainingEstimate)
  420. }
  421. }
  422. }
  423. // Use line-based progress with dynamic estimation
  424. linePercentage := (processedLineWeight / float64(dynamicLineWeight)) * 100
  425. // Use file-based progress as primary metric for reliability
  426. filePercentage := (processedFileWeight / totalFileWeight) * 100
  427. // Weight them: 40% lines, 60% files for better reliability
  428. // Files are more predictable than line estimates for compressed files
  429. percentage = (linePercentage * 0.4) + (filePercentage * 0.6)
  430. // Additional safety: never exceed file-based progress by more than 10%
  431. // This prevents premature 100% when file-based progress is more reliable
  432. maxAllowedPercentage := filePercentage + 10.0
  433. if percentage > maxAllowedPercentage {
  434. percentage = maxAllowedPercentage
  435. }
  436. } else if totalSizeWeight > 0 {
  437. // Fallback to size-based progress
  438. percentage = (processedSizeWeight / totalSizeWeight) * 100
  439. } else if totalFileWeight > 0 {
  440. // Fallback to file-based progress only
  441. percentage = (processedFileWeight / totalFileWeight) * 100
  442. }
  443. // Cap at 100%
  444. if percentage > 100 {
  445. percentage = 100
  446. }
  447. elapsed := time.Since(pt.startTime)
  448. var estimatedRemain time.Duration
  449. if percentage > 0 && percentage < 100 {
  450. avgTimePerPercent := float64(elapsed.Nanoseconds()) / percentage
  451. remainingPercent := 100.0 - percentage
  452. estimatedRemain = time.Duration(int64(avgTimePerPercent * remainingPercent))
  453. }
  454. // Use dynamic line estimation for reporting
  455. adjustedEstimate := pt.totalEstimate
  456. if completedFiles >= 2 {
  457. // Calculate average lines per completed file
  458. var totalLinesFromCompleted int64
  459. for _, fp := range pt.files {
  460. if fp.State == FileStateCompleted {
  461. totalLinesFromCompleted += fp.ProcessedLines
  462. }
  463. }
  464. if completedFiles > 0 {
  465. avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)
  466. // Project total lines based on completed files
  467. projectedTotal := int64(avgLinesPerFile * float64(len(pt.files)))
  468. // Use dynamic estimate with some constraints to prevent extreme changes
  469. if completedFiles >= len(pt.files)/5 { // At least 20% of files processed
  470. // More confidence, allow larger adjustments
  471. adjustedEstimate = projectedTotal
  472. } else if projectedTotal > adjustedEstimate {
  473. // Less confidence, be more conservative but still adjust upward
  474. maxIncrease := adjustedEstimate + (projectedTotal-adjustedEstimate)/2
  475. adjustedEstimate = maxIncrease
  476. }
  477. }
  478. }
  479. return ProgressNotification{
  480. LogGroupPath: pt.logGroupPath,
  481. Percentage: percentage,
  482. TotalFiles: len(pt.files),
  483. CompletedFiles: completedFiles,
  484. ProcessingFiles: processingFiles,
  485. FailedFiles: failedFiles,
  486. ProcessedLines: pt.totalActual,
  487. EstimatedLines: adjustedEstimate,
  488. ElapsedTime: elapsed,
  489. EstimatedRemain: estimatedRemain,
  490. IsCompleted: pt.isCompleted,
  491. }
  492. }
  493. // EstimateFileLines estimates the number of lines in a file based on sampling
  494. func EstimateFileLines(ctx context.Context, filePath string, fileSize int64, isCompressed bool) (int64, error) {
  495. if fileSize == 0 {
  496. return 0, nil
  497. }
  498. // Validate log path before accessing it
  499. if !utils.IsValidLogPath(filePath) {
  500. // Return fallback estimate for invalid paths
  501. return fileSize / 150, nil // Fallback: ~150 bytes per line
  502. }
  503. file, err := os.Open(filePath)
  504. if err != nil {
  505. // Return fallback estimate instead of error
  506. return fileSize / 150, nil // Fallback: ~150 bytes per line
  507. }
  508. defer file.Close()
  509. var reader io.Reader = file
  510. // Handle compressed files
  511. if isCompressed {
  512. gzReader, err := gzip.NewReader(file)
  513. if err != nil {
  514. return (fileSize * 3) / 150, nil // Fallback for compressed: 3:1 ratio
  515. }
  516. defer gzReader.Close()
  517. reader = gzReader
  518. }
  519. // Sample the first 1MB of the file content (decompressed if necessary)
  520. sampleSize := int64(1024 * 1024)
  521. buf := make([]byte, sampleSize)
  522. // Check for context cancellation
  523. select {
  524. case <-ctx.Done():
  525. return 0, ctx.Err()
  526. default:
  527. }
  528. bytesRead, err := io.ReadFull(reader, buf)
  529. if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
  530. return fileSize / 150, nil // Fallback on read error
  531. }
  532. if bytesRead == 0 {
  533. return 0, nil // Empty file
  534. }
  535. // Count lines in the sample
  536. lineCount := bytes.Count(buf[:bytesRead], []byte{'\n'})
  537. if lineCount == 0 {
  538. // Avoid division by zero, fallback to rough estimate
  539. return fileSize / 150, nil
  540. }
  541. // Calculate average line size from the sample
  542. avgLineSize := float64(bytesRead) / float64(lineCount)
  543. if avgLineSize == 0 {
  544. return fileSize / 150, nil // Fallback
  545. }
  546. // Estimate total lines
  547. var estimatedLines int64
  548. if isCompressed {
  549. // For compressed files, use a compression ratio estimate
  550. estimatedUncompressedSize := fileSize * 5 // Conservative 5:1 compression ratio
  551. estimatedLines = int64(float64(estimatedUncompressedSize) / avgLineSize)
  552. } else {
  553. estimatedLines = int64(float64(fileSize) / avgLineSize)
  554. }
  555. return estimatedLines, nil
  556. }
  557. // IsCompressedFile determines if a file is compressed based on its extension
  558. func IsCompressedFile(filePath string) bool {
  559. ext := strings.ToLower(filepath.Ext(filePath))
  560. return ext == ".gz" || ext == ".bz2" || ext == ".xz" || ext == ".lz4"
  561. }
  562. // IsRotationLogFile determines if a file is a rotation log file
  563. func IsRotationLogFile(filePath string) bool {
  564. base := filepath.Base(filePath)
  565. // Common nginx rotation patterns:
  566. // access.log, access.log.1, access.log.2.gz
  567. // access.1.log, access.2.log.gz
  568. // error.log, error.log.1, error.log.2.gz
  569. // Remove compression extensions first
  570. if IsCompressedFile(base) {
  571. base = strings.TrimSuffix(base, filepath.Ext(base))
  572. }
  573. // Check for basic .log files
  574. if strings.HasSuffix(base, ".log") {
  575. return true
  576. }
  577. // Check for numbered rotation patterns: access.log.1, error.log.10, etc.
  578. parts := strings.Split(base, ".")
  579. if len(parts) >= 3 {
  580. // Pattern: name.log.number (e.g., access.log.1)
  581. if parts[len(parts)-2] == "log" && isNumeric(parts[len(parts)-1]) {
  582. return true
  583. }
  584. // Pattern: name.number.log (e.g., access.1.log)
  585. if parts[len(parts)-1] == "log" {
  586. for i := 1; i < len(parts)-1; i++ {
  587. if isNumeric(parts[i]) {
  588. return true
  589. }
  590. }
  591. }
  592. }
  593. return false
  594. }
  595. // isNumeric checks if a string represents a number
  596. func isNumeric(s string) bool {
  597. if len(s) == 0 {
  598. return false
  599. }
  600. for _, r := range s {
  601. if r < '0' || r > '9' {
  602. return false
  603. }
  604. }
  605. return true
  606. }
  607. // AddRotationFiles automatically detects and adds rotation log files with appropriate compression detection
  608. func (pt *ProgressTracker) AddRotationFiles(filePaths ...string) {
  609. for _, filePath := range filePaths {
  610. isCompressed := IsCompressedFile(filePath)
  611. pt.AddFile(filePath, isCompressed)
  612. }
  613. }
  614. // ProgressManager manages multiple progress trackers
  615. type ProgressManager struct {
  616. mu sync.RWMutex
  617. trackers map[string]*ProgressTracker
  618. }
  619. // NewProgressManager creates a new progress manager
  620. func NewProgressManager() *ProgressManager {
  621. return &ProgressManager{
  622. trackers: make(map[string]*ProgressTracker),
  623. }
  624. }
  625. // GetTracker gets or creates a progress tracker for a log group
  626. func (pm *ProgressManager) GetTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {
  627. pm.mu.Lock()
  628. defer pm.mu.Unlock()
  629. if tracker, exists := pm.trackers[logGroupPath]; exists {
  630. return tracker
  631. }
  632. tracker := NewProgressTracker(logGroupPath, config)
  633. pm.trackers[logGroupPath] = tracker
  634. return tracker
  635. }
  636. // RemoveTracker removes a progress tracker
  637. func (pm *ProgressManager) RemoveTracker(logGroupPath string) {
  638. pm.mu.Lock()
  639. defer pm.mu.Unlock()
  640. delete(pm.trackers, logGroupPath)
  641. }
  642. // GetAllTrackers returns all current trackers
  643. func (pm *ProgressManager) GetAllTrackers() map[string]*ProgressTracker {
  644. pm.mu.RLock()
  645. defer pm.mu.RUnlock()
  646. result := make(map[string]*ProgressTracker)
  647. for path, tracker := range pm.trackers {
  648. result[path] = tracker
  649. }
  650. return result
  651. }
  652. // Cleanup removes completed or failed trackers
  653. func (pm *ProgressManager) Cleanup() {
  654. pm.mu.Lock()
  655. defer pm.mu.Unlock()
  656. for path, tracker := range pm.trackers {
  657. if tracker.IsCompleted() {
  658. delete(pm.trackers, path)
  659. }
  660. }
  661. }