cardinality_counter.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. package searcher
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "github.com/blevesearch/bleve/v2"
  7. "github.com/blevesearch/bleve/v2/search"
  8. "github.com/blevesearch/bleve/v2/search/query"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // CardinalityCounter provides efficient unique value counting without large FacetSize
  12. type CardinalityCounter struct {
  13. indexAlias bleve.IndexAlias // Use IndexAlias instead of individual shards
  14. shards []bleve.Index // Keep shards for fallback if needed
  15. mu sync.RWMutex
  16. }
  17. // NewCardinalityCounter creates a new cardinality counter
  18. func NewCardinalityCounter(shards []bleve.Index) *CardinalityCounter {
  19. var indexAlias bleve.IndexAlias
  20. if len(shards) > 0 {
  21. // Create IndexAlias for distributed search like DistributedSearcher does
  22. indexAlias = bleve.NewIndexAlias(shards...)
  23. // Note: IndexAlias doesn't have SetIndexMapping method
  24. // The mapping will be inherited from the constituent indices
  25. logger.Debugf("Created IndexAlias for cardinality counter with %d shards", len(shards))
  26. }
  27. return &CardinalityCounter{
  28. indexAlias: indexAlias,
  29. shards: shards,
  30. }
  31. }
  32. // CardinalityRequest represents a request for unique value counting
  33. type CardinalityRequest struct {
  34. Field string `json:"field"`
  35. Query query.Query `json:"query,omitempty"` // Optional query to filter documents
  36. StartTime *int64 `json:"start_time,omitempty"`
  37. EndTime *int64 `json:"end_time,omitempty"`
  38. LogPaths []string `json:"log_paths,omitempty"`
  39. }
  40. // CardinalityResult represents the result of cardinality counting
  41. type CardinalityResult struct {
  42. Field string `json:"field"`
  43. Cardinality uint64 `json:"cardinality"`
  44. TotalDocs uint64 `json:"total_docs"`
  45. Error string `json:"error,omitempty"`
  46. }
  47. // CountCardinality efficiently counts unique values using IndexAlias with global scoring
  48. // This leverages Bleve's distributed search optimizations and avoids FacetSize limits
  49. func (cc *CardinalityCounter) CountCardinality(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  50. cc.mu.RLock()
  51. defer cc.mu.RUnlock()
  52. if req.Field == "" {
  53. return nil, fmt.Errorf("field name is required")
  54. }
  55. if cc.indexAlias == nil {
  56. return &CardinalityResult{
  57. Field: req.Field,
  58. Error: "IndexAlias not available",
  59. }, fmt.Errorf("IndexAlias not available")
  60. }
  61. // Use IndexAlias with global scoring for consistent distributed search
  62. uniqueTerms, totalDocs, err := cc.collectTermsUsingIndexAlias(ctx, req)
  63. if err != nil {
  64. return &CardinalityResult{
  65. Field: req.Field,
  66. Error: fmt.Sprintf("failed to collect terms: %v", err),
  67. }, err
  68. }
  69. logger.Infof("Cardinality count completed: field='%s', unique_terms=%d, total_docs=%d",
  70. req.Field, len(uniqueTerms), totalDocs)
  71. return &CardinalityResult{
  72. Field: req.Field,
  73. Cardinality: uint64(len(uniqueTerms)),
  74. TotalDocs: totalDocs,
  75. }, nil
  76. }
  77. // collectTermsUsingIndexAlias collects unique terms using IndexAlias with global scoring
  78. func (cc *CardinalityCounter) collectTermsUsingIndexAlias(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  79. uniqueTerms := make(map[string]struct{})
  80. // Enable global scoring context like DistributedSearcher does
  81. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  82. // Strategy 1: Try large facet first (more efficient for most cases)
  83. terms1, totalDocs, err1 := cc.collectTermsUsingLargeFacet(globalCtx, req)
  84. if err1 != nil {
  85. logger.Warnf("Large facet collection failed: %v", err1)
  86. } else {
  87. for term := range terms1 {
  88. uniqueTerms[term] = struct{}{}
  89. }
  90. logger.Infof("Large facet collected %d unique terms", len(terms1))
  91. }
  92. // Strategy 2: Use pagination if facet was likely truncated or failed
  93. needsPagination := len(terms1) >= 50000 || err1 != nil
  94. if needsPagination {
  95. logger.Infof("Using pagination to collect remaining terms...")
  96. terms2, _, err2 := cc.collectTermsUsingPagination(globalCtx, req)
  97. if err2 != nil {
  98. logger.Warnf("Pagination collection failed: %v", err2)
  99. } else {
  100. for term := range terms2 {
  101. uniqueTerms[term] = struct{}{}
  102. }
  103. logger.Infof("Pagination collected additional terms, total unique: %d", len(uniqueTerms))
  104. }
  105. }
  106. return uniqueTerms, totalDocs, nil
  107. }
  108. // collectTermsUsingLargeFacet uses IndexAlias with a large facet to efficiently collect terms
  109. func (cc *CardinalityCounter) collectTermsUsingLargeFacet(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  110. terms := make(map[string]struct{})
  111. // Build search request using IndexAlias
  112. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  113. searchReq.Size = 0 // We don't need documents, just facets
  114. // Add time range filter if specified
  115. if req.StartTime != nil && req.EndTime != nil {
  116. startTime := float64(*req.StartTime)
  117. endTime := float64(*req.EndTime)
  118. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  119. timeQuery.SetField("timestamp")
  120. boolQuery := bleve.NewBooleanQuery()
  121. boolQuery.AddMust(searchReq.Query)
  122. boolQuery.AddMust(timeQuery)
  123. searchReq.Query = boolQuery
  124. }
  125. // Use very large facet size - we're back to this approach but using IndexAlias
  126. // which should handle it more efficiently than individual shards
  127. facetSize := 100000 // Large size for maximum accuracy
  128. facet := bleve.NewFacetRequest(req.Field, facetSize)
  129. searchReq.AddFacet(req.Field, facet)
  130. // Execute search using IndexAlias with global scoring context
  131. result, err := cc.indexAlias.SearchInContext(ctx, searchReq)
  132. if err != nil {
  133. return terms, 0, fmt.Errorf("IndexAlias facet search failed: %w", err)
  134. }
  135. // Extract terms from facet result
  136. if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {
  137. facetTerms := facetResult.Terms.Terms()
  138. for _, term := range facetTerms {
  139. terms[term.Term] = struct{}{}
  140. }
  141. logger.Infof("IndexAlias large facet: collected %d terms, facet.Total=%d, result.Total=%d",
  142. len(terms), facetResult.Total, result.Total)
  143. // Check if facet was truncated
  144. if len(facetTerms) >= facetSize {
  145. logger.Warnf("Facet likely truncated at %d terms, total unique may be higher", facetSize)
  146. }
  147. }
  148. return terms, result.Total, nil
  149. }
  150. // collectTermsUsingPagination uses IndexAlias with pagination to collect all terms
  151. func (cc *CardinalityCounter) collectTermsUsingPagination(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  152. terms := make(map[string]struct{})
  153. pageSize := 10000 // Large page size for efficiency
  154. maxPages := 1000 // Support very large datasets
  155. processedDocs := 0
  156. logger.Infof("Starting IndexAlias pagination for field '%s' (pageSize=%d)", req.Field, pageSize)
  157. for page := 0; page < maxPages; page++ {
  158. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  159. searchReq.Size = pageSize
  160. searchReq.From = page * pageSize
  161. searchReq.Fields = []string{req.Field}
  162. // Add time range filter if specified
  163. if req.StartTime != nil && req.EndTime != nil {
  164. startTime := float64(*req.StartTime)
  165. endTime := float64(*req.EndTime)
  166. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  167. timeQuery.SetField("timestamp")
  168. boolQuery := bleve.NewBooleanQuery()
  169. boolQuery.AddMust(searchReq.Query)
  170. boolQuery.AddMust(timeQuery)
  171. searchReq.Query = boolQuery
  172. }
  173. // Execute with IndexAlias and global scoring
  174. result, err := cc.indexAlias.SearchInContext(ctx, searchReq)
  175. if err != nil {
  176. return terms, 0, fmt.Errorf("IndexAlias pagination search failed at page %d: %w", page, err)
  177. }
  178. // If no more hits, we're done
  179. if len(result.Hits) == 0 {
  180. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  181. processedDocs, len(terms))
  182. break
  183. }
  184. // Extract unique terms from documents
  185. for _, hit := range result.Hits {
  186. if fieldValue, ok := hit.Fields[req.Field]; ok {
  187. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  188. terms[strValue] = struct{}{}
  189. }
  190. }
  191. }
  192. processedDocs += len(result.Hits)
  193. // Progress logging
  194. if processedDocs%50000 == 0 && processedDocs > 0 {
  195. logger.Infof("Pagination progress: processed %d documents, found %d unique terms",
  196. processedDocs, len(terms))
  197. }
  198. // If we got fewer results than pageSize, we've reached the end
  199. if len(result.Hits) < pageSize {
  200. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  201. processedDocs, len(terms))
  202. break
  203. }
  204. // Generous safety limit
  205. if len(terms) > 500000 {
  206. logger.Warnf("Very large cardinality detected (%d terms), stopping for memory safety", len(terms))
  207. break
  208. }
  209. }
  210. return terms, uint64(processedDocs), nil
  211. }
  212. // countShardCardinality counts unique values in a single shard (legacy method)
  213. func (cc *CardinalityCounter) countShardCardinality(ctx context.Context, shard bleve.Index, shardID int, req *CardinalityRequest) (uint64, uint64, error) {
  214. // For now, we'll use a small facet to get an estimate
  215. // In the future, this could be optimized with direct index access
  216. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  217. searchReq.Size = 0 // We don't need actual documents
  218. // Add time range filter if specified
  219. if req.StartTime != nil && req.EndTime != nil {
  220. startTime := float64(*req.StartTime)
  221. endTime := float64(*req.EndTime)
  222. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  223. timeQuery.SetField("timestamp")
  224. boolQuery := bleve.NewBooleanQuery()
  225. boolQuery.AddMust(searchReq.Query)
  226. boolQuery.AddMust(timeQuery)
  227. searchReq.Query = boolQuery
  228. }
  229. // Add custom query filter if provided
  230. if req.Query != nil {
  231. boolQuery := bleve.NewBooleanQuery()
  232. boolQuery.AddMust(searchReq.Query)
  233. boolQuery.AddMust(req.Query)
  234. searchReq.Query = boolQuery
  235. }
  236. // Use a small facet just to get document count, not for cardinality
  237. facet := bleve.NewFacetRequest(req.Field, 1) // Minimal size
  238. searchReq.AddFacet(req.Field, facet)
  239. result, err := shard.Search(searchReq)
  240. if err != nil {
  241. return 0, 0, err
  242. }
  243. return 0, result.Total, nil
  244. }
  245. // getShardTerms retrieves unique terms from a shard using multiple strategies to avoid FacetSize limits
  246. func (cc *CardinalityCounter) getShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  247. // Try multiple approaches for maximum accuracy
  248. // Strategy 1: Use large facet first (still more efficient than old 100k)
  249. terms1, err1 := cc.getTermsUsingLargeFacet(ctx, shard, req)
  250. if err1 != nil {
  251. logger.Warnf("Large facet strategy failed: %v", err1)
  252. }
  253. // Strategy 2: Use pagination to get remaining terms
  254. terms2, err2 := cc.getTermsUsingPagination(ctx, shard, req)
  255. if err2 != nil {
  256. logger.Warnf("Pagination strategy failed: %v", err2)
  257. }
  258. // Merge results from both strategies
  259. allTerms := make(map[string]struct{})
  260. for term := range terms1 {
  261. allTerms[term] = struct{}{}
  262. }
  263. for term := range terms2 {
  264. allTerms[term] = struct{}{}
  265. }
  266. logger.Infof("Combined strategies found %d unique terms for field '%s' (facet: %d, pagination: %d)",
  267. len(allTerms), req.Field, len(terms1), len(terms2))
  268. return allTerms, nil
  269. }
  270. // getTermsUsingLargeFacet uses a large facet to collect terms efficiently
  271. func (cc *CardinalityCounter) getTermsUsingLargeFacet(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  272. terms := make(map[string]struct{})
  273. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  274. searchReq.Size = 0 // We don't need documents, just facets
  275. // Add time range filter if specified
  276. if req.StartTime != nil && req.EndTime != nil {
  277. startTime := float64(*req.StartTime)
  278. endTime := float64(*req.EndTime)
  279. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  280. timeQuery.SetField("timestamp")
  281. boolQuery := bleve.NewBooleanQuery()
  282. boolQuery.AddMust(searchReq.Query)
  283. boolQuery.AddMust(timeQuery)
  284. searchReq.Query = boolQuery
  285. }
  286. // Use a large facet size - larger than before but not excessive
  287. facetSize := 50000 // Compromise: large enough for most cases, but not memory-killing
  288. facet := bleve.NewFacetRequest(req.Field, facetSize)
  289. searchReq.AddFacet(req.Field, facet)
  290. result, err := shard.Search(searchReq)
  291. if err != nil {
  292. return terms, err
  293. }
  294. // Extract terms from facet result
  295. if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {
  296. facetTerms := facetResult.Terms.Terms()
  297. for _, term := range facetTerms {
  298. terms[term.Term] = struct{}{}
  299. }
  300. logger.Debugf("Large facet collected %d terms (facet reports total: %d)", len(terms), facetResult.Total)
  301. // If facet is truncated, we know we need pagination
  302. if len(facetTerms) >= facetSize {
  303. logger.Warnf("Facet truncated at %d terms, pagination needed for complete results", facetSize)
  304. }
  305. }
  306. return terms, nil
  307. }
  308. // getTermsUsingPagination uses document pagination to collect all terms
  309. func (cc *CardinalityCounter) getTermsUsingPagination(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  310. terms := make(map[string]struct{})
  311. // Use pagination approach to collect all terms without FacetSize limitation
  312. // This iterates through result pages to get complete term list
  313. pageSize := 5000 // Larger page size for efficiency
  314. maxPages := 1000 // Higher limit to handle large datasets
  315. processedDocs := 0
  316. logger.Infof("Starting cardinality collection for field '%s' with pagination (pageSize=%d)", req.Field, pageSize)
  317. for page := 0; page < maxPages; page++ {
  318. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  319. searchReq.Size = pageSize
  320. searchReq.From = page * pageSize
  321. searchReq.Fields = []string{req.Field} // Only fetch the field we need
  322. // Add time range filter if specified
  323. if req.StartTime != nil && req.EndTime != nil {
  324. startTime := float64(*req.StartTime)
  325. endTime := float64(*req.EndTime)
  326. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  327. timeQuery.SetField("timestamp")
  328. boolQuery := bleve.NewBooleanQuery()
  329. boolQuery.AddMust(searchReq.Query)
  330. boolQuery.AddMust(timeQuery)
  331. searchReq.Query = boolQuery
  332. }
  333. result, err := shard.Search(searchReq)
  334. if err != nil {
  335. return terms, err
  336. }
  337. // If no more hits, we're done
  338. if len(result.Hits) == 0 {
  339. break
  340. }
  341. // Extract unique terms from documents
  342. for _, hit := range result.Hits {
  343. if fieldValue, ok := hit.Fields[req.Field]; ok {
  344. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  345. terms[strValue] = struct{}{}
  346. }
  347. }
  348. }
  349. processedDocs += len(result.Hits)
  350. // Progress logging every 50K documents
  351. if processedDocs%50000 == 0 && processedDocs > 0 {
  352. logger.Infof("Progress: processed %d documents, found %d unique terms for field '%s'",
  353. processedDocs, len(terms), req.Field)
  354. }
  355. // If we got fewer results than pageSize, we've reached the end
  356. if len(result.Hits) < pageSize {
  357. logger.Infof("Completed: processed %d documents, found %d unique terms for field '%s'",
  358. processedDocs, len(terms), req.Field)
  359. break
  360. }
  361. // Increased safety limit for large datasets, but with warning
  362. if len(terms) > 200000 {
  363. logger.Warnf("Very large number of unique terms detected (%d), stopping collection for field %s. Consider using EstimateCardinality for better performance", len(terms), req.Field)
  364. break
  365. }
  366. }
  367. return terms, nil
  368. }
  369. // EstimateCardinality provides a fast cardinality estimate using sampling approach
  370. // This is useful for very large datasets where exact counting might be expensive
  371. func (cc *CardinalityCounter) EstimateCardinality(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  372. cc.mu.RLock()
  373. defer cc.mu.RUnlock()
  374. if req.Field == "" {
  375. return nil, fmt.Errorf("field name is required")
  376. }
  377. // Use statistical sampling for very large datasets
  378. // Take a sample and extrapolate to estimate total cardinality
  379. sampleSize := 10000 // Sample 10K documents
  380. uniqueInSample := make(map[string]struct{})
  381. totalSampleDocs := uint64(0)
  382. // Process each shard with sampling
  383. for i, shard := range cc.shards {
  384. if shard == nil {
  385. continue
  386. }
  387. shardSample, shardTotal, err := cc.sampleShardTerms(ctx, shard, req, sampleSize/len(cc.shards))
  388. if err != nil {
  389. logger.Errorf("Failed to sample shard %d: %v", i, err)
  390. continue
  391. }
  392. totalSampleDocs += shardTotal
  393. // Merge unique terms from sample
  394. for term := range shardSample {
  395. uniqueInSample[term] = struct{}{}
  396. }
  397. }
  398. // For now, still use exact counting for accuracy
  399. // In the future, we could use the sample to extrapolate:
  400. // sampledUnique := uint64(len(uniqueInSample))
  401. // estimatedCardinality := sampledUnique * (totalDocs / totalSampleDocs)
  402. if totalSampleDocs == 0 {
  403. return &CardinalityResult{
  404. Field: req.Field,
  405. Cardinality: 0,
  406. TotalDocs: 0,
  407. }, nil
  408. }
  409. // For accurate results with large datasets, we use exact counting
  410. // The sampling code above is kept for future statistical estimation
  411. return cc.CountCardinality(ctx, req)
  412. }
  413. // sampleShardTerms takes a statistical sample from a shard for cardinality estimation
  414. func (cc *CardinalityCounter) sampleShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest, sampleSize int) (map[string]struct{}, uint64, error) {
  415. terms := make(map[string]struct{})
  416. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  417. searchReq.Size = sampleSize
  418. searchReq.Fields = []string{req.Field}
  419. // Add time range filter if specified
  420. if req.StartTime != nil && req.EndTime != nil {
  421. startTime := float64(*req.StartTime)
  422. endTime := float64(*req.EndTime)
  423. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  424. timeQuery.SetField("timestamp")
  425. boolQuery := bleve.NewBooleanQuery()
  426. boolQuery.AddMust(searchReq.Query)
  427. boolQuery.AddMust(timeQuery)
  428. searchReq.Query = boolQuery
  429. }
  430. result, err := shard.Search(searchReq)
  431. if err != nil {
  432. return terms, 0, err
  433. }
  434. // Extract terms from sample
  435. for _, hit := range result.Hits {
  436. if fieldValue, ok := hit.Fields[req.Field]; ok {
  437. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  438. terms[strValue] = struct{}{}
  439. }
  440. }
  441. }
  442. return terms, result.Total, nil
  443. }
  444. // BatchCountCardinality counts cardinality for multiple fields efficiently
  445. func (cc *CardinalityCounter) BatchCountCardinality(ctx context.Context, fields []string, baseReq *CardinalityRequest) (map[string]*CardinalityResult, error) {
  446. results := make(map[string]*CardinalityResult)
  447. // Process fields in parallel
  448. var wg sync.WaitGroup
  449. var mu sync.Mutex
  450. for _, field := range fields {
  451. wg.Add(1)
  452. go func(f string) {
  453. defer wg.Done()
  454. req := *baseReq // Copy base request
  455. req.Field = f
  456. result, err := cc.CountCardinality(ctx, &req)
  457. if err != nil {
  458. result = &CardinalityResult{
  459. Field: f,
  460. Error: err.Error(),
  461. }
  462. }
  463. mu.Lock()
  464. results[f] = result
  465. mu.Unlock()
  466. }(field)
  467. }
  468. wg.Wait()
  469. return results, nil
  470. }