cpu_optimization_test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. // BenchmarkCPUUtilization tests different worker configurations for CPU utilization
  11. func BenchmarkCPUUtilization(b *testing.B) {
  12. configs := []struct {
  13. name string
  14. workerCount int
  15. batchSize int
  16. queueSize int
  17. }{
  18. {"Current_8W_1000B", 8, 1000, 10000},
  19. {"CPU_Match", runtime.GOMAXPROCS(0), 1000, 10000},
  20. {"CPU_Double", runtime.GOMAXPROCS(0) * 2, 1000, 10000},
  21. {"CPU_Triple", runtime.GOMAXPROCS(0) * 3, 1000, 10000},
  22. {"HighBatch_8W_2000B", 8, 2000, 10000},
  23. {"HighBatch_12W_2000B", 12, 2000, 20000},
  24. {"LowLatency_16W_500B", 16, 500, 20000},
  25. }
  26. for _, config := range configs {
  27. b.Run(config.name, func(b *testing.B) {
  28. benchmarkWorkerConfiguration(b, config.workerCount, config.batchSize, config.queueSize)
  29. })
  30. }
  31. }
  32. func benchmarkWorkerConfiguration(b *testing.B, workerCount, batchSize, queueSize int) {
  33. b.Helper()
  34. // Create test configuration
  35. cfg := &Config{
  36. WorkerCount: workerCount,
  37. BatchSize: batchSize,
  38. MaxQueueSize: queueSize,
  39. }
  40. // Track CPU utilization during benchmark
  41. var totalCPUTime time.Duration
  42. var measurements int
  43. b.ResetTimer()
  44. b.SetBytes(int64(batchSize * 100)) // Approximate bytes per operation
  45. for i := 0; i < b.N; i++ {
  46. start := time.Now()
  47. // Simulate worker pipeline processing
  48. simulateWorkerPipeline(cfg)
  49. elapsed := time.Since(start)
  50. totalCPUTime += elapsed
  51. measurements++
  52. }
  53. // Report CPU utilization metrics
  54. avgProcessingTime := totalCPUTime / time.Duration(measurements)
  55. b.ReportMetric(float64(avgProcessingTime.Nanoseconds()), "ns/pipeline")
  56. b.ReportMetric(float64(workerCount), "workers")
  57. b.ReportMetric(float64(batchSize), "batch_size")
  58. }
  59. func simulateWorkerPipeline(cfg *Config) {
  60. // Create job and result channels
  61. jobQueue := make(chan *IndexJob, cfg.MaxQueueSize)
  62. resultQueue := make(chan *IndexResult, cfg.WorkerCount)
  63. // Create worker pool
  64. var wg sync.WaitGroup
  65. ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
  66. defer cancel()
  67. // Start workers
  68. for i := 0; i < cfg.WorkerCount; i++ {
  69. wg.Add(1)
  70. go func(workerID int) {
  71. defer wg.Done()
  72. simulateWorker(ctx, workerID, jobQueue, resultQueue)
  73. }(i)
  74. }
  75. // Generate jobs
  76. go func() {
  77. defer close(jobQueue)
  78. for i := 0; i < cfg.BatchSize; i++ {
  79. select {
  80. case jobQueue <- &IndexJob{
  81. Documents: []*Document{{
  82. ID: fmt.Sprintf("job-%d", i),
  83. Fields: &LogDocument{
  84. Timestamp: time.Now().Unix(),
  85. IP: "127.0.0.1",
  86. },
  87. }},
  88. Priority: 1,
  89. }:
  90. case <-ctx.Done():
  91. return
  92. }
  93. }
  94. }()
  95. // Process results
  96. resultCount := 0
  97. done := make(chan bool)
  98. go func() {
  99. defer close(done)
  100. for {
  101. select {
  102. case <-resultQueue:
  103. resultCount++
  104. if resultCount >= cfg.BatchSize {
  105. return
  106. }
  107. case <-ctx.Done():
  108. return
  109. }
  110. }
  111. }()
  112. // Wait for completion
  113. select {
  114. case <-done:
  115. case <-ctx.Done():
  116. }
  117. wg.Wait()
  118. }
  119. func simulateWorker(ctx context.Context, workerID int, jobQueue <-chan *IndexJob, resultQueue chan<- *IndexResult) {
  120. for {
  121. select {
  122. case _, ok := <-jobQueue:
  123. if !ok {
  124. return
  125. }
  126. // Simulate CPU-intensive work
  127. simulateCPUWork()
  128. // Send result
  129. select {
  130. case resultQueue <- &IndexResult{
  131. Processed: 1,
  132. Succeeded: 1,
  133. Failed: 0,
  134. Throughput: 1.0,
  135. }:
  136. case <-ctx.Done():
  137. return
  138. }
  139. case <-ctx.Done():
  140. return
  141. }
  142. }
  143. }
  144. func simulateCPUWork() {
  145. // Simulate CPU-bound operations similar to log parsing and indexing
  146. sum := 0
  147. for i := 0; i < 10000; i++ {
  148. sum += i * i
  149. }
  150. // Prevent compiler optimization
  151. if sum < 0 {
  152. panic("unexpected")
  153. }
  154. }
  155. // BenchmarkPipelineBottleneck identifies bottlenecks in the processing pipeline
  156. func BenchmarkPipelineBottleneck(b *testing.B) {
  157. tests := []struct {
  158. name string
  159. jobQueueSize int
  160. resultQueueSize int
  161. workerCount int
  162. }{
  163. {"SmallQueues", 100, 10, 8},
  164. {"MediumQueues", 1000, 100, 8},
  165. {"LargeQueues", 10000, 1000, 8},
  166. {"BufferedPipeline", 50000, 5000, 8},
  167. }
  168. for _, test := range tests {
  169. b.Run(test.name, func(b *testing.B) {
  170. benchmarkPipelineConfiguration(b, test.jobQueueSize, test.resultQueueSize, test.workerCount)
  171. })
  172. }
  173. }
  174. func benchmarkPipelineConfiguration(b *testing.B, jobQueueSize, resultQueueSize, workerCount int) {
  175. b.ResetTimer()
  176. for i := 0; i < b.N; i++ {
  177. // Simulate pipeline with different buffer sizes
  178. jobQueue := make(chan *IndexJob, jobQueueSize)
  179. resultQueue := make(chan *IndexResult, resultQueueSize)
  180. ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
  181. var wg sync.WaitGroup
  182. // Start workers
  183. for w := 0; w < workerCount; w++ {
  184. wg.Add(1)
  185. go func() {
  186. defer wg.Done()
  187. for {
  188. select {
  189. case _, ok := <-jobQueue:
  190. if !ok {
  191. return
  192. }
  193. // Simulate processing
  194. for j := 0; j < 1000; j++ {
  195. _ = j * j
  196. }
  197. select {
  198. case resultQueue <- &IndexResult{
  199. Processed: 1,
  200. Succeeded: 1,
  201. Failed: 0,
  202. Throughput: 1.0,
  203. }:
  204. case <-ctx.Done():
  205. return
  206. }
  207. case <-ctx.Done():
  208. return
  209. }
  210. }
  211. }()
  212. }
  213. // Feed jobs
  214. go func() {
  215. defer close(jobQueue)
  216. for j := 0; j < 1000; j++ {
  217. select {
  218. case jobQueue <- &IndexJob{
  219. Documents: []*Document{{
  220. ID: fmt.Sprintf("job-%d", j),
  221. Fields: &LogDocument{
  222. Timestamp: time.Now().Unix(),
  223. IP: "127.0.0.1",
  224. },
  225. }},
  226. Priority: 1,
  227. }:
  228. case <-ctx.Done():
  229. return
  230. }
  231. }
  232. }()
  233. // Consume results
  234. resultCount := 0
  235. for resultCount < 1000 {
  236. select {
  237. case <-resultQueue:
  238. resultCount++
  239. case <-ctx.Done():
  240. break
  241. }
  242. }
  243. cancel()
  244. wg.Wait()
  245. }
  246. b.ReportMetric(float64(jobQueueSize), "job_queue_size")
  247. b.ReportMetric(float64(resultQueueSize), "result_queue_size")
  248. }
  249. // BenchmarkMemoryPressure tests performance under different memory conditions
  250. func BenchmarkMemoryPressure(b *testing.B) {
  251. tests := []struct {
  252. name string
  253. allocSize int
  254. allocCount int
  255. }{
  256. {"LowMemory", 1024, 100},
  257. {"MediumMemory", 4096, 500},
  258. {"HighMemory", 16384, 1000},
  259. }
  260. for _, test := range tests {
  261. b.Run(test.name, func(b *testing.B) {
  262. // Allocate memory to simulate different memory pressure
  263. allocations := make([][]byte, test.allocCount)
  264. for i := 0; i < test.allocCount; i++ {
  265. allocations[i] = make([]byte, test.allocSize)
  266. }
  267. b.ResetTimer()
  268. for i := 0; i < b.N; i++ {
  269. // Simulate indexing work under memory pressure
  270. simulateMemoryIntensiveWork()
  271. }
  272. // Keep allocations alive until end
  273. runtime.KeepAlive(allocations)
  274. })
  275. }
  276. }
  277. func simulateMemoryIntensiveWork() {
  278. // Simulate memory allocation patterns similar to log parsing
  279. buffers := make([][]byte, 10)
  280. for i := range buffers {
  281. buffers[i] = make([]byte, 1024)
  282. // Fill with some data
  283. for j := range buffers[i] {
  284. buffers[i][j] = byte(i + j)
  285. }
  286. }
  287. // Simulate some processing
  288. sum := 0
  289. for _, buf := range buffers {
  290. for _, b := range buf {
  291. sum += int(b)
  292. }
  293. }
  294. // Prevent optimization
  295. if sum < 0 {
  296. panic("unexpected")
  297. }
  298. }