facet_aggregator.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package searcher
  2. import (
  3. "context"
  4. "sort"
  5. "strings"
  6. "github.com/blevesearch/bleve/v2/search"
  7. )
  8. // convertFacets converts Bleve facets to our facet format
  9. func (ds *DistributedSearcher) convertFacets(bleveFacets search.FacetResults) map[string]*Facet {
  10. facets := make(map[string]*Facet)
  11. for name, result := range bleveFacets {
  12. facet := &Facet{
  13. Field: name,
  14. Total: result.Total,
  15. Missing: result.Missing,
  16. Other: result.Other,
  17. Terms: make([]*FacetTerm, 0),
  18. }
  19. // Handle Terms facet
  20. if result.Terms != nil {
  21. for _, term := range result.Terms.Terms() {
  22. facet.Terms = append(facet.Terms, &FacetTerm{
  23. Term: term.Term,
  24. Count: term.Count,
  25. })
  26. }
  27. }
  28. facets[name] = facet
  29. }
  30. return facets
  31. }
  32. // mergeFacets merges facet results from multiple shards
  33. func (ds *DistributedSearcher) mergeFacets(combined, incoming map[string]*Facet) {
  34. for field, incomingFacet := range incoming {
  35. if existingFacet, exists := combined[field]; exists {
  36. ds.mergeSingleFacet(existingFacet, incomingFacet)
  37. } else {
  38. combined[field] = ds.copyFacet(incomingFacet)
  39. }
  40. }
  41. }
  42. // mergeSingleFacet merges two facets for the same field
  43. func (ds *DistributedSearcher) mergeSingleFacet(existing, incoming *Facet) {
  44. // Note: Do NOT sum Total values - it represents unique terms count, not document count
  45. // The Total should be recalculated based on the actual number of unique terms after merging
  46. existing.Missing += incoming.Missing
  47. existing.Other += incoming.Other
  48. // Merge terms
  49. termCounts := make(map[string]int)
  50. // Add existing terms
  51. for _, term := range existing.Terms {
  52. termCounts[term.Term] = term.Count
  53. }
  54. // Add incoming terms
  55. for _, term := range incoming.Terms {
  56. termCounts[term.Term] += term.Count
  57. }
  58. // Convert back to slice and sort by count
  59. terms := make([]*FacetTerm, 0, len(termCounts))
  60. for term, count := range termCounts {
  61. terms = append(terms, &FacetTerm{
  62. Term: term,
  63. Count: count,
  64. })
  65. }
  66. // Sort by count (descending) then by term (ascending)
  67. sort.Slice(terms, func(i, j int) bool {
  68. if terms[i].Count == terms[j].Count {
  69. return terms[i].Term < terms[j].Term
  70. }
  71. return terms[i].Count > terms[j].Count
  72. })
  73. // Limit to top terms
  74. if len(terms) > DefaultFacetSize {
  75. // Calculate "other" count
  76. otherCount := 0
  77. for _, term := range terms[DefaultFacetSize:] {
  78. otherCount += term.Count
  79. }
  80. existing.Other += otherCount
  81. terms = terms[:DefaultFacetSize]
  82. }
  83. existing.Terms = terms
  84. // Set Total to the actual number of unique terms (not sum of totals)
  85. existing.Total = len(termCounts)
  86. }
  87. // copyFacet creates a deep copy of a facet
  88. func (ds *DistributedSearcher) copyFacet(original *Facet) *Facet {
  89. facet := &Facet{
  90. Field: original.Field,
  91. Total: original.Total,
  92. Missing: original.Missing,
  93. Other: original.Other,
  94. Terms: make([]*FacetTerm, len(original.Terms)),
  95. }
  96. for i, term := range original.Terms {
  97. facet.Terms[i] = &FacetTerm{
  98. Term: term.Term,
  99. Count: term.Count,
  100. }
  101. }
  102. return facet
  103. }
  104. // Aggregate performs aggregations on search results
  105. func (ds *DistributedSearcher) Aggregate(ctx context.Context, req *AggregationRequest) (*AggregationResult, error) {
  106. // This is a simplified implementation
  107. // In a full implementation, you would execute the aggregation across all shards
  108. // and merge the results similar to how facets are handled
  109. result := &AggregationResult{
  110. Field: req.Field,
  111. Type: req.Type,
  112. }
  113. // For now, return a placeholder result
  114. // This would need to be implemented based on specific requirements
  115. switch req.Type {
  116. case AggregationTerms:
  117. result.Data = map[string]interface{}{
  118. "buckets": []map[string]interface{}{},
  119. }
  120. case AggregationStats:
  121. result.Data = map[string]interface{}{
  122. "count": 0,
  123. "min": 0,
  124. "max": 0,
  125. "avg": 0,
  126. "sum": 0,
  127. }
  128. case AggregationHistogram:
  129. result.Data = map[string]interface{}{
  130. "buckets": []map[string]interface{}{},
  131. }
  132. case AggregationDateHistogram:
  133. result.Data = map[string]interface{}{
  134. "buckets": []map[string]interface{}{},
  135. }
  136. case AggregationCardinality:
  137. result.Data = map[string]interface{}{
  138. "value": 0,
  139. }
  140. }
  141. return result, nil
  142. }
  143. // Suggest provides search suggestions
  144. func (ds *DistributedSearcher) Suggest(ctx context.Context, text string, field string, size int) ([]*Suggestion, error) {
  145. if size <= 0 || size > 100 {
  146. size = 10
  147. }
  148. // Create search request
  149. req := &SearchRequest{
  150. Query: text,
  151. Fields: []string{field},
  152. Limit: size * 2, // Get more results to have better suggestions
  153. SortBy: "_score",
  154. SortOrder: SortOrderDesc,
  155. }
  156. // Execute search
  157. result, err := ds.Search(ctx, req)
  158. if err != nil {
  159. return nil, err
  160. }
  161. // Convert results to suggestions
  162. suggestions := make([]*Suggestion, 0, size)
  163. seen := make(map[string]bool)
  164. for _, hit := range result.Hits {
  165. if len(suggestions) >= size {
  166. break
  167. }
  168. // Extract text from the specified field
  169. if fieldValue, exists := hit.Fields[field]; exists {
  170. if textValue, ok := fieldValue.(string); ok {
  171. // Simple suggestion extraction - this could be made more sophisticated
  172. terms := ds.extractSuggestionTerms(textValue, text)
  173. for _, term := range terms {
  174. if len(suggestions) >= size {
  175. break
  176. }
  177. if !seen[term] && strings.Contains(strings.ToLower(term), strings.ToLower(text)) {
  178. suggestions = append(suggestions, &Suggestion{
  179. Text: term,
  180. Score: hit.Score,
  181. Freq: 1, // Would need to be calculated from corpus
  182. })
  183. seen[term] = true
  184. }
  185. }
  186. }
  187. }
  188. }
  189. // Sort suggestions by score
  190. sort.Slice(suggestions, func(i, j int) bool {
  191. return suggestions[i].Score > suggestions[j].Score
  192. })
  193. return suggestions, nil
  194. }
  195. // extractSuggestionTerms extracts potential suggestion terms from text
  196. func (ds *DistributedSearcher) extractSuggestionTerms(text string, query string) []string {
  197. // Simple term extraction - this could be enhanced with NLP
  198. terms := strings.Fields(text)
  199. // Filter and clean terms
  200. var suggestions []string
  201. for _, term := range terms {
  202. term = strings.TrimSpace(term)
  203. if len(term) > 2 && !isCommonWord(term) {
  204. suggestions = append(suggestions, term)
  205. }
  206. }
  207. return suggestions
  208. }
  209. // isCommonWord checks if a word is too common to be a good suggestion
  210. func isCommonWord(word string) bool {
  211. commonWords := map[string]bool{
  212. "the": true, "and": true, "or": true, "but": true,
  213. "in": true, "on": true, "at": true, "to": true,
  214. "for": true, "of": true, "with": true, "by": true,
  215. "a": true, "an": true, "as": true, "is": true,
  216. "are": true, "was": true, "were": true, "be": true,
  217. "been": true, "have": true, "has": true, "had": true,
  218. "do": true, "does": true, "did": true, "will": true,
  219. "would": true, "could": true, "should": true, "may": true,
  220. "might": true, "must": true, "can": true, "shall": true,
  221. }
  222. return commonWords[strings.ToLower(word)]
  223. }
  224. // Analyze analyzes text using a specified analyzer
  225. func (ds *DistributedSearcher) Analyze(ctx context.Context, text string, analyzer string) ([]string, error) {
  226. // This would typically use Bleve's analysis capabilities
  227. // For now, provide a simple implementation
  228. if analyzer == "" {
  229. analyzer = "standard"
  230. }
  231. // Simple tokenization - this should use proper analyzers
  232. terms := strings.Fields(strings.ToLower(text))
  233. // Remove punctuation and short terms
  234. var analyzed []string
  235. for _, term := range terms {
  236. term = strings.Trim(term, ".,!?;:\"'()[]{}/-_")
  237. if len(term) > 2 {
  238. analyzed = append(analyzed, term)
  239. }
  240. }
  241. return analyzed, nil
  242. }
  243. // Cache operations
  244. func (ds *DistributedSearcher) getFromCache(req *SearchRequest) *SearchResult {
  245. if ds.cache == nil {
  246. return nil
  247. }
  248. return ds.cache.Get(req)
  249. }
  250. func (ds *DistributedSearcher) cacheResult(req *SearchRequest, result *SearchResult) {
  251. if ds.cache == nil {
  252. return
  253. }
  254. ds.cache.Put(req, result, DefaultCacheTTL)
  255. }
  256. // ClearCache clears the search cache
  257. func (ds *DistributedSearcher) ClearCache() error {
  258. if ds.cache != nil {
  259. ds.cache.Clear()
  260. }
  261. return nil
  262. }
  263. // GetCacheStats returns cache statistics
  264. func (ds *DistributedSearcher) GetCacheStats() *CacheStats {
  265. if ds.cache != nil {
  266. return ds.cache.GetStats()
  267. }
  268. return nil
  269. }