indexer_file_batch.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/blevesearch/bleve/v2"
  6. )
  7. // processBatchStreaming processes a batch of lines using parallel parsing
  8. func (li *LogIndexer) processBatchStreaming(lines []string, filePath string, mainLogPath string, startPosition int64, batch **bleve.Batch, entryCount *int, newTimeStart, newTimeEnd **time.Time) error {
  9. if len(lines) == 0 {
  10. return nil
  11. }
  12. // Parse lines in parallel
  13. entries := li.parser.ParseLines(lines)
  14. if len(entries) == 0 {
  15. return nil // No valid entries in this batch
  16. }
  17. // Index entries
  18. for i, entry := range entries {
  19. // Track time range for new entries
  20. if *newTimeStart == nil || entry.Timestamp.Before(**newTimeStart) {
  21. *newTimeStart = &entry.Timestamp
  22. }
  23. if *newTimeEnd == nil || entry.Timestamp.After(**newTimeEnd) {
  24. *newTimeEnd = &entry.Timestamp
  25. }
  26. // Create indexed entry with unique ID
  27. // Use actual file path in ID to avoid conflicts, but mainLogPath for grouping queries
  28. indexedEntry := &IndexedLogEntry{
  29. ID: fmt.Sprintf("%s_%d_%d", filePath, startPosition, *entryCount+i),
  30. FilePath: mainLogPath, // Use main log path for queries
  31. Timestamp: entry.Timestamp,
  32. IP: entry.IP,
  33. RegionCode: entry.RegionCode,
  34. Province: entry.Province,
  35. City: entry.City,
  36. Method: entry.Method,
  37. Path: entry.Path,
  38. Protocol: entry.Protocol,
  39. Status: entry.Status,
  40. BytesSent: entry.BytesSent,
  41. Referer: entry.Referer,
  42. UserAgent: entry.UserAgent,
  43. Browser: entry.Browser,
  44. BrowserVer: entry.BrowserVer,
  45. OS: entry.OS,
  46. OSVersion: entry.OSVersion,
  47. DeviceType: entry.DeviceType,
  48. RequestTime: entry.RequestTime,
  49. UpstreamTime: entry.UpstreamTime,
  50. Raw: entry.Raw,
  51. }
  52. (*batch).Index(indexedEntry.ID, indexedEntry)
  53. // Execute batch when it reaches the limit
  54. if (*batch).Size() >= li.indexBatch {
  55. if err := li.index.Batch(*batch); err != nil {
  56. return fmt.Errorf("failed to execute batch: %w", err)
  57. }
  58. *batch = li.index.NewBatch()
  59. }
  60. }
  61. *entryCount += len(entries)
  62. return nil
  63. }