counter.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package searcher
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "github.com/blevesearch/bleve/v2"
  8. "github.com/blevesearch/bleve/v2/search"
  9. "github.com/blevesearch/bleve/v2/search/query"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // Counter provides efficient unique value counting without large FacetSize
  13. type Counter struct {
  14. indexAlias bleve.IndexAlias // Use IndexAlias instead of individual shards
  15. shards []bleve.Index // Keep shards for fallback if needed
  16. mu sync.RWMutex
  17. stopOnce sync.Once
  18. }
  19. // NewCounter creates a new cardinality counter
  20. func NewCounter(shards []bleve.Index) *Counter {
  21. var indexAlias bleve.IndexAlias
  22. if len(shards) > 0 {
  23. // Create IndexAlias for distributed search like Searcher does
  24. indexAlias = bleve.NewIndexAlias(shards...)
  25. // Note: IndexAlias doesn't have SetIndexMapping method
  26. // The mapping will be inherited from the constituent indices
  27. logger.Debugf("Created IndexAlias for counter with %d shards", len(shards))
  28. }
  29. return &Counter{
  30. indexAlias: indexAlias,
  31. shards: shards,
  32. }
  33. }
  34. // Stop gracefully closes the counter's resources, like the IndexAlias.
  35. func (c *Counter) Stop() error {
  36. var err error
  37. c.stopOnce.Do(func() {
  38. c.mu.Lock()
  39. defer c.mu.Unlock()
  40. if c.indexAlias != nil {
  41. logger.Debugf("Closing IndexAlias in Counter")
  42. err = c.indexAlias.Close()
  43. c.indexAlias = nil
  44. }
  45. c.shards = nil
  46. })
  47. return err
  48. }
  49. // CardinalityRequest represents a request for unique value counting
  50. type CardinalityRequest struct {
  51. Field string `json:"field"`
  52. Query query.Query `json:"query,omitempty"` // Optional query to filter documents
  53. StartTime *int64 `json:"start_time,omitempty"`
  54. EndTime *int64 `json:"end_time,omitempty"`
  55. LogPaths []string `json:"log_paths,omitempty"`
  56. UseMainLogPath bool `json:"use_main_log_path,omitempty"` // Use main_log_path field instead of file_path
  57. }
  58. // CardinalityResult represents the result of cardinality counting
  59. type CardinalityResult struct {
  60. Field string `json:"field"`
  61. Cardinality uint64 `json:"cardinality"`
  62. TotalDocs uint64 `json:"total_docs"`
  63. Error string `json:"error,omitempty"`
  64. }
  65. // Count efficiently counts unique values using IndexAlias with global scoring
  66. // This leverages Bleve's distributed search optimizations and avoids FacetSize limits
  67. func (c *Counter) Count(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  68. c.mu.RLock()
  69. defer c.mu.RUnlock()
  70. if req.Field == "" {
  71. return nil, fmt.Errorf("field name is required")
  72. }
  73. if c.indexAlias == nil {
  74. return &CardinalityResult{
  75. Field: req.Field,
  76. Error: "IndexAlias not available",
  77. }, fmt.Errorf("IndexAlias not available")
  78. }
  79. // Use IndexAlias with global scoring for consistent distributed search
  80. uniqueTerms, totalDocs, err := c.collectTermsUsingIndexAlias(ctx, req)
  81. if err != nil {
  82. return &CardinalityResult{
  83. Field: req.Field,
  84. Error: fmt.Sprintf("failed to collect terms: %v", err),
  85. }, err
  86. }
  87. logger.Infof("Cardinality count completed: field='%s', unique_terms=%d, total_docs=%d",
  88. req.Field, len(uniqueTerms), totalDocs)
  89. return &CardinalityResult{
  90. Field: req.Field,
  91. Cardinality: uint64(len(uniqueTerms)),
  92. TotalDocs: totalDocs,
  93. }, nil
  94. }
  95. // collectTermsUsingIndexAlias collects unique terms using IndexAlias with global scoring
  96. func (c *Counter) collectTermsUsingIndexAlias(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  97. uniqueTerms := make(map[string]struct{})
  98. // Enable global scoring context like Searcher does
  99. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  100. // Strategy 1: Try large facet first (more efficient for most cases)
  101. terms1, totalDocs, err1 := c.collectTermsUsingLargeFacet(globalCtx, req)
  102. if err1 != nil {
  103. logger.Warnf("Large facet collection failed: %v", err1)
  104. } else {
  105. for term := range terms1 {
  106. uniqueTerms[term] = struct{}{}
  107. }
  108. logger.Infof("Large facet collected %d unique terms", len(terms1))
  109. }
  110. // Strategy 2: Use pagination if facet was likely truncated or failed
  111. needsPagination := len(terms1) >= 50000 || err1 != nil
  112. if needsPagination {
  113. logger.Infof("Using pagination to collect remaining terms...")
  114. terms2, _, err2 := c.collectTermsUsingPagination(globalCtx, req)
  115. if err2 != nil {
  116. logger.Warnf("Pagination collection failed: %v", err2)
  117. } else {
  118. for term := range terms2 {
  119. uniqueTerms[term] = struct{}{}
  120. }
  121. logger.Infof("Pagination collected additional terms, total unique: %d", len(uniqueTerms))
  122. }
  123. }
  124. return uniqueTerms, totalDocs, nil
  125. }
  126. // collectTermsUsingLargeFacet uses IndexAlias with a large facet to efficiently collect terms
  127. func (c *Counter) collectTermsUsingLargeFacet(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  128. terms := make(map[string]struct{})
  129. // Build search request using IndexAlias with proper filtering
  130. boolQuery := bleve.NewBooleanQuery()
  131. boolQuery.AddMust(bleve.NewMatchAllQuery())
  132. // Add time range filter if specified
  133. if req.StartTime != nil && req.EndTime != nil {
  134. startTime := float64(*req.StartTime)
  135. endTime := float64(*req.EndTime)
  136. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  137. timeQuery.SetField("timestamp")
  138. boolQuery.AddMust(timeQuery)
  139. }
  140. // Add log path filters - use main_log_path or file_path based on request
  141. if len(req.LogPaths) > 0 {
  142. logPathQuery := bleve.NewBooleanQuery()
  143. fieldName := "file_path" // default
  144. if req.UseMainLogPath {
  145. fieldName = "main_log_path"
  146. }
  147. for _, logPath := range req.LogPaths {
  148. termQuery := bleve.NewTermQuery(logPath)
  149. termQuery.SetField(fieldName)
  150. logPathQuery.AddShould(termQuery)
  151. }
  152. logPathQuery.SetMinShould(1)
  153. boolQuery.AddMust(logPathQuery)
  154. }
  155. searchReq := bleve.NewSearchRequest(boolQuery)
  156. searchReq.Size = 0 // We don't need documents, just facets
  157. // Use very large facet size - we're back to this approach but using IndexAlias
  158. // which should handle it more efficiently than individual shards
  159. facetSize := 100000 // Large size for maximum accuracy
  160. facet := bleve.NewFacetRequest(req.Field, facetSize)
  161. searchReq.AddFacet(req.Field, facet)
  162. // Debug: Log the constructed query
  163. if queryBytes, err := json.Marshal(searchReq.Query); err == nil {
  164. logger.Debugf("Counter query: %s", string(queryBytes))
  165. }
  166. // Execute search using IndexAlias with global scoring context
  167. result, err := c.indexAlias.SearchInContext(ctx, searchReq)
  168. if err != nil {
  169. return terms, 0, fmt.Errorf("IndexAlias facet search failed: %w", err)
  170. }
  171. logger.Debugf("Counter facet search result: Total=%d, Facets=%v", result.Total, result.Facets != nil)
  172. // Extract terms from facet result
  173. if facetResult, ok := result.Facets[req.Field]; ok && facetResult.Terms != nil {
  174. facetTerms := facetResult.Terms.Terms()
  175. for _, term := range facetTerms {
  176. terms[term.Term] = struct{}{}
  177. }
  178. logger.Infof("IndexAlias large facet: collected %d terms, facet.Total=%d, result.Total=%d",
  179. len(terms), facetResult.Total, result.Total)
  180. // Check if facet was truncated
  181. if len(facetTerms) >= facetSize {
  182. logger.Warnf("Facet likely truncated at %d terms, total unique may be higher", facetSize)
  183. }
  184. }
  185. return terms, result.Total, nil
  186. }
  187. // collectTermsUsingPagination uses IndexAlias with pagination to collect all terms
  188. func (c *Counter) collectTermsUsingPagination(ctx context.Context, req *CardinalityRequest) (map[string]struct{}, uint64, error) {
  189. terms := make(map[string]struct{})
  190. pageSize := 10000 // Large page size for efficiency
  191. maxPages := 1000 // Support very large datasets
  192. processedDocs := 0
  193. logger.Infof("Starting IndexAlias pagination for field '%s' (pageSize=%d)", req.Field, pageSize)
  194. for page := 0; page < maxPages; page++ {
  195. // Build proper query with all filters
  196. boolQuery := bleve.NewBooleanQuery()
  197. boolQuery.AddMust(bleve.NewMatchAllQuery())
  198. // Add time range filter if specified
  199. if req.StartTime != nil && req.EndTime != nil {
  200. startTime := float64(*req.StartTime)
  201. endTime := float64(*req.EndTime)
  202. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  203. timeQuery.SetField("timestamp")
  204. boolQuery.AddMust(timeQuery)
  205. }
  206. // Add log path filters, respecting UseMainLogPath
  207. if len(req.LogPaths) > 0 {
  208. logPathQuery := bleve.NewBooleanQuery()
  209. fieldName := "file_path"
  210. if req.UseMainLogPath {
  211. fieldName = "main_log_path"
  212. }
  213. for _, logPath := range req.LogPaths {
  214. termQuery := bleve.NewTermQuery(logPath)
  215. termQuery.SetField(fieldName)
  216. logPathQuery.AddShould(termQuery)
  217. }
  218. logPathQuery.SetMinShould(1)
  219. boolQuery.AddMust(logPathQuery)
  220. }
  221. searchReq := bleve.NewSearchRequest(boolQuery)
  222. searchReq.Size = pageSize
  223. searchReq.From = page * pageSize
  224. searchReq.Fields = []string{req.Field}
  225. // Execute with IndexAlias and global scoring
  226. result, err := c.indexAlias.SearchInContext(ctx, searchReq)
  227. if err != nil {
  228. return terms, 0, fmt.Errorf("IndexAlias pagination search failed at page %d: %w", page, err)
  229. }
  230. // If no more hits, we're done
  231. if len(result.Hits) == 0 {
  232. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  233. processedDocs, len(terms))
  234. break
  235. }
  236. // Extract unique terms from documents
  237. for _, hit := range result.Hits {
  238. if fieldValue, ok := hit.Fields[req.Field]; ok {
  239. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  240. terms[strValue] = struct{}{}
  241. }
  242. }
  243. }
  244. processedDocs += len(result.Hits)
  245. // Progress logging
  246. if processedDocs%50000 == 0 && processedDocs > 0 {
  247. logger.Infof("Pagination progress: processed %d documents, found %d unique terms",
  248. processedDocs, len(terms))
  249. }
  250. // If we got fewer results than pageSize, we've reached the end
  251. if len(result.Hits) < pageSize {
  252. logger.Infof("Pagination complete: processed %d documents, found %d unique terms",
  253. processedDocs, len(terms))
  254. break
  255. }
  256. // Generous safety limit
  257. if len(terms) > 500000 {
  258. logger.Warnf("Very large cardinality detected (%d terms), stopping for memory safety", len(terms))
  259. break
  260. }
  261. }
  262. return terms, uint64(processedDocs), nil
  263. }
  264. // Estimate provides a fast cardinality estimate using sampling approach
  265. // This is useful for very large datasets where exact counting might be expensive
  266. func (c *Counter) Estimate(ctx context.Context, req *CardinalityRequest) (*CardinalityResult, error) {
  267. c.mu.RLock()
  268. defer c.mu.RUnlock()
  269. if req.Field == "" {
  270. return nil, fmt.Errorf("field name is required")
  271. }
  272. // Use statistical sampling for very large datasets
  273. // Take a sample and extrapolate to estimate total cardinality
  274. sampleSize := 10000 // Sample 10K documents
  275. uniqueInSample := make(map[string]struct{})
  276. totalSampleDocs := uint64(0)
  277. // Process each shard with sampling
  278. for i, shard := range c.shards {
  279. if shard == nil {
  280. continue
  281. }
  282. shardSample, shardTotal, err := c.sampleShardTerms(ctx, shard, req, sampleSize/len(c.shards))
  283. if err != nil {
  284. logger.Errorf("Failed to sample shard %d: %v", i, err)
  285. continue
  286. }
  287. totalSampleDocs += shardTotal
  288. // Merge unique terms from sample
  289. for term := range shardSample {
  290. uniqueInSample[term] = struct{}{}
  291. }
  292. }
  293. // For now, still use exact counting for accuracy
  294. // In the future, we could use the sample to extrapolate:
  295. // sampledUnique := uint64(len(uniqueInSample))
  296. // estimatedCardinality := sampledUnique * (totalDocs / totalSampleDocs)
  297. if totalSampleDocs == 0 {
  298. return &CardinalityResult{
  299. Field: req.Field,
  300. Cardinality: 0,
  301. TotalDocs: 0,
  302. }, nil
  303. }
  304. // For accurate results with large datasets, we use exact counting
  305. // The sampling code above is kept for future statistical estimation
  306. return c.Count(ctx, req)
  307. }
  308. // sampleShardTerms takes a statistical sample from a shard for cardinality estimation
  309. func (c *Counter) sampleShardTerms(ctx context.Context, shard bleve.Index, req *CardinalityRequest, sampleSize int) (map[string]struct{}, uint64, error) {
  310. terms := make(map[string]struct{})
  311. searchReq := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  312. searchReq.Size = sampleSize
  313. searchReq.Fields = []string{req.Field}
  314. // Add time range filter if specified
  315. if req.StartTime != nil && req.EndTime != nil {
  316. startTime := float64(*req.StartTime)
  317. endTime := float64(*req.EndTime)
  318. timeQuery := bleve.NewNumericRangeQuery(&startTime, &endTime)
  319. timeQuery.SetField("timestamp")
  320. boolQuery := bleve.NewBooleanQuery()
  321. boolQuery.AddMust(searchReq.Query)
  322. boolQuery.AddMust(timeQuery)
  323. searchReq.Query = boolQuery
  324. }
  325. result, err := shard.Search(searchReq)
  326. if err != nil {
  327. return terms, 0, err
  328. }
  329. // Extract terms from sample
  330. for _, hit := range result.Hits {
  331. if fieldValue, ok := hit.Fields[req.Field]; ok {
  332. if strValue, ok := fieldValue.(string); ok && strValue != "" {
  333. terms[strValue] = struct{}{}
  334. }
  335. }
  336. }
  337. return terms, result.Total, nil
  338. }
  339. // BatchCount counts cardinality for multiple fields efficiently
  340. func (c *Counter) BatchCount(ctx context.Context, fields []string, baseReq *CardinalityRequest) (map[string]*CardinalityResult, error) {
  341. results := make(map[string]*CardinalityResult)
  342. // Process fields in parallel
  343. var wg sync.WaitGroup
  344. var mu sync.Mutex
  345. for _, field := range fields {
  346. wg.Add(1)
  347. go func(f string) {
  348. defer wg.Done()
  349. req := *baseReq // Copy base request
  350. req.Field = f
  351. result, err := c.Count(ctx, &req)
  352. if err != nil {
  353. result = &CardinalityResult{
  354. Field: f,
  355. Error: err.Error(),
  356. }
  357. }
  358. mu.Lock()
  359. results[f] = result
  360. mu.Unlock()
  361. }(field)
  362. }
  363. wg.Wait()
  364. return results, nil
  365. }