optimized_time_series_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package analytics
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. )
  7. // mockSearcher implements Searcher for testing
  8. type mockSearcher struct{}
  9. func (m *mockSearcher) Search(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  10. // Generate mock search results
  11. hits := make([]*SearchHit, 1000)
  12. baseTime := time.Now().Unix()
  13. for i := 0; i < 1000; i++ {
  14. hits[i] = &SearchHit{
  15. Fields: map[string]interface{}{
  16. "timestamp": float64(baseTime + int64(i*60)), // 1 minute intervals
  17. "ip": "192.168.1." + string(rune('1'+(i%254))),
  18. "method": []string{"GET", "POST", "PUT"}[i%3],
  19. "path": "/api/test",
  20. "status": float64([]int{200, 404, 500}[i%3]),
  21. "bytes_sent": float64(1000 + (i % 5000)),
  22. },
  23. }
  24. }
  25. return &SearchResult{
  26. Hits: hits,
  27. TotalHits: 1000,
  28. Stats: &SearchStats{
  29. TotalBytes: 2500000,
  30. },
  31. }, nil
  32. }
  33. func (m *mockSearcher) Aggregate(ctx context.Context, req *AggregationRequest) (*AggregationResult, error) {
  34. return &AggregationResult{}, nil
  35. }
  36. func (m *mockSearcher) Suggest(ctx context.Context, text string, field string, size int) ([]*Suggestion, error) {
  37. return []*Suggestion{}, nil
  38. }
  39. func (m *mockSearcher) Analyze(ctx context.Context, text string, analyzer string) ([]string, error) {
  40. return []string{}, nil
  41. }
  42. func (m *mockSearcher) ClearCache() error {
  43. return nil
  44. }
  45. // TestOptimizedTimeSeriesProcessor tests the optimized processor
  46. func TestOptimizedTimeSeriesProcessor(t *testing.T) {
  47. processor := NewOptimizedTimeSeriesProcessor()
  48. if processor == nil {
  49. t.Fatal("Failed to create optimized processor")
  50. }
  51. // Test bucket pool
  52. bucketPool := processor.getBucketPool(60)
  53. if bucketPool == nil {
  54. t.Fatal("Failed to get bucket pool")
  55. }
  56. buckets := bucketPool.Get()
  57. if buckets == nil {
  58. t.Fatal("Failed to get buckets from pool")
  59. }
  60. bucketPool.Put(buckets)
  61. }
  62. // TestTimeBucket tests the time bucket functionality
  63. func TestTimeBucket(t *testing.T) {
  64. timestamp := time.Now().Unix()
  65. bucket := NewTimeBucket(timestamp)
  66. if bucket.Timestamp != timestamp {
  67. t.Errorf("Expected timestamp %d, got %d", timestamp, bucket.Timestamp)
  68. }
  69. // Test adding entries
  70. bucket.AddEntry("192.168.1.1", 200, "GET", "/api/test", 1024)
  71. bucket.AddEntry("192.168.1.2", 404, "POST", "/api/data", 512)
  72. bucket.AddEntry("192.168.1.1", 200, "GET", "/api/test", 2048) // Duplicate IP
  73. // Verify counts
  74. if bucket.RequestCount != 3 {
  75. t.Errorf("Expected 3 requests, got %d", bucket.RequestCount)
  76. }
  77. if bucket.BytesTransferred != 3584 {
  78. t.Errorf("Expected 3584 bytes, got %d", bucket.BytesTransferred)
  79. }
  80. if bucket.GetUniqueVisitorCount() != 2 {
  81. t.Errorf("Expected 2 unique visitors, got %d", bucket.GetUniqueVisitorCount())
  82. }
  83. // Verify status codes
  84. if bucket.StatusCodes[200] != 2 {
  85. t.Errorf("Expected 2 status 200, got %d", bucket.StatusCodes[200])
  86. }
  87. if bucket.StatusCodes[404] != 1 {
  88. t.Errorf("Expected 1 status 404, got %d", bucket.StatusCodes[404])
  89. }
  90. }
  91. // TestTimeSeriesCache tests the caching functionality
  92. func TestTimeSeriesCache(t *testing.T) {
  93. cache := NewTimeSeriesCache(5, 300) // 5 entries, 5 min TTL
  94. // Test put and get
  95. testData := "test_data"
  96. cache.Put("key1", testData)
  97. retrieved, found := cache.Get("key1")
  98. if !found {
  99. t.Error("Failed to find cached data")
  100. }
  101. if retrieved != testData {
  102. t.Errorf("Expected %s, got %v", testData, retrieved)
  103. }
  104. // Test non-existent key
  105. _, found = cache.Get("non_existent")
  106. if found {
  107. t.Error("Found non-existent key")
  108. }
  109. // Test cache eviction
  110. for i := 0; i < 10; i++ {
  111. key := "evict_key" + string(rune('0'+i))
  112. cache.Put(key, i)
  113. }
  114. // Original key1 should still exist (eviction targets oldest by timestamp)
  115. _, found = cache.Get("key1")
  116. if !found {
  117. t.Log("Key1 was evicted as expected due to LRU policy")
  118. }
  119. }
  120. // TestOptimizedGetVisitorsByTime tests optimized visitors by time
  121. func TestOptimizedGetVisitorsByTime(t *testing.T) {
  122. processor := NewOptimizedTimeSeriesProcessor()
  123. mockSearcher := &mockSearcher{}
  124. req := &VisitorsByTimeRequest{
  125. StartTime: time.Now().Unix() - 3600, // 1 hour ago
  126. EndTime: time.Now().Unix(),
  127. LogPaths: []string{"/var/log/nginx/access.log"},
  128. IntervalSeconds: 60, // 1 minute intervals
  129. }
  130. result, err := processor.OptimizedGetVisitorsByTime(context.Background(), req, mockSearcher)
  131. if err != nil {
  132. t.Fatalf("Failed to get visitors by time: %v", err)
  133. }
  134. if result == nil {
  135. t.Fatal("Result is nil")
  136. }
  137. if len(result.Data) == 0 {
  138. t.Error("No data returned")
  139. }
  140. // Verify data is sorted
  141. for i := 1; i < len(result.Data); i++ {
  142. if result.Data[i].Timestamp < result.Data[i-1].Timestamp {
  143. t.Error("Data is not sorted by timestamp")
  144. }
  145. }
  146. }
  147. // TestOptimizedGetTrafficByTime tests optimized traffic by time
  148. func TestOptimizedGetTrafficByTime(t *testing.T) {
  149. processor := NewOptimizedTimeSeriesProcessor()
  150. mockSearcher := &mockSearcher{}
  151. req := &TrafficByTimeRequest{
  152. StartTime: time.Now().Unix() - 3600,
  153. EndTime: time.Now().Unix(),
  154. LogPaths: []string{"/var/log/nginx/access.log"},
  155. IntervalSeconds: 300, // 5 minute intervals
  156. }
  157. result, err := processor.OptimizedGetTrafficByTime(context.Background(), req, mockSearcher)
  158. if err != nil {
  159. t.Fatalf("Failed to get traffic by time: %v", err)
  160. }
  161. if result == nil {
  162. t.Fatal("Result is nil")
  163. }
  164. if len(result.Data) == 0 {
  165. t.Error("No data returned")
  166. }
  167. // Verify comprehensive metrics
  168. for _, point := range result.Data {
  169. if point.Timestamp <= 0 {
  170. t.Error("Invalid timestamp")
  171. }
  172. if point.Requests < 0 {
  173. t.Error("Invalid request count")
  174. }
  175. if point.Bytes < 0 {
  176. t.Error("Invalid byte count")
  177. }
  178. if point.UniqueVisitors < 0 {
  179. t.Error("Invalid unique visitor count")
  180. }
  181. }
  182. }
  183. // TestHyperLogLog tests the HyperLogLog cardinality estimator
  184. func TestHyperLogLog(t *testing.T) {
  185. hll := NewHyperLogLog(8) // 256 buckets
  186. // Add known unique values
  187. uniqueValues := []string{
  188. "192.168.1.1", "192.168.1.2", "192.168.1.3",
  189. "10.0.0.1", "10.0.0.2", "172.16.0.1",
  190. }
  191. for _, value := range uniqueValues {
  192. hll.Add(value)
  193. }
  194. count := hll.Count()
  195. expectedCount := uint64(len(uniqueValues))
  196. // HyperLogLog should be reasonably accurate for small sets
  197. if count == 0 {
  198. t.Error("HyperLogLog count is 0")
  199. }
  200. // Allow for some estimation error (compute absolute difference safely for uint64)
  201. var diff uint64
  202. if count >= expectedCount {
  203. diff = count - expectedCount
  204. } else {
  205. diff = expectedCount - count
  206. }
  207. if diff > expectedCount/2 {
  208. t.Logf("HyperLogLog estimate %d vs actual %d (difference: %d)", count, expectedCount, diff)
  209. }
  210. }
  211. // TestAdvancedTimeSeriesProcessor tests advanced analytics
  212. func TestAdvancedTimeSeriesProcessor(t *testing.T) {
  213. processor := NewAdvancedTimeSeriesProcessor()
  214. if processor == nil {
  215. t.Fatal("Failed to create advanced processor")
  216. }
  217. // Test anomaly detection
  218. testData := []TimeValue{
  219. {Timestamp: 1000, Value: 10},
  220. {Timestamp: 1060, Value: 12},
  221. {Timestamp: 1120, Value: 11},
  222. {Timestamp: 1180, Value: 13},
  223. {Timestamp: 1240, Value: 10},
  224. {Timestamp: 1300, Value: 50}, // Anomaly
  225. {Timestamp: 1360, Value: 12},
  226. {Timestamp: 1420, Value: 11},
  227. }
  228. anomalies := processor.DetectAnomalies(testData)
  229. if len(anomalies) == 0 {
  230. t.Log("No anomalies detected")
  231. } else {
  232. t.Logf("Detected %d anomalies", len(anomalies))
  233. for _, anomaly := range anomalies {
  234. t.Logf("Anomaly: timestamp=%d, value=%d, expected=%d, deviation=%.2f",
  235. anomaly.Timestamp, anomaly.Value, anomaly.Expected, anomaly.Deviation)
  236. }
  237. }
  238. }
  239. // TestTrendAnalysis tests trend calculation
  240. func TestTrendAnalysis(t *testing.T) {
  241. processor := NewAdvancedTimeSeriesProcessor()
  242. // Test increasing trend
  243. increasingData := []TimeValue{
  244. {Timestamp: 1000, Value: 10},
  245. {Timestamp: 1060, Value: 15},
  246. {Timestamp: 1120, Value: 20},
  247. {Timestamp: 1180, Value: 25},
  248. {Timestamp: 1240, Value: 30},
  249. }
  250. trend := processor.CalculateTrend(increasingData)
  251. if trend.Direction != "increasing" {
  252. t.Errorf("Expected increasing trend, got %s", trend.Direction)
  253. }
  254. if trend.Slope <= 0 {
  255. t.Errorf("Expected positive slope, got %f", trend.Slope)
  256. }
  257. // Test decreasing trend
  258. decreasingData := []TimeValue{
  259. {Timestamp: 1000, Value: 30},
  260. {Timestamp: 1060, Value: 25},
  261. {Timestamp: 1120, Value: 20},
  262. {Timestamp: 1180, Value: 15},
  263. {Timestamp: 1240, Value: 10},
  264. }
  265. trend = processor.CalculateTrend(decreasingData)
  266. if trend.Direction != "decreasing" {
  267. t.Errorf("Expected decreasing trend, got %s", trend.Direction)
  268. }
  269. if trend.Slope >= 0 {
  270. t.Errorf("Expected negative slope, got %f", trend.Slope)
  271. }
  272. }
  273. // BenchmarkOptimizedTimeSeriesProcessing benchmarks the optimized processing
  274. func BenchmarkOptimizedTimeSeriesProcessing(b *testing.B) {
  275. processor := NewOptimizedTimeSeriesProcessor()
  276. mockSearcher := &mockSearcher{}
  277. req := &VisitorsByTimeRequest{
  278. StartTime: time.Now().Unix() - 3600,
  279. EndTime: time.Now().Unix(),
  280. LogPaths: []string{"/var/log/nginx/access.log"},
  281. IntervalSeconds: 60,
  282. }
  283. b.ResetTimer()
  284. b.ReportAllocs()
  285. b.Run("OptimizedVisitorsByTime", func(b *testing.B) {
  286. for i := 0; i < b.N; i++ {
  287. _, err := processor.OptimizedGetVisitorsByTime(context.Background(), req, mockSearcher)
  288. if err != nil {
  289. b.Fatal(err)
  290. }
  291. }
  292. })
  293. }
  294. // BenchmarkTimeBucketOperations benchmarks time bucket operations
  295. func BenchmarkTimeBucketOperations(b *testing.B) {
  296. bucket := NewTimeBucket(time.Now().Unix())
  297. b.ResetTimer()
  298. b.ReportAllocs()
  299. b.Run("AddEntry", func(b *testing.B) {
  300. for i := 0; i < b.N; i++ {
  301. ip := "192.168.1." + string(rune('1'+(i%254)))
  302. bucket.AddEntry(ip, 200, "GET", "/api/test", 1024)
  303. }
  304. })
  305. b.Run("GetUniqueVisitorCount", func(b *testing.B) {
  306. for i := 0; i < b.N; i++ {
  307. _ = bucket.GetUniqueVisitorCount()
  308. }
  309. })
  310. }
  311. // BenchmarkHyperLogLog benchmarks HyperLogLog operations
  312. func BenchmarkHyperLogLog(b *testing.B) {
  313. hll := NewHyperLogLog(12) // 4096 buckets
  314. b.ResetTimer()
  315. b.ReportAllocs()
  316. b.Run("Add", func(b *testing.B) {
  317. for i := 0; i < b.N; i++ {
  318. ip := "192.168.1." + string(rune('1'+(i%255)))
  319. hll.Add(ip)
  320. }
  321. })
  322. b.Run("Count", func(b *testing.B) {
  323. // Add some data first
  324. for i := 0; i < 1000; i++ {
  325. hll.Add("192.168.1." + string(rune('1'+(i%255))))
  326. }
  327. b.ResetTimer()
  328. for i := 0; i < b.N; i++ {
  329. _ = hll.Count()
  330. }
  331. })
  332. }
  333. // BenchmarkTimeSeriesCache benchmarks cache operations
  334. func BenchmarkTimeSeriesCache(b *testing.B) {
  335. cache := NewTimeSeriesCache(1000, 3600)
  336. // Pre-populate cache
  337. for i := 0; i < 500; i++ {
  338. key := "key_" + string(rune('0'+(i%10)))
  339. cache.Put(key, i)
  340. }
  341. b.ResetTimer()
  342. b.ReportAllocs()
  343. b.Run("Get", func(b *testing.B) {
  344. for i := 0; i < b.N; i++ {
  345. key := "key_" + string(rune('0'+(i%10)))
  346. cache.Get(key)
  347. }
  348. })
  349. b.Run("Put", func(b *testing.B) {
  350. for i := 0; i < b.N; i++ {
  351. key := "bench_key_" + string(rune('0'+(i%100)))
  352. cache.Put(key, i)
  353. }
  354. })
  355. }
  356. // BenchmarkAnomalyDetection benchmarks anomaly detection
  357. func BenchmarkAnomalyDetection(b *testing.B) {
  358. processor := NewAdvancedTimeSeriesProcessor()
  359. // Generate test data
  360. testData := make([]TimeValue, 100)
  361. for i := 0; i < 100; i++ {
  362. testData[i] = TimeValue{
  363. Timestamp: int64(1000 + i*60),
  364. Value: 10 + (i % 5), // Normal pattern with occasional spikes
  365. }
  366. }
  367. // Add some anomalies
  368. testData[50].Value = 100
  369. testData[75].Value = 2
  370. b.ResetTimer()
  371. b.ReportAllocs()
  372. for i := 0; i < b.N; i++ {
  373. _ = processor.DetectAnomalies(testData)
  374. }
  375. }
  376. // BenchmarkTrendCalculation benchmarks trend calculation
  377. func BenchmarkTrendCalculation(b *testing.B) {
  378. processor := NewAdvancedTimeSeriesProcessor()
  379. // Generate test data with trend
  380. testData := make([]TimeValue, 50)
  381. for i := 0; i < 50; i++ {
  382. testData[i] = TimeValue{
  383. Timestamp: int64(1000 + i*60),
  384. Value: 10 + i/2, // Increasing trend
  385. }
  386. }
  387. b.ResetTimer()
  388. b.ReportAllocs()
  389. for i := 0; i < b.N; i++ {
  390. _ = processor.CalculateTrend(testData)
  391. }
  392. }