cardinality_counter.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. package searcher
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "github.com/blevesearch/bleve/v2"
  8. "github.com/blevesearch/bleve/v2/search"
  9. "github.com/blevesearch/bleve/v2/search/query"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // CardinalityCounter provides efficient unique value counting without large FacetSize
  13. type CardinalityCounter struct {
  14. indexAlias bleve.IndexAlias // Use IndexAlias instead of individual shards
  15. shards []bleve.Index // Keep shards for fallback if needed
  16. mu sync.RWMutex
  17. }
  18. // NewCardinalityCounter creates a new cardinality counter
  19. func NewCardinalityCounter(shards []bleve.Index) *CardinalityCounter {
  20. var indexAlias bleve.IndexAlias
  21. if len(shards) > 0 {
  22. // Create IndexAlias for distributed search like DistributedSearcher does
  23. indexAlias = bleve.NewIndexAlias(shards...)
  24. // Note: IndexAlias doesn't have SetIndexMapping method
  25. // The mapping will be inherited from the constituent indices
  26. logger.Debugf("Created IndexAlias for cardinality counter with %d shards", len(shards))
  27. }
  28. return &CardinalityCounter{
  29. indexAlias: indexAlias,
  30. shards: shards,
  31. }
  32. }
  33. // CardinalityRequest represents a request for unique value counting
  34. type CardinalityRequest struct {
  35. Field string `json:"field"`
  36. Query query.Query `json:"query,omitempty"` // Optional query to filter documents
  37. StartTime *int64 `json:"start_time,omitempty"`
  38. EndTime *int64 `json:"end_time,omitempty"`
  39. LogPaths []string `json:"log_paths,omitempty"`
  40. }
  41. // CardinalityResult represents the result of cardinality counting
  42. type CardinalityResult struct {
  43. Field string `json:"field"`
  44. Cardinality uint64 `json:"cardinality"`
  45. TotalDocs uint64 `json:"total_docs"`
  46. Error string `json:"error,omitempty"`
  47. }
  48. // CountCardinality efficiently counts unique values using IndexAlias with global scoring
  49. // This leverages Bleve's distributed search optimizations and avoids FacetSize limits
  50. func (cc *CardinalityCounter) CountCardinality(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  51. cc.mu.RLock()
  52. defer cc.mu.RUnlock()
  53. if req.Field == "" {
  54. return nil, fmt.Errorf("field name is required")
  55. }
  56. if cc.indexAlias == nil {
  57. return &CardinalityResult{
  58. Field: req.Field,
  59. Error: "IndexAlias not available",
  60. }, fmt.Errorf("IndexAlias not available")
  61. }
  62. // Use IndexAlias with global scoring for consistent distributed search
  63. uniqueTerms, totalDocs, err := cc.collectTermsUsingIndexAlias(ctx, req)
  64. if err != nil {
  65. return &CardinalityResult{
  66. Field: req.Field,
  67. Error: fmt.Sprintf("failed to collect terms: %v", err),
  68. }, err
  69. }
  70. logger.Infof("Cardinality count completed: field='%s', unique_terms=%d, total_docs=%d",
  71. req.Field, len(uniqueTerms), totalDocs)
  72. return &CardinalityResult{
  73. Field: req.Field,
  74. Cardinality: uint64(len(uniqueTerms)),
  75. TotalDocs: totalDocs,
  76. }, nil
  77. }
  78. // collectTermsUsingIndexAlias collects unique terms using IndexAlias with global scoring
  79. func (cc *CardinalityCounter) collectTermsUsingIndexAlias(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  80. uniqueTerms := make(map[string]struct{})
  81. // Enable global scoring context like DistributedSearcher does
  82. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  83. // Strategy 1: Try large facet first (more efficient for most cases)
  84. terms1, totalDocs, err1 := cc.collectTermsUsingLargeFacet(globalCtx, req)
  85. if err1 != nil {
  86. logger.Warnf("Large facet collection failed: %v", err1)
  87. } else {
  88. for term := range terms1 {
  89. uniqueTerms[term] = struct{}{}
  90. }
  91. logger.Infof("Large facet collected %d unique terms", len(terms1))
  92. }
  93. // Strategy 2: Use pagination if facet was likely truncated or failed
  94. needsPagination := len(terms1) >= 50000 || err1 != nil
  95. if needsPagination {
  96. logger.Infof("Using pagination to collect remaining terms...")
  97. terms2, _, err2 := cc.collectTermsUsingPagination(globalCtx, req)
  98. if err2 != nil {
  99. logger.Warnf("Pagination collection failed: %v", err2)
  100. } else {
  101. for term := range terms2 {
  102. uniqueTerms[term] = struct{}{}
  103. }
  104. logger.Infof("Pagination collected additional terms, total unique: %d", len(uniqueTerms))
  105. }
  106. }
  107. return uniqueTerms, totalDocs, nil
  108. }
  109. // collectTermsUsingLargeFacet uses IndexAlias with a large facet to efficiently collect terms
  110. func (cc *CardinalityCounter) collectTermsUsingLargeFacet(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  111. terms := make(map[string]struct{})
  112. // Build search request using IndexAlias with proper filtering
  113. boolQuery := bleve.NewBooleanQuery()
  114. boolQuery.AddMust(bleve.NewMatchAllQuery())
  115. // Add time range filter if specified
  116. if req.StartTime != nil && req.EndTime != nil {
  117. startTime := float64(*req.StartTime)
  118. endTime := float64(*req.EndTime)
  119. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  120. timeQuery.SetField("timestamp")
  121. boolQuery.AddMust(timeQuery)
  122. }
  123. // CRITICAL FIX: Add log path filters (this was missing!)
  124. if len(req.LogPaths) > 0 {
  125. logPathQuery := bleve.NewBooleanQuery()
  126. for _, logPath := range req.LogPaths {
  127. termQuery := bleve.NewTermQuery(logPath)
  128. termQuery.SetField("file_path")
  129. logPathQuery.AddShould(termQuery)
  130. }
  131. logPathQuery.SetMinShould(1)
  132. boolQuery.AddMust(logPathQuery)
  133. }
  134. searchReq := bleve.NewSearchRequest(boolQuery)
  135. searchReq.Size = 0 // We don't need documents, just facets
  136. // Use very large facet size - we're back to this approach but using IndexAlias
  137. // which should handle it more efficiently than individual shards
  138. facetSize := 100000 // Large size for maximum accuracy
  139. facet := bleve.NewFacetRequest(req.Field, facetSize)
  140. searchReq.AddFacet(req.Field, facet)
  141. // Debug: Log the constructed query
  142. if queryBytes, err := json.Marshal(searchReq.Query); err == nil {
  143. logger.Debugf("CardinalityCounter query: %s", string(queryBytes))
  144. }
  145. // Execute search using IndexAlias with global scoring context
  146. result, err := cc.indexAlias.SearchInContext(ctx, searchReq)
  147. if err != nil {
  148. return terms, 0, fmt.Errorf("IndexAlias facet search failed: %w", err)
  149. }
  150. logger.Debugf("CardinalityCounter facet search result: Total=%d, Facets=%v", result.Total, result.Facets != nil)
  151. // Extract terms from facet result
  152. if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {
  153. facetTerms := facetResult.Terms.Terms()
  154. for _, term := range facetTerms {
  155. terms[term.Term] = struct{}{}
  156. }
  157. logger.Infof("IndexAlias large facet: collected %d terms, facet.Total=%d, result.Total=%d",
  158. len(terms), facetResult.Total, result.Total)
  159. // Check if facet was truncated
  160. if len(facetTerms) >= facetSize {
  161. logger.Warnf("Facet likely truncated at %d terms, total unique may be higher", facetSize)
  162. }
  163. }
  164. return terms, result.Total, nil
  165. }
  166. // collectTermsUsingPagination uses IndexAlias with pagination to collect all terms
  167. func (cc *CardinalityCounter) collectTermsUsingPagination(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  168. terms := make(map[string]struct{})
  169. pageSize := 10000 // Large page size for efficiency
  170. maxPages := 1000 // Support very large datasets
  171. processedDocs := 0
  172. logger.Infof("Starting IndexAlias pagination for field '%s' (pageSize=%d)", req.Field, pageSize)
  173. for page := 0; page < maxPages; page++ {
  174. // Build proper query with all filters
  175. boolQuery := bleve.NewBooleanQuery()
  176. boolQuery.AddMust(bleve.NewMatchAllQuery())
  177. // Add time range filter if specified
  178. if req.StartTime != nil && req.EndTime != nil {
  179. startTime := float64(*req.StartTime)
  180. endTime := float64(*req.EndTime)
  181. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  182. timeQuery.SetField("timestamp")
  183. boolQuery.AddMust(timeQuery)
  184. }
  185. // CRITICAL FIX: Add log path filters (this was missing!)
  186. if len(req.LogPaths) > 0 {
  187. logPathQuery := bleve.NewBooleanQuery()
  188. for _, logPath := range req.LogPaths {
  189. termQuery := bleve.NewTermQuery(logPath)
  190. termQuery.SetField("file_path")
  191. logPathQuery.AddShould(termQuery)
  192. }
  193. logPathQuery.SetMinShould(1)
  194. boolQuery.AddMust(logPathQuery)
  195. }
  196. searchReq := bleve.NewSearchRequest(boolQuery)
  197. searchReq.Size = pageSize
  198. searchReq.From = page * pageSize
  199. searchReq.Fields = []string{req.Field}
  200. // Execute with IndexAlias and global scoring
  201. result, err := cc.indexAlias.SearchInContext(ctx, searchReq)
  202. if err != nil {
  203. return terms, 0, fmt.Errorf("IndexAlias pagination search failed at page %d: %w", page, err)
  204. }
  205. // If no more hits, we're done
  206. if len(result.Hits) == 0 {
  207. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  208. processedDocs, len(terms))
  209. break
  210. }
  211. // Extract unique terms from documents
  212. for _, hit := range result.Hits {
  213. if fieldValue, ok := hit.Fields[req.Field]; ok {
  214. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  215. terms[strValue] = struct{}{}
  216. }
  217. }
  218. }
  219. processedDocs += len(result.Hits)
  220. // Progress logging
  221. if processedDocs%50000 == 0 && processedDocs > 0 {
  222. logger.Infof("Pagination progress: processed %d documents, found %d unique terms",
  223. processedDocs, len(terms))
  224. }
  225. // If we got fewer results than pageSize, we've reached the end
  226. if len(result.Hits) < pageSize {
  227. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  228. processedDocs, len(terms))
  229. break
  230. }
  231. // Generous safety limit
  232. if len(terms) > 500000 {
  233. logger.Warnf("Very large cardinality detected (%d terms), stopping for memory safety", len(terms))
  234. break
  235. }
  236. }
  237. return terms, uint64(processedDocs), nil
  238. }
  239. // countShardCardinality counts unique values in a single shard (legacy method)
  240. func (cc *CardinalityCounter) countShardCardinality(ctx context.Context, shard bleve.Index, shardID int, req *CardinalityRequest) (uint64, uint64, error) {
  241. // For now, we'll use a small facet to get an estimate
  242. // In the future, this could be optimized with direct index access
  243. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  244. searchReq.Size = 0 // We don't need actual documents
  245. // Add time range filter if specified
  246. if req.StartTime != nil && req.EndTime != nil {
  247. startTime := float64(*req.StartTime)
  248. endTime := float64(*req.EndTime)
  249. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  250. timeQuery.SetField("timestamp")
  251. boolQuery := bleve.NewBooleanQuery()
  252. boolQuery.AddMust(searchReq.Query)
  253. boolQuery.AddMust(timeQuery)
  254. searchReq.Query = boolQuery
  255. }
  256. // Add custom query filter if provided
  257. if req.Query != nil {
  258. boolQuery := bleve.NewBooleanQuery()
  259. boolQuery.AddMust(searchReq.Query)
  260. boolQuery.AddMust(req.Query)
  261. searchReq.Query = boolQuery
  262. }
  263. // Use a small facet just to get document count, not for cardinality
  264. facet := bleve.NewFacetRequest(req.Field, 1) // Minimal size
  265. searchReq.AddFacet(req.Field, facet)
  266. result, err := shard.Search(searchReq)
  267. if err != nil {
  268. return 0, 0, err
  269. }
  270. return 0, result.Total, nil
  271. }
  272. // getShardTerms retrieves unique terms from a shard using multiple strategies to avoid FacetSize limits
  273. func (cc *CardinalityCounter) getShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  274. // Try multiple approaches for maximum accuracy
  275. // Strategy 1: Use large facet first (still more efficient than old 100k)
  276. terms1, err1 := cc.getTermsUsingLargeFacet(ctx, shard, req)
  277. if err1 != nil {
  278. logger.Warnf("Large facet strategy failed: %v", err1)
  279. }
  280. // Strategy 2: Use pagination to get remaining terms
  281. terms2, err2 := cc.getTermsUsingPagination(ctx, shard, req)
  282. if err2 != nil {
  283. logger.Warnf("Pagination strategy failed: %v", err2)
  284. }
  285. // Merge results from both strategies
  286. allTerms := make(map[string]struct{})
  287. for term := range terms1 {
  288. allTerms[term] = struct{}{}
  289. }
  290. for term := range terms2 {
  291. allTerms[term] = struct{}{}
  292. }
  293. logger.Infof("Combined strategies found %d unique terms for field '%s' (facet: %d, pagination: %d)",
  294. len(allTerms), req.Field, len(terms1), len(terms2))
  295. return allTerms, nil
  296. }
  297. // getTermsUsingLargeFacet uses a large facet to collect terms efficiently
  298. func (cc *CardinalityCounter) getTermsUsingLargeFacet(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  299. terms := make(map[string]struct{})
  300. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  301. searchReq.Size = 0 // We don't need documents, just facets
  302. // Add time range filter if specified
  303. if req.StartTime != nil && req.EndTime != nil {
  304. startTime := float64(*req.StartTime)
  305. endTime := float64(*req.EndTime)
  306. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  307. timeQuery.SetField("timestamp")
  308. boolQuery := bleve.NewBooleanQuery()
  309. boolQuery.AddMust(searchReq.Query)
  310. boolQuery.AddMust(timeQuery)
  311. searchReq.Query = boolQuery
  312. }
  313. // Use a large facet size - larger than before but not excessive
  314. facetSize := 50000 // Compromise: large enough for most cases, but not memory-killing
  315. facet := bleve.NewFacetRequest(req.Field, facetSize)
  316. searchReq.AddFacet(req.Field, facet)
  317. result, err := shard.Search(searchReq)
  318. if err != nil {
  319. return terms, err
  320. }
  321. // Extract terms from facet result
  322. if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {
  323. facetTerms := facetResult.Terms.Terms()
  324. for _, term := range facetTerms {
  325. terms[term.Term] = struct{}{}
  326. }
  327. logger.Debugf("Large facet collected %d terms (facet reports total: %d)", len(terms), facetResult.Total)
  328. // If facet is truncated, we know we need pagination
  329. if len(facetTerms) >= facetSize {
  330. logger.Warnf("Facet truncated at %d terms, pagination needed for complete results", facetSize)
  331. }
  332. }
  333. return terms, nil
  334. }
  335. // getTermsUsingPagination uses document pagination to collect all terms
  336. func (cc *CardinalityCounter) getTermsUsingPagination(ctx context.Context, shard bleve.Index, req *CardinalityRequest) (map[string]struct{}, error) {
  337. terms := make(map[string]struct{})
  338. // Use pagination approach to collect all terms without FacetSize limitation
  339. // This iterates through result pages to get complete term list
  340. pageSize := 5000 // Larger page size for efficiency
  341. maxPages := 1000 // Higher limit to handle large datasets
  342. processedDocs := 0
  343. logger.Infof("Starting cardinality collection for field '%s' with pagination (pageSize=%d)", req.Field, pageSize)
  344. for page := 0; page < maxPages; page++ {
  345. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  346. searchReq.Size = pageSize
  347. searchReq.From = page * pageSize
  348. searchReq.Fields = []string{req.Field} // Only fetch the field we need
  349. // Add time range filter if specified
  350. if req.StartTime != nil && req.EndTime != nil {
  351. startTime := float64(*req.StartTime)
  352. endTime := float64(*req.EndTime)
  353. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  354. timeQuery.SetField("timestamp")
  355. boolQuery := bleve.NewBooleanQuery()
  356. boolQuery.AddMust(searchReq.Query)
  357. boolQuery.AddMust(timeQuery)
  358. searchReq.Query = boolQuery
  359. }
  360. result, err := shard.Search(searchReq)
  361. if err != nil {
  362. return terms, err
  363. }
  364. // If no more hits, we're done
  365. if len(result.Hits) == 0 {
  366. break
  367. }
  368. // Extract unique terms from documents
  369. for _, hit := range result.Hits {
  370. if fieldValue, ok := hit.Fields[req.Field]; ok {
  371. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  372. terms[strValue] = struct{}{}
  373. }
  374. }
  375. }
  376. processedDocs += len(result.Hits)
  377. // Progress logging every 50K documents
  378. if processedDocs%50000 == 0 && processedDocs > 0 {
  379. logger.Infof("Progress: processed %d documents, found %d unique terms for field '%s'",
  380. processedDocs, len(terms), req.Field)
  381. }
  382. // If we got fewer results than pageSize, we've reached the end
  383. if len(result.Hits) < pageSize {
  384. logger.Infof("Completed: processed %d documents, found %d unique terms for field '%s'",
  385. processedDocs, len(terms), req.Field)
  386. break
  387. }
  388. // Increased safety limit for large datasets, but with warning
  389. if len(terms) > 200000 {
  390. logger.Warnf("Very large number of unique terms detected (%d), stopping collection for field %s. Consider using EstimateCardinality for better performance", len(terms), req.Field)
  391. break
  392. }
  393. }
  394. return terms, nil
  395. }
  396. // EstimateCardinality provides a fast cardinality estimate using sampling approach
  397. // This is useful for very large datasets where exact counting might be expensive
  398. func (cc *CardinalityCounter) EstimateCardinality(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  399. cc.mu.RLock()
  400. defer cc.mu.RUnlock()
  401. if req.Field == "" {
  402. return nil, fmt.Errorf("field name is required")
  403. }
  404. // Use statistical sampling for very large datasets
  405. // Take a sample and extrapolate to estimate total cardinality
  406. sampleSize := 10000 // Sample 10K documents
  407. uniqueInSample := make(map[string]struct{})
  408. totalSampleDocs := uint64(0)
  409. // Process each shard with sampling
  410. for i, shard := range cc.shards {
  411. if shard == nil {
  412. continue
  413. }
  414. shardSample, shardTotal, err := cc.sampleShardTerms(ctx, shard, req, sampleSize/len(cc.shards))
  415. if err != nil {
  416. logger.Errorf("Failed to sample shard %d: %v", i, err)
  417. continue
  418. }
  419. totalSampleDocs += shardTotal
  420. // Merge unique terms from sample
  421. for term := range shardSample {
  422. uniqueInSample[term] = struct{}{}
  423. }
  424. }
  425. // For now, still use exact counting for accuracy
  426. // In the future, we could use the sample to extrapolate:
  427. // sampledUnique := uint64(len(uniqueInSample))
  428. // estimatedCardinality := sampledUnique * (totalDocs / totalSampleDocs)
  429. if totalSampleDocs == 0 {
  430. return &CardinalityResult{
  431. Field: req.Field,
  432. Cardinality: 0,
  433. TotalDocs: 0,
  434. }, nil
  435. }
  436. // For accurate results with large datasets, we use exact counting
  437. // The sampling code above is kept for future statistical estimation
  438. return cc.CountCardinality(ctx, req)
  439. }
  440. // sampleShardTerms takes a statistical sample from a shard for cardinality estimation
  441. func (cc *CardinalityCounter) sampleShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest, sampleSize int) (map[string]struct{}, uint64, error) {
  442. terms := make(map[string]struct{})
  443. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  444. searchReq.Size = sampleSize
  445. searchReq.Fields = []string{req.Field}
  446. // Add time range filter if specified
  447. if req.StartTime != nil && req.EndTime != nil {
  448. startTime := float64(*req.StartTime)
  449. endTime := float64(*req.EndTime)
  450. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  451. timeQuery.SetField("timestamp")
  452. boolQuery := bleve.NewBooleanQuery()
  453. boolQuery.AddMust(searchReq.Query)
  454. boolQuery.AddMust(timeQuery)
  455. searchReq.Query = boolQuery
  456. }
  457. result, err := shard.Search(searchReq)
  458. if err != nil {
  459. return terms, 0, err
  460. }
  461. // Extract terms from sample
  462. for _, hit := range result.Hits {
  463. if fieldValue, ok := hit.Fields[req.Field]; ok {
  464. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  465. terms[strValue] = struct{}{}
  466. }
  467. }
  468. }
  469. return terms, result.Total, nil
  470. }
  471. // BatchCountCardinality counts cardinality for multiple fields efficiently
  472. func (cc *CardinalityCounter) BatchCountCardinality(ctx context.Context, fields []string, baseReq *CardinalityRequest) (map[string]*CardinalityResult, error) {
  473. results := make(map[string]*CardinalityResult)
  474. // Process fields in parallel
  475. var wg sync.WaitGroup
  476. var mu sync.Mutex
  477. for _, field := range fields {
  478. wg.Add(1)
  479. go func(f string) {
  480. defer wg.Done()
  481. req := *baseReq // Copy base request
  482. req.Field = f
  483. result, err := cc.CountCardinality(ctx, &req)
  484. if err != nil {
  485. result = &CardinalityResult{
  486. Field: f,
  487. Error: err.Error(),
  488. }
  489. }
  490. mu.Lock()
  491. results[f] = result
  492. mu.Unlock()
  493. }(field)
  494. }
  495. wg.Wait()
  496. return results, nil
  497. }