123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740 |
- package parser
- import (
- "bufio"
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "io"
- "runtime"
- "strconv"
- "sync"
- "time"
- "unsafe"
- )
- // Parser provides high-performance log parsing with zero-copy optimizations
- type Parser struct {
- config *Config
- uaParser UserAgentParser
- geoService GeoIPService
- pool *sync.Pool
- detector *FormatDetector
- stats *ParseStats
- mu sync.RWMutex
- }
- // ParseStats tracks parsing performance metrics
- type ParseStats struct {
- TotalLines int64
- SuccessLines int64
- ErrorLines int64
- TotalBytes int64
- ParseDuration time.Duration
- LinesPerSecond float64
- BytesPerSecond float64
- LastUpdated time.Time
- }
- // parseBuffer holds reusable parsing buffers
- type parseBuffer struct {
- fields [][]byte
- entry *AccessLogEntry
- lineBytes []byte
- }
- // NewParser creates a new high-performance parser
- func NewParser(config *Config, uaParser UserAgentParser, geoService GeoIPService) *Parser {
- if config == nil {
- config = DefaultParserConfig()
- }
- return &Parser{
- config: config,
- uaParser: uaParser,
- geoService: geoService,
- detector: NewFormatDetector(),
- stats: &ParseStats{},
- pool: &sync.Pool{
- New: func() interface{} {
- return &parseBuffer{
- fields: make([][]byte, 0, 16),
- entry: &AccessLogEntry{},
- lineBytes: make([]byte, 0, config.MaxLineLength),
- }
- },
- },
- }
- }
- // ParseLine parses a single log line with zero-copy optimizations
- func (p *Parser) ParseLine(line string) (*AccessLogEntry, error) {
- if len(line) == 0 {
- return nil, ErrEmptyLogLine
- }
- if len(line) > p.config.MaxLineLength {
- return nil, ErrLineTooLong
- }
- buf := p.pool.Get().(*parseBuffer)
- defer p.pool.Put(buf)
- // Reset buffer state
- buf.fields = buf.fields[:0]
- *buf.entry = AccessLogEntry{}
- // Zero-copy conversion to bytes
- buf.lineBytes = stringToBytes(line)
- if err := p.parseLineOptimized(buf.lineBytes, buf); err != nil {
- if p.config.StrictMode {
- return nil, err
- }
- // In non-strict mode, create a minimal entry with raw line
- buf.entry.Raw = line
- buf.entry.Timestamp = time.Now().Unix()
- buf.entry.ID = p.generateEntryID(line)
- // Create a copy to avoid sharing the pooled object
- entryCopy := *buf.entry
- return &entryCopy, nil
- }
- // Generate unique ID for the entry
- buf.entry.ID = p.generateEntryID(line)
- buf.entry.Raw = line
- // Create a copy of the entry to avoid sharing the pooled object
- entryCopy := *buf.entry
- return &entryCopy, nil
- }
- // ParseLines parses multiple log lines with parallel processing
- func (p *Parser) ParseLines(lines []string) *ParseResult {
- return p.ParseLinesWithContext(context.Background(), lines)
- }
- // ParseLinesWithContext parses lines with context support for cancellation
- func (p *Parser) ParseLinesWithContext(ctx context.Context, lines []string) *ParseResult {
- startTime := time.Now()
- result := &ParseResult{
- Entries: make([]*AccessLogEntry, 0, len(lines)),
- Processed: len(lines),
- }
- if len(lines) == 0 {
- result.Duration = time.Since(startTime)
- return result
- }
- // For small datasets, use single-threaded parsing
- if len(lines) < p.config.BatchSize {
- return p.parseLinesSingleThreaded(ctx, lines, startTime)
- }
- // Use parallel processing for larger datasets
- return p.parseLinesParallel(ctx, lines, startTime)
- }
- // ParseStream parses log entries from an io.Reader with streaming support
- func (p *Parser) ParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
- startTime := time.Now()
- result := &ParseResult{
- Entries: make([]*AccessLogEntry, 0),
- }
- scanner := bufio.NewScanner(reader)
- scanner.Buffer(make([]byte, 0, p.config.BufferSize), p.config.MaxLineLength)
- batch := make([]string, 0, p.config.BatchSize)
- for scanner.Scan() {
- select {
- case <-ctx.Done():
- return result, ctx.Err()
- default:
- }
- line := scanner.Text()
- if len(line) == 0 {
- continue
- }
- batch = append(batch, line)
- result.Processed++
- if len(batch) >= p.config.BatchSize {
- batchResult := p.ParseLinesWithContext(ctx, batch)
- result.Entries = append(result.Entries, batchResult.Entries...)
- result.Succeeded += batchResult.Succeeded
- result.Failed += batchResult.Failed
- batch = batch[:0]
- }
- }
- // Process remaining lines in batch
- if len(batch) > 0 {
- batchResult := p.ParseLinesWithContext(ctx, batch)
- result.Entries = append(result.Entries, batchResult.Entries...)
- result.Succeeded += batchResult.Succeeded
- result.Failed += batchResult.Failed
- }
- if err := scanner.Err(); err != nil {
- return result, err
- }
- result.Duration = time.Since(startTime)
- if result.Processed > 0 {
- result.ErrorRate = float64(result.Failed) / float64(result.Processed)
- }
- p.updateStats(result)
- return result, nil
- }
- // parseLinesSingleThreaded handles small datasets with single-threaded parsing
- func (p *Parser) parseLinesSingleThreaded(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
- result := &ParseResult{
- Entries: make([]*AccessLogEntry, 0, len(lines)),
- Processed: len(lines),
- }
- for _, line := range lines {
- select {
- case <-ctx.Done():
- result.Duration = time.Since(startTime)
- return result
- default:
- }
- if entry, err := p.ParseLine(line); err == nil {
- result.Entries = append(result.Entries, entry)
- result.Succeeded++
- } else {
- result.Failed++
- }
- }
- result.Duration = time.Since(startTime)
- if result.Processed > 0 {
- result.ErrorRate = float64(result.Failed) / float64(result.Processed)
- }
- return result
- }
- // parseLinesParallel handles large datasets with parallel processing
- func (p *Parser) parseLinesParallel(ctx context.Context, lines []string, startTime time.Time) *ParseResult {
- numWorkers := p.config.WorkerCount
- if numWorkers <= 0 {
- numWorkers = runtime.NumCPU()
- }
- if numWorkers > len(lines)/10 {
- numWorkers = len(lines)/10 + 1
- }
- result := &ParseResult{
- Processed: len(lines),
- }
- // Create channels for work distribution
- lineChan := make(chan string, numWorkers*2)
- resultChan := make(chan *AccessLogEntry, len(lines))
- errorChan := make(chan error, len(lines))
- var wg sync.WaitGroup
- // Start worker goroutines
- for i := 0; i < numWorkers; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for line := range lineChan {
- if entry, err := p.ParseLine(line); err == nil {
- resultChan <- entry
- } else {
- errorChan <- err
- }
- }
- }()
- }
- // Send lines to workers
- go func() {
- defer close(lineChan)
- for _, line := range lines {
- select {
- case <-ctx.Done():
- return
- case lineChan <- line:
- }
- }
- }()
- // Wait for workers and close result channels
- go func() {
- wg.Wait()
- close(resultChan)
- close(errorChan)
- }()
- // Collect results
- entries := make([]*AccessLogEntry, 0, len(lines))
- var errorCount int
- for {
- select {
- case <-ctx.Done():
- result.Duration = time.Since(startTime)
- return result
- case entry, ok := <-resultChan:
- if !ok {
- resultChan = nil
- } else {
- entries = append(entries, entry)
- }
- case _, ok := <-errorChan:
- if !ok {
- errorChan = nil
- } else {
- errorCount++
- }
- }
- if resultChan == nil && errorChan == nil {
- break
- }
- }
- result.Entries = entries
- result.Succeeded = len(entries)
- result.Failed = errorCount
- result.Duration = time.Since(startTime)
- if result.Processed > 0 {
- result.ErrorRate = float64(result.Failed) / float64(result.Processed)
- }
- return result
- }
- // parseLineOptimized performs optimized parsing of a single line
- func (p *Parser) parseLineOptimized(line []byte, buf *parseBuffer) error {
- pos := 0
- length := len(line)
- if length < 20 {
- return ErrUnsupportedLogFormat
- }
- // Parse IP address
- pos = p.parseIP(line, pos, buf.entry)
- if pos >= length {
- return ErrUnsupportedLogFormat
- }
- // Skip remote user fields (- -)
- pos = p.skipSpaces(line, pos)
- pos = p.skipField(line, pos) // remote user
- pos = p.skipSpaces(line, pos)
- pos = p.skipField(line, pos) // remote logname
- // Parse timestamp
- pos = p.skipSpaces(line, pos)
- pos = p.parseTimestamp(line, pos, buf.entry)
- if pos >= length {
- return ErrUnsupportedLogFormat
- }
- // Parse request
- pos = p.skipSpaces(line, pos)
- pos = p.parseRequest(line, pos, buf.entry)
- if pos >= length {
- return ErrUnsupportedLogFormat
- }
- // Parse status code
- pos = p.skipSpaces(line, pos)
- pos = p.parseStatus(line, pos, buf.entry)
- if pos >= length {
- return ErrUnsupportedLogFormat
- }
- // Parse response size
- pos = p.skipSpaces(line, pos)
- pos = p.parseSize(line, pos, buf.entry)
- // Parse optional fields if they exist
- if pos < length {
- pos = p.skipSpaces(line, pos)
- if pos < length && line[pos] == '"' {
- pos = p.parseReferer(line, pos, buf.entry)
- }
- }
- if pos < length {
- pos = p.skipSpaces(line, pos)
- if pos < length && line[pos] == '"' {
- pos = p.parseUserAgent(line, pos, buf.entry)
- }
- }
- if pos < length {
- pos = p.skipSpaces(line, pos)
- if pos < length {
- pos = p.parseRequestTime(line, pos, buf.entry)
- }
- }
- if pos < length {
- pos = p.skipSpaces(line, pos)
- if pos < length {
- _ = p.parseUpstreamTime(line, pos, buf.entry)
- }
- }
- return nil
- }
- // Fast field parsing methods with zero-copy optimizations
- func (p *Parser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {
- start := pos
- for pos < len(line) && line[pos] != ' ' {
- pos++
- }
- if pos > start {
- entry.IP = bytesToString(line[start:pos])
- // Populate geographic fields if enabled
- if p.config.EnableGeoIP && p.geoService != nil && entry.IP != "-" {
- if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {
- entry.Province = location.Province
- entry.City = location.City
- // Use the specific RegionCode (e.g., province code 'CA') if available,
- // otherwise, fall back to the CountryCode (e.g., 'US').
- if location.RegionCode != "" {
- entry.RegionCode = location.RegionCode
- } else {
- entry.RegionCode = location.CountryCode
- }
- }
- }
- }
- return pos
- }
- func (p *Parser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {
- if pos >= len(line) || line[pos] != '[' {
- return pos
- }
- pos++
- start := pos
- for pos < len(line) && line[pos] != ']' {
- pos++
- }
- if pos > start {
- timeStr := bytesToString(line[start:pos])
- // Debug: log the timestamp string we're trying to parse
- // fmt.Printf("DEBUG: Parsing timestamp string: '%s'\n", timeStr)
- if t, err := time.Parse(p.config.TimeLayout, timeStr); err == nil {
- entry.Timestamp = t.Unix()
- } else {
- // Try alternative common nginx timestamp formats if the default fails
- formats := []string{
- "02/Jan/2006:15:04:05 -0700", // Standard nginx format
- "2006-01-02T15:04:05-07:00", // ISO 8601 format
- "2006-01-02 15:04:05", // Simple datetime format
- "02/Jan/2006:15:04:05", // Without timezone
- }
- parsed := false
- for _, format := range formats {
- if t, err := time.Parse(format, timeStr); err == nil {
- entry.Timestamp = t.Unix()
- parsed = true
- break
- }
- }
- // If all parsing attempts fail, keep timestamp as 0
- if !parsed {
- // Debug: log parsing failure
- // fmt.Printf("DEBUG: Failed to parse timestamp: '%s'\n", timeStr)
- }
- }
- }
- if pos < len(line) && line[pos] == ']' {
- pos++
- }
- return pos
- }
- func (p *Parser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {
- if pos >= len(line) || line[pos] != '"' {
- return pos
- }
- pos++
- start := pos
- for pos < len(line) && line[pos] != '"' {
- pos++
- }
- if pos > start {
- requestLine := line[start:pos]
- parts := bytes.Fields(requestLine)
- if len(parts) >= 1 {
- method := bytesToString(parts[0])
- if ValidHTTPMethods[method] {
- entry.Method = method
- }
- }
- if len(parts) >= 2 {
- entry.Path = bytesToString(parts[1])
- }
- if len(parts) >= 3 {
- entry.Protocol = bytesToString(parts[2])
- }
- }
- if pos < len(line) && line[pos] == '"' {
- pos++
- }
- return pos
- }
- func (p *Parser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {
- start := pos
- for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {
- pos++
- }
- if pos > start {
- if status, err := fastParseInt(line[start:pos]); err == nil && status >= 100 && status < 600 {
- entry.Status = status
- }
- }
- return pos
- }
- func (p *Parser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {
- start := pos
- for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {
- pos++
- }
- if pos > start {
- sizeBytes := line[start:pos]
- if len(sizeBytes) == 1 && sizeBytes[0] == '-' {
- entry.BytesSent = 0
- } else {
- if size, err := fastParseInt(sizeBytes); err == nil && size >= 0 {
- entry.BytesSent = int64(size)
- }
- }
- }
- return pos
- }
- func (p *Parser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {
- if pos >= len(line) || line[pos] != '"' {
- return pos
- }
- pos++
- start := pos
- for pos < len(line) && line[pos] != '"' {
- pos++
- }
- if pos > start {
- entry.Referer = bytesToString(line[start:pos])
- }
- if pos < len(line) && line[pos] == '"' {
- pos++
- }
- return pos
- }
- func (p *Parser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {
- if pos >= len(line) || line[pos] != '"' {
- return pos
- }
- pos++
- start := pos
- for pos < len(line) && line[pos] != '"' {
- pos++
- }
- if pos > start {
- userAgent := bytesToString(line[start:pos])
- entry.UserAgent = userAgent
- if p.config.EnableUA && p.uaParser != nil && userAgent != "-" {
- parsed := p.uaParser.Parse(userAgent)
- if parsed.Browser != "Unknown" && parsed.Browser != "" {
- entry.Browser = parsed.Browser
- entry.BrowserVer = parsed.BrowserVer
- }
- if parsed.OS != "Unknown" && parsed.OS != "" {
- entry.OS = parsed.OS
- entry.OSVersion = parsed.OSVersion
- }
- if parsed.DeviceType != "" {
- entry.DeviceType = parsed.DeviceType
- }
- }
- }
- if pos < len(line) && line[pos] == '"' {
- pos++
- }
- return pos
- }
- func (p *Parser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {
- start := pos
- for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
- pos++
- }
- if pos > start {
- timeStr := bytesToString(line[start:pos])
- if timeStr != "-" {
- if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
- entry.RequestTime = val
- }
- }
- }
- return pos
- }
- func (p *Parser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {
- start := pos
- for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
- pos++
- }
- if pos > start {
- timeStr := bytesToString(line[start:pos])
- if timeStr != "-" {
- if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {
- entry.UpstreamTime = &val
- }
- }
- }
- return pos
- }
- // Utility methods
- func (p *Parser) skipSpaces(line []byte, pos int) int {
- for pos < len(line) && line[pos] == ' ' {
- pos++
- }
- return pos
- }
- func (p *Parser) skipField(line []byte, pos int) int {
- for pos < len(line) && line[pos] != ' ' {
- pos++
- }
- return pos
- }
- func (p *Parser) generateEntryID(line string) string {
- hash := md5.Sum([]byte(line))
- return fmt.Sprintf("%x", hash)[:16]
- }
- func (p *Parser) updateStats(result *ParseResult) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.stats.TotalLines += int64(result.Processed)
- p.stats.SuccessLines += int64(result.Succeeded)
- p.stats.ErrorLines += int64(result.Failed)
- p.stats.ParseDuration += result.Duration
- p.stats.LastUpdated = time.Now()
- if result.Duration > 0 {
- p.stats.LinesPerSecond = float64(result.Processed) / result.Duration.Seconds()
- }
- }
- // GetStats returns current parsing statistics
- func (p *Parser) GetStats() *ParseStats {
- p.mu.RLock()
- defer p.mu.RUnlock()
- statsCopy := *p.stats
- return &statsCopy
- }
- // ResetStats resets parsing statistics
- func (p *Parser) ResetStats() {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.stats = &ParseStats{}
- }
- // Zero-copy string/byte conversion utilities
- func stringToBytes(s string) []byte {
- return *(*[]byte)(unsafe.Pointer(&struct {
- string
- Cap int
- }{s, len(s)}))
- }
- func bytesToString(b []byte) string {
- return *(*string)(unsafe.Pointer(&b))
- }
- // Fast integer parsing without allocations
- func fastParseInt(b []byte) (int, error) {
- if len(b) == 0 {
- return 0, strconv.ErrSyntax
- }
- neg := false
- if b[0] == '-' {
- neg = true
- b = b[1:]
- if len(b) == 0 {
- return 0, strconv.ErrSyntax
- }
- }
- n := 0
- for _, c := range b {
- if c < '0' || c > '9' {
- return 0, strconv.ErrSyntax
- }
- n = n*10 + int(c-'0')
- }
- if neg {
- n = -n
- }
- return n, nil
- }
|