123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- package nginx_log
- import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "runtime"
- "testing"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
- "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
- )
- // BenchmarkWorkerScaling tests the actual production configuration performance
- func BenchmarkWorkerScaling(b *testing.B) {
- // Different worker configurations to test
- testConfigs := []struct {
- name string
- configModifier func(*indexer.Config)
- }{
- {
- name: "Default_Config",
- configModifier: func(c *indexer.Config) {
- // Use actual default configuration - no modifications
- },
- },
- {
- name: "Old_Conservative_2x",
- configModifier: func(c *indexer.Config) {
- c.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Old default
- },
- },
- {
- name: "New_Default_3x",
- configModifier: func(c *indexer.Config) {
- c.WorkerCount = runtime.GOMAXPROCS(0) * 3 // New default
- },
- },
- {
- name: "High_Throughput_4x",
- configModifier: func(c *indexer.Config) {
- c.WorkerCount = runtime.GOMAXPROCS(0) * 4 // High throughput mode
- },
- },
- {
- name: "Aggressive_6x",
- configModifier: func(c *indexer.Config) {
- c.WorkerCount = runtime.GOMAXPROCS(0) * 6 // Maximum adaptive scaling
- },
- },
- }
- recordCounts := []int{10000, 50000, 100000}
- for _, recordCount := range recordCounts {
- for _, tc := range testConfigs {
- benchName := fmt.Sprintf("Records_%d/%s", recordCount, tc.name)
- b.Run(benchName, func(b *testing.B) {
- benchmarkWorkerConfig(b, recordCount, tc.configModifier)
- })
- }
- }
- }
- func benchmarkWorkerConfig(b *testing.B, recordCount int, configModifier func(*indexer.Config)) {
- // Create temp directory
- tempDir := b.TempDir()
- // Generate test data once
- testLogFile := filepath.Join(tempDir, "access.log")
- if err := generateBenchmarkLogData(testLogFile, recordCount); err != nil {
- b.Fatalf("Failed to generate test data: %v", err)
- }
- // Get file size for metrics
- fileInfo, err := os.Stat(testLogFile)
- if err != nil {
- b.Fatalf("Failed to stat test file: %v", err)
- }
- fileSizeMB := float64(fileInfo.Size()) / (1024 * 1024)
- b.ResetTimer()
- b.ReportAllocs()
- for i := 0; i < b.N; i++ {
- // Create index directory for this iteration
- indexDir := filepath.Join(tempDir, fmt.Sprintf("index_%d", i))
- if err := os.MkdirAll(indexDir, 0755); err != nil {
- b.Fatalf("Failed to create index dir: %v", err)
- }
- // Use DEFAULT configuration and apply modifier
- config := indexer.DefaultIndexerConfig()
- config.IndexPath = indexDir
- config.EnableMetrics = false // Disable for cleaner benchmarking
- // Apply configuration modifier
- if configModifier != nil {
- configModifier(config)
- }
- // Run the actual benchmark
- result := runWorkerBenchmark(b, config, testLogFile, recordCount)
- // Report custom metrics
- throughput := float64(recordCount) / result.Duration.Seconds()
- mbPerSec := fileSizeMB / result.Duration.Seconds()
- b.ReportMetric(throughput, "records/sec")
- b.ReportMetric(mbPerSec, "MB/sec")
- b.ReportMetric(float64(config.WorkerCount), "workers")
- b.ReportMetric(float64(result.Parsed), "parsed")
- b.ReportMetric(float64(result.Indexed), "indexed")
- // Log configuration for verification
- if i == 0 {
- b.Logf("Config: Workers=%d, BatchSize=%d, Shards=%d",
- config.WorkerCount, config.BatchSize, config.ShardCount)
- }
- }
- }
- type WorkerBenchResult struct {
- Duration time.Duration
- Parsed int
- Indexed int
- }
- func runWorkerBenchmark(b *testing.B, config *indexer.Config, logFile string, expectedRecords int) *WorkerBenchResult {
- start := time.Now()
- // Create production components
- shardManager := indexer.NewGroupedShardManager(config)
- parallelIndexer := indexer.NewParallelIndexer(config, shardManager)
- ctx := context.Background()
- if err := parallelIndexer.Start(ctx); err != nil {
- b.Fatalf("Failed to start indexer: %v", err)
- }
- defer parallelIndexer.Stop()
- // Create parser with production configuration
- parserConfig := &parser.Config{
- MaxLineLength: 8 * 1024,
- WorkerCount: config.WorkerCount / 2, // Parser uses half of indexer workers
- BatchSize: 1000,
- }
- optimizedParser := parser.NewOptimizedParser(
- parserConfig,
- parser.NewSimpleUserAgentParser(),
- &MockGeoService{},
- )
- // Parse the log file
- file, err := os.Open(logFile)
- if err != nil {
- b.Fatalf("Failed to open log file: %v", err)
- }
- defer file.Close()
- parseResult, err := optimizedParser.OptimizedParseStream(ctx, file)
- if err != nil {
- b.Fatalf("Parsing failed: %v", err)
- }
- // Index documents (limit to avoid timeout)
- maxToIndex := minInt(len(parseResult.Entries), 5000)
- indexed := 0
- for i, entry := range parseResult.Entries[:maxToIndex] {
- doc := &indexer.Document{
- ID: fmt.Sprintf("doc_%d", i),
- Fields: &indexer.LogDocument{
- Timestamp: entry.Timestamp,
- IP: entry.IP,
- Method: entry.Method,
- Path: entry.Path,
- PathExact: entry.Path,
- Status: entry.Status,
- BytesSent: entry.BytesSent,
- UserAgent: entry.UserAgent,
- FilePath: logFile,
- MainLogPath: logFile,
- Raw: entry.Raw,
- },
- }
- if err := parallelIndexer.IndexDocument(ctx, doc); err == nil {
- indexed++
- }
- }
- // Flush
- parallelIndexer.FlushAll()
- duration := time.Since(start)
- return &WorkerBenchResult{
- Duration: duration,
- Parsed: parseResult.Processed,
- Indexed: indexed,
- }
- }
- func generateBenchmarkLogData(filename string, recordCount int) error {
- file, err := os.Create(filename)
- if err != nil {
- return err
- }
- defer file.Close()
- baseTime := time.Now().Unix() - 86400
- for i := 0; i < recordCount; i++ {
- timestamp := baseTime + int64(i%86400)
- ip := fmt.Sprintf("192.168.%d.%d", (i/256)%256, i%256)
- path := []string{"/", "/api/users", "/api/posts", "/health"}[i%4]
- status := []int{200, 200, 200, 404}[i%4]
- size := 1000 + i%5000
- logLine := fmt.Sprintf(
- `%s - - [%s] "GET %s HTTP/1.1" %d %d "-" "Mozilla/5.0" 0.%03d`,
- ip,
- time.Unix(timestamp, 0).Format("02/Jan/2006:15:04:05 -0700"),
- path,
- status,
- size,
- i%1000,
- )
- if _, err := fmt.Fprintln(file, logLine); err != nil {
- return err
- }
- }
- return nil
- }
- type MockGeoService struct{}
- func (m *MockGeoService) Search(ip string) (*parser.GeoLocation, error) {
- return &parser.GeoLocation{
- CountryCode: "US",
- RegionCode: "CA",
- Province: "California",
- City: "San Francisco",
- }, nil
- }
- func minInt(a, b int) int {
- if a < b {
- return a
- }
- return b
- }
|