| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 | package searcherimport (	"context"	"encoding/json"	"fmt"	"sync"	"github.com/blevesearch/bleve/v2"	"github.com/blevesearch/bleve/v2/search"	"github.com/blevesearch/bleve/v2/search/query"	"github.com/uozi-tech/cosy/logger")// Counter provides efficient unique value counting without large FacetSizetype Counter struct {	indexAlias bleve.IndexAlias // Use IndexAlias instead of individual shards	shards     []bleve.Index    // Keep shards for fallback if needed	mu         sync.RWMutex	stopOnce   sync.Once}// NewCounter creates a new cardinality counterfunc NewCounter(shards []bleve.Index) *Counter {	var indexAlias bleve.IndexAlias	if len(shards) > 0 {		// Create IndexAlias for distributed search like Searcher does		indexAlias = bleve.NewIndexAlias(shards...)		// Note: IndexAlias doesn't have SetIndexMapping method		// The mapping will be inherited from the constituent indices		logger.Debugf("Created IndexAlias for counter with %d shards", len(shards))	}	return &Counter{		indexAlias: indexAlias,		shards:     shards,	}}// Stop gracefully closes the counter's resources, like the IndexAlias.func (c *Counter) Stop() error {	var err error	c.stopOnce.Do(func() {		c.mu.Lock()		defer c.mu.Unlock()		if c.indexAlias != nil {			logger.Debugf("Closing IndexAlias in Counter")			err = c.indexAlias.Close()			c.indexAlias = nil		}		c.shards = nil	})	return err}// CardinalityRequest represents a request for unique value countingtype CardinalityRequest struct {	Field          string      `json:"field"`	Query          query.Query `json:"query,omitempty"` // Optional query to filter documents	StartTime      *int64      `json:"start_time,omitempty"`	EndTime        *int64      `json:"end_time,omitempty"`	LogPaths       []string    `json:"log_paths,omitempty"`	UseMainLogPath bool        `json:"use_main_log_path,omitempty"` // Use main_log_path field instead of file_path}// CardinalityResult represents the result of cardinality countingtype CardinalityResult struct {	Field       string `json:"field"`	Cardinality uint64 `json:"cardinality"`	TotalDocs   uint64 `json:"total_docs"`	Error       string `json:"error,omitempty"`}// Count efficiently counts unique values using IndexAlias with global scoring// This leverages Bleve's distributed search optimizations and avoids FacetSize limitsfunc (c *Counter) Count(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {	c.mu.RLock()	defer c.mu.RUnlock()	if req.Field == "" {		return nil, fmt.Errorf("field name is required")	}	if c.indexAlias == nil {		return &CardinalityResult{			Field: req.Field,			Error: "IndexAlias not available",		}, fmt.Errorf("IndexAlias not available")	}	// Use IndexAlias with global scoring for consistent distributed search	uniqueTerms, totalDocs, err := c.collectTermsUsingIndexAlias(ctx, req)	if err != nil {		return &CardinalityResult{			Field: req.Field,			Error: fmt.Sprintf("failed to collect terms: %v", err),		}, err	}	logger.Infof("Cardinality count completed: field='%s', unique_terms=%d, total_docs=%d",		req.Field, len(uniqueTerms), totalDocs)	return &CardinalityResult{		Field:       req.Field,		Cardinality: uint64(len(uniqueTerms)),		TotalDocs:   totalDocs,	}, nil}// collectTermsUsingIndexAlias collects unique terms using IndexAlias with global scoringfunc (c *Counter) collectTermsUsingIndexAlias(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {	uniqueTerms := make(map[string]struct{})	// Enable global scoring context like Searcher does	globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)	// Strategy 1: Try large facet first (more efficient for most cases)	terms1, totalDocs, err1 := c.collectTermsUsingLargeFacet(globalCtx, req)	if err1 != nil {		logger.Warnf("Large facet collection failed: %v", err1)	} else {		for term := range terms1 {			uniqueTerms[term] = struct{}{}		}		logger.Infof("Large facet collected %d unique terms", len(terms1))	}	// Strategy 2: Use pagination if facet was likely truncated or failed	needsPagination := len(terms1) >= 50000 || err1 != nil	if needsPagination {		logger.Infof("Using pagination to collect remaining terms...")		terms2, _, err2 := c.collectTermsUsingPagination(globalCtx, req)		if err2 != nil {			logger.Warnf("Pagination collection failed: %v", err2)		} else {			for term := range terms2 {				uniqueTerms[term] = struct{}{}			}			logger.Infof("Pagination collected additional terms, total unique: %d", len(uniqueTerms))		}	}	return uniqueTerms, totalDocs, nil}// collectTermsUsingLargeFacet uses IndexAlias with a large facet to efficiently collect termsfunc (c *Counter) collectTermsUsingLargeFacet(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {	terms := make(map[string]struct{})	// Build search request using IndexAlias with proper filtering	boolQuery := bleve.NewBooleanQuery()	boolQuery.AddMust(bleve.NewMatchAllQuery())	// Add time range filter if specified	if req.StartTime != nil && req.EndTime != nil {		startTime := float64(*req.StartTime)		endTime := float64(*req.EndTime)		timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)		timeQuery.SetField("timestamp")		boolQuery.AddMust(timeQuery)	}	// Add log path filters - use main_log_path or file_path based on request	if len(req.LogPaths) > 0 {		logPathQuery := bleve.NewBooleanQuery()		fieldName := "file_path" // default		if req.UseMainLogPath {			fieldName = "main_log_path"		}		for _, logPath := range req.LogPaths {			termQuery := bleve.NewTermQuery(logPath)			termQuery.SetField(fieldName)			logPathQuery.AddShould(termQuery)		}		logPathQuery.SetMinShould(1)		boolQuery.AddMust(logPathQuery)	}	searchReq := bleve.NewSearchRequest(boolQuery)	searchReq.Size = 0 // We don't need documents, just facets	// Use very large facet size - we're back to this approach but using IndexAlias	// which should handle it more efficiently than individual shards	facetSize := 100000 // Large size for maximum accuracy	facet := bleve.NewFacetRequest(req.Field, facetSize)	searchReq.AddFacet(req.Field, facet)	// Debug: Log the constructed query	if queryBytes, err := json.Marshal(searchReq.Query); err == nil {		logger.Debugf("Counter query: %s", string(queryBytes))	}	// Execute search using IndexAlias with global scoring context	result, err := c.indexAlias.SearchInContext(ctx, searchReq)	if err != nil {		return terms, 0, fmt.Errorf("IndexAlias facet search failed: %w", err)	}	logger.Debugf("Counter facet search result: Total=%d, Facets=%v", result.Total, result.Facets != nil)	// Extract terms from facet result	if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {		facetTerms := facetResult.Terms.Terms()		for _, term := range facetTerms {			terms[term.Term] = struct{}{}		}		logger.Infof("IndexAlias large facet: collected %d terms, facet.Total=%d, result.Total=%d",			len(terms), facetResult.Total, result.Total)		// Check if facet was truncated		if len(facetTerms) >= facetSize {			logger.Warnf("Facet likely truncated at %d terms, total unique may be higher", facetSize)		}	}	return terms, result.Total, nil}// collectTermsUsingPagination uses IndexAlias with pagination to collect all termsfunc (c *Counter) collectTermsUsingPagination(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {	terms := make(map[string]struct{})	pageSize := 10000 // Large page size for efficiency	maxPages := 1000  // Support very large datasets	processedDocs := 0	logger.Infof("Starting IndexAlias pagination for field '%s' (pageSize=%d)", req.Field, pageSize)	for page := 0; page < maxPages; page++ {		// Build proper query with all filters		boolQuery := bleve.NewBooleanQuery()		boolQuery.AddMust(bleve.NewMatchAllQuery())		// Add time range filter if specified		if req.StartTime != nil && req.EndTime != nil {			startTime := float64(*req.StartTime)			endTime := float64(*req.EndTime)			timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)			timeQuery.SetField("timestamp")			boolQuery.AddMust(timeQuery)		}		// Add log path filters, respecting UseMainLogPath		if len(req.LogPaths) > 0 {			logPathQuery := bleve.NewBooleanQuery()			fieldName := "file_path"			if req.UseMainLogPath {				fieldName = "main_log_path"			}			for _, logPath := range req.LogPaths {				termQuery := bleve.NewTermQuery(logPath)				termQuery.SetField(fieldName)				logPathQuery.AddShould(termQuery)			}			logPathQuery.SetMinShould(1)			boolQuery.AddMust(logPathQuery)		}		searchReq := bleve.NewSearchRequest(boolQuery)		searchReq.Size = pageSize		searchReq.From = page * pageSize		searchReq.Fields = []string{req.Field}		// Execute with IndexAlias and global scoring		result, err := c.indexAlias.SearchInContext(ctx, searchReq)		if err != nil {			return terms, 0, fmt.Errorf("IndexAlias pagination search failed at page %d: %w", page, err)		}		// If no more hits, we're done		if len(result.Hits) == 0 {			logger.Infof("Pagination complete: processed %d documents, found %d unique terms",				processedDocs, len(terms))			break		}		// Extract unique terms from documents		for _, hit := range result.Hits {			if fieldValue, ok := hit.Fields[req.Field]; ok {				if strValue, ok := fieldValue.(string); ok && strValue != "" {					terms[strValue] = struct{}{}				}			}		}		processedDocs += len(result.Hits)		// Progress logging		if processedDocs%50000 == 0 && processedDocs > 0 {			logger.Infof("Pagination progress: processed %d documents, found %d unique terms",				processedDocs, len(terms))		}		// If we got fewer results than pageSize, we've reached the end		if len(result.Hits) < pageSize {			logger.Infof("Pagination complete: processed %d documents, found %d unique terms",				processedDocs, len(terms))			break		}		// Generous safety limit		if len(terms) > 500000 {			logger.Warnf("Very large cardinality detected (%d terms), stopping for memory safety", len(terms))			break		}	}	return terms, uint64(processedDocs), nil}// Estimate provides a fast cardinality estimate using sampling approach// This is useful for very large datasets where exact counting might be expensivefunc (c *Counter) Estimate(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {	c.mu.RLock()	defer c.mu.RUnlock()	if req.Field == "" {		return nil, fmt.Errorf("field name is required")	}	// Use statistical sampling for very large datasets	// Take a sample and extrapolate to estimate total cardinality	sampleSize := 10000 // Sample 10K documents	uniqueInSample := make(map[string]struct{})	totalSampleDocs := uint64(0)	// Process each shard with sampling	for i, shard := range c.shards {		if shard == nil {			continue		}		shardSample, shardTotal, err := c.sampleShardTerms(ctx, shard, req, sampleSize/len(c.shards))		if err != nil {			logger.Errorf("Failed to sample shard %d: %v", i, err)			continue		}		totalSampleDocs += shardTotal		// Merge unique terms from sample		for term := range shardSample {			uniqueInSample[term] = struct{}{}		}	}	// For now, still use exact counting for accuracy	// In the future, we could use the sample to extrapolate:	// sampledUnique := uint64(len(uniqueInSample))	// estimatedCardinality := sampledUnique * (totalDocs / totalSampleDocs)	if totalSampleDocs == 0 {		return &CardinalityResult{			Field:       req.Field,			Cardinality: 0,			TotalDocs:   0,		}, nil	}	// For accurate results with large datasets, we use exact counting	// The sampling code above is kept for future statistical estimation	return c.Count(ctx, req)}// sampleShardTerms takes a statistical sample from a shard for cardinality estimationfunc (c *Counter) sampleShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest, sampleSize int) (map[string]struct{}, uint64, error) {	terms := make(map[string]struct{})	searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())	searchReq.Size = sampleSize	searchReq.Fields = []string{req.Field}	// Add time range filter if specified	if req.StartTime != nil && req.EndTime != nil {		startTime := float64(*req.StartTime)		endTime := float64(*req.EndTime)		timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)		timeQuery.SetField("timestamp")		boolQuery := bleve.NewBooleanQuery()		boolQuery.AddMust(searchReq.Query)		boolQuery.AddMust(timeQuery)		searchReq.Query = boolQuery	}	result, err := shard.Search(searchReq)	if err != nil {		return terms, 0, err	}	// Extract terms from sample	for _, hit := range result.Hits {		if fieldValue, ok := hit.Fields[req.Field]; ok {			if strValue, ok := fieldValue.(string); ok && strValue != "" {				terms[strValue] = struct{}{}			}		}	}	return terms, result.Total, nil}// BatchCount counts cardinality for multiple fields efficientlyfunc (c *Counter) BatchCount(ctx context.Context, fields []string, baseReq *CardinalityRequest) (map[string]*CardinalityResult, error) {	results := make(map[string]*CardinalityResult)	// Process fields in parallel	var wg sync.WaitGroup	var mu sync.Mutex	for _, field := range fields {		wg.Add(1)		go func(f string) {			defer wg.Done()			req := *baseReq // Copy base request			req.Field = f			result, err := c.Count(ctx, &req)			if err != nil {				result = &CardinalityResult{					Field: f,					Error: err.Error(),				}			}			mu.Lock()			results[f] = result			mu.Unlock()		}(field)	}	wg.Wait()	return results, nil}
 |