indexer_file_streaming.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package nginx_log
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // indexFileFromPosition indexes a file starting from a specific byte position
  14. func (li *LogIndexer) indexFileFromPosition(filePath string, startPosition int64, logIndex *model.NginxLogIndex) error {
  15. indexStartTime := time.Now()
  16. // Record the start time of indexing operation
  17. logIndex.SetIndexStartTime(indexStartTime)
  18. file, err := li.safeOpenFile(filePath)
  19. if err != nil {
  20. return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
  21. }
  22. defer file.Close()
  23. // Get file info
  24. fileInfo, err := file.Stat()
  25. if err != nil {
  26. return fmt.Errorf("failed to get file info: %w", err)
  27. }
  28. // Seek to start position
  29. if startPosition > 0 {
  30. _, err = file.Seek(startPosition, io.SeekStart)
  31. if err != nil {
  32. return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  33. }
  34. }
  35. var reader io.Reader = file
  36. // Handle compressed files (note: incremental indexing may not work well with compressed files)
  37. if strings.HasSuffix(filePath, ".gz") {
  38. if startPosition > 0 {
  39. return fmt.Errorf("incremental indexing not supported for compressed files")
  40. }
  41. gzReader, err := gzip.NewReader(file)
  42. if err != nil {
  43. return fmt.Errorf("failed to create gzip reader: %w", err)
  44. }
  45. defer gzReader.Close()
  46. reader = gzReader
  47. }
  48. scanner := bufio.NewScanner(reader)
  49. const maxCapacity = 1024 * 1024 // 1MB
  50. buf := make([]byte, maxCapacity)
  51. scanner.Buffer(buf, maxCapacity)
  52. // Use streaming processing to avoid loading all lines into memory
  53. return li.indexFileFromPositionStreaming(filePath, startPosition, logIndex, fileInfo, scanner, indexStartTime)
  54. }
  55. // indexFileFromPositionWithMainLogPath indexes a file starting from a specific byte position with specified main log path
  56. func (li *LogIndexer) indexFileFromPositionWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, progressTracker *ProgressTracker) error {
  57. indexStartTime := time.Now()
  58. // Record the start time of indexing operation
  59. logIndex.SetIndexStartTime(indexStartTime)
  60. file, err := li.safeOpenFile(filePath)
  61. if err != nil {
  62. return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
  63. }
  64. defer file.Close()
  65. // Get file info
  66. fileInfo, err := file.Stat()
  67. if err != nil {
  68. return fmt.Errorf("failed to get file info: %w", err)
  69. }
  70. // Seek to start position
  71. if startPosition > 0 {
  72. _, err = file.Seek(startPosition, io.SeekStart)
  73. if err != nil {
  74. return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  75. }
  76. }
  77. var reader io.Reader = file
  78. // Handle compressed files (note: incremental indexing may not work well with compressed files)
  79. if strings.HasSuffix(filePath, ".gz") {
  80. if startPosition > 0 {
  81. return fmt.Errorf("incremental indexing not supported for compressed files")
  82. }
  83. gzReader, err := gzip.NewReader(file)
  84. if err != nil {
  85. return fmt.Errorf("failed to create gzip reader: %w", err)
  86. }
  87. defer gzReader.Close()
  88. reader = gzReader
  89. }
  90. scanner := bufio.NewScanner(reader)
  91. const maxCapacity = 1024 * 1024 // 1MB
  92. buf := make([]byte, maxCapacity)
  93. scanner.Buffer(buf, maxCapacity)
  94. // Use streaming processing with specified main log path
  95. return li.indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath, startPosition, logIndex, fileInfo, scanner, indexStartTime, progressTracker)
  96. }
  97. // indexFileFromPositionStreamingWithMainLogPath processes file content using streaming approach with specified main log path
  98. func (li *LogIndexer) indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time, progressTracker *ProgressTracker) error {
  99. // Record index start time
  100. logIndex.SetIndexStartTime(startTime)
  101. var currentPosition int64 = startPosition
  102. lineCount := 0
  103. entryCount := 0
  104. batch := li.index.NewBatch()
  105. var newTimeStart, newTimeEnd *time.Time
  106. logger.Infof("Starting index for file %s -> %s (size: %d bytes)", filePath, mainLogPath, fileInfo.Size())
  107. // Set file size in progress tracker
  108. if progressTracker != nil {
  109. progressTracker.SetFileSize(filePath, fileInfo.Size())
  110. }
  111. // Determine compression status
  112. isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
  113. // Line buffer for batch processing
  114. const batchLines = 10000 // Process 10000 lines at a time for better performance
  115. var lineBuffer []string
  116. // If starting from middle of file, skip partial line
  117. if startPosition > 0 {
  118. if scanner.Scan() {
  119. // Skip the first (potentially partial) line
  120. line := scanner.Text()
  121. currentPosition += int64(len(line)) + 1 // +1 for newline
  122. }
  123. }
  124. // Process lines in batches
  125. for scanner.Scan() {
  126. line := strings.TrimSpace(scanner.Text())
  127. if line != "" {
  128. lineBuffer = append(lineBuffer, line)
  129. lineCount++
  130. currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
  131. // Process batch when buffer is full
  132. if len(lineBuffer) >= batchLines {
  133. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  134. return err
  135. }
  136. // Clear buffer
  137. lineBuffer = lineBuffer[:0]
  138. }
  139. // Update progress tracker periodically
  140. if lineCount%5000 == 0 {
  141. progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
  142. }
  143. }
  144. }
  145. // Process remaining lines in buffer
  146. if len(lineBuffer) > 0 {
  147. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  148. return err
  149. }
  150. }
  151. if err := scanner.Err(); err != nil {
  152. return fmt.Errorf("failed to scan file %s: %w", filePath, err)
  153. }
  154. // Execute final batch
  155. if batch.Size() > 0 {
  156. if err := li.index.Batch(batch); err != nil {
  157. return fmt.Errorf("failed to execute final batch: %w", err)
  158. }
  159. logger.Infof("Final batch executed: %d entries indexed for %s", batch.Size(), filePath)
  160. }
  161. // For compressed files, mark as fully indexed
  162. if isCompressed {
  163. currentPosition = fileInfo.Size()
  164. }
  165. // Update persistence with final status
  166. logIndex.UpdateProgress(fileInfo.ModTime(), fileInfo.Size(), currentPosition, uint64(entryCount), newTimeStart, newTimeEnd)
  167. logIndex.SetIndexDuration(startTime)
  168. // Save the updated log index
  169. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  170. logger.Warnf("Failed to save log index for %s: %v", filePath, err)
  171. }
  172. // Final position update for progress tracker
  173. if progressTracker != nil {
  174. progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
  175. }
  176. // Complete file in progress tracker
  177. if progressTracker != nil {
  178. progressTracker.CompleteFile(filePath, int64(lineCount))
  179. }
  180. duration := time.Since(startTime)
  181. logger.Infof("Completed indexing of %s: %d lines processed, %d entries indexed in %v", filePath, lineCount, entryCount, duration)
  182. return nil
  183. }
  184. // indexFileFromPositionStreaming processes file content using streaming approach
  185. func (li *LogIndexer) indexFileFromPositionStreaming(filePath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time) error {
  186. // Record index start time
  187. logIndex.SetIndexStartTime(startTime)
  188. var currentPosition int64 = startPosition
  189. lineCount := 0
  190. entryCount := 0
  191. batch := li.index.NewBatch()
  192. var newTimeStart, newTimeEnd *time.Time
  193. // Get main log path first (for statistics grouping)
  194. mainLogPath := li.getMainLogPath(filePath)
  195. // Note: Stats calculation removed - using Bleve aggregations instead
  196. // For compressed files, we can't use position-based progress accurately
  197. // Fall back to line-based estimation for compressed files
  198. isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
  199. var totalFileSize int64
  200. if isCompressed {
  201. // For compressed files, estimate uncompressed size using compression ratio
  202. // Use 3:1 compression ratio as a reasonable estimate for most log files
  203. estimatedUncompressedSize := fileInfo.Size() * 3
  204. if estimatedUncompressedSize < 1 {
  205. estimatedUncompressedSize = 1
  206. }
  207. totalFileSize = estimatedUncompressedSize
  208. logger.Infof("Starting index for compressed file %s: compressed size %d, estimated uncompressed size %d", filePath, fileInfo.Size(), estimatedUncompressedSize)
  209. } else {
  210. // For uncompressed files, use actual file size
  211. totalFileSize = fileInfo.Size()
  212. if totalFileSize < 1 {
  213. totalFileSize = 1
  214. }
  215. logger.Infof("Starting index from position %d of %d bytes for %s", startPosition, totalFileSize, filePath)
  216. }
  217. logger.Debugf("Starting indexing: filePath=%s, mainLogPath=%s", filePath, mainLogPath)
  218. // Line buffer for batch processing
  219. const batchLines = 10000 // Process 10000 lines at a time for better performance
  220. var lineBuffer []string
  221. // If starting from middle of file, skip partial line
  222. if startPosition > 0 {
  223. if scanner.Scan() {
  224. // Skip the first (potentially partial) line
  225. line := scanner.Text()
  226. currentPosition += int64(len(line)) + 1 // +1 for newline
  227. }
  228. }
  229. // Process lines in batches
  230. for scanner.Scan() {
  231. line := strings.TrimSpace(scanner.Text())
  232. if line != "" {
  233. lineBuffer = append(lineBuffer, line)
  234. lineCount++
  235. currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
  236. // Process batch when buffer is full
  237. if len(lineBuffer) >= batchLines {
  238. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  239. return err
  240. }
  241. // Clear buffer
  242. lineBuffer = lineBuffer[:0]
  243. }
  244. // Progress reporting for large files
  245. if lineCount%10000 == 0 {
  246. // Calculate current file progress percentage
  247. var currentFileProgress float64
  248. if isCompressed {
  249. logger.Debugf("Processed %d lines, indexed %d entries from compressed file %s...",
  250. lineCount, entryCount, filePath)
  251. // For compressed files, estimate progress based on current position vs total estimated size
  252. currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
  253. } else {
  254. logger.Debugf("Processed %d lines, indexed %d entries from %s... (position: %d/%d bytes)",
  255. lineCount, entryCount, filePath, currentPosition, totalFileSize)
  256. // For uncompressed files, use byte position
  257. currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
  258. }
  259. // Ensure file progress doesn't exceed 100%
  260. if currentFileProgress > 100 {
  261. currentFileProgress = 100
  262. }
  263. // Log progress (simplified for incremental indexing)
  264. logger.Debugf("Processed %d lines for incremental indexing of %s", lineCount, filePath)
  265. }
  266. }
  267. }
  268. // Process remaining lines in buffer
  269. if len(lineBuffer) > 0 {
  270. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  271. return err
  272. }
  273. }
  274. if err := scanner.Err(); err != nil {
  275. return fmt.Errorf("failed to scan file %s: %w", filePath, err)
  276. }
  277. // Execute final batch
  278. if batch.Size() > 0 {
  279. if err := li.index.Batch(batch); err != nil {
  280. return fmt.Errorf("failed to execute final batch: %w", err)
  281. }
  282. }
  283. logger.Debugf("Processed %d lines, indexed %d entries from %s (position: %d->%d)", lineCount, entryCount, filePath, startPosition, currentPosition)
  284. // Note: Log group aggregator removed - using Bleve aggregations instead
  285. if entryCount == 0 {
  286. logger.Debugf("No new entries to index in %s", filePath)
  287. // Still update the position and modification time with total index size
  288. totalSize := li.calculateRelatedLogFilesSize(filePath)
  289. // Record index completion time and duration
  290. logIndex.SetIndexDuration(startTime)
  291. logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, fileInfo.Size(), logIndex.DocumentCount, logIndex.TimeRangeStart, logIndex.TimeRangeEnd)
  292. return li.persistence.SaveLogIndex(logIndex)
  293. }
  294. // Update time range in log index
  295. var timeRangeStart, timeRangeEnd *time.Time
  296. if logIndex.TimeRangeStart != nil {
  297. timeRangeStart = logIndex.TimeRangeStart
  298. } else {
  299. timeRangeStart = newTimeStart
  300. }
  301. if logIndex.TimeRangeEnd != nil {
  302. timeRangeEnd = logIndex.TimeRangeEnd
  303. } else {
  304. timeRangeEnd = newTimeEnd
  305. }
  306. // Expand time range if needed
  307. if newTimeStart != nil && (timeRangeStart == nil || newTimeStart.Before(*timeRangeStart)) {
  308. timeRangeStart = newTimeStart
  309. }
  310. if newTimeEnd != nil && (timeRangeEnd == nil || newTimeEnd.After(*timeRangeEnd)) {
  311. timeRangeEnd = newTimeEnd
  312. }
  313. // Calculate total index size of related log files for this log group
  314. totalSize := li.calculateRelatedLogFilesSize(filePath)
  315. // Record index completion time and duration
  316. logIndex.SetIndexDuration(startTime)
  317. // Update persistence record with total index size
  318. logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, currentPosition, logIndex.DocumentCount+uint64(entryCount), timeRangeStart, timeRangeEnd)
  319. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  320. logger.Warnf("Failed to save log index: %v", err)
  321. }
  322. // Update in-memory file info for compatibility
  323. li.mu.Lock()
  324. if fileInfo, exists := li.logPaths[filePath]; exists {
  325. fileInfo.LastModified = logIndex.LastModified
  326. fileInfo.LastSize = logIndex.LastSize
  327. fileInfo.LastIndexed = logIndex.LastIndexed
  328. if timeRangeStart != nil && timeRangeEnd != nil {
  329. fileInfo.TimeRange = &TimeRange{Start: *timeRangeStart, End: *timeRangeEnd}
  330. }
  331. }
  332. li.mu.Unlock()
  333. // Invalidate statistics cache since data has changed
  334. li.invalidateStatsCache()
  335. // Clear indexing status for this file
  336. SetIndexingStatus(filePath, false)
  337. statusManager := GetIndexingStatusManager()
  338. statusManager.UpdateIndexingStatus()
  339. indexDuration := time.Since(startTime)
  340. logger.Infof("Indexed %d new entries from %s in %v (position: %d->%d, index_size: %d bytes)",
  341. entryCount, filePath, indexDuration, startPosition, currentPosition, totalSize)
  342. // Send completion event
  343. // duration := time.Since(startTime).Milliseconds()
  344. if isCompressed {
  345. logger.Infof("Indexing completed for compressed file %s: processed %d lines (estimated %d total)", filePath, lineCount, totalFileSize)
  346. } else {
  347. logger.Infof("Indexing completed for %s: processed %d lines, position %d/%d bytes", filePath, lineCount, currentPosition, totalFileSize)
  348. }
  349. // Note: Index complete notification will be sent with the log group ready notification
  350. // Note: Log group ready notification is now handled centrally after all files complete
  351. return nil
  352. }