optimized_parser.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. package parser
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "runtime"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. )
  16. // OptimizedParser provides high-performance log parsing with zero-copy optimizations
  17. type OptimizedParser struct {
  18. config *Config
  19. uaParser UserAgentParser
  20. geoService GeoIPService
  21. pool *sync.Pool
  22. detector *FormatDetector
  23. stats *ParseStats
  24. mu sync.RWMutex
  25. }
  26. // ParseStats tracks parsing performance metrics
  27. type ParseStats struct {
  28. TotalLines int64
  29. SuccessLines int64
  30. ErrorLines int64
  31. TotalBytes int64
  32. ParseDuration time.Duration
  33. LinesPerSecond float64
  34. BytesPerSecond float64
  35. LastUpdated time.Time
  36. }
  37. // parseBuffer holds reusable parsing buffers
  38. type parseBuffer struct {
  39. fields [][]byte
  40. entry *AccessLogEntry
  41. lineBytes []byte
  42. }
  43. // NewOptimizedParser creates a new high-performance parser
  44. func NewOptimizedParser(config *Config, uaParser UserAgentParser, geoService GeoIPService) *OptimizedParser {
  45. if config == nil {
  46. config = DefaultParserConfig()
  47. }
  48. return &OptimizedParser{
  49. config: config,
  50. uaParser: uaParser,
  51. geoService: geoService,
  52. detector: NewFormatDetector(),
  53. stats: &ParseStats{},
  54. pool: &sync.Pool{
  55. New: func() interface{} {
  56. return &parseBuffer{
  57. fields: make([][]byte, 0, 16),
  58. entry: &AccessLogEntry{},
  59. lineBytes: make([]byte, 0, config.MaxLineLength),
  60. }
  61. },
  62. },
  63. }
  64. }
  65. // ParseLine parses a single log line with zero-copy optimizations
  66. func (p *OptimizedParser) ParseLine(line string) (*AccessLogEntry, error) {
  67. if len(line) == 0 {
  68. return nil, errors.New(ErrEmptyLogLine)
  69. }
  70. if len(line) > p.config.MaxLineLength {
  71. return nil, errors.New(ErrLineTooLong)
  72. }
  73. buf := p.pool.Get().(*parseBuffer)
  74. defer p.pool.Put(buf)
  75. // Reset buffer state
  76. buf.fields = buf.fields[:0]
  77. *buf.entry = AccessLogEntry{}
  78. // Zero-copy conversion to bytes
  79. buf.lineBytes = stringToBytes(line)
  80. if err := p.parseLineOptimized(buf.lineBytes, buf); err != nil {
  81. if p.config.StrictMode {
  82. return nil, err
  83. }
  84. // In non-strict mode, create a minimal entry with raw line
  85. buf.entry.Raw = line
  86. buf.entry.Timestamp = time.Now().Unix()
  87. buf.entry.ID = p.generateEntryID(line)
  88. // Create a copy to avoid sharing the pooled object
  89. entryCopy := *buf.entry
  90. return &entryCopy, nil
  91. }
  92. // Generate unique ID for the entry
  93. buf.entry.ID = p.generateEntryID(line)
  94. buf.entry.Raw = line
  95. // Create a copy of the entry to avoid sharing the pooled object
  96. entryCopy := *buf.entry
  97. return &entryCopy, nil
  98. }
  99. // ParseLines parses multiple log lines with parallel processing
  100. func (p *OptimizedParser) ParseLines(lines []string) *ParseResult {
  101. return p.ParseLinesWithContext(context.Background(), lines)
  102. }
  103. // ParseLinesWithContext parses lines with context support for cancellation
  104. func (p *OptimizedParser) ParseLinesWithContext(ctx context.Context, lines []string) *ParseResult {
  105. startTime := time.Now()
  106. result := &ParseResult{
  107. Entries: make([]*AccessLogEntry, 0, len(lines)),
  108. Processed: len(lines),
  109. }
  110. if len(lines) == 0 {
  111. result.Duration = time.Since(startTime)
  112. return result
  113. }
  114. // For small datasets, use single-threaded parsing
  115. if len(lines) < p.config.BatchSize {
  116. return p.parseLinesSingleThreaded(ctx, lines, startTime)
  117. }
  118. // Use parallel processing for larger datasets
  119. return p.parseLinesParallel(ctx, lines, startTime)
  120. }
  121. // ParseStream parses log entries from an io.Reader with streaming support
  122. func (p *OptimizedParser) ParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
  123. startTime := time.Now()
  124. result := &ParseResult{
  125. Entries: make([]*AccessLogEntry, 0),
  126. }
  127. scanner := bufio.NewScanner(reader)
  128. scanner.Buffer(make([]byte, 0, p.config.BufferSize), p.config.MaxLineLength)
  129. batch := make([]string, 0, p.config.BatchSize)
  130. for scanner.Scan() {
  131. select {
  132. case <-ctx.Done():
  133. return result, ctx.Err()
  134. default:
  135. }
  136. line := scanner.Text()
  137. if len(line) == 0 {
  138. continue
  139. }
  140. batch = append(batch, line)
  141. result.Processed++
  142. if len(batch) >= p.config.BatchSize {
  143. batchResult := p.ParseLinesWithContext(ctx, batch)
  144. result.Entries = append(result.Entries, batchResult.Entries...)
  145. result.Succeeded += batchResult.Succeeded
  146. result.Failed += batchResult.Failed
  147. batch = batch[:0]
  148. }
  149. }
  150. // Process remaining lines in batch
  151. if len(batch) > 0 {
  152. batchResult := p.ParseLinesWithContext(ctx, batch)
  153. result.Entries = append(result.Entries, batchResult.Entries...)
  154. result.Succeeded += batchResult.Succeeded
  155. result.Failed += batchResult.Failed
  156. }
  157. if err := scanner.Err(); err != nil {
  158. return result, err
  159. }
  160. result.Duration = time.Since(startTime)
  161. if result.Processed > 0 {
  162. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  163. }
  164. p.updateStats(result)
  165. return result, nil
  166. }
  167. // parseLinesSingleThreaded handles small datasets with single-threaded parsing
  168. func (p *OptimizedParser) parseLinesSingleThreaded(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  169. result := &ParseResult{
  170. Entries: make([]*AccessLogEntry, 0, len(lines)),
  171. Processed: len(lines),
  172. }
  173. for _, line := range lines {
  174. select {
  175. case <-ctx.Done():
  176. result.Duration = time.Since(startTime)
  177. return result
  178. default:
  179. }
  180. if entry, err := p.ParseLine(line); err == nil {
  181. result.Entries = append(result.Entries, entry)
  182. result.Succeeded++
  183. } else {
  184. result.Failed++
  185. }
  186. }
  187. result.Duration = time.Since(startTime)
  188. if result.Processed > 0 {
  189. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  190. }
  191. return result
  192. }
  193. // parseLinesParallel handles large datasets with parallel processing
  194. func (p *OptimizedParser) parseLinesParallel(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  195. numWorkers := p.config.WorkerCount
  196. if numWorkers <= 0 {
  197. numWorkers = runtime.NumCPU()
  198. }
  199. if numWorkers > len(lines)/10 {
  200. numWorkers = len(lines)/10 + 1
  201. }
  202. result := &ParseResult{
  203. Processed: len(lines),
  204. }
  205. // Create channels for work distribution
  206. lineChan := make(chan string, numWorkers*2)
  207. resultChan := make(chan *AccessLogEntry, len(lines))
  208. errorChan := make(chan error, len(lines))
  209. var wg sync.WaitGroup
  210. // Start worker goroutines
  211. for i := 0; i < numWorkers; i++ {
  212. wg.Add(1)
  213. go func() {
  214. defer wg.Done()
  215. for line := range lineChan {
  216. if entry, err := p.ParseLine(line); err == nil {
  217. resultChan <- entry
  218. } else {
  219. errorChan <- err
  220. }
  221. }
  222. }()
  223. }
  224. // Send lines to workers
  225. go func() {
  226. defer close(lineChan)
  227. for _, line := range lines {
  228. select {
  229. case <-ctx.Done():
  230. return
  231. case lineChan <- line:
  232. }
  233. }
  234. }()
  235. // Wait for workers and close result channels
  236. go func() {
  237. wg.Wait()
  238. close(resultChan)
  239. close(errorChan)
  240. }()
  241. // Collect results
  242. entries := make([]*AccessLogEntry, 0, len(lines))
  243. var errorCount int
  244. for {
  245. select {
  246. case <-ctx.Done():
  247. result.Duration = time.Since(startTime)
  248. return result
  249. case entry, ok := <-resultChan:
  250. if !ok {
  251. resultChan = nil
  252. } else {
  253. entries = append(entries, entry)
  254. }
  255. case _, ok := <-errorChan:
  256. if !ok {
  257. errorChan = nil
  258. } else {
  259. errorCount++
  260. }
  261. }
  262. if resultChan == nil && errorChan == nil {
  263. break
  264. }
  265. }
  266. result.Entries = entries
  267. result.Succeeded = len(entries)
  268. result.Failed = errorCount
  269. result.Duration = time.Since(startTime)
  270. if result.Processed > 0 {
  271. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  272. }
  273. return result
  274. }
  275. // parseLineOptimized performs optimized parsing of a single line
  276. func (p *OptimizedParser) parseLineOptimized(line []byte, buf *parseBuffer) error {
  277. pos := 0
  278. length := len(line)
  279. if length < 20 {
  280. return errors.New(ErrUnsupportedLogFormat)
  281. }
  282. // Parse IP address
  283. pos = p.parseIP(line, pos, buf.entry)
  284. if pos >= length {
  285. return errors.New(ErrUnsupportedLogFormat)
  286. }
  287. // Skip remote user fields (- -)
  288. pos = p.skipSpaces(line, pos)
  289. pos = p.skipField(line, pos) // remote user
  290. pos = p.skipSpaces(line, pos)
  291. pos = p.skipField(line, pos) // remote logname
  292. // Parse timestamp
  293. pos = p.skipSpaces(line, pos)
  294. pos = p.parseTimestamp(line, pos, buf.entry)
  295. if pos >= length {
  296. return errors.New(ErrUnsupportedLogFormat)
  297. }
  298. // Parse request
  299. pos = p.skipSpaces(line, pos)
  300. pos = p.parseRequest(line, pos, buf.entry)
  301. if pos >= length {
  302. return errors.New(ErrUnsupportedLogFormat)
  303. }
  304. // Parse status code
  305. pos = p.skipSpaces(line, pos)
  306. pos = p.parseStatus(line, pos, buf.entry)
  307. if pos >= length {
  308. return errors.New(ErrUnsupportedLogFormat)
  309. }
  310. // Parse response size
  311. pos = p.skipSpaces(line, pos)
  312. pos = p.parseSize(line, pos, buf.entry)
  313. // Parse optional fields if they exist
  314. if pos < length {
  315. pos = p.skipSpaces(line, pos)
  316. if pos < length && line[pos] == '"' {
  317. pos = p.parseReferer(line, pos, buf.entry)
  318. }
  319. }
  320. if pos < length {
  321. pos = p.skipSpaces(line, pos)
  322. if pos < length && line[pos] == '"' {
  323. pos = p.parseUserAgent(line, pos, buf.entry)
  324. }
  325. }
  326. if pos < length {
  327. pos = p.skipSpaces(line, pos)
  328. if pos < length {
  329. pos = p.parseRequestTime(line, pos, buf.entry)
  330. }
  331. }
  332. if pos < length {
  333. pos = p.skipSpaces(line, pos)
  334. if pos < length {
  335. _ = p.parseUpstreamTime(line, pos, buf.entry)
  336. }
  337. }
  338. return nil
  339. }
  340. // Fast field parsing methods with zero-copy optimizations
  341. func (p *OptimizedParser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {
  342. start := pos
  343. for pos < len(line) && line[pos] != ' ' {
  344. pos++
  345. }
  346. if pos > start {
  347. entry.IP = bytesToString(line[start:pos])
  348. // Populate geographic fields if enabled
  349. if p.config.EnableGeoIP && p.geoService != nil && entry.IP != "-" {
  350. if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {
  351. entry.Province = location.Province
  352. entry.City = location.City
  353. // Use the specific RegionCode (e.g., province code 'CA') if available,
  354. // otherwise, fall back to the CountryCode (e.g., 'US').
  355. if location.RegionCode != "" {
  356. entry.RegionCode = location.RegionCode
  357. } else {
  358. entry.RegionCode = location.CountryCode
  359. }
  360. }
  361. }
  362. }
  363. return pos
  364. }
  365. func (p *OptimizedParser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {
  366. if pos >= len(line) || line[pos] != '[' {
  367. return pos
  368. }
  369. pos++
  370. start := pos
  371. for pos < len(line) && line[pos] != ']' {
  372. pos++
  373. }
  374. if pos > start {
  375. timeStr := bytesToString(line[start:pos])
  376. // Debug: log the timestamp string we're trying to parse
  377. // fmt.Printf("DEBUG: Parsing timestamp string: '%s'\n", timeStr)
  378. if t, err := time.Parse(p.config.TimeLayout, timeStr); err == nil {
  379. entry.Timestamp = t.Unix()
  380. } else {
  381. // Try alternative common nginx timestamp formats if the default fails
  382. formats := []string{
  383. "02/Jan/2006:15:04:05 -0700", // Standard nginx format
  384. "2006-01-02T15:04:05-07:00", // ISO 8601 format
  385. "2006-01-02 15:04:05", // Simple datetime format
  386. "02/Jan/2006:15:04:05", // Without timezone
  387. }
  388. parsed := false
  389. for _, format := range formats {
  390. if t, err := time.Parse(format, timeStr); err == nil {
  391. entry.Timestamp = t.Unix()
  392. parsed = true
  393. break
  394. }
  395. }
  396. // If all parsing attempts fail, keep timestamp as 0
  397. if !parsed {
  398. // Debug: log parsing failure
  399. // fmt.Printf("DEBUG: Failed to parse timestamp: '%s'\n", timeStr)
  400. }
  401. }
  402. }
  403. if pos < len(line) && line[pos] == ']' {
  404. pos++
  405. }
  406. return pos
  407. }
  408. func (p *OptimizedParser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {
  409. if pos >= len(line) || line[pos] != '"' {
  410. return pos
  411. }
  412. pos++
  413. start := pos
  414. for pos < len(line) && line[pos] != '"' {
  415. pos++
  416. }
  417. if pos > start {
  418. requestLine := line[start:pos]
  419. parts := bytes.Fields(requestLine)
  420. if len(parts) >= 1 {
  421. method := bytesToString(parts[0])
  422. if ValidHTTPMethods[method] {
  423. entry.Method = method
  424. }
  425. }
  426. if len(parts) >= 2 {
  427. entry.Path = bytesToString(parts[1])
  428. }
  429. if len(parts) >= 3 {
  430. entry.Protocol = bytesToString(parts[2])
  431. }
  432. }
  433. if pos < len(line) && line[pos] == '"' {
  434. pos++
  435. }
  436. return pos
  437. }
  438. func (p *OptimizedParser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {
  439. start := pos
  440. for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {
  441. pos++
  442. }
  443. if pos > start {
  444. if status, err := fastParseInt(line[start:pos]); err == nil && status >= 100 && status < 600 {
  445. entry.Status = status
  446. }
  447. }
  448. return pos
  449. }
  450. func (p *OptimizedParser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {
  451. start := pos
  452. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {
  453. pos++
  454. }
  455. if pos > start {
  456. sizeBytes := line[start:pos]
  457. if len(sizeBytes) == 1 && sizeBytes[0] == '-' {
  458. entry.BytesSent = 0
  459. } else {
  460. if size, err := fastParseInt(sizeBytes); err == nil && size >= 0 {
  461. entry.BytesSent = int64(size)
  462. }
  463. }
  464. }
  465. return pos
  466. }
  467. func (p *OptimizedParser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {
  468. if pos >= len(line) || line[pos] != '"' {
  469. return pos
  470. }
  471. pos++
  472. start := pos
  473. for pos < len(line) && line[pos] != '"' {
  474. pos++
  475. }
  476. if pos > start {
  477. entry.Referer = bytesToString(line[start:pos])
  478. }
  479. if pos < len(line) && line[pos] == '"' {
  480. pos++
  481. }
  482. return pos
  483. }
  484. func (p *OptimizedParser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {
  485. if pos >= len(line) || line[pos] != '"' {
  486. return pos
  487. }
  488. pos++
  489. start := pos
  490. for pos < len(line) && line[pos] != '"' {
  491. pos++
  492. }
  493. if pos > start {
  494. userAgent := bytesToString(line[start:pos])
  495. entry.UserAgent = userAgent
  496. if p.config.EnableUA && p.uaParser != nil && userAgent != "-" {
  497. parsed := p.uaParser.Parse(userAgent)
  498. if parsed.Browser != "Unknown" && parsed.Browser != "" {
  499. entry.Browser = parsed.Browser
  500. entry.BrowserVer = parsed.BrowserVer
  501. }
  502. if parsed.OS != "Unknown" && parsed.OS != "" {
  503. entry.OS = parsed.OS
  504. entry.OSVersion = parsed.OSVersion
  505. }
  506. if parsed.DeviceType != "" {
  507. entry.DeviceType = parsed.DeviceType
  508. }
  509. }
  510. }
  511. if pos < len(line) && line[pos] == '"' {
  512. pos++
  513. }
  514. return pos
  515. }
  516. func (p *OptimizedParser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {
  517. start := pos
  518. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  519. pos++
  520. }
  521. if pos > start {
  522. timeStr := bytesToString(line[start:pos])
  523. if timeStr != "-" {
  524. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  525. entry.RequestTime = val
  526. }
  527. }
  528. }
  529. return pos
  530. }
  531. func (p *OptimizedParser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {
  532. start := pos
  533. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  534. pos++
  535. }
  536. if pos > start {
  537. timeStr := bytesToString(line[start:pos])
  538. if timeStr != "-" {
  539. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  540. entry.UpstreamTime = &val
  541. }
  542. }
  543. }
  544. return pos
  545. }
  546. // Utility methods
  547. func (p *OptimizedParser) skipSpaces(line []byte, pos int) int {
  548. for pos < len(line) && line[pos] == ' ' {
  549. pos++
  550. }
  551. return pos
  552. }
  553. func (p *OptimizedParser) skipField(line []byte, pos int) int {
  554. for pos < len(line) && line[pos] != ' ' {
  555. pos++
  556. }
  557. return pos
  558. }
  559. func (p *OptimizedParser) generateEntryID(line string) string {
  560. hash := md5.Sum([]byte(line))
  561. return fmt.Sprintf("%x", hash)[:16]
  562. }
  563. func (p *OptimizedParser) updateStats(result *ParseResult) {
  564. p.mu.Lock()
  565. defer p.mu.Unlock()
  566. p.stats.TotalLines += int64(result.Processed)
  567. p.stats.SuccessLines += int64(result.Succeeded)
  568. p.stats.ErrorLines += int64(result.Failed)
  569. p.stats.ParseDuration += result.Duration
  570. p.stats.LastUpdated = time.Now()
  571. if result.Duration > 0 {
  572. p.stats.LinesPerSecond = float64(result.Processed) / result.Duration.Seconds()
  573. }
  574. }
  575. // GetStats returns current parsing statistics
  576. func (p *OptimizedParser) GetStats() *ParseStats {
  577. p.mu.RLock()
  578. defer p.mu.RUnlock()
  579. statsCopy := *p.stats
  580. return &statsCopy
  581. }
  582. // ResetStats resets parsing statistics
  583. func (p *OptimizedParser) ResetStats() {
  584. p.mu.Lock()
  585. defer p.mu.Unlock()
  586. p.stats = &ParseStats{}
  587. }
  588. // Zero-copy string/byte conversion utilities
  589. func stringToBytes(s string) []byte {
  590. return *(*[]byte)(unsafe.Pointer(&struct {
  591. string
  592. Cap int
  593. }{s, len(s)}))
  594. }
  595. func bytesToString(b []byte) string {
  596. return *(*string)(unsafe.Pointer(&b))
  597. }
  598. // Fast integer parsing without allocations
  599. func fastParseInt(b []byte) (int, error) {
  600. if len(b) == 0 {
  601. return 0, strconv.ErrSyntax
  602. }
  603. neg := false
  604. if b[0] == '-' {
  605. neg = true
  606. b = b[1:]
  607. if len(b) == 0 {
  608. return 0, strconv.ErrSyntax
  609. }
  610. }
  611. n := 0
  612. for _, c := range b {
  613. if c < '0' || c > '9' {
  614. return 0, strconv.ErrSyntax
  615. }
  616. n = n*10 + int(c-'0')
  617. }
  618. if neg {
  619. n = -n
  620. }
  621. return n, nil
  622. }