| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740 | package parserimport (	"bufio"	"bytes"	"context"	"crypto/md5"	"fmt"	"io"	"runtime"	"strconv"	"sync"	"time"	"unsafe")// Parser provides high-performance log parsing with zero-copy optimizationstype Parser struct {	config     *Config	uaParser   UserAgentParser	geoService GeoIPService	pool       *sync.Pool	detector   *FormatDetector	stats      *ParseStats	mu         sync.RWMutex}// ParseStats tracks parsing performance metricstype ParseStats struct {	TotalLines     int64	SuccessLines   int64	ErrorLines     int64	TotalBytes     int64	ParseDuration  time.Duration	LinesPerSecond float64	BytesPerSecond float64	LastUpdated    time.Time}// parseBuffer holds reusable parsing bufferstype parseBuffer struct {	fields    [][]byte	entry     *AccessLogEntry	lineBytes []byte}// NewParser creates a new high-performance parserfunc NewParser(config *Config, uaParser UserAgentParser, geoService GeoIPService) *Parser {	if config == nil {		config = DefaultParserConfig()	}	return &Parser{		config:     config,		uaParser:   uaParser,		geoService: geoService,		detector:   NewFormatDetector(),		stats:      &ParseStats{},		pool: &sync.Pool{			New: func() interface{} {				return &parseBuffer{					fields:    make([][]byte, 0, 16),					entry:     &AccessLogEntry{},					lineBytes: make([]byte, 0, config.MaxLineLength),				}			},		},	}}// ParseLine parses a single log line with zero-copy optimizationsfunc (p *Parser) ParseLine(line string) (*AccessLogEntry, error) {	if len(line) == 0 {		return nil, ErrEmptyLogLine	}	if len(line) > p.config.MaxLineLength {		return nil, ErrLineTooLong	}	buf := p.pool.Get().(*parseBuffer)	defer p.pool.Put(buf)	// Reset buffer state	buf.fields = buf.fields[:0]	*buf.entry = AccessLogEntry{}	// Zero-copy conversion to bytes	buf.lineBytes = stringToBytes(line)	if err := p.parseLineOptimized(buf.lineBytes, buf); err != nil {		if p.config.StrictMode {			return nil, err		}		// In non-strict mode, create a minimal entry with raw line		buf.entry.Raw = line		buf.entry.Timestamp = time.Now().Unix()		buf.entry.ID = p.generateEntryID(line)		// Create a copy to avoid sharing the pooled object		entryCopy := *buf.entry		return &entryCopy, nil	}	// Generate unique ID for the entry	buf.entry.ID = p.generateEntryID(line)	buf.entry.Raw = line	// Create a copy of the entry to avoid sharing the pooled object	entryCopy := *buf.entry	return &entryCopy, nil}// ParseLines parses multiple log lines with parallel processingfunc (p *Parser) ParseLines(lines []string) *ParseResult {	return p.ParseLinesWithContext(context.Background(), lines)}// ParseLinesWithContext parses lines with context support for cancellationfunc (p *Parser) ParseLinesWithContext(ctx context.Context, lines []string) *ParseResult {	startTime := time.Now()	result := &ParseResult{		Entries:   make([]*AccessLogEntry, 0, len(lines)),		Processed: len(lines),	}	if len(lines) == 0 {		result.Duration = time.Since(startTime)		return result	}	// For small datasets, use single-threaded parsing	if len(lines) < p.config.BatchSize {		return p.parseLinesSingleThreaded(ctx, lines, startTime)	}	// Use parallel processing for larger datasets	return p.parseLinesParallel(ctx, lines, startTime)}// ParseStream parses log entries from an io.Reader with streaming supportfunc (p *Parser) ParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {	startTime := time.Now()	result := &ParseResult{		Entries: make([]*AccessLogEntry, 0),	}	scanner := bufio.NewScanner(reader)	scanner.Buffer(make([]byte, 0, p.config.BufferSize), p.config.MaxLineLength)	batch := make([]string, 0, p.config.BatchSize)	for scanner.Scan() {		select {		case <-ctx.Done():			return result, ctx.Err()		default:		}		line := scanner.Text()		if len(line) == 0 {			continue		}		batch = append(batch, line)		result.Processed++		if len(batch) >= p.config.BatchSize {			batchResult := p.ParseLinesWithContext(ctx, batch)			result.Entries = append(result.Entries, batchResult.Entries...)			result.Succeeded += batchResult.Succeeded			result.Failed += batchResult.Failed			batch = batch[:0]		}	}	// Process remaining lines in batch	if len(batch) > 0 {		batchResult := p.ParseLinesWithContext(ctx, batch)		result.Entries = append(result.Entries, batchResult.Entries...)		result.Succeeded += batchResult.Succeeded		result.Failed += batchResult.Failed	}	if err := scanner.Err(); err != nil {		return result, err	}	result.Duration = time.Since(startTime)	if result.Processed > 0 {		result.ErrorRate = float64(result.Failed) / float64(result.Processed)	}	p.updateStats(result)	return result, nil}// parseLinesSingleThreaded handles small datasets with single-threaded parsingfunc (p *Parser) parseLinesSingleThreaded(ctx context.Context, lines []string, startTime time.Time) *ParseResult {	result := &ParseResult{		Entries:   make([]*AccessLogEntry, 0, len(lines)),		Processed: len(lines),	}	for _, line := range lines {		select {		case <-ctx.Done():			result.Duration = time.Since(startTime)			return result		default:		}		if entry, err := p.ParseLine(line); err == nil {			result.Entries = append(result.Entries, entry)			result.Succeeded++		} else {			result.Failed++		}	}	result.Duration = time.Since(startTime)	if result.Processed > 0 {		result.ErrorRate = float64(result.Failed) / float64(result.Processed)	}	return result}// parseLinesParallel handles large datasets with parallel processingfunc (p *Parser) parseLinesParallel(ctx context.Context, lines []string, startTime time.Time) *ParseResult {	numWorkers := p.config.WorkerCount	if numWorkers <= 0 {		numWorkers = runtime.NumCPU()	}	if numWorkers > len(lines)/10 {		numWorkers = len(lines)/10 + 1	}	result := &ParseResult{		Processed: len(lines),	}	// Create channels for work distribution	lineChan := make(chan string, numWorkers*2)	resultChan := make(chan *AccessLogEntry, len(lines))	errorChan := make(chan error, len(lines))	var wg sync.WaitGroup	// Start worker goroutines	for i := 0; i < numWorkers; i++ {		wg.Add(1)		go func() {			defer wg.Done()			for line := range lineChan {				if entry, err := p.ParseLine(line); err == nil {					resultChan <- entry				} else {					errorChan <- err				}			}		}()	}	// Send lines to workers	go func() {		defer close(lineChan)		for _, line := range lines {			select {			case <-ctx.Done():				return			case lineChan <- line:			}		}	}()	// Wait for workers and close result channels	go func() {		wg.Wait()		close(resultChan)		close(errorChan)	}()	// Collect results	entries := make([]*AccessLogEntry, 0, len(lines))	var errorCount int	for {		select {		case <-ctx.Done():			result.Duration = time.Since(startTime)			return result		case entry, ok := <-resultChan:			if !ok {				resultChan = nil			} else {				entries = append(entries, entry)			}		case _, ok := <-errorChan:			if !ok {				errorChan = nil			} else {				errorCount++			}		}		if resultChan == nil && errorChan == nil {			break		}	}	result.Entries = entries	result.Succeeded = len(entries)	result.Failed = errorCount	result.Duration = time.Since(startTime)	if result.Processed > 0 {		result.ErrorRate = float64(result.Failed) / float64(result.Processed)	}	return result}// parseLineOptimized performs optimized parsing of a single linefunc (p *Parser) parseLineOptimized(line []byte, buf *parseBuffer) error {	pos := 0	length := len(line)	if length < 20 {		return ErrUnsupportedLogFormat	}	// Parse IP address	pos = p.parseIP(line, pos, buf.entry)	if pos >= length {		return ErrUnsupportedLogFormat	}	// Skip remote user fields (- -)	pos = p.skipSpaces(line, pos)	pos = p.skipField(line, pos) // remote user	pos = p.skipSpaces(line, pos)	pos = p.skipField(line, pos) // remote logname	// Parse timestamp	pos = p.skipSpaces(line, pos)	pos = p.parseTimestamp(line, pos, buf.entry)	if pos >= length {		return ErrUnsupportedLogFormat	}	// Parse request	pos = p.skipSpaces(line, pos)	pos = p.parseRequest(line, pos, buf.entry)	if pos >= length {		return ErrUnsupportedLogFormat	}	// Parse status code	pos = p.skipSpaces(line, pos)	pos = p.parseStatus(line, pos, buf.entry)	if pos >= length {		return ErrUnsupportedLogFormat	}	// Parse response size	pos = p.skipSpaces(line, pos)	pos = p.parseSize(line, pos, buf.entry)	// Parse optional fields if they exist	if pos < length {		pos = p.skipSpaces(line, pos)		if pos < length && line[pos] == '"' {			pos = p.parseReferer(line, pos, buf.entry)		}	}	if pos < length {		pos = p.skipSpaces(line, pos)		if pos < length && line[pos] == '"' {			pos = p.parseUserAgent(line, pos, buf.entry)		}	}	if pos < length {		pos = p.skipSpaces(line, pos)		if pos < length {			pos = p.parseRequestTime(line, pos, buf.entry)		}	}	if pos < length {		pos = p.skipSpaces(line, pos)		if pos < length {			_ = p.parseUpstreamTime(line, pos, buf.entry)		}	}	return nil}// Fast field parsing methods with zero-copy optimizationsfunc (p *Parser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {	start := pos	for pos < len(line) && line[pos] != ' ' {		pos++	}	if pos > start {		entry.IP = bytesToString(line[start:pos])		// Populate geographic fields if enabled		if p.config.EnableGeoIP && p.geoService != nil && entry.IP != "-" {			if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {				entry.Province = location.Province				entry.City = location.City				// Use the specific RegionCode (e.g., province code 'CA') if available,				// otherwise, fall back to the CountryCode (e.g., 'US').				if location.RegionCode != "" {					entry.RegionCode = location.RegionCode				} else {					entry.RegionCode = location.CountryCode				}			}		}	}	return pos}func (p *Parser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {	if pos >= len(line) || line[pos] != '[' {		return pos	}	pos++	start := pos	for pos < len(line) && line[pos] != ']' {		pos++	}	if pos > start {		timeStr := bytesToString(line[start:pos])		// Debug: log the timestamp string we're trying to parse		// fmt.Printf("DEBUG: Parsing timestamp string: '%s'\n", timeStr)		if t, err := time.Parse(p.config.TimeLayout, timeStr); err == nil {			entry.Timestamp = t.Unix()		} else {			// Try alternative common nginx timestamp formats if the default fails			formats := []string{				"02/Jan/2006:15:04:05 -0700", // Standard nginx format				"2006-01-02T15:04:05-07:00",  // ISO 8601 format				"2006-01-02 15:04:05",        // Simple datetime format				"02/Jan/2006:15:04:05",       // Without timezone			}			parsed := false			for _, format := range formats {				if t, err := time.Parse(format, timeStr); err == nil {					entry.Timestamp = t.Unix()					parsed = true					break				}			}			// If all parsing attempts fail, keep timestamp as 0			if !parsed {				// Debug: log parsing failure				// fmt.Printf("DEBUG: Failed to parse timestamp: '%s'\n", timeStr)			}		}	}	if pos < len(line) && line[pos] == ']' {		pos++	}	return pos}func (p *Parser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {	if pos >= len(line) || line[pos] != '"' {		return pos	}	pos++	start := pos	for pos < len(line) && line[pos] != '"' {		pos++	}	if pos > start {		requestLine := line[start:pos]		parts := bytes.Fields(requestLine)		if len(parts) >= 1 {			method := bytesToString(parts[0])			if ValidHTTPMethods[method] {				entry.Method = method			}		}		if len(parts) >= 2 {			entry.Path = bytesToString(parts[1])		}		if len(parts) >= 3 {			entry.Protocol = bytesToString(parts[2])		}	}	if pos < len(line) && line[pos] == '"' {		pos++	}	return pos}func (p *Parser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {	start := pos	for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {		pos++	}	if pos > start {		if status, err := fastParseInt(line[start:pos]); err == nil && status >= 100 && status < 600 {			entry.Status = status		}	}	return pos}func (p *Parser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {	start := pos	for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {		pos++	}	if pos > start {		sizeBytes := line[start:pos]		if len(sizeBytes) == 1 && sizeBytes[0] == '-' {			entry.BytesSent = 0		} else {			if size, err := fastParseInt(sizeBytes); err == nil && size >= 0 {				entry.BytesSent = int64(size)			}		}	}	return pos}func (p *Parser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {	if pos >= len(line) || line[pos] != '"' {		return pos	}	pos++	start := pos	for pos < len(line) && line[pos] != '"' {		pos++	}	if pos > start {		entry.Referer = bytesToString(line[start:pos])	}	if pos < len(line) && line[pos] == '"' {		pos++	}	return pos}func (p *Parser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {	if pos >= len(line) || line[pos] != '"' {		return pos	}	pos++	start := pos	for pos < len(line) && line[pos] != '"' {		pos++	}	if pos > start {		userAgent := bytesToString(line[start:pos])		entry.UserAgent = userAgent		if p.config.EnableUA && p.uaParser != nil && userAgent != "-" {			parsed := p.uaParser.Parse(userAgent)			if parsed.Browser != "Unknown" && parsed.Browser != "" {				entry.Browser = parsed.Browser				entry.BrowserVer = parsed.BrowserVer			}			if parsed.OS != "Unknown" && parsed.OS != "" {				entry.OS = parsed.OS				entry.OSVersion = parsed.OSVersion			}			if parsed.DeviceType != "" {				entry.DeviceType = parsed.DeviceType			}		}	}	if pos < len(line) && line[pos] == '"' {		pos++	}	return pos}func (p *Parser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {	start := pos	for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {		pos++	}	if pos > start {		timeStr := bytesToString(line[start:pos])		if timeStr != "-" {			if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {				entry.RequestTime = val			}		}	}	return pos}func (p *Parser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {	start := pos	for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {		pos++	}	if pos > start {		timeStr := bytesToString(line[start:pos])		if timeStr != "-" {			if val, err := strconv.ParseFloat(timeStr, 64); err == nil && val >= 0 {				entry.UpstreamTime = &val			}		}	}	return pos}// Utility methodsfunc (p *Parser) skipSpaces(line []byte, pos int) int {	for pos < len(line) && line[pos] == ' ' {		pos++	}	return pos}func (p *Parser) skipField(line []byte, pos int) int {	for pos < len(line) && line[pos] != ' ' {		pos++	}	return pos}func (p *Parser) generateEntryID(line string) string {	hash := md5.Sum([]byte(line))	return fmt.Sprintf("%x", hash)[:16]}func (p *Parser) updateStats(result *ParseResult) {	p.mu.Lock()	defer p.mu.Unlock()	p.stats.TotalLines += int64(result.Processed)	p.stats.SuccessLines += int64(result.Succeeded)	p.stats.ErrorLines += int64(result.Failed)	p.stats.ParseDuration += result.Duration	p.stats.LastUpdated = time.Now()	if result.Duration > 0 {		p.stats.LinesPerSecond = float64(result.Processed) / result.Duration.Seconds()	}}// GetStats returns current parsing statisticsfunc (p *Parser) GetStats() *ParseStats {	p.mu.RLock()	defer p.mu.RUnlock()	statsCopy := *p.stats	return &statsCopy}// ResetStats resets parsing statisticsfunc (p *Parser) ResetStats() {	p.mu.Lock()	defer p.mu.Unlock()	p.stats = &ParseStats{}}// Zero-copy string/byte conversion utilitiesfunc stringToBytes(s string) []byte {	return *(*[]byte)(unsafe.Pointer(&struct {		string		Cap int	}{s, len(s)}))}func bytesToString(b []byte) string {	return *(*string)(unsafe.Pointer(&b))}// Fast integer parsing without allocationsfunc fastParseInt(b []byte) (int, error) {	if len(b) == 0 {		return 0, strconv.ErrSyntax	}	neg := false	if b[0] == '-' {		neg = true		b = b[1:]		if len(b) == 0 {			return 0, strconv.ErrSyntax		}	}	n := 0	for _, c := range b {		if c < '0' || c > '9' {			return 0, strconv.ErrSyntax		}		n = n*10 + int(c-'0')	}	if neg {		n = -n	}	return n, nil}
 |