|
@@ -4,8 +4,10 @@ import (
|
|
|
"bufio"
|
|
|
"compress/gzip"
|
|
|
"context"
|
|
|
+ "errors"
|
|
|
"io"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
|
|
|
"github.com/0xJacky/Nginx-UI/internal/geolite"
|
|
|
"github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
|
|
@@ -15,35 +17,44 @@ import (
|
|
|
|
|
|
// Global parser instances
|
|
|
var (
|
|
|
- logParser *parser.OptimizedParser // Use the concrete type for both regular and single-line parsing
|
|
|
+ logParser *parser.OptimizedParser // Use the concrete type for both regular and single-line parsing
|
|
|
+ parserInitOnce sync.Once
|
|
|
)
|
|
|
|
|
|
-func init() {
|
|
|
- // Initialize the parser with production-ready configuration
|
|
|
- config := parser.DefaultParserConfig()
|
|
|
- config.MaxLineLength = 16 * 1024 // 16KB for large log lines
|
|
|
- config.BatchSize = 15000 // Maximum batch size for highest frontend throughput
|
|
|
- config.WorkerCount = 24 // Match CPU core count for high-throughput
|
|
|
- // Note: Caching is handled by the CachedUserAgentParser
|
|
|
-
|
|
|
- // Initialize user agent parser with caching (10,000 cache size for production)
|
|
|
- uaParser := parser.NewCachedUserAgentParser(
|
|
|
- parser.NewSimpleUserAgentParser(),
|
|
|
- 10000, // Large cache for production workloads
|
|
|
- )
|
|
|
-
|
|
|
- var geoIPService parser.GeoIPService
|
|
|
- geoService, err := geolite.GetService()
|
|
|
- if err != nil {
|
|
|
- logger.Warnf("Failed to initialize GeoIP service, geo-enrichment will be disabled: %v", err)
|
|
|
- } else {
|
|
|
- geoIPService = parser.NewGeoLiteAdapter(geoService)
|
|
|
- }
|
|
|
+// InitLogParser initializes the global optimized parser once (singleton).
|
|
|
+func InitLogParser() {
|
|
|
+ parserInitOnce.Do(func() {
|
|
|
+ // Initialize the parser with production-ready configuration
|
|
|
+ config := parser.DefaultParserConfig()
|
|
|
+ config.MaxLineLength = 16 * 1024 // 16KB for large log lines
|
|
|
+ config.BatchSize = 15000 // Maximum batch size for highest frontend throughput
|
|
|
+ config.WorkerCount = 24 // Match CPU core count for high-throughput
|
|
|
+ // Note: Caching is handled by the CachedUserAgentParser
|
|
|
+
|
|
|
+ // Initialize user agent parser with caching (10,000 cache size for production)
|
|
|
+ uaParser := parser.NewCachedUserAgentParser(
|
|
|
+ parser.NewSimpleUserAgentParser(),
|
|
|
+ 10000, // Large cache for production workloads
|
|
|
+ )
|
|
|
+
|
|
|
+ var geoIPService parser.GeoIPService
|
|
|
+ geoService, err := geolite.GetService()
|
|
|
+ if err != nil {
|
|
|
+ logger.Warnf("Failed to initialize GeoIP service, geo-enrichment will be disabled: %v", err)
|
|
|
+ } else {
|
|
|
+ geoIPService = parser.NewGeoLiteAdapter(geoService)
|
|
|
+ }
|
|
|
|
|
|
- // Create the optimized parser with production configuration
|
|
|
- logParser = parser.NewOptimizedParser(config, uaParser, geoIPService)
|
|
|
+ // Create the optimized parser with production configuration
|
|
|
+ logParser = parser.NewOptimizedParser(config, uaParser, geoIPService)
|
|
|
|
|
|
- logger.Info("Nginx log processing optimization system initialized with production configuration")
|
|
|
+ logger.Info("Nginx log processing optimization system initialized with production configuration")
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+// IsLogParserInitialized returns true if the global parser singleton has been created.
|
|
|
+func IsLogParserInitialized() bool {
|
|
|
+ return logParser != nil
|
|
|
}
|
|
|
|
|
|
// ParseLogLine parses a raw log line into a structured LogDocument using optimized parsing
|
|
@@ -52,6 +63,10 @@ func ParseLogLine(line string) (*LogDocument, error) {
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
+ if logParser == nil {
|
|
|
+ return nil, errors.New("log parser is not initialized; call indexer.InitLogParser() before use")
|
|
|
+ }
|
|
|
+
|
|
|
// Use optimized parser for single line processing
|
|
|
entry, err := logParser.ParseLine(line)
|
|
|
if err != nil {
|
|
@@ -63,6 +78,9 @@ func ParseLogLine(line string) (*LogDocument, error) {
|
|
|
|
|
|
// ParseLogStream parses a stream of log data using OptimizedParseStream (7-8x faster)
|
|
|
func ParseLogStream(ctx context.Context, reader io.Reader, filePath string) ([]*LogDocument, error) {
|
|
|
+ if logParser == nil {
|
|
|
+ return nil, errors.New("log parser is not initialized; call indexer.InitLogParser() before use")
|
|
|
+ }
|
|
|
// Auto-detect and handle gzip files
|
|
|
actualReader, cleanup, err := createReaderForFile(reader, filePath)
|
|
|
if err != nil {
|
|
@@ -94,6 +112,9 @@ func ParseLogStream(ctx context.Context, reader io.Reader, filePath string) ([]*
|
|
|
|
|
|
// ParseLogStreamChunked processes large files using chunked processing for memory efficiency
|
|
|
func ParseLogStreamChunked(ctx context.Context, reader io.Reader, filePath string, chunkSize int) ([]*LogDocument, error) {
|
|
|
+ if logParser == nil {
|
|
|
+ return nil, errors.New("log parser is not initialized; call indexer.InitLogParser() before use")
|
|
|
+ }
|
|
|
// Auto-detect and handle gzip files
|
|
|
actualReader, cleanup, err := createReaderForFile(reader, filePath)
|
|
|
if err != nil {
|
|
@@ -121,6 +142,9 @@ func ParseLogStreamChunked(ctx context.Context, reader io.Reader, filePath strin
|
|
|
|
|
|
// ParseLogStreamMemoryEfficient uses memory-efficient parsing for low memory environments
|
|
|
func ParseLogStreamMemoryEfficient(ctx context.Context, reader io.Reader, filePath string) ([]*LogDocument, error) {
|
|
|
+ if logParser == nil {
|
|
|
+ return nil, errors.New("log parser is not initialized; call indexer.InitLogParser() before use")
|
|
|
+ }
|
|
|
// Auto-detect and handle gzip files
|
|
|
actualReader, cleanup, err := createReaderForFile(reader, filePath)
|
|
|
if err != nil {
|