parser.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740
  1. package parser
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "fmt"
  8. "io"
  9. "runtime"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "unsafe"
  14. )
  15. // Parser provides high-performance log parsing with zero-copy optimizations
  16. type Parser struct {
  17. config *Config
  18. uaParser UserAgentParser
  19. geoService GeoIPService
  20. pool *sync.Pool
  21. detector *FormatDetector
  22. stats *ParseStats
  23. mu sync.RWMutex
  24. }
  25. // ParseStats tracks parsing performance metrics
  26. type ParseStats struct {
  27. TotalLines int64
  28. SuccessLines int64
  29. ErrorLines int64
  30. TotalBytes int64
  31. ParseDuration time.Duration
  32. LinesPerSecond float64
  33. BytesPerSecond float64
  34. LastUpdated time.Time
  35. }
  36. // parseBuffer holds reusable parsing buffers
  37. type parseBuffer struct {
  38. fields [][]byte
  39. entry *AccessLogEntry
  40. lineBytes []byte
  41. }
  42. // NewParser creates a new high-performance parser
  43. func NewParser(config *Config, uaParser UserAgentParser, geoService GeoIPService) *Parser {
  44. if config == nil {
  45. config = DefaultParserConfig()
  46. }
  47. return &Parser{
  48. config: config,
  49. uaParser: uaParser,
  50. geoService: geoService,
  51. detector: NewFormatDetector(),
  52. stats: &ParseStats{},
  53. pool: &sync.Pool{
  54. New: func() interface{} {
  55. return &parseBuffer{
  56. fields: make([][]byte, 0, 16),
  57. entry: &AccessLogEntry{},
  58. lineBytes: make([]byte, 0, config.MaxLineLength),
  59. }
  60. },
  61. },
  62. }
  63. }
  64. // ParseLine parses a single log line with zero-copy optimizations
  65. func (p *Parser) ParseLine(line string) (*AccessLogEntry, error) {
  66. if len(line) == 0 {
  67. return nil, ErrEmptyLogLine
  68. }
  69. if len(line) > p.config.MaxLineLength {
  70. return nil, ErrLineTooLong
  71. }
  72. buf := p.pool.Get().(*parseBuffer)
  73. defer p.pool.Put(buf)
  74. // Reset buffer state
  75. buf.fields = buf.fields[:0]
  76. *buf.entry = AccessLogEntry{}
  77. // Zero-copy conversion to bytes
  78. buf.lineBytes = stringToBytes(line)
  79. if err := p.parseLineOptimized(buf.lineBytes, buf); err != nil {
  80. if p.config.StrictMode {
  81. return nil, err
  82. }
  83. // In non-strict mode, create a minimal entry with raw line
  84. buf.entry.Raw = line
  85. buf.entry.Timestamp = time.Now().Unix()
  86. buf.entry.ID = p.generateEntryID(line)
  87. // Create a copy to avoid sharing the pooled object
  88. entryCopy := *buf.entry
  89. return &entryCopy, nil
  90. }
  91. // Generate unique ID for the entry
  92. buf.entry.ID = p.generateEntryID(line)
  93. buf.entry.Raw = line
  94. // Create a copy of the entry to avoid sharing the pooled object
  95. entryCopy := *buf.entry
  96. return &entryCopy, nil
  97. }
  98. // ParseLines parses multiple log lines with parallel processing
  99. func (p *Parser) ParseLines(lines []string) *ParseResult {
  100. return p.ParseLinesWithContext(context.Background(), lines)
  101. }
  102. // ParseLinesWithContext parses lines with context support for cancellation
  103. func (p *Parser) ParseLinesWithContext(ctx context.Context, lines []string) *ParseResult {
  104. startTime := time.Now()
  105. result := &ParseResult{
  106. Entries: make([]*AccessLogEntry, 0, len(lines)),
  107. Processed: len(lines),
  108. }
  109. if len(lines) == 0 {
  110. result.Duration = time.Since(startTime)
  111. return result
  112. }
  113. // For small datasets, use single-threaded parsing
  114. if len(lines) < p.config.BatchSize {
  115. return p.parseLinesSingleThreaded(ctx, lines, startTime)
  116. }
  117. // Use parallel processing for larger datasets
  118. return p.parseLinesParallel(ctx, lines, startTime)
  119. }
  120. // ParseStream parses log entries from an io.Reader with streaming support
  121. func (p *Parser) ParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
  122. startTime := time.Now()
  123. result := &ParseResult{
  124. Entries: make([]*AccessLogEntry, 0),
  125. }
  126. scanner := bufio.NewScanner(reader)
  127. scanner.Buffer(make([]byte, 0, p.config.BufferSize), p.config.MaxLineLength)
  128. batch := make([]string, 0, p.config.BatchSize)
  129. for scanner.Scan() {
  130. select {
  131. case <-ctx.Done():
  132. return result, ctx.Err()
  133. default:
  134. }
  135. line := scanner.Text()
  136. if len(line) == 0 {
  137. continue
  138. }
  139. batch = append(batch, line)
  140. result.Processed++
  141. if len(batch) >= p.config.BatchSize {
  142. batchResult := p.ParseLinesWithContext(ctx, batch)
  143. result.Entries = append(result.Entries, batchResult.Entries...)
  144. result.Succeeded += batchResult.Succeeded
  145. result.Failed += batchResult.Failed
  146. batch = batch[:0]
  147. }
  148. }
  149. // Process remaining lines in batch
  150. if len(batch) > 0 {
  151. batchResult := p.ParseLinesWithContext(ctx, batch)
  152. result.Entries = append(result.Entries, batchResult.Entries...)
  153. result.Succeeded += batchResult.Succeeded
  154. result.Failed += batchResult.Failed
  155. }
  156. if err := scanner.Err(); err != nil {
  157. return result, err
  158. }
  159. result.Duration = time.Since(startTime)
  160. if result.Processed > 0 {
  161. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  162. }
  163. p.updateStats(result)
  164. return result, nil
  165. }
  166. // parseLinesSingleThreaded handles small datasets with single-threaded parsing
  167. func (p *Parser) parseLinesSingleThreaded(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  168. result := &ParseResult{
  169. Entries: make([]*AccessLogEntry, 0, len(lines)),
  170. Processed: len(lines),
  171. }
  172. for _, line := range lines {
  173. select {
  174. case <-ctx.Done():
  175. result.Duration = time.Since(startTime)
  176. return result
  177. default:
  178. }
  179. if entry, err := p.ParseLine(line); err == nil {
  180. result.Entries = append(result.Entries, entry)
  181. result.Succeeded++
  182. } else {
  183. result.Failed++
  184. }
  185. }
  186. result.Duration = time.Since(startTime)
  187. if result.Processed > 0 {
  188. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  189. }
  190. return result
  191. }
  192. // parseLinesParallel handles large datasets with parallel processing
  193. func (p *Parser) parseLinesParallel(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  194. numWorkers := p.config.WorkerCount
  195. if numWorkers <= 0 {
  196. numWorkers = runtime.NumCPU()
  197. }
  198. if numWorkers > len(lines)/10 {
  199. numWorkers = len(lines)/10 + 1
  200. }
  201. result := &ParseResult{
  202. Processed: len(lines),
  203. }
  204. // Create channels for work distribution
  205. lineChan := make(chan string, numWorkers*2)
  206. resultChan := make(chan *AccessLogEntry, len(lines))
  207. errorChan := make(chan error, len(lines))
  208. var wg sync.WaitGroup
  209. // Start worker goroutines
  210. for i := 0; i < numWorkers; i++ {
  211. wg.Add(1)
  212. go func() {
  213. defer wg.Done()
  214. for line := range lineChan {
  215. if entry, err := p.ParseLine(line); err == nil {
  216. resultChan <- entry
  217. } else {
  218. errorChan <- err
  219. }
  220. }
  221. }()
  222. }
  223. // Send lines to workers
  224. go func() {
  225. defer close(lineChan)
  226. for _, line := range lines {
  227. select {
  228. case <-ctx.Done():
  229. return
  230. case lineChan <- line:
  231. }
  232. }
  233. }()
  234. // Wait for workers and close result channels
  235. go func() {
  236. wg.Wait()
  237. close(resultChan)
  238. close(errorChan)
  239. }()
  240. // Collect results
  241. entries := make([]*AccessLogEntry, 0, len(lines))
  242. var errorCount int
  243. for {
  244. select {
  245. case <-ctx.Done():
  246. result.Duration = time.Since(startTime)
  247. return result
  248. case entry, ok := <-resultChan:
  249. if !ok {
  250. resultChan = nil
  251. } else {
  252. entries = append(entries, entry)
  253. }
  254. case _, ok := <-errorChan:
  255. if !ok {
  256. errorChan = nil
  257. } else {
  258. errorCount++
  259. }
  260. }
  261. if resultChan == nil && errorChan == nil {
  262. break
  263. }
  264. }
  265. result.Entries = entries
  266. result.Succeeded = len(entries)
  267. result.Failed = errorCount
  268. result.Duration = time.Since(startTime)
  269. if result.Processed > 0 {
  270. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  271. }
  272. return result
  273. }
  274. // parseLineOptimized performs optimized parsing of a single line
  275. func (p *Parser) parseLineOptimized(line []byte, buf *parseBuffer) error {
  276. pos := 0
  277. length := len(line)
  278. if length < 20 {
  279. return ErrUnsupportedLogFormat
  280. }
  281. // Parse IP address
  282. pos = p.parseIP(line, pos, buf.entry)
  283. if pos >= length {
  284. return ErrUnsupportedLogFormat
  285. }
  286. // Skip remote user fields (- -)
  287. pos = p.skipSpaces(line, pos)
  288. pos = p.skipField(line, pos) // remote user
  289. pos = p.skipSpaces(line, pos)
  290. pos = p.skipField(line, pos) // remote logname
  291. // Parse timestamp
  292. pos = p.skipSpaces(line, pos)
  293. pos = p.parseTimestamp(line, pos, buf.entry)
  294. if pos >= length {
  295. return ErrUnsupportedLogFormat
  296. }
  297. // Parse request
  298. pos = p.skipSpaces(line, pos)
  299. pos = p.parseRequest(line, pos, buf.entry)
  300. if pos >= length {
  301. return ErrUnsupportedLogFormat
  302. }
  303. // Parse status code
  304. pos = p.skipSpaces(line, pos)
  305. pos = p.parseStatus(line, pos, buf.entry)
  306. if pos >= length {
  307. return ErrUnsupportedLogFormat
  308. }
  309. // Parse response size
  310. pos = p.skipSpaces(line, pos)
  311. pos = p.parseSize(line, pos, buf.entry)
  312. // Parse optional fields if they exist
  313. if pos < length {
  314. pos = p.skipSpaces(line, pos)
  315. if pos < length && line[pos] == '"' {
  316. pos = p.parseReferer(line, pos, buf.entry)
  317. }
  318. }
  319. if pos < length {
  320. pos = p.skipSpaces(line, pos)
  321. if pos < length && line[pos] == '"' {
  322. pos = p.parseUserAgent(line, pos, buf.entry)
  323. }
  324. }
  325. if pos < length {
  326. pos = p.skipSpaces(line, pos)
  327. if pos < length {
  328. pos = p.parseRequestTime(line, pos, buf.entry)
  329. }
  330. }
  331. if pos < length {
  332. pos = p.skipSpaces(line, pos)
  333. if pos < length {
  334. _ = p.parseUpstreamTime(line, pos, buf.entry)
  335. }
  336. }
  337. return nil
  338. }
  339. // Fast field parsing methods with zero-copy optimizations
  340. func (p *Parser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {
  341. start := pos
  342. for pos < len(line) && line[pos] != ' ' {
  343. pos++
  344. }
  345. if pos > start {
  346. entry.IP = bytesToString(line[start:pos])
  347. // Populate geographic fields if enabled
  348. if p.config.EnableGeoIP && p.geoService != nil && entry.IP != "-" {
  349. if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {
  350. entry.Province = location.Province
  351. entry.City = location.City
  352. // Use the specific RegionCode (e.g., province code 'CA') if available,
  353. // otherwise, fall back to the CountryCode (e.g., 'US').
  354. if location.RegionCode != "" {
  355. entry.RegionCode = location.RegionCode
  356. } else {
  357. entry.RegionCode = location.CountryCode
  358. }
  359. }
  360. }
  361. }
  362. return pos
  363. }
  364. func (p *Parser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {
  365. if pos >= len(line) || line[pos] != '[' {
  366. return pos
  367. }
  368. pos++
  369. start := pos
  370. for pos < len(line) && line[pos] != ']' {
  371. pos++
  372. }
  373. if pos > start {
  374. timeStr := bytesToString(line[start:pos])
  375. // Debug: log the timestamp string we're trying to parse
  376. // fmt.Printf("DEBUG: Parsing timestamp string: '%s'\n", timeStr)
  377. if t, err := time.Parse(p.config.TimeLayout, timeStr); err == nil {
  378. entry.Timestamp = t.Unix()
  379. } else {
  380. // Try alternative common nginx timestamp formats if the default fails
  381. formats := []string{
  382. "02/Jan/2006:15:04:05 -0700", // Standard nginx format
  383. "2006-01-02T15:04:05-07:00", // ISO 8601 format
  384. "2006-01-02 15:04:05", // Simple datetime format
  385. "02/Jan/2006:15:04:05", // Without timezone
  386. }
  387. parsed := false
  388. for _, format := range formats {
  389. if t, err := time.Parse(format, timeStr); err == nil {
  390. entry.Timestamp = t.Unix()
  391. parsed = true
  392. break
  393. }
  394. }
  395. // If all parsing attempts fail, keep timestamp as 0
  396. if !parsed {
  397. // Debug: log parsing failure
  398. // fmt.Printf("DEBUG: Failed to parse timestamp: '%s'\n", timeStr)
  399. }
  400. }
  401. }
  402. if pos < len(line) && line[pos] == ']' {
  403. pos++
  404. }
  405. return pos
  406. }
  407. func (p *Parser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {
  408. if pos >= len(line) || line[pos] != '"' {
  409. return pos
  410. }
  411. pos++
  412. start := pos
  413. for pos < len(line) && line[pos] != '"' {
  414. pos++
  415. }
  416. if pos > start {
  417. requestLine := line[start:pos]
  418. parts := bytes.Fields(requestLine)
  419. if len(parts) >= 1 {
  420. method := bytesToString(parts[0])
  421. if ValidHTTPMethods[method] {
  422. entry.Method = method
  423. }
  424. }
  425. if len(parts) >= 2 {
  426. entry.Path = bytesToString(parts[1])
  427. }
  428. if len(parts) >= 3 {
  429. entry.Protocol = bytesToString(parts[2])
  430. }
  431. }
  432. if pos < len(line) && line[pos] == '"' {
  433. pos++
  434. }
  435. return pos
  436. }
  437. func (p *Parser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {
  438. start := pos
  439. for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {
  440. pos++
  441. }
  442. if pos > start {
  443. if status, err := fastParseInt(line[start:pos]); err == nil && status >= 100 && status < 600 {
  444. entry.Status = status
  445. }
  446. }
  447. return pos
  448. }
  449. func (p *Parser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {
  450. start := pos
  451. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {
  452. pos++
  453. }
  454. if pos > start {
  455. sizeBytes := line[start:pos]
  456. if len(sizeBytes) == 1 && sizeBytes[0] == '-' {
  457. entry.BytesSent = 0
  458. } else {
  459. if size, err := fastParseInt(sizeBytes); err == nil && size >= 0 {
  460. entry.BytesSent = int64(size)
  461. }
  462. }
  463. }
  464. return pos
  465. }
  466. func (p *Parser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {
  467. if pos >= len(line) || line[pos] != '"' {
  468. return pos
  469. }
  470. pos++
  471. start := pos
  472. for pos < len(line) && line[pos] != '"' {
  473. pos++
  474. }
  475. if pos > start {
  476. entry.Referer = bytesToString(line[start:pos])
  477. }
  478. if pos < len(line) && line[pos] == '"' {
  479. pos++
  480. }
  481. return pos
  482. }
  483. func (p *Parser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {
  484. if pos >= len(line) || line[pos] != '"' {
  485. return pos
  486. }
  487. pos++
  488. start := pos
  489. for pos < len(line) && line[pos] != '"' {
  490. pos++
  491. }
  492. if pos > start {
  493. userAgent := bytesToString(line[start:pos])
  494. entry.UserAgent = userAgent
  495. if p.config.EnableUA && p.uaParser != nil && userAgent != "-" {
  496. parsed := p.uaParser.Parse(userAgent)
  497. if parsed.Browser != "Unknown" && parsed.Browser != "" {
  498. entry.Browser = parsed.Browser
  499. entry.BrowserVer = parsed.BrowserVer
  500. }
  501. if parsed.OS != "Unknown" && parsed.OS != "" {
  502. entry.OS = parsed.OS
  503. entry.OSVersion = parsed.OSVersion
  504. }
  505. if parsed.DeviceType != "" {
  506. entry.DeviceType = parsed.DeviceType
  507. }
  508. }
  509. }
  510. if pos < len(line) && line[pos] == '"' {
  511. pos++
  512. }
  513. return pos
  514. }
  515. func (p *Parser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {
  516. start := pos
  517. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  518. pos++
  519. }
  520. if pos > start {
  521. timeStr := bytesToString(line[start:pos])
  522. if timeStr != "-" {
  523. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  524. entry.RequestTime = val
  525. }
  526. }
  527. }
  528. return pos
  529. }
  530. func (p *Parser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {
  531. start := pos
  532. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  533. pos++
  534. }
  535. if pos > start {
  536. timeStr := bytesToString(line[start:pos])
  537. if timeStr != "-" {
  538. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  539. entry.UpstreamTime = &val
  540. }
  541. }
  542. }
  543. return pos
  544. }
  545. // Utility methods
  546. func (p *Parser) skipSpaces(line []byte, pos int) int {
  547. for pos < len(line) && line[pos] == ' ' {
  548. pos++
  549. }
  550. return pos
  551. }
  552. func (p *Parser) skipField(line []byte, pos int) int {
  553. for pos < len(line) && line[pos] != ' ' {
  554. pos++
  555. }
  556. return pos
  557. }
  558. func (p *Parser) generateEntryID(line string) string {
  559. hash := md5.Sum([]byte(line))
  560. return fmt.Sprintf("%x", hash)[:16]
  561. }
  562. func (p *Parser) updateStats(result *ParseResult) {
  563. p.mu.Lock()
  564. defer p.mu.Unlock()
  565. p.stats.TotalLines += int64(result.Processed)
  566. p.stats.SuccessLines += int64(result.Succeeded)
  567. p.stats.ErrorLines += int64(result.Failed)
  568. p.stats.ParseDuration += result.Duration
  569. p.stats.LastUpdated = time.Now()
  570. if result.Duration > 0 {
  571. p.stats.LinesPerSecond = float64(result.Processed) / result.Duration.Seconds()
  572. }
  573. }
  574. // GetStats returns current parsing statistics
  575. func (p *Parser) GetStats() *ParseStats {
  576. p.mu.RLock()
  577. defer p.mu.RUnlock()
  578. statsCopy := *p.stats
  579. return &statsCopy
  580. }
  581. // ResetStats resets parsing statistics
  582. func (p *Parser) ResetStats() {
  583. p.mu.Lock()
  584. defer p.mu.Unlock()
  585. p.stats = &ParseStats{}
  586. }
  587. // Zero-copy string/byte conversion utilities
  588. func stringToBytes(s string) []byte {
  589. return *(*[]byte)(unsafe.Pointer(&struct {
  590. string
  591. Cap int
  592. }{s, len(s)}))
  593. }
  594. func bytesToString(b []byte) string {
  595. return *(*string)(unsafe.Pointer(&b))
  596. }
  597. // Fast integer parsing without allocations
  598. func fastParseInt(b []byte) (int, error) {
  599. if len(b) == 0 {
  600. return 0, strconv.ErrSyntax
  601. }
  602. neg := false
  603. if b[0] == '-' {
  604. neg = true
  605. b = b[1:]
  606. if len(b) == 0 {
  607. return 0, strconv.ErrSyntax
  608. }
  609. }
  610. n := 0
  611. for _, c := range b {
  612. if c < '0' || c > '9' {
  613. return 0, strconv.ErrSyntax
  614. }
  615. n = n*10 + int(c-'0')
  616. }
  617. if neg {
  618. n = -n
  619. }
  620. return n, nil
  621. }