optimized_parser.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. package parser
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "fmt"
  8. "io"
  9. "runtime"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "unsafe"
  14. )
  15. // OptimizedParser provides high-performance log parsing with zero-copy optimizations
  16. type OptimizedParser struct {
  17. config *Config
  18. uaParser UserAgentParser
  19. geoService GeoIPService
  20. pool *sync.Pool
  21. detector *FormatDetector
  22. stats *ParseStats
  23. mu sync.RWMutex
  24. }
  25. // ParseStats tracks parsing performance metrics
  26. type ParseStats struct {
  27. TotalLines int64
  28. SuccessLines int64
  29. ErrorLines int64
  30. TotalBytes int64
  31. ParseDuration time.Duration
  32. LinesPerSecond float64
  33. BytesPerSecond float64
  34. LastUpdated time.Time
  35. }
  36. // parseBuffer holds reusable parsing buffers
  37. type parseBuffer struct {
  38. fields [][]byte
  39. entry *AccessLogEntry
  40. lineBytes []byte
  41. }
  42. // NewOptimizedParser creates a new high-performance parser
  43. func NewOptimizedParser(config *Config, uaParser UserAgentParser, geoService GeoIPService) *OptimizedParser {
  44. if config == nil {
  45. config = DefaultParserConfig()
  46. }
  47. return &OptimizedParser{
  48. config: config,
  49. uaParser: uaParser,
  50. geoService: geoService,
  51. detector: NewFormatDetector(),
  52. stats: &ParseStats{},
  53. pool: &sync.Pool{
  54. New: func() interface{} {
  55. return &parseBuffer{
  56. fields: make([][]byte, 0, 16),
  57. entry: &AccessLogEntry{},
  58. lineBytes: make([]byte, 0, config.MaxLineLength),
  59. }
  60. },
  61. },
  62. }
  63. }
  64. // ParseLine parses a single log line with zero-copy optimizations
  65. func (p *OptimizedParser) ParseLine(line string) (*AccessLogEntry, error) {
  66. if len(line) == 0 {
  67. return nil, fmt.Errorf(ErrEmptyLogLine)
  68. }
  69. if len(line) > p.config.MaxLineLength {
  70. return nil, fmt.Errorf(ErrLineTooLong)
  71. }
  72. buf := p.pool.Get().(*parseBuffer)
  73. defer p.pool.Put(buf)
  74. // Reset buffer state
  75. buf.fields = buf.fields[:0]
  76. *buf.entry = AccessLogEntry{}
  77. // Zero-copy conversion to bytes
  78. buf.lineBytes = stringToBytes(line)
  79. if err := p.parseLineOptimized(buf.lineBytes, buf); err != nil {
  80. if p.config.StrictMode {
  81. return nil, err
  82. }
  83. // In non-strict mode, create a minimal entry with raw line
  84. buf.entry.Raw = line
  85. buf.entry.Timestamp = time.Now().Unix()
  86. buf.entry.ID = p.generateEntryID(line)
  87. return buf.entry, nil
  88. }
  89. // Generate unique ID for the entry
  90. buf.entry.ID = p.generateEntryID(line)
  91. buf.entry.Raw = line
  92. return buf.entry, nil
  93. }
  94. // ParseLines parses multiple log lines with parallel processing
  95. func (p *OptimizedParser) ParseLines(lines []string) *ParseResult {
  96. return p.ParseLinesWithContext(context.Background(), lines)
  97. }
  98. // ParseLinesWithContext parses lines with context support for cancellation
  99. func (p *OptimizedParser) ParseLinesWithContext(ctx context.Context, lines []string) *ParseResult {
  100. startTime := time.Now()
  101. result := &ParseResult{
  102. Entries: make([]*AccessLogEntry, 0, len(lines)),
  103. Processed: len(lines),
  104. }
  105. if len(lines) == 0 {
  106. result.Duration = time.Since(startTime)
  107. return result
  108. }
  109. // For small datasets, use single-threaded parsing
  110. if len(lines) < p.config.BatchSize {
  111. return p.parseLinesSingleThreaded(ctx, lines, startTime)
  112. }
  113. // Use parallel processing for larger datasets
  114. return p.parseLinesParallel(ctx, lines, startTime)
  115. }
  116. // ParseStream parses log entries from an io.Reader with streaming support
  117. func (p *OptimizedParser) ParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
  118. startTime := time.Now()
  119. result := &ParseResult{
  120. Entries: make([]*AccessLogEntry, 0),
  121. }
  122. scanner := bufio.NewScanner(reader)
  123. scanner.Buffer(make([]byte, 0, p.config.BufferSize), p.config.MaxLineLength)
  124. batch := make([]string, 0, p.config.BatchSize)
  125. for scanner.Scan() {
  126. select {
  127. case <-ctx.Done():
  128. return result, ctx.Err()
  129. default:
  130. }
  131. line := scanner.Text()
  132. if len(line) == 0 {
  133. continue
  134. }
  135. batch = append(batch, line)
  136. result.Processed++
  137. if len(batch) >= p.config.BatchSize {
  138. batchResult := p.ParseLinesWithContext(ctx, batch)
  139. result.Entries = append(result.Entries, batchResult.Entries...)
  140. result.Succeeded += batchResult.Succeeded
  141. result.Failed += batchResult.Failed
  142. batch = batch[:0]
  143. }
  144. }
  145. // Process remaining lines in batch
  146. if len(batch) > 0 {
  147. batchResult := p.ParseLinesWithContext(ctx, batch)
  148. result.Entries = append(result.Entries, batchResult.Entries...)
  149. result.Succeeded += batchResult.Succeeded
  150. result.Failed += batchResult.Failed
  151. }
  152. if err := scanner.Err(); err != nil {
  153. return result, err
  154. }
  155. result.Duration = time.Since(startTime)
  156. if result.Processed > 0 {
  157. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  158. }
  159. p.updateStats(result)
  160. return result, nil
  161. }
  162. // parseLinesSingleThreaded handles small datasets with single-threaded parsing
  163. func (p *OptimizedParser) parseLinesSingleThreaded(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  164. result := &ParseResult{
  165. Entries: make([]*AccessLogEntry, 0, len(lines)),
  166. Processed: len(lines),
  167. }
  168. for _, line := range lines {
  169. select {
  170. case <-ctx.Done():
  171. result.Duration = time.Since(startTime)
  172. return result
  173. default:
  174. }
  175. if entry, err := p.ParseLine(line); err == nil {
  176. result.Entries = append(result.Entries, entry)
  177. result.Succeeded++
  178. } else {
  179. result.Failed++
  180. }
  181. }
  182. result.Duration = time.Since(startTime)
  183. if result.Processed > 0 {
  184. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  185. }
  186. return result
  187. }
  188. // parseLinesParallel handles large datasets with parallel processing
  189. func (p *OptimizedParser) parseLinesParallel(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
  190. numWorkers := p.config.WorkerCount
  191. if numWorkers <= 0 {
  192. numWorkers = runtime.NumCPU()
  193. }
  194. if numWorkers > len(lines)/10 {
  195. numWorkers = len(lines)/10 + 1
  196. }
  197. result := &ParseResult{
  198. Processed: len(lines),
  199. }
  200. // Create channels for work distribution
  201. lineChan := make(chan string, numWorkers*2)
  202. resultChan := make(chan *AccessLogEntry, len(lines))
  203. errorChan := make(chan error, len(lines))
  204. var wg sync.WaitGroup
  205. // Start worker goroutines
  206. for i := 0; i < numWorkers; i++ {
  207. wg.Add(1)
  208. go func() {
  209. defer wg.Done()
  210. for line := range lineChan {
  211. if entry, err := p.ParseLine(line); err == nil {
  212. resultChan <- entry
  213. } else {
  214. errorChan <- err
  215. }
  216. }
  217. }()
  218. }
  219. // Send lines to workers
  220. go func() {
  221. defer close(lineChan)
  222. for _, line := range lines {
  223. select {
  224. case <-ctx.Done():
  225. return
  226. case lineChan <- line:
  227. }
  228. }
  229. }()
  230. // Wait for workers and close result channels
  231. go func() {
  232. wg.Wait()
  233. close(resultChan)
  234. close(errorChan)
  235. }()
  236. // Collect results
  237. entries := make([]*AccessLogEntry, 0, len(lines))
  238. var errorCount int
  239. for {
  240. select {
  241. case <-ctx.Done():
  242. result.Duration = time.Since(startTime)
  243. return result
  244. case entry, ok := <-resultChan:
  245. if !ok {
  246. resultChan = nil
  247. } else {
  248. entries = append(entries, entry)
  249. }
  250. case _, ok := <-errorChan:
  251. if !ok {
  252. errorChan = nil
  253. } else {
  254. errorCount++
  255. }
  256. }
  257. if resultChan == nil && errorChan == nil {
  258. break
  259. }
  260. }
  261. result.Entries = entries
  262. result.Succeeded = len(entries)
  263. result.Failed = errorCount
  264. result.Duration = time.Since(startTime)
  265. if result.Processed > 0 {
  266. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  267. }
  268. return result
  269. }
  270. // parseLineOptimized performs optimized parsing of a single line
  271. func (p *OptimizedParser) parseLineOptimized(line []byte, buf *parseBuffer) error {
  272. pos := 0
  273. length := len(line)
  274. if length < 20 {
  275. return fmt.Errorf(ErrUnsupportedLogFormat)
  276. }
  277. // Parse IP address
  278. pos = p.parseIP(line, pos, buf.entry)
  279. if pos >= length {
  280. return fmt.Errorf(ErrUnsupportedLogFormat)
  281. }
  282. // Skip remote user fields (- -)
  283. pos = p.skipSpaces(line, pos)
  284. pos = p.skipField(line, pos) // remote user
  285. pos = p.skipSpaces(line, pos)
  286. pos = p.skipField(line, pos) // remote logname
  287. // Parse timestamp
  288. pos = p.skipSpaces(line, pos)
  289. pos = p.parseTimestamp(line, pos, buf.entry)
  290. if pos >= length {
  291. return fmt.Errorf(ErrUnsupportedLogFormat)
  292. }
  293. // Parse request
  294. pos = p.skipSpaces(line, pos)
  295. pos = p.parseRequest(line, pos, buf.entry)
  296. if pos >= length {
  297. return fmt.Errorf(ErrUnsupportedLogFormat)
  298. }
  299. // Parse status code
  300. pos = p.skipSpaces(line, pos)
  301. pos = p.parseStatus(line, pos, buf.entry)
  302. if pos >= length {
  303. return fmt.Errorf(ErrUnsupportedLogFormat)
  304. }
  305. // Parse response size
  306. pos = p.skipSpaces(line, pos)
  307. pos = p.parseSize(line, pos, buf.entry)
  308. // Parse optional fields if they exist
  309. if pos < length {
  310. pos = p.skipSpaces(line, pos)
  311. if pos < length && line[pos] == '"' {
  312. pos = p.parseReferer(line, pos, buf.entry)
  313. }
  314. }
  315. if pos < length {
  316. pos = p.skipSpaces(line, pos)
  317. if pos < length && line[pos] == '"' {
  318. pos = p.parseUserAgent(line, pos, buf.entry)
  319. }
  320. }
  321. if pos < length {
  322. pos = p.skipSpaces(line, pos)
  323. if pos < length {
  324. pos = p.parseRequestTime(line, pos, buf.entry)
  325. }
  326. }
  327. if pos < length {
  328. pos = p.skipSpaces(line, pos)
  329. if pos < length {
  330. pos = p.parseUpstreamTime(line, pos, buf.entry)
  331. }
  332. }
  333. return nil
  334. }
  335. // Fast field parsing methods with zero-copy optimizations
  336. func (p *OptimizedParser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {
  337. start := pos
  338. for pos < len(line) && line[pos] != ' ' {
  339. pos++
  340. }
  341. if pos > start {
  342. entry.IP = bytesToString(line[start:pos])
  343. // Populate geographic fields if enabled
  344. if p.config.EnableGeoIP && p.geoService != nil && entry.IP != "-" {
  345. if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {
  346. entry.Province = location.Province
  347. entry.City = location.City
  348. // Use the specific RegionCode (e.g., province code 'CA') if available,
  349. // otherwise, fall back to the CountryCode (e.g., 'US').
  350. if location.RegionCode != "" {
  351. entry.RegionCode = location.RegionCode
  352. } else {
  353. entry.RegionCode = location.CountryCode
  354. }
  355. }
  356. }
  357. }
  358. return pos
  359. }
  360. func (p *OptimizedParser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {
  361. if pos >= len(line) || line[pos] != '[' {
  362. return pos
  363. }
  364. pos++
  365. start := pos
  366. for pos < len(line) && line[pos] != ']' {
  367. pos++
  368. }
  369. if pos > start {
  370. timeStr := bytesToString(line[start:pos])
  371. if t, err := time.Parse(p.config.TimeLayout, timeStr); err == nil {
  372. entry.Timestamp = t.Unix()
  373. }
  374. }
  375. if pos < len(line) && line[pos] == ']' {
  376. pos++
  377. }
  378. return pos
  379. }
  380. func (p *OptimizedParser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {
  381. if pos >= len(line) || line[pos] != '"' {
  382. return pos
  383. }
  384. pos++
  385. start := pos
  386. for pos < len(line) && line[pos] != '"' {
  387. pos++
  388. }
  389. if pos > start {
  390. requestLine := line[start:pos]
  391. parts := bytes.Fields(requestLine)
  392. if len(parts) >= 1 {
  393. method := bytesToString(parts[0])
  394. if ValidHTTPMethods[method] {
  395. entry.Method = method
  396. }
  397. }
  398. if len(parts) >= 2 {
  399. entry.Path = bytesToString(parts[1])
  400. }
  401. if len(parts) >= 3 {
  402. entry.Protocol = bytesToString(parts[2])
  403. }
  404. }
  405. if pos < len(line) && line[pos] == '"' {
  406. pos++
  407. }
  408. return pos
  409. }
  410. func (p *OptimizedParser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {
  411. start := pos
  412. for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {
  413. pos++
  414. }
  415. if pos > start {
  416. if status, err := fastParseInt(line[start:pos]); err == nil && status >= 100 && status < 600 {
  417. entry.Status = status
  418. }
  419. }
  420. return pos
  421. }
  422. func (p *OptimizedParser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {
  423. start := pos
  424. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {
  425. pos++
  426. }
  427. if pos > start {
  428. sizeBytes := line[start:pos]
  429. if len(sizeBytes) == 1 && sizeBytes[0] == '-' {
  430. entry.BytesSent = 0
  431. } else {
  432. if size, err := fastParseInt(sizeBytes); err == nil && size >= 0 {
  433. entry.BytesSent = int64(size)
  434. }
  435. }
  436. }
  437. return pos
  438. }
  439. func (p *OptimizedParser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {
  440. if pos >= len(line) || line[pos] != '"' {
  441. return pos
  442. }
  443. pos++
  444. start := pos
  445. for pos < len(line) && line[pos] != '"' {
  446. pos++
  447. }
  448. if pos > start {
  449. entry.Referer = bytesToString(line[start:pos])
  450. }
  451. if pos < len(line) && line[pos] == '"' {
  452. pos++
  453. }
  454. return pos
  455. }
  456. func (p *OptimizedParser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {
  457. if pos >= len(line) || line[pos] != '"' {
  458. return pos
  459. }
  460. pos++
  461. start := pos
  462. for pos < len(line) && line[pos] != '"' {
  463. pos++
  464. }
  465. if pos > start {
  466. userAgent := bytesToString(line[start:pos])
  467. entry.UserAgent = userAgent
  468. if p.config.EnableUA && p.uaParser != nil && userAgent != "-" {
  469. parsed := p.uaParser.Parse(userAgent)
  470. if parsed.Browser != "Unknown" && parsed.Browser != "" {
  471. entry.Browser = parsed.Browser
  472. entry.BrowserVer = parsed.BrowserVer
  473. }
  474. if parsed.OS != "Unknown" && parsed.OS != "" {
  475. entry.OS = parsed.OS
  476. entry.OSVersion = parsed.OSVersion
  477. }
  478. if parsed.DeviceType != "" {
  479. entry.DeviceType = parsed.DeviceType
  480. }
  481. }
  482. }
  483. if pos < len(line) && line[pos] == '"' {
  484. pos++
  485. }
  486. return pos
  487. }
  488. func (p *OptimizedParser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {
  489. start := pos
  490. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  491. pos++
  492. }
  493. if pos > start {
  494. timeStr := bytesToString(line[start:pos])
  495. if timeStr != "-" {
  496. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  497. entry.RequestTime = val
  498. }
  499. }
  500. }
  501. return pos
  502. }
  503. func (p *OptimizedParser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {
  504. start := pos
  505. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  506. pos++
  507. }
  508. if pos > start {
  509. timeStr := bytesToString(line[start:pos])
  510. if timeStr != "-" {
  511. if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
  512. entry.UpstreamTime = &val
  513. }
  514. }
  515. }
  516. return pos
  517. }
  518. // Utility methods
  519. func (p *OptimizedParser) skipSpaces(line []byte, pos int) int {
  520. for pos < len(line) && line[pos] == ' ' {
  521. pos++
  522. }
  523. return pos
  524. }
  525. func (p *OptimizedParser) skipField(line []byte, pos int) int {
  526. for pos < len(line) && line[pos] != ' ' {
  527. pos++
  528. }
  529. return pos
  530. }
  531. func (p *OptimizedParser) generateEntryID(line string) string {
  532. hash := md5.Sum([]byte(line))
  533. return fmt.Sprintf("%x", hash)[:16]
  534. }
  535. func (p *OptimizedParser) updateStats(result *ParseResult) {
  536. p.mu.Lock()
  537. defer p.mu.Unlock()
  538. p.stats.TotalLines += int64(result.Processed)
  539. p.stats.SuccessLines += int64(result.Succeeded)
  540. p.stats.ErrorLines += int64(result.Failed)
  541. p.stats.ParseDuration += result.Duration
  542. p.stats.LastUpdated = time.Now()
  543. if result.Duration > 0 {
  544. p.stats.LinesPerSecond = float64(result.Processed) / result.Duration.Seconds()
  545. }
  546. }
  547. // GetStats returns current parsing statistics
  548. func (p *OptimizedParser) GetStats() *ParseStats {
  549. p.mu.RLock()
  550. defer p.mu.RUnlock()
  551. statsCopy := *p.stats
  552. return &statsCopy
  553. }
  554. // ResetStats resets parsing statistics
  555. func (p *OptimizedParser) ResetStats() {
  556. p.mu.Lock()
  557. defer p.mu.Unlock()
  558. p.stats = &ParseStats{}
  559. }
  560. // Zero-copy string/byte conversion utilities
  561. func stringToBytes(s string) []byte {
  562. return *(*[]byte)(unsafe.Pointer(&struct {
  563. string
  564. Cap int
  565. }{s, len(s)}))
  566. }
  567. func bytesToString(b []byte) string {
  568. return *(*string)(unsafe.Pointer(&b))
  569. }
  570. // Fast integer parsing without allocations
  571. func fastParseInt(b []byte) (int, error) {
  572. if len(b) == 0 {
  573. return 0, strconv.ErrSyntax
  574. }
  575. neg := false
  576. if b[0] == '-' {
  577. neg = true
  578. b = b[1:]
  579. if len(b) == 0 {
  580. return 0, strconv.ErrSyntax
  581. }
  582. }
  583. n := 0
  584. for _, c := range b {
  585. if c < '0' || c > '9' {
  586. return 0, strconv.ErrSyntax
  587. }
  588. n = n*10 + int(c-'0')
  589. }
  590. if neg {
  591. n = -n
  592. }
  593. return n, nil
  594. }