1
0

worker_scaling_benchmark_test.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "runtime"
  8. "testing"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
  12. )
  13. // BenchmarkWorkerScaling tests the actual production configuration performance
  14. func BenchmarkWorkerScaling(b *testing.B) {
  15. // Different worker configurations to test
  16. testConfigs := []struct {
  17. name string
  18. configModifier func(*indexer.Config)
  19. }{
  20. {
  21. name: "Default_Config",
  22. configModifier: func(c *indexer.Config) {
  23. // Use actual default configuration - no modifications
  24. },
  25. },
  26. {
  27. name: "Old_Conservative_2x",
  28. configModifier: func(c *indexer.Config) {
  29. c.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Old default
  30. },
  31. },
  32. {
  33. name: "New_Default_3x",
  34. configModifier: func(c *indexer.Config) {
  35. c.WorkerCount = runtime.GOMAXPROCS(0) * 3 // New default
  36. },
  37. },
  38. {
  39. name: "High_Throughput_4x",
  40. configModifier: func(c *indexer.Config) {
  41. c.WorkerCount = runtime.GOMAXPROCS(0) * 4 // High throughput mode
  42. },
  43. },
  44. {
  45. name: "Aggressive_6x",
  46. configModifier: func(c *indexer.Config) {
  47. c.WorkerCount = runtime.GOMAXPROCS(0) * 6 // Maximum adaptive scaling
  48. },
  49. },
  50. }
  51. recordCounts := []int{10000, 50000, 100000}
  52. for _, recordCount := range recordCounts {
  53. for _, tc := range testConfigs {
  54. benchName := fmt.Sprintf("Records_%d/%s", recordCount, tc.name)
  55. b.Run(benchName, func(b *testing.B) {
  56. benchmarkWorkerConfig(b, recordCount, tc.configModifier)
  57. })
  58. }
  59. }
  60. }
  61. func benchmarkWorkerConfig(b *testing.B, recordCount int, configModifier func(*indexer.Config)) {
  62. // Create temp directory
  63. tempDir := b.TempDir()
  64. // Generate test data once
  65. testLogFile := filepath.Join(tempDir, "access.log")
  66. if err := generateBenchmarkLogData(testLogFile, recordCount); err != nil {
  67. b.Fatalf("Failed to generate test data: %v", err)
  68. }
  69. // Get file size for metrics
  70. fileInfo, err := os.Stat(testLogFile)
  71. if err != nil {
  72. b.Fatalf("Failed to stat test file: %v", err)
  73. }
  74. fileSizeMB := float64(fileInfo.Size()) / (1024 * 1024)
  75. b.ResetTimer()
  76. b.ReportAllocs()
  77. for i := 0; i < b.N; i++ {
  78. // Create index directory for this iteration
  79. indexDir := filepath.Join(tempDir, fmt.Sprintf("index_%d", i))
  80. if err := os.MkdirAll(indexDir, 0755); err != nil {
  81. b.Fatalf("Failed to create index dir: %v", err)
  82. }
  83. // Use DEFAULT configuration and apply modifier
  84. config := indexer.DefaultIndexerConfig()
  85. config.IndexPath = indexDir
  86. config.EnableMetrics = false // Disable for cleaner benchmarking
  87. // Apply configuration modifier
  88. if configModifier != nil {
  89. configModifier(config)
  90. }
  91. // Run the actual benchmark
  92. result := runWorkerBenchmark(b, config, testLogFile, recordCount)
  93. // Report custom metrics
  94. throughput := float64(recordCount) / result.Duration.Seconds()
  95. mbPerSec := fileSizeMB / result.Duration.Seconds()
  96. b.ReportMetric(throughput, "records/sec")
  97. b.ReportMetric(mbPerSec, "MB/sec")
  98. b.ReportMetric(float64(config.WorkerCount), "workers")
  99. b.ReportMetric(float64(result.Parsed), "parsed")
  100. b.ReportMetric(float64(result.Indexed), "indexed")
  101. // Log configuration for verification
  102. if i == 0 {
  103. b.Logf("Config: Workers=%d, BatchSize=%d, Shards=%d",
  104. config.WorkerCount, config.BatchSize, config.ShardCount)
  105. }
  106. }
  107. }
  108. type WorkerBenchResult struct {
  109. Duration time.Duration
  110. Parsed int
  111. Indexed int
  112. }
  113. func runWorkerBenchmark(b *testing.B, config *indexer.Config, logFile string, expectedRecords int) *WorkerBenchResult {
  114. start := time.Now()
  115. // Create production components
  116. shardManager := indexer.NewGroupedShardManager(config)
  117. parallelIndexer := indexer.NewParallelIndexer(config, shardManager)
  118. ctx := context.Background()
  119. if err := parallelIndexer.Start(ctx); err != nil {
  120. b.Fatalf("Failed to start indexer: %v", err)
  121. }
  122. defer parallelIndexer.Stop()
  123. // Create parser with production configuration
  124. parserConfig := &parser.Config{
  125. MaxLineLength: 8 * 1024,
  126. WorkerCount: config.WorkerCount / 2, // Parser uses half of indexer workers
  127. BatchSize: 1000,
  128. }
  129. optimizedParser := parser.NewOptimizedParser(
  130. parserConfig,
  131. parser.NewSimpleUserAgentParser(),
  132. &MockGeoService{},
  133. )
  134. // Parse the log file
  135. file, err := os.Open(logFile)
  136. if err != nil {
  137. b.Fatalf("Failed to open log file: %v", err)
  138. }
  139. defer file.Close()
  140. parseResult, err := optimizedParser.OptimizedParseStream(ctx, file)
  141. if err != nil {
  142. b.Fatalf("Parsing failed: %v", err)
  143. }
  144. // Index documents (limit to avoid timeout)
  145. maxToIndex := minInt(len(parseResult.Entries), 5000)
  146. indexed := 0
  147. for i, entry := range parseResult.Entries[:maxToIndex] {
  148. doc := &indexer.Document{
  149. ID: fmt.Sprintf("doc_%d", i),
  150. Fields: &indexer.LogDocument{
  151. Timestamp: entry.Timestamp,
  152. IP: entry.IP,
  153. Method: entry.Method,
  154. Path: entry.Path,
  155. PathExact: entry.Path,
  156. Status: entry.Status,
  157. BytesSent: entry.BytesSent,
  158. UserAgent: entry.UserAgent,
  159. FilePath: logFile,
  160. MainLogPath: logFile,
  161. Raw: entry.Raw,
  162. },
  163. }
  164. if err := parallelIndexer.IndexDocument(ctx, doc); err == nil {
  165. indexed++
  166. }
  167. }
  168. // Flush
  169. parallelIndexer.FlushAll()
  170. duration := time.Since(start)
  171. return &WorkerBenchResult{
  172. Duration: duration,
  173. Parsed: parseResult.Processed,
  174. Indexed: indexed,
  175. }
  176. }
  177. func generateBenchmarkLogData(filename string, recordCount int) error {
  178. file, err := os.Create(filename)
  179. if err != nil {
  180. return err
  181. }
  182. defer file.Close()
  183. baseTime := time.Now().Unix() - 86400
  184. for i := 0; i < recordCount; i++ {
  185. timestamp := baseTime + int64(i%86400)
  186. ip := fmt.Sprintf("192.168.%d.%d", (i/256)%256, i%256)
  187. path := []string{"/", "/api/users", "/api/posts", "/health"}[i%4]
  188. status := []int{200, 200, 200, 404}[i%4]
  189. size := 1000 + i%5000
  190. logLine := fmt.Sprintf(
  191. `%s - - [%s] "GET %s HTTP/1.1" %d %d "-" "Mozilla/5.0" 0.%03d`,
  192. ip,
  193. time.Unix(timestamp, 0).Format("02/Jan/2006:15:04:05 -0700"),
  194. path,
  195. status,
  196. size,
  197. i%1000,
  198. )
  199. if _, err := fmt.Fprintln(file, logLine); err != nil {
  200. return err
  201. }
  202. }
  203. return nil
  204. }
  205. type MockGeoService struct{}
  206. func (m *MockGeoService) Search(ip string) (*parser.GeoLocation, error) {
  207. return &parser.GeoLocation{
  208. CountryCode: "US",
  209. RegionCode: "CA",
  210. Province: "California",
  211. City: "San Francisco",
  212. }, nil
  213. }
  214. func minInt(a, b int) int {
  215. if a < b {
  216. return a
  217. }
  218. return b
  219. }