indexer_file_streaming.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package nginx_log
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // indexFileFromPosition indexes a file starting from a specific byte position
  14. func (li *LogIndexer) indexFileFromPosition(filePath string, startPosition int64, logIndex *model.NginxLogIndex) error {
  15. indexStartTime := time.Now()
  16. // Record the start time of indexing operation
  17. logIndex.SetIndexStartTime(indexStartTime)
  18. file, err := li.safeOpenFile(filePath)
  19. if err != nil {
  20. return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
  21. }
  22. defer file.Close()
  23. // Get file info
  24. fileInfo, err := file.Stat()
  25. if err != nil {
  26. return fmt.Errorf("failed to get file info: %w", err)
  27. }
  28. // Seek to start position
  29. if startPosition > 0 {
  30. _, err = file.Seek(startPosition, io.SeekStart)
  31. if err != nil {
  32. return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  33. }
  34. }
  35. var reader io.Reader = file
  36. // Handle compressed files (note: incremental indexing may not work well with compressed files)
  37. if strings.HasSuffix(filePath, ".gz") {
  38. if startPosition > 0 {
  39. return fmt.Errorf("incremental indexing not supported for compressed files")
  40. }
  41. gzReader, err := gzip.NewReader(file)
  42. if err != nil {
  43. return fmt.Errorf("failed to create gzip reader: %w", err)
  44. }
  45. defer gzReader.Close()
  46. reader = gzReader
  47. }
  48. scanner := bufio.NewScanner(reader)
  49. const maxCapacity = 1024 * 1024 // 1MB
  50. buf := make([]byte, maxCapacity)
  51. scanner.Buffer(buf, maxCapacity)
  52. // Use streaming processing to avoid loading all lines into memory
  53. return li.indexFileFromPositionStreaming(filePath, startPosition, logIndex, fileInfo, scanner, indexStartTime)
  54. }
  55. // indexFileFromPositionWithMainLogPath indexes a file starting from a specific byte position with specified main log path
  56. func (li *LogIndexer) indexFileFromPositionWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, progressTracker *ProgressTracker) error {
  57. indexStartTime := time.Now()
  58. // Record the start time of indexing operation
  59. logIndex.SetIndexStartTime(indexStartTime)
  60. file, err := li.safeOpenFile(filePath)
  61. if err != nil {
  62. return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
  63. }
  64. defer file.Close()
  65. // Get file info
  66. fileInfo, err := file.Stat()
  67. if err != nil {
  68. return fmt.Errorf("failed to get file info: %w", err)
  69. }
  70. // Seek to start position
  71. if startPosition > 0 {
  72. _, err = file.Seek(startPosition, io.SeekStart)
  73. if err != nil {
  74. return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  75. }
  76. }
  77. var reader io.Reader = file
  78. // Handle compressed files (note: incremental indexing may not work well with compressed files)
  79. if strings.HasSuffix(filePath, ".gz") {
  80. if startPosition > 0 {
  81. return fmt.Errorf("incremental indexing not supported for compressed files")
  82. }
  83. gzReader, err := gzip.NewReader(file)
  84. if err != nil {
  85. return fmt.Errorf("failed to create gzip reader: %w", err)
  86. }
  87. defer gzReader.Close()
  88. reader = gzReader
  89. }
  90. scanner := bufio.NewScanner(reader)
  91. const maxCapacity = 1024 * 1024 // 1MB
  92. buf := make([]byte, maxCapacity)
  93. scanner.Buffer(buf, maxCapacity)
  94. // Use streaming processing with specified main log path
  95. return li.indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath, startPosition, logIndex, fileInfo, scanner, indexStartTime, progressTracker)
  96. }
  97. // indexFileFromPositionStreamingWithMainLogPath processes file content using streaming approach with specified main log path
  98. func (li *LogIndexer) indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time, progressTracker *ProgressTracker) error {
  99. // Record index start time
  100. logIndex.SetIndexStartTime(startTime)
  101. var currentPosition int64 = startPosition
  102. lineCount := 0
  103. entryCount := 0
  104. batch := li.index.NewBatch()
  105. var newTimeStart, newTimeEnd int64
  106. logger.Infof("Starting index for file %s -> %s (size: %d bytes)", filePath, mainLogPath, fileInfo.Size())
  107. // Set file size in progress tracker
  108. if progressTracker != nil {
  109. progressTracker.SetFileSize(filePath, fileInfo.Size())
  110. }
  111. // Determine compression status
  112. isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
  113. // Line buffer for batch processing
  114. const batchLines = 10000 // Process 10000 lines at a time for better performance
  115. var lineBuffer []string
  116. // If starting from middle of file, skip partial line
  117. if startPosition > 0 {
  118. if scanner.Scan() {
  119. // Skip the first (potentially partial) line
  120. line := scanner.Text()
  121. currentPosition += int64(len(line)) + 1 // +1 for newline
  122. }
  123. }
  124. // Process lines in batches
  125. for scanner.Scan() {
  126. line := strings.TrimSpace(scanner.Text())
  127. if line != "" {
  128. lineBuffer = append(lineBuffer, line)
  129. lineCount++
  130. currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
  131. // Process batch when buffer is full
  132. if len(lineBuffer) >= batchLines {
  133. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  134. return err
  135. }
  136. // Clear buffer
  137. lineBuffer = lineBuffer[:0]
  138. }
  139. // Update progress tracker periodically
  140. if lineCount%5000 == 0 {
  141. progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
  142. }
  143. }
  144. }
  145. // Process remaining lines in buffer
  146. if len(lineBuffer) > 0 {
  147. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  148. return err
  149. }
  150. }
  151. if err := scanner.Err(); err != nil {
  152. return fmt.Errorf("failed to scan file %s: %w", filePath, err)
  153. }
  154. // Execute final batch
  155. if batch.Size() > 0 {
  156. if err := li.index.Batch(batch); err != nil {
  157. return fmt.Errorf("failed to execute final batch: %w", err)
  158. }
  159. logger.Infof("Final batch executed: %d entries indexed for %s", batch.Size(), filePath)
  160. }
  161. // For compressed files, mark as fully indexed
  162. if isCompressed {
  163. currentPosition = fileInfo.Size()
  164. }
  165. // Update persistence with final status
  166. var newTimeStartPtr, newTimeEndPtr *time.Time
  167. if newTimeStart != 0 {
  168. t := time.Unix(newTimeStart, 0)
  169. newTimeStartPtr = &t
  170. }
  171. if newTimeEnd != 0 {
  172. t := time.Unix(newTimeEnd, 0)
  173. newTimeEndPtr = &t
  174. }
  175. logIndex.UpdateProgress(fileInfo.ModTime(), fileInfo.Size(), currentPosition, uint64(entryCount), newTimeStartPtr, newTimeEndPtr)
  176. logIndex.SetIndexDuration(startTime)
  177. // Save the updated log index
  178. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  179. logger.Warnf("Failed to save log index for %s: %v", filePath, err)
  180. }
  181. // Final position update for progress tracker
  182. if progressTracker != nil {
  183. progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
  184. }
  185. // Complete file in progress tracker
  186. if progressTracker != nil {
  187. progressTracker.CompleteFile(filePath, int64(lineCount))
  188. }
  189. duration := time.Since(startTime)
  190. logger.Infof("Completed indexing of %s: %d lines processed, %d entries indexed in %v", filePath, lineCount, entryCount, duration)
  191. return nil
  192. }
  193. // indexFileFromPositionStreaming processes file content using streaming approach
  194. func (li *LogIndexer) indexFileFromPositionStreaming(filePath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time) error {
  195. // Record index start time
  196. logIndex.SetIndexStartTime(startTime)
  197. var currentPosition int64 = startPosition
  198. lineCount := 0
  199. entryCount := 0
  200. batch := li.index.NewBatch()
  201. var newTimeStart, newTimeEnd int64
  202. // Get main log path first (for statistics grouping)
  203. mainLogPath := li.getMainLogPath(filePath)
  204. // Note: Stats calculation removed - using Bleve aggregations instead
  205. // For compressed files, we can't use position-based progress accurately
  206. // Fall back to line-based estimation for compressed files
  207. isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
  208. var totalFileSize int64
  209. if isCompressed {
  210. // For compressed files, estimate uncompressed size using compression ratio
  211. // Use 3:1 compression ratio as a reasonable estimate for most log files
  212. estimatedUncompressedSize := fileInfo.Size() * 3
  213. if estimatedUncompressedSize < 1 {
  214. estimatedUncompressedSize = 1
  215. }
  216. totalFileSize = estimatedUncompressedSize
  217. logger.Infof("Starting index for compressed file %s: compressed size %d, estimated uncompressed size %d", filePath, fileInfo.Size(), estimatedUncompressedSize)
  218. } else {
  219. // For uncompressed files, use actual file size
  220. totalFileSize = fileInfo.Size()
  221. if totalFileSize < 1 {
  222. totalFileSize = 1
  223. }
  224. logger.Infof("Starting index from position %d of %d bytes for %s", startPosition, totalFileSize, filePath)
  225. }
  226. logger.Debugf("Starting indexing: filePath=%s, mainLogPath=%s", filePath, mainLogPath)
  227. // Line buffer for batch processing
  228. const batchLines = 10000 // Process 10000 lines at a time for better performance
  229. var lineBuffer []string
  230. // If starting from middle of file, skip partial line
  231. if startPosition > 0 {
  232. if scanner.Scan() {
  233. // Skip the first (potentially partial) line
  234. line := scanner.Text()
  235. currentPosition += int64(len(line)) + 1 // +1 for newline
  236. }
  237. }
  238. // Process lines in batches
  239. for scanner.Scan() {
  240. line := strings.TrimSpace(scanner.Text())
  241. if line != "" {
  242. lineBuffer = append(lineBuffer, line)
  243. lineCount++
  244. currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
  245. // Process batch when buffer is full
  246. if len(lineBuffer) >= batchLines {
  247. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  248. return err
  249. }
  250. // Clear buffer
  251. lineBuffer = lineBuffer[:0]
  252. }
  253. // Progress reporting for large files
  254. if lineCount%10000 == 0 {
  255. // Calculate current file progress percentage
  256. var currentFileProgress float64
  257. if isCompressed {
  258. logger.Debugf("Processed %d lines, indexed %d entries from compressed file %s...",
  259. lineCount, entryCount, filePath)
  260. // For compressed files, estimate progress based on current position vs total estimated size
  261. currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
  262. } else {
  263. logger.Debugf("Processed %d lines, indexed %d entries from %s... (position: %d/%d bytes)",
  264. lineCount, entryCount, filePath, currentPosition, totalFileSize)
  265. // For uncompressed files, use byte position
  266. currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
  267. }
  268. // Ensure file progress doesn't exceed 100%
  269. if currentFileProgress > 100 {
  270. currentFileProgress = 100
  271. }
  272. // Log progress (simplified for incremental indexing)
  273. logger.Debugf("Processed %d lines for incremental indexing of %s", lineCount, filePath)
  274. }
  275. }
  276. }
  277. // Process remaining lines in buffer
  278. if len(lineBuffer) > 0 {
  279. if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
  280. return err
  281. }
  282. }
  283. if err := scanner.Err(); err != nil {
  284. return fmt.Errorf("failed to scan file %s: %w", filePath, err)
  285. }
  286. // Execute final batch
  287. if batch.Size() > 0 {
  288. if err := li.index.Batch(batch); err != nil {
  289. return fmt.Errorf("failed to execute final batch: %w", err)
  290. }
  291. }
  292. logger.Debugf("Processed %d lines, indexed %d entries from %s (position: %d->%d)", lineCount, entryCount, filePath, startPosition, currentPosition)
  293. // Note: Log group aggregator removed - using Bleve aggregations instead
  294. if entryCount == 0 {
  295. logger.Debugf("No new entries to index in %s", filePath)
  296. // Still update the position and modification time with total index size
  297. totalSize := li.calculateRelatedLogFilesSize(filePath)
  298. // Record index completion time and duration
  299. logIndex.SetIndexDuration(startTime)
  300. logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, fileInfo.Size(), logIndex.DocumentCount, logIndex.TimeRangeStart, logIndex.TimeRangeEnd)
  301. return li.persistence.SaveLogIndex(logIndex)
  302. }
  303. // Update time range in log index
  304. var timeRangeStart, timeRangeEnd *time.Time
  305. if logIndex.TimeRangeStart != nil {
  306. timeRangeStart = logIndex.TimeRangeStart
  307. } else if newTimeStart != 0 {
  308. t := time.Unix(newTimeStart, 0)
  309. timeRangeStart = &t
  310. }
  311. if logIndex.TimeRangeEnd != nil {
  312. timeRangeEnd = logIndex.TimeRangeEnd
  313. } else if newTimeEnd != 0 {
  314. t := time.Unix(newTimeEnd, 0)
  315. timeRangeEnd = &t
  316. }
  317. // Expand time range if needed
  318. if newTimeStart != 0 && (timeRangeStart == nil || time.Unix(newTimeStart, 0).Before(*timeRangeStart)) {
  319. t := time.Unix(newTimeStart, 0)
  320. timeRangeStart = &t
  321. }
  322. if newTimeEnd != 0 && (timeRangeEnd == nil || time.Unix(newTimeEnd, 0).After(*timeRangeEnd)) {
  323. t := time.Unix(newTimeEnd, 0)
  324. timeRangeEnd = &t
  325. }
  326. // Calculate total index size of related log files for this log group
  327. totalSize := li.calculateRelatedLogFilesSize(filePath)
  328. // Record index completion time and duration
  329. logIndex.SetIndexDuration(startTime)
  330. // Update persistence record with total index size
  331. logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, currentPosition, logIndex.DocumentCount+uint64(entryCount), timeRangeStart, timeRangeEnd)
  332. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  333. logger.Warnf("Failed to save log index: %v", err)
  334. }
  335. // Update in-memory file info for compatibility
  336. li.mu.Lock()
  337. if fileInfo, exists := li.logPaths[filePath]; exists {
  338. fileInfo.LastModified = logIndex.LastModified.Unix()
  339. fileInfo.LastSize = logIndex.LastSize
  340. fileInfo.LastIndexed = logIndex.LastIndexed.Unix()
  341. if timeRangeStart != nil && timeRangeEnd != nil {
  342. fileInfo.TimeRange = &TimeRange{Start: timeRangeStart.Unix(), End: timeRangeEnd.Unix()}
  343. }
  344. }
  345. li.mu.Unlock()
  346. // Invalidate statistics cache since data has changed
  347. li.invalidateStatsCache()
  348. // Clear indexing status for this file
  349. SetIndexingStatus(filePath, false)
  350. statusManager := GetIndexingStatusManager()
  351. statusManager.UpdateIndexingStatus()
  352. indexDuration := time.Since(startTime)
  353. logger.Infof("Indexed %d new entries from %s in %v (position: %d->%d, index_size: %d bytes)",
  354. entryCount, filePath, indexDuration, startPosition, currentPosition, totalSize)
  355. // Send completion event
  356. // duration := time.Since(startTime).Milliseconds()
  357. if isCompressed {
  358. logger.Infof("Indexing completed for compressed file %s: processed %d lines (estimated %d total)", filePath, lineCount, totalFileSize)
  359. } else {
  360. logger.Infof("Indexing completed for %s: processed %d lines, position %d/%d bytes", filePath, lineCount, currentPosition, totalFileSize)
  361. }
  362. // Note: Index complete notification will be sent with the log group ready notification
  363. // Note: Log group ready notification is now handled centrally after all files complete
  364. return nil
  365. }