indexer_file_batch.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "github.com/blevesearch/bleve/v2"
  5. )
  6. // processBatchStreaming processes a batch of lines using parallel parsing
  7. func (li *LogIndexer) processBatchStreaming(lines []string, filePath string, mainLogPath string, startPosition int64, batch **bleve.Batch, entryCount *int, newTimeStart, newTimeEnd *int64) error {
  8. if len(lines) == 0 {
  9. return nil
  10. }
  11. // Parse lines in parallel
  12. entries := li.parser.ParseLines(lines)
  13. if len(entries) == 0 {
  14. return nil // No valid entries in this batch
  15. }
  16. // Index entries
  17. for i, entry := range entries {
  18. // Track time range for new entries
  19. if *newTimeStart == 0 || entry.Timestamp < *newTimeStart {
  20. *newTimeStart = entry.Timestamp
  21. }
  22. if *newTimeEnd == 0 || entry.Timestamp > *newTimeEnd {
  23. *newTimeEnd = entry.Timestamp
  24. }
  25. // Create indexed entry with unique ID
  26. // Use actual file path in ID to avoid conflicts, but mainLogPath for grouping queries
  27. indexedEntry := &IndexedLogEntry{
  28. ID: fmt.Sprintf("%s_%d_%d", filePath, startPosition, *entryCount+i),
  29. FilePath: mainLogPath, // Use main log path for queries
  30. Timestamp: entry.Timestamp,
  31. IP: entry.IP,
  32. RegionCode: entry.RegionCode,
  33. Province: entry.Province,
  34. City: entry.City,
  35. Method: entry.Method,
  36. Path: entry.Path,
  37. Protocol: entry.Protocol,
  38. Status: entry.Status,
  39. BytesSent: entry.BytesSent,
  40. Referer: entry.Referer,
  41. UserAgent: entry.UserAgent,
  42. Browser: entry.Browser,
  43. BrowserVer: entry.BrowserVer,
  44. OS: entry.OS,
  45. OSVersion: entry.OSVersion,
  46. DeviceType: entry.DeviceType,
  47. RequestTime: entry.RequestTime,
  48. UpstreamTime: entry.UpstreamTime,
  49. Raw: entry.Raw,
  50. }
  51. (*batch).Index(indexedEntry.ID, indexedEntry)
  52. // Execute batch when it reaches the limit
  53. if (*batch).Size() >= li.indexBatch {
  54. if err := li.index.Batch(*batch); err != nil {
  55. return fmt.Errorf("failed to execute batch: %w", err)
  56. }
  57. *batch = li.index.NewBatch()
  58. }
  59. }
  60. *entryCount += len(entries)
  61. return nil
  62. }