rotation_scanner.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // RotationScanner efficiently scans and prioritizes rotation logs for indexing
  14. type RotationScanner struct {
  15. config *RotationScanConfig
  16. scanResults map[string]*LogGroupScanResult
  17. resultsMutex sync.RWMutex
  18. priorityQueue []*RotationLogFileInfo
  19. queueMutex sync.Mutex
  20. scanInProgress bool
  21. progressMutex sync.Mutex
  22. }
  23. // RotationScanConfig configures the rotation log scanner
  24. type RotationScanConfig struct {
  25. // Scanning parameters
  26. MaxConcurrentScans int `json:"max_concurrent_scans"`
  27. ScanTimeout time.Duration `json:"scan_timeout"`
  28. MinFileSize int64 `json:"min_file_size"`
  29. MaxFileAge time.Duration `json:"max_file_age"`
  30. // Throughput optimization
  31. PrioritizeBySize bool `json:"prioritize_by_size"`
  32. PrioritizeByAge bool `json:"prioritize_by_age"`
  33. PreferredExtensions []string `json:"preferred_extensions"`
  34. ExcludePatterns []string `json:"exclude_patterns"`
  35. // Performance settings
  36. EnableParallelScan bool `json:"enable_parallel_scan"`
  37. ScanBatchSize int `json:"scan_batch_size"`
  38. }
  39. // LogGroupScanResult contains the scan results for a log group
  40. type LogGroupScanResult struct {
  41. BasePath string `json:"base_path"`
  42. Files []*RotationLogFileInfo `json:"files"`
  43. TotalSize int64 `json:"total_size"`
  44. TotalFiles int `json:"total_files"`
  45. ScanTime time.Time `json:"scan_time"`
  46. ScanDuration time.Duration `json:"scan_duration"`
  47. EstimatedLines int64 `json:"estimated_lines"`
  48. }
  49. // LogFileInfo contains detailed information about a log file
  50. type RotationLogFileInfo struct {
  51. Path string `json:"path"`
  52. Size int64 `json:"size"`
  53. ModTime time.Time `json:"mod_time"`
  54. IsCompressed bool `json:"is_compressed"`
  55. RotationIndex int `json:"rotation_index"`
  56. EstimatedLines int64 `json:"estimated_lines"`
  57. Priority int `json:"priority"`
  58. MainLogPath string `json:"main_log_path"`
  59. }
  60. // NewRotationScanner creates a new rotation log scanner
  61. func NewRotationScanner(config *RotationScanConfig) *RotationScanner {
  62. if config == nil {
  63. config = DefaultRotationScanConfig()
  64. }
  65. return &RotationScanner{
  66. config: config,
  67. scanResults: make(map[string]*LogGroupScanResult),
  68. priorityQueue: make([]*RotationLogFileInfo, 0),
  69. }
  70. }
  71. // DefaultRotationScanConfig returns default configuration for rotation scanning
  72. func DefaultRotationScanConfig() *RotationScanConfig {
  73. return &RotationScanConfig{
  74. MaxConcurrentScans: 8,
  75. ScanTimeout: 30 * time.Second,
  76. MinFileSize: 1024, // 1KB minimum
  77. MaxFileAge: 30 * 24 * time.Hour, // 30 days
  78. PrioritizeBySize: true,
  79. PrioritizeByAge: true,
  80. PreferredExtensions: []string{".log", ".gz"},
  81. ExcludePatterns: []string{"*.tmp", "*.lock", "*.swap"},
  82. EnableParallelScan: true,
  83. ScanBatchSize: 50,
  84. }
  85. }
  86. // ScanLogGroups scans multiple log groups and builds a prioritized queue
  87. func (rs *RotationScanner) ScanLogGroups(ctx context.Context, basePaths []string) error {
  88. rs.progressMutex.Lock()
  89. if rs.scanInProgress {
  90. rs.progressMutex.Unlock()
  91. return fmt.Errorf("scan already in progress")
  92. }
  93. rs.scanInProgress = true
  94. rs.progressMutex.Unlock()
  95. defer func() {
  96. rs.progressMutex.Lock()
  97. rs.scanInProgress = false
  98. rs.progressMutex.Unlock()
  99. }()
  100. logger.Infof("🔍 Starting rotation log scan for %d log groups", len(basePaths))
  101. if rs.config.EnableParallelScan {
  102. return rs.scanLogGroupsParallel(ctx, basePaths)
  103. } else {
  104. return rs.scanLogGroupsSequential(ctx, basePaths)
  105. }
  106. }
  107. // scanLogGroupsParallel scans log groups in parallel for maximum throughput
  108. func (rs *RotationScanner) scanLogGroupsParallel(ctx context.Context, basePaths []string) error {
  109. semaphore := make(chan struct{}, rs.config.MaxConcurrentScans)
  110. var wg sync.WaitGroup
  111. errors := make(chan error, len(basePaths))
  112. for _, basePath := range basePaths {
  113. wg.Add(1)
  114. go func(path string) {
  115. defer wg.Done()
  116. select {
  117. case semaphore <- struct{}{}:
  118. defer func() { <-semaphore }()
  119. if err := rs.scanSingleLogGroup(ctx, path); err != nil {
  120. errors <- fmt.Errorf("failed to scan %s: %w", path, err)
  121. }
  122. case <-ctx.Done():
  123. errors <- ctx.Err()
  124. return
  125. }
  126. }(basePath)
  127. }
  128. wg.Wait()
  129. close(errors)
  130. // Collect any errors
  131. var scanErrors []error
  132. for err := range errors {
  133. if err != nil {
  134. scanErrors = append(scanErrors, err)
  135. }
  136. }
  137. if len(scanErrors) > 0 {
  138. logger.Warnf("Encountered %d errors during parallel scan: %v", len(scanErrors), scanErrors)
  139. }
  140. // Build priority queue from all scan results
  141. rs.buildPriorityQueue()
  142. logger.Infof("✅ Rotation log scan completed: %d log groups, %d total files",
  143. len(rs.scanResults), len(rs.priorityQueue))
  144. return nil
  145. }
  146. // scanLogGroupsSequential scans log groups sequentially
  147. func (rs *RotationScanner) scanLogGroupsSequential(ctx context.Context, basePaths []string) error {
  148. for _, basePath := range basePaths {
  149. select {
  150. case <-ctx.Done():
  151. return ctx.Err()
  152. default:
  153. if err := rs.scanSingleLogGroup(ctx, basePath); err != nil {
  154. logger.Warnf("Failed to scan log group %s: %v", basePath, err)
  155. // Continue with other groups
  156. }
  157. }
  158. }
  159. rs.buildPriorityQueue()
  160. return nil
  161. }
  162. // scanSingleLogGroup scans a single log group for rotation logs
  163. func (rs *RotationScanner) scanSingleLogGroup(ctx context.Context, basePath string) error {
  164. start := time.Now()
  165. // Use optimized glob pattern for rotation logs
  166. patterns := []string{
  167. basePath, // Main log file
  168. basePath + ".*", // access.log.1, access.log.2.gz, etc.
  169. basePath + "-*", // access.log-20240101, etc.
  170. strings.TrimSuffix(basePath, ".log") + ".*.log*", // For patterns like access.20240101.log.gz
  171. }
  172. var allFiles []*RotationLogFileInfo
  173. seen := make(map[string]struct{})
  174. for _, pattern := range patterns {
  175. matches, err := filepath.Glob(pattern)
  176. if err != nil {
  177. continue // Skip problematic patterns
  178. }
  179. for _, match := range matches {
  180. if _, exists := seen[match]; exists {
  181. continue
  182. }
  183. seen[match] = struct{}{}
  184. info, err := os.Stat(match)
  185. if err != nil || !info.Mode().IsRegular() {
  186. continue
  187. }
  188. // Apply file filters
  189. if info.Size() < rs.config.MinFileSize {
  190. continue
  191. }
  192. if time.Since(info.ModTime()) > rs.config.MaxFileAge {
  193. continue
  194. }
  195. if rs.shouldExcludeFile(match) {
  196. continue
  197. }
  198. logFile := &RotationLogFileInfo{
  199. Path: match,
  200. Size: info.Size(),
  201. ModTime: info.ModTime(),
  202. IsCompressed: strings.HasSuffix(match, ".gz"),
  203. RotationIndex: rs.extractRotationIndex(match, basePath),
  204. EstimatedLines: rs.estimateLineCount(info.Size(), strings.HasSuffix(match, ".gz")),
  205. MainLogPath: basePath,
  206. }
  207. logFile.Priority = rs.calculatePriority(logFile)
  208. allFiles = append(allFiles, logFile)
  209. }
  210. }
  211. // Sort files by priority and rotation index
  212. sort.Slice(allFiles, func(i, j int) bool {
  213. if allFiles[i].Priority != allFiles[j].Priority {
  214. return allFiles[i].Priority > allFiles[j].Priority // Higher priority first
  215. }
  216. return allFiles[i].RotationIndex < allFiles[j].RotationIndex // Newer files first
  217. })
  218. // Calculate totals
  219. var totalSize int64
  220. var estimatedLines int64
  221. for _, file := range allFiles {
  222. totalSize += file.Size
  223. estimatedLines += file.EstimatedLines
  224. }
  225. result := &LogGroupScanResult{
  226. BasePath: basePath,
  227. Files: allFiles,
  228. TotalSize: totalSize,
  229. TotalFiles: len(allFiles),
  230. ScanTime: start,
  231. ScanDuration: time.Since(start),
  232. EstimatedLines: estimatedLines,
  233. }
  234. rs.resultsMutex.Lock()
  235. rs.scanResults[basePath] = result
  236. rs.resultsMutex.Unlock()
  237. logger.Debugf("📊 Scanned log group %s: %d files, %s total, %d estimated lines",
  238. basePath, len(allFiles), formatSize(totalSize), estimatedLines)
  239. return nil
  240. }
  241. // buildPriorityQueue builds a global priority queue from all scan results
  242. func (rs *RotationScanner) buildPriorityQueue() {
  243. rs.queueMutex.Lock()
  244. defer rs.queueMutex.Unlock()
  245. rs.priorityQueue = rs.priorityQueue[:0] // Clear existing queue
  246. rs.resultsMutex.RLock()
  247. for _, result := range rs.scanResults {
  248. rs.priorityQueue = append(rs.priorityQueue, result.Files...)
  249. }
  250. rs.resultsMutex.RUnlock()
  251. // Sort by priority
  252. sort.Slice(rs.priorityQueue, func(i, j int) bool {
  253. if rs.priorityQueue[i].Priority != rs.priorityQueue[j].Priority {
  254. return rs.priorityQueue[i].Priority > rs.priorityQueue[j].Priority
  255. }
  256. // Secondary sort by size for throughput
  257. return rs.priorityQueue[i].Size > rs.priorityQueue[j].Size
  258. })
  259. logger.Infof("🚀 Built priority queue with %d files for optimized indexing", len(rs.priorityQueue))
  260. }
  261. // GetNextBatch returns the next batch of files to index, prioritized for throughput
  262. func (rs *RotationScanner) GetNextBatch(batchSize int) []*RotationLogFileInfo {
  263. rs.queueMutex.Lock()
  264. defer rs.queueMutex.Unlock()
  265. if len(rs.priorityQueue) == 0 {
  266. return []*RotationLogFileInfo{} // Return empty slice instead of nil
  267. }
  268. end := minInt(batchSize, len(rs.priorityQueue))
  269. batch := make([]*RotationLogFileInfo, end)
  270. copy(batch, rs.priorityQueue[:end])
  271. // Remove processed files from queue
  272. rs.priorityQueue = rs.priorityQueue[end:]
  273. return batch
  274. }
  275. // extractRotationIndex extracts rotation index from file path
  276. func (rs *RotationScanner) extractRotationIndex(filePath, basePath string) int {
  277. if filePath == basePath {
  278. return 0 // Main log file
  279. }
  280. // Extract numeric suffix (e.g., access.log.1 -> 1)
  281. suffix := strings.TrimPrefix(filePath, basePath)
  282. // Handle various rotation formats
  283. if strings.HasPrefix(suffix, ".") {
  284. suffix = strings.TrimPrefix(suffix, ".")
  285. // Always trim optional ".gz" suffix
  286. suffix = strings.TrimSuffix(suffix, ".gz")
  287. // Try to parse as integer
  288. var index int
  289. if n, err := fmt.Sscanf(suffix, "%d", &index); n == 1 && err == nil {
  290. return index
  291. }
  292. }
  293. // Default to high index for unknown formats
  294. return 999
  295. }
  296. // calculatePriority calculates file priority based on configuration
  297. func (rs *RotationScanner) calculatePriority(file *RotationLogFileInfo) int {
  298. priority := 100 // Base priority
  299. if rs.config.PrioritizeBySize {
  300. // Larger files get higher priority (better throughput)
  301. sizeMB := file.Size / (1024 * 1024)
  302. priority += int(sizeMB/10) * 10 // +10 points per 10MB
  303. }
  304. if rs.config.PrioritizeByAge {
  305. // Newer files get higher priority
  306. hoursOld := int(time.Since(file.ModTime).Hours())
  307. if hoursOld < 24 {
  308. priority += 50 // Recent files
  309. } else if hoursOld < 168 { // 1 week
  310. priority += 20
  311. }
  312. }
  313. // Main log file gets highest priority
  314. if file.RotationIndex == 0 {
  315. priority += 100
  316. }
  317. // Compressed files get slightly lower priority (decompression overhead)
  318. if file.IsCompressed {
  319. priority -= 10
  320. }
  321. return priority
  322. }
  323. // estimateLineCount estimates the number of log lines in a file
  324. func (rs *RotationScanner) estimateLineCount(size int64, isCompressed bool) int64 {
  325. avgLineSize := int64(200) // Average nginx log line size in bytes
  326. if isCompressed {
  327. // Assume 3x compression ratio for text logs
  328. size = size * 3
  329. }
  330. return size / avgLineSize
  331. }
  332. // shouldExcludeFile checks if file should be excluded based on patterns
  333. func (rs *RotationScanner) shouldExcludeFile(path string) bool {
  334. filename := filepath.Base(path)
  335. for _, pattern := range rs.config.ExcludePatterns {
  336. if matched, _ := filepath.Match(pattern, filename); matched {
  337. return true
  338. }
  339. }
  340. return false
  341. }
  342. // GetScanResults returns the current scan results
  343. func (rs *RotationScanner) GetScanResults() map[string]*LogGroupScanResult {
  344. rs.resultsMutex.RLock()
  345. defer rs.resultsMutex.RUnlock()
  346. results := make(map[string]*LogGroupScanResult)
  347. for k, v := range rs.scanResults {
  348. results[k] = v
  349. }
  350. return results
  351. }
  352. // GetQueueSize returns the current number of files in the priority queue
  353. func (rs *RotationScanner) GetQueueSize() int {
  354. rs.queueMutex.Lock()
  355. defer rs.queueMutex.Unlock()
  356. return len(rs.priorityQueue)
  357. }
  358. // IsScanning returns true if a scan is currently in progress
  359. func (rs *RotationScanner) IsScanning() bool {
  360. rs.progressMutex.Lock()
  361. defer rs.progressMutex.Unlock()
  362. return rs.scanInProgress
  363. }
  364. // formatSize formats byte size for display
  365. func formatSize(bytes int64) string {
  366. const unit = 1024
  367. if bytes < unit {
  368. return fmt.Sprintf("%d B", bytes)
  369. }
  370. div, exp := int64(unit), 0
  371. for n := bytes / unit; n >= unit; n /= unit {
  372. div *= unit
  373. exp++
  374. }
  375. return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
  376. }
  377. func minInt(a, b int) int {
  378. if a < b {
  379. return a
  380. }
  381. return b
  382. }