1
0

production_scale_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package parser
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "strings"
  7. "testing"
  8. "time"
  9. )
  10. // TestProductionScaleValidation tests optimized parsers with 1M+ records
  11. func TestProductionScaleValidation(t *testing.T) {
  12. if testing.Short() {
  13. t.Skip("Skipping production scale test in short mode")
  14. }
  15. scales := []struct {
  16. name string
  17. records int
  18. }{
  19. {"Medium_100K", 100000},
  20. {"Large_500K", 500000},
  21. {"XLarge_1M", 1000000},
  22. {"Enterprise_2M", 2000000},
  23. }
  24. config := DefaultParserConfig()
  25. config.MaxLineLength = 16 * 1024
  26. config.WorkerCount = 12 // Utilize all CPU cores
  27. config.BatchSize = 2000 // Larger batches for high volume
  28. parser := NewOptimizedParser(
  29. config,
  30. NewCachedUserAgentParser(NewSimpleUserAgentParser(), 10000),
  31. &mockGeoIPService{},
  32. )
  33. for _, scale := range scales {
  34. t.Run(scale.name, func(t *testing.T) {
  35. t.Logf("🚀 Starting production scale test: %s (%d records)", scale.name, scale.records)
  36. // Generate realistic production data
  37. logData := generateProductionLogData(scale.records)
  38. t.Logf("📊 Generated %d bytes of test data", len(logData))
  39. // Test Original ParseStream (for comparison on smaller datasets only)
  40. if scale.records <= 100000 {
  41. t.Run("Original_ParseStream", func(t *testing.T) {
  42. startTime := time.Now()
  43. reader := strings.NewReader(logData)
  44. ctx := context.Background()
  45. result, err := parser.ParseStream(ctx, reader)
  46. duration := time.Since(startTime)
  47. if err != nil {
  48. t.Fatalf("Original ParseStream failed: %v", err)
  49. }
  50. t.Logf("✅ Original ParseStream: %d records in %v (%.0f records/sec)",
  51. result.Processed, duration, float64(result.Processed)/duration.Seconds())
  52. validateResults(t, result, scale.records)
  53. })
  54. }
  55. // Test Optimized ParseStream
  56. t.Run("Optimized_ParseStream", func(t *testing.T) {
  57. startTime := time.Now()
  58. reader := strings.NewReader(logData)
  59. ctx := context.Background()
  60. result, err := parser.OptimizedParseStream(ctx, reader)
  61. duration := time.Since(startTime)
  62. if err != nil {
  63. t.Fatalf("Optimized ParseStream failed: %v", err)
  64. }
  65. t.Logf("🚀 Optimized ParseStream: %d records in %v (%.0f records/sec)",
  66. result.Processed, duration, float64(result.Processed)/duration.Seconds())
  67. validateResults(t, result, scale.records)
  68. // Performance expectations
  69. recordsPerSec := float64(result.Processed) / duration.Seconds()
  70. if recordsPerSec < 1000 { // Expect at least 1K records/sec
  71. t.Errorf("Performance below expectation: %.0f records/sec < 1000", recordsPerSec)
  72. }
  73. })
  74. // Test Memory-Efficient ParseStream
  75. t.Run("MemoryEfficient_ParseStream", func(t *testing.T) {
  76. startTime := time.Now()
  77. reader := strings.NewReader(logData)
  78. ctx := context.Background()
  79. result, err := parser.MemoryEfficientParseStream(ctx, reader)
  80. duration := time.Since(startTime)
  81. if err != nil {
  82. t.Fatalf("Memory-Efficient ParseStream failed: %v", err)
  83. }
  84. t.Logf("💡 Memory-Efficient ParseStream: %d records in %v (%.0f records/sec)",
  85. result.Processed, duration, float64(result.Processed)/duration.Seconds())
  86. validateResults(t, result, scale.records)
  87. })
  88. // Test Chunked ParseStream with different chunk sizes
  89. chunkSizes := []int{32 * 1024, 64 * 1024, 128 * 1024}
  90. for _, chunkSize := range chunkSizes {
  91. t.Run(fmt.Sprintf("Chunked_ParseStream_%dKB", chunkSize/1024), func(t *testing.T) {
  92. startTime := time.Now()
  93. reader := strings.NewReader(logData)
  94. ctx := context.Background()
  95. result, err := parser.ChunkedParseStream(ctx, reader, chunkSize)
  96. duration := time.Since(startTime)
  97. if err != nil {
  98. t.Fatalf("Chunked ParseStream failed: %v", err)
  99. }
  100. t.Logf("📦 Chunked ParseStream (%dKB): %d records in %v (%.0f records/sec)",
  101. chunkSize/1024, result.Processed, duration, float64(result.Processed)/duration.Seconds())
  102. validateResults(t, result, scale.records)
  103. })
  104. }
  105. })
  106. }
  107. }
  108. // BenchmarkProductionScale benchmarks parsers at production scale
  109. func BenchmarkProductionScale(b *testing.B) {
  110. if testing.Short() {
  111. b.Skip("Skipping production scale benchmark in short mode")
  112. }
  113. // Test with 100K records (representative of high-volume production workload)
  114. logData := generateProductionLogData(100000)
  115. config := DefaultParserConfig()
  116. config.MaxLineLength = 16 * 1024
  117. config.WorkerCount = 12
  118. config.BatchSize = 2000
  119. parser := NewOptimizedParser(
  120. config,
  121. NewCachedUserAgentParser(NewSimpleUserAgentParser(), 10000),
  122. &mockGeoIPService{},
  123. )
  124. benchmarks := []struct {
  125. name string
  126. fn func(context.Context, *strings.Reader) (*ParseResult, error)
  127. }{
  128. {
  129. "Optimized_ParseStream_100K",
  130. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  131. return parser.OptimizedParseStream(ctx, reader)
  132. },
  133. },
  134. {
  135. "MemoryEfficient_ParseStream_100K",
  136. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  137. return parser.MemoryEfficientParseStream(ctx, reader)
  138. },
  139. },
  140. {
  141. "Chunked_ParseStream_100K_64KB",
  142. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  143. return parser.ChunkedParseStream(ctx, reader, 64*1024)
  144. },
  145. },
  146. }
  147. for _, bench := range benchmarks {
  148. b.Run(bench.name, func(b *testing.B) {
  149. b.ResetTimer()
  150. b.ReportAllocs()
  151. for i := 0; i < b.N; i++ {
  152. reader := strings.NewReader(logData)
  153. ctx := context.Background()
  154. result, err := bench.fn(ctx, reader)
  155. if err != nil {
  156. b.Fatalf("Benchmark failed: %v", err)
  157. }
  158. // Report detailed metrics
  159. b.ReportMetric(float64(result.Processed), "records_processed")
  160. b.ReportMetric(float64(result.Succeeded), "records_succeeded")
  161. b.ReportMetric(result.ErrorRate*100, "error_rate_%")
  162. if result.Duration > 0 {
  163. throughput := float64(result.Processed) / result.Duration.Seconds()
  164. b.ReportMetric(throughput, "records_per_sec")
  165. // Memory efficiency metric - use a reasonable estimate
  166. b.ReportMetric(float64(result.Processed)*100, "estimated_bytes_per_record")
  167. }
  168. }
  169. })
  170. }
  171. }
  172. // validateResults validates parsing results for correctness
  173. func validateResults(t *testing.T, result *ParseResult, expectedRecords int) {
  174. t.Helper()
  175. // Basic validation
  176. if result.Processed != expectedRecords {
  177. t.Errorf("Processed count mismatch: got %d, want %d", result.Processed, expectedRecords)
  178. }
  179. // Error rate should be reasonable (< 1%)
  180. if result.ErrorRate > 0.01 {
  181. t.Errorf("Error rate too high: %.2f%% > 1%%", result.ErrorRate*100)
  182. }
  183. // Should have successfully parsed most records
  184. expectedSucceeded := int(float64(expectedRecords) * 0.99) // Allow for 1% error rate
  185. if result.Succeeded < expectedSucceeded {
  186. t.Errorf("Success rate too low: got %d, expected at least %d", result.Succeeded, expectedSucceeded)
  187. }
  188. // Entries count should match succeeded count
  189. if len(result.Entries) != result.Succeeded {
  190. t.Errorf("Entries count mismatch: got %d entries, expected %d", len(result.Entries), result.Succeeded)
  191. }
  192. // Validate some sample entries
  193. if len(result.Entries) > 0 {
  194. firstEntry := result.Entries[0]
  195. if firstEntry.IP == "" {
  196. t.Error("First entry missing IP address")
  197. }
  198. if firstEntry.Status == 0 {
  199. t.Error("First entry missing status code")
  200. }
  201. // Validate a middle entry
  202. if len(result.Entries) > 100 {
  203. middleEntry := result.Entries[len(result.Entries)/2]
  204. if middleEntry.Method == "" {
  205. t.Error("Middle entry missing HTTP method")
  206. }
  207. }
  208. // Validate last entry
  209. lastEntry := result.Entries[len(result.Entries)-1]
  210. if lastEntry.IP == "" {
  211. t.Error("Last entry missing IP address")
  212. }
  213. }
  214. t.Logf("✅ Validation passed: %d processed, %d succeeded (%.2f%% success rate)",
  215. result.Processed, result.Succeeded, (1-result.ErrorRate)*100)
  216. }
  217. // generateProductionLogData generates realistic production-scale nginx log data
  218. func generateProductionLogData(records int) string {
  219. var builder strings.Builder
  220. // Pre-allocate for better performance (estimated 250 bytes per log line)
  221. builder.Grow(records * 250)
  222. // Realistic production patterns
  223. methods := []string{"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH"}
  224. _ = []string{"200", "201", "204", "301", "302", "304", "400", "401", "403", "404", "429", "500", "502", "503"} // statuses
  225. // Common paths in production
  226. paths := []string{
  227. "/", "/index.html", "/favicon.ico", "/robots.txt",
  228. "/api/v1/users", "/api/v1/auth", "/api/v1/data", "/api/v1/health",
  229. "/static/css/main.css", "/static/js/app.js", "/static/images/logo.png",
  230. "/admin", "/admin/dashboard", "/admin/users", "/admin/settings",
  231. "/docs", "/docs/api", "/docs/guide",
  232. "/search", "/search/results",
  233. "/upload", "/download", "/export",
  234. }
  235. // Realistic user agents
  236. userAgents := []string{
  237. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
  238. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
  239. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
  240. "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
  241. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
  242. "Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
  243. "Mozilla/5.0 (iPad; CPU OS 17_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
  244. "curl/8.4.0", "PostmanRuntime/7.35.0", "Go-http-client/1.1",
  245. "Googlebot/2.1 (+http://www.google.com/bot.html)",
  246. "Bingbot/2.0 (+http://www.bing.com/bingbot.htm)",
  247. }
  248. // Generate realistic IP addresses (simulate different networks)
  249. ipRanges := []string{
  250. "192.168.1", "192.168.0", "10.0.0", "10.0.1", "172.16.0", "172.16.1",
  251. "203.0.113", "198.51.100", "185.199.108", "140.82.114", "151.101.1",
  252. }
  253. referers := []string{
  254. "https://www.google.com/", "https://github.com/", "https://stackoverflow.com/",
  255. "https://www.linkedin.com/", "https://twitter.com/", "https://www.youtube.com/",
  256. "", "-", // Empty and dash referers are common
  257. }
  258. // Generate log entries
  259. baseTime := time.Date(2025, 9, 6, 10, 0, 0, 0, time.UTC)
  260. for i := 0; i < records; i++ {
  261. // Generate realistic IP
  262. ipRange := ipRanges[i%len(ipRanges)]
  263. ip := fmt.Sprintf("%s.%d", ipRange, (i%254)+1)
  264. // Generate timestamp (spread over 24 hours)
  265. timestamp := baseTime.Add(time.Duration(i) * time.Second / 100) // ~100 requests per second
  266. timeStr := timestamp.Format("02/Jan/2006:15:04:05 -0700")
  267. // Select method and path (weighted towards GET)
  268. var method string
  269. if i%10 < 7 { // 70% GET requests
  270. method = "GET"
  271. } else {
  272. method = methods[i%len(methods)]
  273. }
  274. path := paths[i%len(paths)]
  275. // Add query parameters for some requests
  276. if i%5 == 0 && method == "GET" {
  277. path += fmt.Sprintf("?page=%d&size=10", (i%100)+1)
  278. }
  279. // Status code (weighted towards successful responses)
  280. var status string
  281. switch {
  282. case i%100 < 85: // 85% success
  283. if method == "POST" || method == "PUT" {
  284. status = "201"
  285. } else {
  286. status = "200"
  287. }
  288. case i%100 < 90: // 5% redirects
  289. status = "302"
  290. case i%100 < 95: // 5% client errors
  291. status = "404"
  292. default: // 5% server errors
  293. status = "500"
  294. }
  295. // Response size (realistic distribution)
  296. var size int
  297. switch method {
  298. case "GET":
  299. size = 1000 + (i%50000) // 1KB to 50KB
  300. case "POST", "PUT":
  301. size = 100 + (i%1000) // Smaller responses for write operations
  302. case "HEAD":
  303. size = 0
  304. default:
  305. size = 500 + (i%5000)
  306. }
  307. // Select user agent and referer
  308. userAgent := userAgents[i%len(userAgents)]
  309. referer := referers[i%len(referers)]
  310. // Add request time for some entries
  311. requestTime := ""
  312. if i%3 == 0 {
  313. reqTime := float64((i%1000)+1) / 1000.0 // 1ms to 1s
  314. requestTime = fmt.Sprintf(" %.3f", reqTime)
  315. }
  316. // Build log entry
  317. logLine := fmt.Sprintf(`%s - - [%s] "%s %s HTTP/1.1" %s %d "%s" "%s"%s`,
  318. ip, timeStr, method, path, status, size, referer, userAgent, requestTime)
  319. builder.WriteString(logLine)
  320. if i < records-1 {
  321. builder.WriteString("\n")
  322. }
  323. }
  324. return builder.String()
  325. }
  326. // TestMemoryUsageValidation tests memory usage patterns of optimized parsers
  327. func TestMemoryUsageValidation(t *testing.T) {
  328. if testing.Short() {
  329. t.Skip("Skipping memory usage test in short mode")
  330. }
  331. // Test with moderate dataset to observe memory patterns
  332. logData := generateProductionLogData(50000)
  333. config := DefaultParserConfig()
  334. config.MaxLineLength = 16 * 1024
  335. config.WorkerCount = 4
  336. config.BatchSize = 1000
  337. parser := NewOptimizedParser(
  338. config,
  339. NewCachedUserAgentParser(NewSimpleUserAgentParser(), 1000),
  340. &mockGeoIPService{},
  341. )
  342. implementations := []struct {
  343. name string
  344. fn func(context.Context, *strings.Reader) (*ParseResult, error)
  345. }{
  346. {
  347. "Optimized_ParseStream",
  348. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  349. return parser.OptimizedParseStream(ctx, reader)
  350. },
  351. },
  352. {
  353. "MemoryEfficient_ParseStream",
  354. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  355. return parser.MemoryEfficientParseStream(ctx, reader)
  356. },
  357. },
  358. }
  359. for _, impl := range implementations {
  360. t.Run(impl.name+"_MemoryUsage", func(t *testing.T) {
  361. // Force GC before measurement
  362. runtime.GC()
  363. var memBefore runtime.MemStats
  364. runtime.ReadMemStats(&memBefore)
  365. // Run parsing
  366. reader := strings.NewReader(logData)
  367. ctx := context.Background()
  368. result, err := impl.fn(ctx, reader)
  369. if err != nil {
  370. t.Fatalf("%s failed: %v", impl.name, err)
  371. }
  372. // Force GC after parsing
  373. runtime.GC()
  374. var memAfter runtime.MemStats
  375. runtime.ReadMemStats(&memAfter)
  376. // Calculate memory usage
  377. memUsed := memAfter.TotalAlloc - memBefore.TotalAlloc
  378. memPerRecord := float64(memUsed) / float64(result.Processed)
  379. t.Logf("📊 %s Memory Usage:", impl.name)
  380. t.Logf(" Total Memory Used: %d bytes", memUsed)
  381. t.Logf(" Memory per Record: %.2f bytes", memPerRecord)
  382. t.Logf(" Records Processed: %d", result.Processed)
  383. t.Logf(" Peak Memory: %d bytes", memAfter.Sys)
  384. // Memory usage should be reasonable (< 1KB per record)
  385. if memPerRecord > 1024 {
  386. t.Errorf("Memory usage too high: %.2f bytes per record > 1024", memPerRecord)
  387. }
  388. validateResults(t, result, 50000)
  389. })
  390. }
  391. }