123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478 |
- package analytics
- import (
- "context"
- "testing"
- "time"
- )
- // mockSearcher implements Searcher for testing
- type mockSearcher struct{}
- func (m *mockSearcher) Search(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
- // Generate mock search results
- hits := make([]*SearchHit, 1000)
- baseTime := time.Now().Unix()
- for i := 0; i < 1000; i++ {
- hits[i] = &SearchHit{
- Fields: map[string]interface{}{
- "timestamp": float64(baseTime + int64(i*60)), // 1 minute intervals
- "ip": "192.168.1." + string(rune('1'+(i%254))),
- "method": []string{"GET", "POST", "PUT"}[i%3],
- "path": "/api/test",
- "status": float64([]int{200, 404, 500}[i%3]),
- "bytes_sent": float64(1000 + (i % 5000)),
- },
- }
- }
- return &SearchResult{
- Hits: hits,
- TotalHits: 1000,
- Stats: &SearchStats{
- TotalBytes: 2500000,
- },
- }, nil
- }
- func (m *mockSearcher) Aggregate(ctx context.Context, req *AggregationRequest) (*AggregationResult, error) {
- return &AggregationResult{}, nil
- }
- func (m *mockSearcher) Suggest(ctx context.Context, text string, field string, size int) ([]*Suggestion, error) {
- return []*Suggestion{}, nil
- }
- func (m *mockSearcher) Analyze(ctx context.Context, text string, analyzer string) ([]string, error) {
- return []string{}, nil
- }
- func (m *mockSearcher) ClearCache() error {
- return nil
- }
- // TestOptimizedTimeSeriesProcessor tests the optimized processor
- func TestOptimizedTimeSeriesProcessor(t *testing.T) {
- processor := NewOptimizedTimeSeriesProcessor()
- if processor == nil {
- t.Fatal("Failed to create optimized processor")
- }
- // Test bucket pool
- bucketPool := processor.getBucketPool(60)
- if bucketPool == nil {
- t.Fatal("Failed to get bucket pool")
- }
- buckets := bucketPool.Get()
- if buckets == nil {
- t.Fatal("Failed to get buckets from pool")
- }
- bucketPool.Put(buckets)
- }
- // TestTimeBucket tests the time bucket functionality
- func TestTimeBucket(t *testing.T) {
- timestamp := time.Now().Unix()
- bucket := NewTimeBucket(timestamp)
- if bucket.Timestamp != timestamp {
- t.Errorf("Expected timestamp %d, got %d", timestamp, bucket.Timestamp)
- }
- // Test adding entries
- bucket.AddEntry("192.168.1.1", 200, "GET", "/api/test", 1024)
- bucket.AddEntry("192.168.1.2", 404, "POST", "/api/data", 512)
- bucket.AddEntry("192.168.1.1", 200, "GET", "/api/test", 2048) // Duplicate IP
- // Verify counts
- if bucket.RequestCount != 3 {
- t.Errorf("Expected 3 requests, got %d", bucket.RequestCount)
- }
- if bucket.BytesTransferred != 3584 {
- t.Errorf("Expected 3584 bytes, got %d", bucket.BytesTransferred)
- }
- if bucket.GetUniqueVisitorCount() != 2 {
- t.Errorf("Expected 2 unique visitors, got %d", bucket.GetUniqueVisitorCount())
- }
- // Verify status codes
- if bucket.StatusCodes[200] != 2 {
- t.Errorf("Expected 2 status 200, got %d", bucket.StatusCodes[200])
- }
- if bucket.StatusCodes[404] != 1 {
- t.Errorf("Expected 1 status 404, got %d", bucket.StatusCodes[404])
- }
- }
- // TestTimeSeriesCache tests the caching functionality
- func TestTimeSeriesCache(t *testing.T) {
- cache := NewTimeSeriesCache(5, 300) // 5 entries, 5 min TTL
- // Test put and get
- testData := "test_data"
- cache.Put("key1", testData)
- retrieved, found := cache.Get("key1")
- if !found {
- t.Error("Failed to find cached data")
- }
- if retrieved != testData {
- t.Errorf("Expected %s, got %v", testData, retrieved)
- }
- // Test non-existent key
- _, found = cache.Get("non_existent")
- if found {
- t.Error("Found non-existent key")
- }
- // Test cache eviction
- for i := 0; i < 10; i++ {
- key := "evict_key" + string(rune('0'+i))
- cache.Put(key, i)
- }
- // Original key1 should still exist (eviction targets oldest by timestamp)
- _, found = cache.Get("key1")
- if !found {
- t.Log("Key1 was evicted as expected due to LRU policy")
- }
- }
- // TestOptimizedGetVisitorsByTime tests optimized visitors by time
- func TestOptimizedGetVisitorsByTime(t *testing.T) {
- processor := NewOptimizedTimeSeriesProcessor()
- mockSearcher := &mockSearcher{}
- req := &VisitorsByTimeRequest{
- StartTime: time.Now().Unix() - 3600, // 1 hour ago
- EndTime: time.Now().Unix(),
- LogPaths: []string{"/var/log/nginx/access.log"},
- IntervalSeconds: 60, // 1 minute intervals
- }
- result, err := processor.OptimizedGetVisitorsByTime(context.Background(), req, mockSearcher)
- if err != nil {
- t.Fatalf("Failed to get visitors by time: %v", err)
- }
- if result == nil {
- t.Fatal("Result is nil")
- }
- if len(result.Data) == 0 {
- t.Error("No data returned")
- }
- // Verify data is sorted
- for i := 1; i < len(result.Data); i++ {
- if result.Data[i].Timestamp < result.Data[i-1].Timestamp {
- t.Error("Data is not sorted by timestamp")
- }
- }
- }
- // TestOptimizedGetTrafficByTime tests optimized traffic by time
- func TestOptimizedGetTrafficByTime(t *testing.T) {
- processor := NewOptimizedTimeSeriesProcessor()
- mockSearcher := &mockSearcher{}
- req := &TrafficByTimeRequest{
- StartTime: time.Now().Unix() - 3600,
- EndTime: time.Now().Unix(),
- LogPaths: []string{"/var/log/nginx/access.log"},
- IntervalSeconds: 300, // 5 minute intervals
- }
- result, err := processor.OptimizedGetTrafficByTime(context.Background(), req, mockSearcher)
- if err != nil {
- t.Fatalf("Failed to get traffic by time: %v", err)
- }
- if result == nil {
- t.Fatal("Result is nil")
- }
- if len(result.Data) == 0 {
- t.Error("No data returned")
- }
- // Verify comprehensive metrics
- for _, point := range result.Data {
- if point.Timestamp <= 0 {
- t.Error("Invalid timestamp")
- }
- if point.Requests < 0 {
- t.Error("Invalid request count")
- }
- if point.Bytes < 0 {
- t.Error("Invalid byte count")
- }
- if point.UniqueVisitors < 0 {
- t.Error("Invalid unique visitor count")
- }
- }
- }
- // TestHyperLogLog tests the HyperLogLog cardinality estimator
- func TestHyperLogLog(t *testing.T) {
- hll := NewHyperLogLog(8) // 256 buckets
- // Add known unique values
- uniqueValues := []string{
- "192.168.1.1", "192.168.1.2", "192.168.1.3",
- "10.0.0.1", "10.0.0.2", "172.16.0.1",
- }
- for _, value := range uniqueValues {
- hll.Add(value)
- }
- count := hll.Count()
- expectedCount := uint64(len(uniqueValues))
- // HyperLogLog should be reasonably accurate for small sets
- if count == 0 {
- t.Error("HyperLogLog count is 0")
- }
- // Allow for some estimation error (compute absolute difference safely for uint64)
- var diff uint64
- if count >= expectedCount {
- diff = count - expectedCount
- } else {
- diff = expectedCount - count
- }
- if diff > expectedCount/2 {
- t.Logf("HyperLogLog estimate %d vs actual %d (difference: %d)", count, expectedCount, diff)
- }
- }
- // TestAdvancedTimeSeriesProcessor tests advanced analytics
- func TestAdvancedTimeSeriesProcessor(t *testing.T) {
- processor := NewAdvancedTimeSeriesProcessor()
- if processor == nil {
- t.Fatal("Failed to create advanced processor")
- }
- // Test anomaly detection
- testData := []TimeValue{
- {Timestamp: 1000, Value: 10},
- {Timestamp: 1060, Value: 12},
- {Timestamp: 1120, Value: 11},
- {Timestamp: 1180, Value: 13},
- {Timestamp: 1240, Value: 10},
- {Timestamp: 1300, Value: 50}, // Anomaly
- {Timestamp: 1360, Value: 12},
- {Timestamp: 1420, Value: 11},
- }
- anomalies := processor.DetectAnomalies(testData)
- if len(anomalies) == 0 {
- t.Log("No anomalies detected")
- } else {
- t.Logf("Detected %d anomalies", len(anomalies))
- for _, anomaly := range anomalies {
- t.Logf("Anomaly: timestamp=%d, value=%d, expected=%d, deviation=%.2f",
- anomaly.Timestamp, anomaly.Value, anomaly.Expected, anomaly.Deviation)
- }
- }
- }
- // TestTrendAnalysis tests trend calculation
- func TestTrendAnalysis(t *testing.T) {
- processor := NewAdvancedTimeSeriesProcessor()
- // Test increasing trend
- increasingData := []TimeValue{
- {Timestamp: 1000, Value: 10},
- {Timestamp: 1060, Value: 15},
- {Timestamp: 1120, Value: 20},
- {Timestamp: 1180, Value: 25},
- {Timestamp: 1240, Value: 30},
- }
- trend := processor.CalculateTrend(increasingData)
- if trend.Direction != "increasing" {
- t.Errorf("Expected increasing trend, got %s", trend.Direction)
- }
- if trend.Slope <= 0 {
- t.Errorf("Expected positive slope, got %f", trend.Slope)
- }
- // Test decreasing trend
- decreasingData := []TimeValue{
- {Timestamp: 1000, Value: 30},
- {Timestamp: 1060, Value: 25},
- {Timestamp: 1120, Value: 20},
- {Timestamp: 1180, Value: 15},
- {Timestamp: 1240, Value: 10},
- }
- trend = processor.CalculateTrend(decreasingData)
- if trend.Direction != "decreasing" {
- t.Errorf("Expected decreasing trend, got %s", trend.Direction)
- }
- if trend.Slope >= 0 {
- t.Errorf("Expected negative slope, got %f", trend.Slope)
- }
- }
- // BenchmarkOptimizedTimeSeriesProcessing benchmarks the optimized processing
- func BenchmarkOptimizedTimeSeriesProcessing(b *testing.B) {
- processor := NewOptimizedTimeSeriesProcessor()
- mockSearcher := &mockSearcher{}
- req := &VisitorsByTimeRequest{
- StartTime: time.Now().Unix() - 3600,
- EndTime: time.Now().Unix(),
- LogPaths: []string{"/var/log/nginx/access.log"},
- IntervalSeconds: 60,
- }
- b.ResetTimer()
- b.ReportAllocs()
- b.Run("OptimizedVisitorsByTime", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- _, err := processor.OptimizedGetVisitorsByTime(context.Background(), req, mockSearcher)
- if err != nil {
- b.Fatal(err)
- }
- }
- })
- }
- // BenchmarkTimeBucketOperations benchmarks time bucket operations
- func BenchmarkTimeBucketOperations(b *testing.B) {
- bucket := NewTimeBucket(time.Now().Unix())
- b.ResetTimer()
- b.ReportAllocs()
- b.Run("AddEntry", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- ip := "192.168.1." + string(rune('1'+(i%254)))
- bucket.AddEntry(ip, 200, "GET", "/api/test", 1024)
- }
- })
- b.Run("GetUniqueVisitorCount", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- _ = bucket.GetUniqueVisitorCount()
- }
- })
- }
- // BenchmarkHyperLogLog benchmarks HyperLogLog operations
- func BenchmarkHyperLogLog(b *testing.B) {
- hll := NewHyperLogLog(12) // 4096 buckets
- b.ResetTimer()
- b.ReportAllocs()
- b.Run("Add", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- ip := "192.168.1." + string(rune('1'+(i%255)))
- hll.Add(ip)
- }
- })
- b.Run("Count", func(b *testing.B) {
- // Add some data first
- for i := 0; i < 1000; i++ {
- hll.Add("192.168.1." + string(rune('1'+(i%255))))
- }
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- _ = hll.Count()
- }
- })
- }
- // BenchmarkTimeSeriesCache benchmarks cache operations
- func BenchmarkTimeSeriesCache(b *testing.B) {
- cache := NewTimeSeriesCache(1000, 3600)
- // Pre-populate cache
- for i := 0; i < 500; i++ {
- key := "key_" + string(rune('0'+(i%10)))
- cache.Put(key, i)
- }
- b.ResetTimer()
- b.ReportAllocs()
- b.Run("Get", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- key := "key_" + string(rune('0'+(i%10)))
- cache.Get(key)
- }
- })
- b.Run("Put", func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- key := "bench_key_" + string(rune('0'+(i%100)))
- cache.Put(key, i)
- }
- })
- }
- // BenchmarkAnomalyDetection benchmarks anomaly detection
- func BenchmarkAnomalyDetection(b *testing.B) {
- processor := NewAdvancedTimeSeriesProcessor()
- // Generate test data
- testData := make([]TimeValue, 100)
- for i := 0; i < 100; i++ {
- testData[i] = TimeValue{
- Timestamp: int64(1000 + i*60),
- Value: 10 + (i % 5), // Normal pattern with occasional spikes
- }
- }
- // Add some anomalies
- testData[50].Value = 100
- testData[75].Value = 2
- b.ResetTimer()
- b.ReportAllocs()
- for i := 0; i < b.N; i++ {
- _ = processor.DetectAnomalies(testData)
- }
- }
- // BenchmarkTrendCalculation benchmarks trend calculation
- func BenchmarkTrendCalculation(b *testing.B) {
- processor := NewAdvancedTimeSeriesProcessor()
- // Generate test data with trend
- testData := make([]TimeValue, 50)
- for i := 0; i < 50; i++ {
- testData[i] = TimeValue{
- Timestamp: int64(1000 + i*60),
- Value: 10 + i/2, // Increasing trend
- }
- }
- b.ResetTimer()
- b.ReportAllocs()
- for i := 0; i < b.N; i++ {
- _ = processor.CalculateTrend(testData)
- }
- }
|