production_throughput_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
  12. )
  13. // ProductionThroughputTest tests the complete production pipeline
  14. // including data generation, indexing, GeoIP, User-Agent parsing, etc.
  15. func TestProductionThroughputEndToEnd(t *testing.T) {
  16. if testing.Short() {
  17. t.Skip("Skipping production throughput test in short mode")
  18. }
  19. scales := []struct {
  20. name string
  21. records int
  22. }{
  23. {"Small_50K", 50000},
  24. {"Medium_100K", 100000},
  25. {"Large_200K", 200000},
  26. {"XLarge_500K", 500000},
  27. }
  28. for _, scale := range scales {
  29. t.Run(scale.name, func(t *testing.T) {
  30. runCompleteProductionTest(t, scale.records)
  31. })
  32. }
  33. }
  34. func runCompleteProductionTest(t *testing.T, recordCount int) {
  35. t.Logf("🚀 Starting COMPLETE production test with %d records", recordCount)
  36. // Step 1: Create temporary directory
  37. tempDir, err := os.MkdirTemp("", "nginx_ui_production_test_")
  38. if err != nil {
  39. t.Fatalf("Failed to create temp dir: %v", err)
  40. }
  41. defer os.RemoveAll(tempDir)
  42. // Step 2: Generate realistic test data
  43. testLogFile := filepath.Join(tempDir, "access.log")
  44. dataGenStart := time.Now()
  45. if err := generateRealisticLogFile(testLogFile, recordCount); err != nil {
  46. t.Fatalf("Failed to generate test data: %v", err)
  47. }
  48. dataGenTime := time.Since(dataGenStart)
  49. t.Logf("📊 Generated %d records in %v", recordCount, dataGenTime)
  50. // Step 3: Set up complete production environment
  51. setupStart := time.Now()
  52. // Create index directory
  53. indexDir := filepath.Join(tempDir, "index")
  54. if err := os.MkdirAll(indexDir, 0755); err != nil {
  55. t.Fatalf("Failed to create index dir: %v", err)
  56. }
  57. // Initialize production-grade configuration
  58. config := indexer.DefaultIndexerConfig()
  59. config.IndexPath = indexDir
  60. config.WorkerCount = 24 // Use all available cores
  61. config.BatchSize = 2000 // Production batch size
  62. config.EnableMetrics = true
  63. // Create production services
  64. geoIPService := &mockProductionGeoIPService{}
  65. userAgentParser := parser.NewCachedUserAgentParser(
  66. parser.NewSimpleUserAgentParser(),
  67. 10000, // Large cache for production
  68. )
  69. optimizedParser := parser.NewOptimizedParser(
  70. &parser.Config{
  71. MaxLineLength: 16 * 1024,
  72. WorkerCount: 12,
  73. BatchSize: 1500,
  74. },
  75. userAgentParser,
  76. geoIPService,
  77. )
  78. // Create shard manager
  79. shardManager := indexer.NewGroupedShardManager(config)
  80. // Initialize indexer with all production components
  81. parallelIndexer := indexer.NewParallelIndexer(config, shardManager)
  82. ctx := context.Background()
  83. if err := parallelIndexer.Start(ctx); err != nil {
  84. t.Fatalf("Failed to start indexer: %v", err)
  85. }
  86. defer parallelIndexer.Stop()
  87. setupTime := time.Since(setupStart)
  88. t.Logf("⚙️ Production environment setup completed in %v", setupTime)
  89. // Step 4: Execute complete production rebuild (same as real rebuild)
  90. rebuildStart := time.Now()
  91. t.Logf("🔄 Starting COMPLETE production rebuild with full indexing pipeline")
  92. // This simulates the exact same process as production rebuild
  93. result, err := executeProductionRebuild(ctx, parallelIndexer, optimizedParser, testLogFile)
  94. if err != nil {
  95. t.Fatalf("Production rebuild failed: %v", err)
  96. }
  97. rebuildTime := time.Since(rebuildStart)
  98. // Step 5: Calculate and report realistic metrics
  99. recordsPerSecond := float64(recordCount) / rebuildTime.Seconds()
  100. t.Logf("🏆 === PRODUCTION THROUGHPUT RESULTS ===")
  101. t.Logf("📈 Total Records: %d", recordCount)
  102. t.Logf("⏱️ Total Time: %v", rebuildTime)
  103. t.Logf("🚀 Throughput: %.2f records/second", recordsPerSecond)
  104. t.Logf("✅ Success Rate: %.2f%% (%d/%d)", result.SuccessRate*100, result.Succeeded, result.Processed)
  105. t.Logf("📊 Index Size: %d documents", result.IndexedDocuments)
  106. t.Logf("🔧 Configuration: Workers=%d, BatchSize=%d", config.WorkerCount, config.BatchSize)
  107. // Performance validation
  108. if result.SuccessRate < 0.99 {
  109. t.Errorf("Success rate too low: %.2f%% (expected >99%%)", result.SuccessRate*100)
  110. }
  111. if recordsPerSecond < 1000 {
  112. t.Logf("⚠️ Warning: Throughput below 1000 records/sec: %.2f", recordsPerSecond)
  113. }
  114. // Log memory usage
  115. stats := parallelIndexer.GetStats()
  116. if stats != nil {
  117. t.Logf("💾 Memory Usage: %d MB", stats.MemoryUsage/(1024*1024))
  118. t.Logf("🔄 Queue Size: %d", stats.QueueSize)
  119. }
  120. }
  121. type ProductionResult struct {
  122. Processed int
  123. Succeeded int
  124. Failed int
  125. SuccessRate float64
  126. IndexedDocuments int
  127. Duration time.Duration
  128. }
  129. func executeProductionRebuild(ctx context.Context, indexerInstance *indexer.ParallelIndexer, parser *parser.OptimizedParser, logFile string) (*ProductionResult, error) {
  130. // Open log file
  131. file, err := os.Open(logFile)
  132. if err != nil {
  133. return nil, fmt.Errorf("failed to open log file: %w", err)
  134. }
  135. defer file.Close()
  136. // Execute the same parsing and indexing as production rebuild
  137. startTime := time.Now()
  138. // Use optimized parse stream (same as production)
  139. parseResult, err := parser.OptimizedParseStream(ctx, file)
  140. if err != nil {
  141. return nil, fmt.Errorf("parsing failed: %w", err)
  142. }
  143. // Index all parsed documents (same as production)
  144. var totalIndexed int
  145. for _, entry := range parseResult.Entries {
  146. doc := &indexer.Document{
  147. ID: fmt.Sprintf("doc_%d", totalIndexed),
  148. Fields: &indexer.LogDocument{
  149. Timestamp: entry.Timestamp,
  150. IP: entry.IP,
  151. Method: entry.Method,
  152. Path: entry.Path,
  153. PathExact: entry.Path,
  154. Status: entry.Status,
  155. BytesSent: entry.BytesSent,
  156. Referer: entry.Referer,
  157. UserAgent: entry.UserAgent,
  158. Browser: entry.Browser,
  159. BrowserVer: entry.BrowserVer,
  160. OS: entry.OS,
  161. OSVersion: entry.OSVersion,
  162. DeviceType: entry.DeviceType,
  163. RequestTime: entry.RequestTime,
  164. UpstreamTime: entry.UpstreamTime,
  165. FilePath: logFile,
  166. MainLogPath: logFile,
  167. Raw: entry.Raw,
  168. },
  169. }
  170. // Index document (same as production indexing)
  171. err := indexerInstance.IndexDocument(ctx, doc)
  172. if err != nil {
  173. continue // Count as failed but continue processing
  174. }
  175. totalIndexed++
  176. }
  177. // Flush all pending operations (same as production)
  178. if err := indexerInstance.FlushAll(); err != nil {
  179. return nil, fmt.Errorf("failed to flush: %w", err)
  180. }
  181. duration := time.Since(startTime)
  182. return &ProductionResult{
  183. Processed: parseResult.Processed,
  184. Succeeded: parseResult.Succeeded,
  185. Failed: parseResult.Failed,
  186. SuccessRate: float64(parseResult.Succeeded) / float64(parseResult.Processed),
  187. IndexedDocuments: totalIndexed,
  188. Duration: duration,
  189. }, nil
  190. }
  191. func generateRealisticLogFile(filename string, recordCount int) error {
  192. file, err := os.Create(filename)
  193. if err != nil {
  194. return err
  195. }
  196. defer file.Close()
  197. // Realistic log patterns
  198. ips := []string{
  199. "192.168.1.1", "10.0.0.1", "172.16.0.1", "203.0.113.1",
  200. "198.51.100.1", "192.0.2.1", "203.0.113.195", "198.51.100.178",
  201. }
  202. methods := []string{"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"}
  203. paths := []string{
  204. "/", "/api/users", "/api/posts", "/api/auth/login", "/api/auth/logout",
  205. "/static/css/style.css", "/static/js/app.js", "/images/logo.png",
  206. "/admin/dashboard", "/user/profile", "/search?q=test", "/api/v1/data",
  207. }
  208. userAgents := []string{
  209. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
  210. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
  211. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
  212. "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
  213. }
  214. statuses := []int{200, 200, 200, 200, 301, 302, 404, 500} // Weighted towards 200
  215. // use default global rng; no need to reseed each call in Go 1.20+
  216. baseTime := time.Now().Unix() - 86400 // 24 hours ago
  217. for i := 0; i < recordCount; i++ {
  218. timestamp := baseTime + int64(i)
  219. ip := ips[rand.Intn(len(ips))]
  220. method := methods[rand.Intn(len(methods))]
  221. path := paths[rand.Intn(len(paths))]
  222. status := statuses[rand.Intn(len(statuses))]
  223. size := rand.Intn(10000) + 100
  224. userAgent := userAgents[rand.Intn(len(userAgents))]
  225. referer := "-"
  226. if rand.Float32() < 0.3 {
  227. referer = "https://example.com/referrer"
  228. }
  229. requestTime := rand.Float64() * 2.0 // 0-2 seconds
  230. // Standard nginx log format
  231. logLine := fmt.Sprintf(
  232. `%s - - [%s] "%s %s HTTP/1.1" %d %d "%s" "%s" %.3f`,
  233. ip,
  234. time.Unix(timestamp, 0).Format("02/Jan/2006:15:04:05 -0700"),
  235. method,
  236. path,
  237. status,
  238. size,
  239. referer,
  240. userAgent,
  241. requestTime,
  242. )
  243. if _, err := fmt.Fprintln(file, logLine); err != nil {
  244. return err
  245. }
  246. }
  247. return nil
  248. }
  249. // Mock services for testing
  250. type mockProductionGeoIPService struct{}
  251. func (m *mockProductionGeoIPService) Search(ip string) (*parser.GeoLocation, error) {
  252. // Mock geographic data
  253. regions := []string{"US", "CN", "JP", "DE", "GB"}
  254. provinces := []string{"California", "Beijing", "Tokyo", "Berlin", "London"}
  255. cities := []string{"San Francisco", "Beijing", "Tokyo", "Berlin", "London"}
  256. // use global rng defaults; no explicit rand.Seed needed in Go 1.20+
  257. idx := rand.Intn(len(regions))
  258. return &parser.GeoLocation{
  259. CountryCode: regions[idx],
  260. RegionCode: regions[idx],
  261. Province: provinces[idx],
  262. City: cities[idx],
  263. }, nil
  264. }