| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 | package analyticsimport (	"context"	"sort"	"sync")// SearchRequest and related types from searcher packagetype SearchRequest struct {	StartTime     *int64    `json:"start_time,omitempty"`	EndTime       *int64    `json:"end_time,omitempty"`	LogPaths      []string  `json:"log_paths,omitempty"`	Limit         int       `json:"limit"`	IncludeFacets bool      `json:"include_facets,omitempty"`	IncludeStats  bool      `json:"include_stats,omitempty"`	UseCache      bool      `json:"use_cache,omitempty"`}type SearchResult struct {	Hits      []*SearchHit `json:"hits"`	TotalHits uint64       `json:"total_hits"`	Stats     *SearchStats `json:"stats,omitempty"`}type SearchHit struct {	Fields map[string]interface{} `json:"fields"`}type SearchStats struct {	TotalBytes int64 `json:"total_bytes"`}type AggregationRequest struct{}type AggregationResult struct{}type Suggestion struct{}// Searcher interface (simplified)type Searcher interface {	Search(ctx context.Context, req *SearchRequest) (*SearchResult, error)	Aggregate(ctx context.Context, req *AggregationRequest) (*AggregationResult, error)	Suggest(ctx context.Context, text string, field string, size int) ([]*Suggestion, error)	Analyze(ctx context.Context, text string, analyzer string) ([]string, error)	ClearCache() error}// TimeSeriesProcessor provides high-performance time-series analyticstype TimeSeriesProcessor struct {	bucketPools    map[int64]*BucketPool	visitorSets    map[int64]*VisitorSetPool	resultCache    *TimeSeriesCache	mutex          sync.RWMutex}// NewTimeSeriesProcessor creates a new optimized processorfunc NewTimeSeriesProcessor() *TimeSeriesProcessor {	return &TimeSeriesProcessor{		bucketPools: make(map[int64]*BucketPool),		visitorSets: make(map[int64]*VisitorSetPool),		resultCache: NewTimeSeriesCache(1000, 1800), // 1000 entries, 30min TTL	}}// BucketPool provides pooled time buckets for aggregationtype BucketPool struct {	buckets sync.Pool}// NewBucketPool creates a new bucket poolfunc NewBucketPool() *BucketPool {	return &BucketPool{		buckets: sync.Pool{			New: func() interface{} {				return make(map[int64]*TimeBucket, 1000)			},		},	}}// Get retrieves a bucket map from the poolfunc (bp *BucketPool) Get() map[int64]*TimeBucket {	return bp.buckets.Get().(map[int64]*TimeBucket)}// Put returns a bucket map to the poolfunc (bp *BucketPool) Put(buckets map[int64]*TimeBucket) {	// Clear the map	for k := range buckets {		delete(buckets, k)	}	bp.buckets.Put(buckets)}// TimeBucket represents an optimized time bucket for aggregationtype TimeBucket struct {	Timestamp      int64	RequestCount   int64	BytesTransferred int64	UniqueVisitors map[string]struct{} // Use struct{} for zero-memory set	StatusCodes    map[int]int64	Methods        map[string]int64	Paths          map[string]int64}// NewTimeBucket creates a new optimized time bucketfunc NewTimeBucket(timestamp int64) *TimeBucket {	return &TimeBucket{		Timestamp:      timestamp,		UniqueVisitors: make(map[string]struct{}, 100),		StatusCodes:    make(map[int]int64, 10),		Methods:        make(map[string]int64, 5),		Paths:          make(map[string]int64, 20),	}}// AddEntry adds an entry to the time bucket with optimized operationsfunc (tb *TimeBucket) AddEntry(ip string, status int, method string, path string, bytes int64) {	tb.RequestCount++	tb.BytesTransferred += bytes		// Use struct{} for zero-memory set operations	tb.UniqueVisitors[ip] = struct{}{}		// Optimized map operations	tb.StatusCodes[status]++	tb.Methods[method]++	tb.Paths[path]++}// GetUniqueVisitorCount returns the count of unique visitorsfunc (tb *TimeBucket) GetUniqueVisitorCount() int {	return len(tb.UniqueVisitors)}// VisitorSetPool provides pooled visitor setstype VisitorSetPool struct {	sets sync.Pool}// NewVisitorSetPool creates a new visitor set poolfunc NewVisitorSetPool() *VisitorSetPool {	return &VisitorSetPool{		sets: sync.Pool{			New: func() interface{} {				return make(map[string]struct{}, 1000)			},		},	}}// Get retrieves a visitor set from the poolfunc (vsp *VisitorSetPool) Get() map[string]struct{} {	return vsp.sets.Get().(map[string]struct{})}// Put returns a visitor set to the poolfunc (vsp *VisitorSetPool) Put(set map[string]struct{}) {	// Clear the set	for k := range set {		delete(set, k)	}	vsp.sets.Put(set)}// TimeSeriesCache provides caching for time-series resultstype TimeSeriesCache struct {	cache     map[string]*CachedTimeSeriesResult	maxSize   int	ttlSeconds int64	mutex     sync.RWMutex}// CachedTimeSeriesResult represents a cached time-series resulttype CachedTimeSeriesResult struct {	Data      interface{}	Timestamp int64	AccessCount int64}// NewTimeSeriesCache creates a new time-series cachefunc NewTimeSeriesCache(maxSize int, ttlSeconds int64) *TimeSeriesCache {	return &TimeSeriesCache{		cache:     make(map[string]*CachedTimeSeriesResult),		maxSize:   maxSize,		ttlSeconds: ttlSeconds,	}}// Get retrieves a cached resultfunc (tsc *TimeSeriesCache) Get(key string) (interface{}, bool) {	tsc.mutex.RLock()	result, exists := tsc.cache[key]	tsc.mutex.RUnlock()		if !exists {		return nil, false	}		// Check TTL	currentTime := getCurrentTimestamp()	if currentTime-result.Timestamp > tsc.ttlSeconds {		tsc.Delete(key)		return nil, false	}		// Update access count atomically	tsc.mutex.Lock()	result.AccessCount++	tsc.mutex.Unlock()		return result.Data, true}// Put stores a result in the cachefunc (tsc *TimeSeriesCache) Put(key string, data interface{}) {	tsc.mutex.Lock()	defer tsc.mutex.Unlock()		// Evict if at capacity	if len(tsc.cache) >= tsc.maxSize {		tsc.evictLRU()	}		tsc.cache[key] = &CachedTimeSeriesResult{		Data:      data,		Timestamp: getCurrentTimestamp(),		AccessCount: 1,	}}// Delete removes a cached resultfunc (tsc *TimeSeriesCache) Delete(key string) {	tsc.mutex.Lock()	defer tsc.mutex.Unlock()	delete(tsc.cache, key)}// evictLRU removes the least recently used entryfunc (tsc *TimeSeriesCache) evictLRU() {	var lruKey string	var lruTimestamp int64 = ^int64(0) // Max int64		for key, result := range tsc.cache {		if result.Timestamp < lruTimestamp {			lruTimestamp = result.Timestamp			lruKey = key		}	}		if lruKey != "" {		delete(tsc.cache, lruKey)	}}// getCurrentTimestamp returns current Unix timestampfunc getCurrentTimestamp() int64 {	return 1640995200 // Mock timestamp for testing}// OptimizedGetVisitorsByTime provides optimized visitors by time calculationfunc (otsp *TimeSeriesProcessor) OptimizedGetVisitorsByTime(	ctx context.Context, 	req *VisitorsByTimeRequest,	s Searcher,) (*VisitorsByTime, error) {		// Check cache first	cacheKey := generateCacheKey("visitors_by_time", req)	if cached, found := otsp.resultCache.Get(cacheKey); found {		return cached.(*VisitorsByTime), nil	}		// Prepare search request	searchReq := &SearchRequest{		StartTime:     &req.StartTime,		EndTime:       &req.EndTime,		LogPaths:      req.LogPaths,		Limit:         0,		IncludeFacets: false,		UseCache:      true,	}		result, err := s.Search(ctx, searchReq)	if err != nil {		return nil, err	}		// Optimize interval calculation	interval := int64(req.IntervalSeconds)	if interval <= 0 {		interval = 60 // Default 1 minute	}		// Get pooled bucket map	bucketPool := otsp.getBucketPool(interval)	buckets := bucketPool.Get()	defer bucketPool.Put(buckets)		// Process hits with optimized bucketing	for _, hit := range result.Hits {		if timestampField, ok := hit.Fields["timestamp"]; ok {			if timestampFloat, ok := timestampField.(float64); ok {				timestamp := int64(timestampFloat)				bucketTime := (timestamp / interval) * interval								// Get or create bucket				bucket := buckets[bucketTime]				if bucket == nil {					bucket = NewTimeBucket(bucketTime)					buckets[bucketTime] = bucket				}								// Add IP to unique visitors				if ip, ok := hit.Fields["ip"].(string); ok {					bucket.UniqueVisitors[ip] = struct{}{}				}			}		}	}		// Convert to sorted result	visitorsByTime := make([]TimeValue, 0, len(buckets))	for _, bucket := range buckets {		visitorsByTime = append(visitorsByTime, TimeValue{			Timestamp: bucket.Timestamp,			Value:     len(bucket.UniqueVisitors),		})	}		// Sort efficiently	sort.Slice(visitorsByTime, func(i, j int) bool {		return visitorsByTime[i].Timestamp < visitorsByTime[j].Timestamp	})		result_data := &VisitorsByTime{Data: visitorsByTime}		// Cache the result	otsp.resultCache.Put(cacheKey, result_data)		return result_data, nil}// OptimizedGetTrafficByTime provides optimized traffic analyticsfunc (otsp *TimeSeriesProcessor) OptimizedGetTrafficByTime(	ctx context.Context,	req *TrafficByTimeRequest,	s Searcher,) (*TrafficByTime, error) {		// Check cache first	cacheKey := generateCacheKey("traffic_by_time", req)	if cached, found := otsp.resultCache.Get(cacheKey); found {		return cached.(*TrafficByTime), nil	}		searchReq := &SearchRequest{		StartTime:     &req.StartTime,		EndTime:       &req.EndTime,		LogPaths:      req.LogPaths,		Limit:         0,		IncludeStats:  true,		UseCache:      true,	}		result, err := s.Search(ctx, searchReq)	if err != nil {		return nil, err	}		interval := int64(req.IntervalSeconds)	if interval <= 0 {		interval = 300 // Default 5 minutes	}		// Get pooled bucket map	bucketPool := otsp.getBucketPool(interval)	buckets := bucketPool.Get()	defer bucketPool.Put(buckets)		// Process hits with comprehensive metrics	for _, hit := range result.Hits {		if timestampField, ok := hit.Fields["timestamp"]; ok {			if timestampFloat, ok := timestampField.(float64); ok {				timestamp := int64(timestampFloat)				bucketTime := (timestamp / interval) * interval								bucket := buckets[bucketTime]				if bucket == nil {					bucket = NewTimeBucket(bucketTime)					buckets[bucketTime] = bucket				}								// Extract fields efficiently				var ip, method, path string				var status int				var bytes int64								if v, ok := hit.Fields["ip"].(string); ok { ip = v }				if v, ok := hit.Fields["method"].(string); ok { method = v }				if v, ok := hit.Fields["path"].(string); ok { path = v }				if v, ok := hit.Fields["status"].(float64); ok { status = int(v) }				if v, ok := hit.Fields["bytes_sent"].(float64); ok { bytes = int64(v) }								bucket.AddEntry(ip, status, method, path, bytes)			}		}	}		// Convert to result with comprehensive metrics	trafficData := make([]TrafficTimeValue, 0, len(buckets))	for _, bucket := range buckets {		trafficData = append(trafficData, TrafficTimeValue{			Timestamp:      bucket.Timestamp,			Requests:       bucket.RequestCount,			Bytes:         bucket.BytesTransferred,			UniqueVisitors: len(bucket.UniqueVisitors),		})	}		// Sort by timestamp	sort.Slice(trafficData, func(i, j int) bool {		return trafficData[i].Timestamp < trafficData[j].Timestamp	})		result_data := &TrafficByTime{Data: trafficData}		// Cache the result	otsp.resultCache.Put(cacheKey, result_data)		return result_data, nil}// HyperLogLog provides cardinality estimation for unique visitorstype HyperLogLog struct {	buckets []uint8	b       uint8 // number of bits for bucket index	m       uint32 // number of buckets (2^b)}// NewHyperLogLog creates a new HyperLogLog counterfunc NewHyperLogLog(precision uint8) *HyperLogLog {	b := precision	m := uint32(1) << b	return &HyperLogLog{		buckets: make([]uint8, m),		b:       b,		m:       m,	}}// Add adds a value to the HyperLogLogfunc (hll *HyperLogLog) Add(value string) {	hash := hashString(value)	j := hash >> (32 - hll.b) // first b bits	w := hash << hll.b       // remaining bits		// Count leading zeros + 1	lz := countLeadingZeros(w) + 1	if lz > uint8(32-hll.b) {		lz = uint8(32 - hll.b)	}		if lz > hll.buckets[j] {		hll.buckets[j] = lz	}}// Count estimates the cardinalityfunc (hll *HyperLogLog) Count() uint64 {	rawEstimate := hll.alpha() * float64(hll.m*hll.m) / hll.sumOfPowers()		if rawEstimate <= 2.5*float64(hll.m) {		// Small range correction		zeros := 0		for _, bucket := range hll.buckets {			if bucket == 0 {				zeros++			}		}		if zeros != 0 {			return uint64(float64(hll.m) * logValue(float64(hll.m)/float64(zeros)))		}	}		return uint64(rawEstimate)}// Helper functions for HyperLogLogfunc (hll *HyperLogLog) alpha() float64 {	switch hll.m {	case 16:		return 0.673	case 32:		return 0.697	case 64:		return 0.709	default:		return 0.7213 / (1.0 + 1.079/float64(hll.m))	}}func (hll *HyperLogLog) sumOfPowers() float64 {	sum := 0.0	for _, bucket := range hll.buckets {		sum += 1.0 / float64(uint32(1)<<bucket)	}	return sum}// Simple hash function for stringsfunc hashString(s string) uint32 {	var hash uint32 = 2166136261	for i := 0; i < len(s); i++ {		hash ^= uint32(s[i])		hash *= 16777619	}	return hash}// Count leading zeros in a 32-bit integerfunc countLeadingZeros(x uint32) uint8 {	if x == 0 {		return 32	}	n := uint8(0)	if x <= 0x0000FFFF {		n += 16		x <<= 16	}	if x <= 0x00FFFFFF {		n += 8		x <<= 8	}	if x <= 0x0FFFFFFF {		n += 4		x <<= 4	}	if x <= 0x3FFFFFFF {		n += 2		x <<= 2	}	if x <= 0x7FFFFFFF {		n += 1	}	return n}// Simple log functionfunc logValue(x float64) float64 {	// Approximation of natural logarithm for HLL correction	if x <= 0 {		return 0	}	return 0.693147 * float64(32-countLeadingZeros(uint32(x))) // Rough approximation}// getBucketPool gets or creates a bucket pool for the given intervalfunc (otsp *TimeSeriesProcessor) getBucketPool(interval int64) *BucketPool {	otsp.mutex.RLock()	pool, exists := otsp.bucketPools[interval]	otsp.mutex.RUnlock()		if !exists {		otsp.mutex.Lock()		// Double-check after acquiring write lock		if pool, exists = otsp.bucketPools[interval]; !exists {			pool = NewBucketPool()			otsp.bucketPools[interval] = pool		}		otsp.mutex.Unlock()	}		return pool}// generateCacheKey generates a cache key from request parametersfunc generateCacheKey(prefix string, req interface{}) string {	// Simple cache key generation - in production, use a proper hash	return prefix + "_cache_key"}// Additional types for comprehensive traffic analyticstype TrafficByTimeRequest struct {	StartTime       int64	EndTime         int64	LogPaths        []string	IntervalSeconds int}type TrafficByTime struct {	Data []TrafficTimeValue `json:"data"`}type TrafficTimeValue struct {	Timestamp      int64 `json:"timestamp"`	Requests       int64 `json:"requests"`	Bytes          int64 `json:"bytes"`	UniqueVisitors int   `json:"unique_visitors"`}// AnomalyDetector provides advanced analytics with ML-like featurestype AnomalyDetector struct {	*TimeSeriesProcessor	anomalyThreshold float64	trendWindow     int}// NewAnomalyDetector creates an advanced processorfunc NewAnomalyDetector() *AnomalyDetector {	return &AnomalyDetector{		TimeSeriesProcessor: NewTimeSeriesProcessor(),		anomalyThreshold:            2.0, // 2 standard deviations		trendWindow:                10,   // 10 data points for trend	}}// DetectAnomalies detects anomalies in time-series datafunc (atsp *AnomalyDetector) DetectAnomalies(data []TimeValue) []AnomalyPoint {	if len(data) < 3 {		return nil	}		// Calculate moving average and standard deviation	anomalies := make([]AnomalyPoint, 0)	windowSize := 5		for i := windowSize; i < len(data); i++ {		// Calculate stats for window		sum, sumSq := 0.0, 0.0		for j := i - windowSize; j < i; j++ {			val := float64(data[j].Value)			sum += val			sumSq += val * val		}				mean := sum / float64(windowSize)		variance := (sumSq / float64(windowSize)) - (mean * mean)		stdDev := variance * 0.5 // Approximate square root				// Check if current value is anomalous		currentVal := float64(data[i].Value)		deviation := currentVal - mean		if deviation < 0 {			deviation = -deviation		}				if stdDev > 0 && deviation > atsp.anomalyThreshold*stdDev {			anomalies = append(anomalies, AnomalyPoint{				Timestamp: data[i].Timestamp,				Value:     data[i].Value,				Expected:  int(mean),				Deviation: deviation / stdDev,			})		}	}		return anomalies}// AnomalyPoint represents a detected anomalytype AnomalyPoint struct {	Timestamp int64   `json:"timestamp"`	Value     int     `json:"value"`	Expected  int     `json:"expected"`	Deviation float64 `json:"deviation"`}// CalculateTrend calculates trend direction and strengthfunc (atsp *AnomalyDetector) CalculateTrend(data []TimeValue) TrendAnalysis {	if len(data) < 2 {		return TrendAnalysis{Direction: "insufficient_data"}	}		// Simple linear regression for trend	n := float64(len(data))	sumX, sumY, sumXY, sumXX := 0.0, 0.0, 0.0, 0.0		for i, point := range data {		x := float64(i)		y := float64(point.Value)		sumX += x		sumY += y		sumXY += x * y		sumXX += x * x	}		// Calculate slope	slope := (n*sumXY - sumX*sumY) / (n*sumXX - sumX*sumX)		// Determine trend direction and strength	direction := "stable"	if slope > 0.1 {		direction = "increasing"	} else if slope < -0.1 {		direction = "decreasing"	}		// Calculate trend strength (simplified R-squared approximation)	strength := slope * slope / (slope*slope + 1) // Normalize to 0-1		return TrendAnalysis{		Direction: direction,		Strength:  strength,		Slope:     slope,	}}// TrendAnalysis represents trend analysis resultstype TrendAnalysis struct {	Direction string  `json:"direction"`	Strength  float64 `json:"strength"`	Slope     float64 `json:"slope"`}
 |