stream_optimizations_test.go 12 KB


  1. package parser
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "testing"
  7. )
  8. // BenchmarkParseStreamComparison compares different ParseStream implementations
  9. func BenchmarkParseStreamComparison(b *testing.B) {
  10. // Generate test data - 1000 lines of realistic log data
  11. logData := generateBenchmarkLogData(1000)
  12. config := DefaultParserConfig()
  13. config.MaxLineLength = 16 * 1024
  14. parser := NewOptimizedParser(
  15. config,
  16. NewCachedUserAgentParser(NewSimpleUserAgentParser(), 100),
  17. &mockGeoIPService{},
  18. )
  19. benchmarks := []struct {
  20. name string
  21. fn func(context.Context, *strings.Reader) (*ParseResult, error)
  22. }{
  23. {
  24. name: "Original_ParseStream",
  25. fn: func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  26. return parser.ParseStream(ctx, reader)
  27. },
  28. },
  29. {
  30. name: "Optimized_ParseStream",
  31. fn: func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  32. return parser.OptimizedParseStream(ctx, reader)
  33. },
  34. },
  35. {
  36. name: "Chunked_ParseStream",
  37. fn: func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  38. return parser.ChunkedParseStream(ctx, reader, 32*1024)
  39. },
  40. },
  41. {
  42. name: "MemoryEfficient_ParseStream",
  43. fn: func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  44. return parser.MemoryEfficientParseStream(ctx, reader)
  45. },
  46. },
  47. }
  48. for _, bench := range benchmarks {
  49. b.Run(bench.name, func(b *testing.B) {
  50. b.ResetTimer()
  51. b.ReportAllocs()
  52. for i := 0; i < b.N; i++ {
  53. reader := strings.NewReader(logData)
  54. ctx := context.Background()
  55. result, err := bench.fn(ctx, reader)
  56. if err != nil {
  57. b.Fatalf("Parse error: %v", err)
  58. }
  59. if result.Processed == 0 {
  60. b.Fatal("No lines processed")
  61. }
  62. // Report custom metrics
  63. b.ReportMetric(float64(result.Processed), "lines_processed")
  64. b.ReportMetric(float64(result.Succeeded), "lines_succeeded")
  65. b.ReportMetric(result.ErrorRate*100, "error_rate_%")
  66. if result.Duration > 0 {
  67. throughput := float64(result.Processed) / result.Duration.Seconds()
  68. b.ReportMetric(throughput, "lines_per_sec")
  69. }
  70. }
  71. })
  72. }
  73. }
  74. // BenchmarkStreamParsing_ScaleTest tests parsing performance at different scales
  75. func BenchmarkStreamParsing_ScaleTest(b *testing.B) {
  76. scales := []struct {
  77. name string
  78. lines int
  79. }{
  80. {"Small_100", 100},
  81. {"Medium_1K", 1000},
  82. {"Large_10K", 10000},
  83. {"XLarge_50K", 50000},
  84. }
  85. config := DefaultParserConfig()
  86. config.MaxLineLength = 16 * 1024
  87. parser := NewOptimizedParser(
  88. config,
  89. NewCachedUserAgentParser(NewSimpleUserAgentParser(), 1000),
  90. &mockGeoIPService{},
  91. )
  92. for _, scale := range scales {
  93. logData := generateBenchmarkLogData(scale.lines)
  94. b.Run("Original_"+scale.name, func(b *testing.B) {
  95. b.ResetTimer()
  96. b.ReportAllocs()
  97. for i := 0; i < b.N; i++ {
  98. reader := strings.NewReader(logData)
  99. ctx := context.Background()
  100. result, err := parser.ParseStream(ctx, reader)
  101. if err != nil {
  102. b.Fatal(err)
  103. }
  104. b.ReportMetric(float64(result.Processed), "lines")
  105. }
  106. })
  107. b.Run("Optimized_"+scale.name, func(b *testing.B) {
  108. b.ResetTimer()
  109. b.ReportAllocs()
  110. for i := 0; i < b.N; i++ {
  111. reader := strings.NewReader(logData)
  112. ctx := context.Background()
  113. result, err := parser.OptimizedParseStream(ctx, reader)
  114. if err != nil {
  115. b.Fatal(err)
  116. }
  117. b.ReportMetric(float64(result.Processed), "lines")
  118. }
  119. })
  120. }
  121. }
  122. // BenchmarkStreamOptimizations_Individual tests individual optimization components
  123. func BenchmarkStreamOptimizations_Individual(b *testing.B) {
  124. _ = generateBenchmarkLogData(5000) // Avoid unused warning
  125. config := DefaultParserConfig()
  126. config.MaxLineLength = 16 * 1024
  127. _ = NewOptimizedParser(
  128. config,
  129. NewSimpleUserAgentParser(),
  130. &mockGeoIPService{},
  131. ) // Avoid unused warning
  132. b.Run("UnsafeStringConversion", func(b *testing.B) {
  133. testBytes := []byte("127.0.0.1 - - [25/Dec/2023:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200 1234")
  134. b.ResetTimer()
  135. b.ReportAllocs()
  136. for i := 0; i < b.N; i++ {
  137. _ = unsafeBytesToString(testBytes)
  138. }
  139. })
  140. b.Run("StandardStringConversion", func(b *testing.B) {
  141. testBytes := []byte("127.0.0.1 - - [25/Dec/2023:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200 1234")
  142. b.ResetTimer()
  143. b.ReportAllocs()
  144. for i := 0; i < b.N; i++ {
  145. _ = string(testBytes)
  146. }
  147. })
  148. b.Run("LineBuffer_Operations", func(b *testing.B) {
  149. buffer := NewLineBuffer(1024)
  150. testData := []byte("test log line data")
  151. b.ResetTimer()
  152. b.ReportAllocs()
  153. for i := 0; i < b.N; i++ {
  154. buffer.Reset()
  155. buffer.Append(testData)
  156. _ = buffer.UnsafeString()
  157. }
  158. })
  159. b.Run("MemoryReallocation_Test", func(b *testing.B) {
  160. b.ResetTimer()
  161. b.ReportAllocs()
  162. for i := 0; i < b.N; i++ {
  163. entries := make([]*AccessLogEntry, 0, 1000)
  164. // Simulate growing the slice
  165. for j := 0; j < 5000; j++ {
  166. entry := &AccessLogEntry{IP: fmt.Sprintf("192.168.1.%d", j%255+1)}
  167. entries = append(entries, entry)
  168. }
  169. // Ensure entries is consumed so append result is used
  170. if len(entries) > 0 {
  171. _ = entries[len(entries)-1]
  172. }
  173. }
  174. })
  175. }
  176. // BenchmarkContextCheckFrequency tests the impact of context checking frequency
  177. func BenchmarkContextCheckFrequency(b *testing.B) {
  178. logData := generateBenchmarkLogData(10000)
  179. frequencies := []struct {
  180. name string
  181. freq int
  182. }{
  183. {"Every_Line", 1},
  184. {"Every_10_Lines", 10},
  185. {"Every_50_Lines", 50},
  186. {"Every_100_Lines", 100},
  187. {"Every_500_Lines", 500},
  188. }
  189. for _, freq := range frequencies {
  190. b.Run(freq.name, func(b *testing.B) {
  191. b.ResetTimer()
  192. b.ReportAllocs()
  193. for i := 0; i < b.N; i++ {
  194. reader := strings.NewReader(logData)
  195. ctx := context.Background()
  196. // Simulate context checking at different frequencies
  197. lineCount := 0
  198. for {
  199. line, err := reader.ReadByte()
  200. if err != nil {
  201. break
  202. }
  203. lineCount++
  204. if lineCount%freq.freq == 0 {
  205. select {
  206. case <-ctx.Done():
  207. b.Fatal("Context cancelled")
  208. default:
  209. }
  210. }
  211. // Simulate some work
  212. _ = line
  213. }
  214. }
  215. })
  216. }
  217. }
  218. // generateBenchmarkLogData generates realistic nginx log data for benchmarking
  219. func generateBenchmarkLogData(lines int) string {
  220. var builder strings.Builder
  221. builder.Grow(lines * 200) // Pre-allocate space
  222. methods := []string{"GET", "POST", "PUT", "DELETE", "HEAD"}
  223. statuses := []string{"200", "404", "500", "301", "302"}
  224. paths := []string{"/", "/index.html", "/api/users", "/static/css/style.css", "/api/data"}
  225. userAgents := []string{
  226. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  227. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
  228. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
  229. "curl/7.68.0",
  230. "PostmanRuntime/7.29.0",
  231. }
  232. for i := 0; i < lines; i++ {
  233. ip := fmt.Sprintf("192.168.%d.%d", i%256, (i/256)%256)
  234. method := methods[i%len(methods)]
  235. path := paths[i%len(paths)]
  236. status := statuses[i%len(statuses)]
  237. size := 1000 + (i % 10000)
  238. userAgent := userAgents[i%len(userAgents)]
  239. line := fmt.Sprintf(`%s - - [25/Dec/2023:10:%02d:%02d +0000] "%s %s HTTP/1.1" %s %d "https://example.com" "%s"`,
  240. ip, (i/60)%24, i%60, method, path, status, size, userAgent)
  241. builder.WriteString(line)
  242. if i < lines-1 {
  243. builder.WriteString("\n")
  244. }
  245. }
  246. return builder.String()
  247. }
  248. // TestOptimizedParseStreamCorrectness verifies that optimized implementations produce correct results
  249. func TestOptimizedParseStreamCorrectness(t *testing.T) {
  250. logData := `127.0.0.1 - - [25/Dec/2023:10:00:00 +0000] "GET /index.html HTTP/1.1" 200 1234 "https://example.com" "Mozilla/5.0"
  251. 192.168.1.1 - - [25/Dec/2023:10:00:01 +0000] "POST /api/data HTTP/1.1" 201 567 "https://example.com" "curl/7.68.0"
  252. 10.0.0.1 - - [25/Dec/2023:10:00:02 +0000] "GET /style.css HTTP/1.1" 200 890 "https://example.com" "Mozilla/5.0"`
  253. config := DefaultParserConfig()
  254. config.MaxLineLength = 16 * 1024
  255. parser := NewOptimizedParser(
  256. config,
  257. NewSimpleUserAgentParser(),
  258. &mockGeoIPService{},
  259. )
  260. ctx := context.Background()
  261. // Test original implementation
  262. originalResult, err := parser.ParseStream(ctx, strings.NewReader(logData))
  263. if err != nil {
  264. t.Fatalf("Original ParseStream failed: %v", err)
  265. }
  266. // Test optimized implementations
  267. implementations := []struct {
  268. name string
  269. fn func(context.Context, *strings.Reader) (*ParseResult, error)
  270. }{
  271. {
  272. "OptimizedParseStream",
  273. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  274. return parser.OptimizedParseStream(ctx, reader)
  275. },
  276. },
  277. {
  278. "ChunkedParseStream",
  279. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  280. return parser.ChunkedParseStream(ctx, reader, 1024)
  281. },
  282. },
  283. {
  284. "MemoryEfficientParseStream",
  285. func(ctx context.Context, reader *strings.Reader) (*ParseResult, error) {
  286. return parser.MemoryEfficientParseStream(ctx, reader)
  287. },
  288. },
  289. }
  290. for _, impl := range implementations {
  291. t.Run(impl.name, func(t *testing.T) {
  292. result, err := impl.fn(ctx, strings.NewReader(logData))
  293. if err != nil {
  294. t.Fatalf("%s failed: %v", impl.name, err)
  295. }
  296. // Compare basic metrics
  297. if result.Processed != originalResult.Processed {
  298. t.Errorf("%s processed count mismatch: got %d, want %d",
  299. impl.name, result.Processed, originalResult.Processed)
  300. }
  301. if result.Succeeded != originalResult.Succeeded {
  302. t.Errorf("%s succeeded count mismatch: got %d, want %d",
  303. impl.name, result.Succeeded, originalResult.Succeeded)
  304. }
  305. if len(result.Entries) != len(originalResult.Entries) {
  306. t.Errorf("%s entries count mismatch: got %d, want %d",
  307. impl.name, len(result.Entries), len(originalResult.Entries))
  308. }
  309. // Compare first entry details
  310. if len(result.Entries) > 0 && len(originalResult.Entries) > 0 {
  311. got := result.Entries[0]
  312. want := originalResult.Entries[0]
  313. if got.IP != want.IP {
  314. t.Errorf("%s first entry IP mismatch: got %s, want %s", impl.name, got.IP, want.IP)
  315. }
  316. if got.Status != want.Status {
  317. t.Errorf("%s first entry Status mismatch: got %d, want %d", impl.name, got.Status, want.Status)
  318. }
  319. }
  320. })
  321. }
  322. }
  323. // TestUnsafeStringConversion tests the safety of unsafe string conversion
  324. func TestUnsafeStringConversion(t *testing.T) {
  325. testCases := [][]byte{
  326. []byte("hello world"),
  327. []byte(""),
  328. []byte("127.0.0.1 - - [25/Dec/2023:10:00:00 +0000] \"GET /index.html HTTP/1.1\" 200 1234"),
  329. []byte("special chars: !@#$%^&*()"),
  330. }
  331. for i, testCase := range testCases {
  332. t.Run(fmt.Sprintf("Case_%d", i), func(t *testing.T) {
  333. unsafeStr := unsafeBytesToString(testCase)
  334. safeStr := string(testCase)
  335. if unsafeStr != safeStr {
  336. t.Errorf("Unsafe conversion mismatch: unsafe=%s, safe=%s", unsafeStr, safeStr)
  337. }
  338. })
  339. }
  340. }
  341. // TestLineBufferOperations tests LineBuffer functionality
  342. func TestLineBufferOperations(t *testing.T) {
  343. buffer := NewLineBuffer(1024)
  344. // Test basic operations
  345. testData := []byte("test data")
  346. buffer.Append(testData)
  347. if string(buffer.Bytes()) != "test data" {
  348. t.Errorf("Buffer content mismatch: got %s, want %s", buffer.Bytes(), "test data")
  349. }
  350. // Test reset
  351. buffer.Reset()
  352. if len(buffer.Bytes()) != 0 {
  353. t.Errorf("Buffer should be empty after reset, got length %d", len(buffer.Bytes()))
  354. }
  355. // Test growth
  356. largeData := make([]byte, 2048)
  357. for i := range largeData {
  358. largeData[i] = byte('a' + (i % 26))
  359. }
  360. buffer.Append(largeData)
  361. if len(buffer.Bytes()) != 2048 {
  362. t.Errorf("Buffer size mismatch: got %d, want 2048", len(buffer.Bytes()))
  363. }
  364. }
  365. // BenchmarkStreamBufferSizes tests different buffer sizes for streaming
  366. func BenchmarkStreamBufferSizes(b *testing.B) {
  367. logData := generateBenchmarkLogData(10000)
  368. config := DefaultParserConfig()
  369. config.MaxLineLength = 16 * 1024
  370. parser := NewOptimizedParser(
  371. config,
  372. NewSimpleUserAgentParser(),
  373. &mockGeoIPService{},
  374. )
  375. bufferSizes := []int{
  376. 1024, // 1KB
  377. 4096, // 4KB
  378. 16384, // 16KB
  379. 32768, // 32KB
  380. 65536, // 64KB
  381. }
  382. for _, size := range bufferSizes {
  383. b.Run(fmt.Sprintf("BufferSize_%dKB", size/1024), func(b *testing.B) {
  384. b.ResetTimer()
  385. b.ReportAllocs()
  386. for i := 0; i < b.N; i++ {
  387. reader := strings.NewReader(logData)
  388. ctx := context.Background()
  389. result, err := parser.ChunkedParseStream(ctx, reader, size)
  390. if err != nil {
  391. b.Fatal(err)
  392. }
  393. b.ReportMetric(float64(result.Processed), "lines")
  394. b.ReportMetric(float64(size), "buffer_bytes")
  395. }
  396. })
  397. }
  398. }