searcher.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. package searcher
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/blevesearch/bleve/v2/search"
  11. "github.com/blevesearch/bleve/v2/search/query"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // Searcher implements high-performance distributed search across multiple shards
  15. type Searcher struct {
  16. config *Config
  17. shards []bleve.Index
  18. indexAlias bleve.IndexAlias // Index alias for global scoring
  19. queryBuilder *QueryBuilder
  20. cache *Cache
  21. stats *searcherStats
  22. // Concurrency control
  23. semaphore chan struct{}
  24. // State
  25. running int32
  26. // Cleanup control
  27. closeOnce sync.Once
  28. }
  29. // searcherStats tracks search performance metrics
  30. type searcherStats struct {
  31. totalSearches int64
  32. successfulSearches int64
  33. failedSearches int64
  34. totalLatency int64 // nanoseconds
  35. minLatency int64
  36. maxLatency int64
  37. activeSearches int32
  38. shardStats map[int]*ShardSearchStats
  39. mutex sync.RWMutex
  40. }
  41. // NewSearcher creates a new distributed searcher
  42. func NewSearcher(config *Config, shards []bleve.Index) *Searcher {
  43. if config == nil {
  44. config = DefaultSearcherConfig()
  45. }
  46. // Create index alias for global scoring across shards
  47. indexAlias := bleve.NewIndexAlias(shards...)
  48. // Set the index mapping from the first shard (all shards should have the same mapping)
  49. if len(shards) > 0 && shards[0] != nil {
  50. mapping := shards[0].Mapping()
  51. if err := indexAlias.SetIndexMapping(mapping); err != nil {
  52. // Log error but continue - this is not critical for basic functionality
  53. }
  54. }
  55. searcher := &Searcher{
  56. config: config,
  57. shards: shards,
  58. indexAlias: indexAlias,
  59. queryBuilder: NewQueryBuilder(),
  60. semaphore: make(chan struct{}, config.MaxConcurrency),
  61. stats: &searcherStats{
  62. shardStats: make(map[int]*ShardSearchStats),
  63. minLatency: int64(time.Hour), // Start with high value
  64. },
  65. running: 1,
  66. }
  67. // Initialize cache if enabled
  68. if config.EnableCache {
  69. searcher.cache = NewCache(int64(config.CacheSize))
  70. }
  71. // Initialize shard stats
  72. for i := range shards {
  73. searcher.stats.shardStats[i] = &ShardSearchStats{
  74. ShardID: i,
  75. IsHealthy: true,
  76. }
  77. }
  78. return searcher
  79. }
  80. // Search performs a distributed search across all shards
  81. func (s *Searcher) Search(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  82. if atomic.LoadInt32(&s.running) == 0 {
  83. return nil, fmt.Errorf("searcher is not running")
  84. }
  85. startTime := time.Now()
  86. defer func() {
  87. s.recordSearchMetrics(time.Since(startTime), true)
  88. }()
  89. // Validate request
  90. if err := s.queryBuilder.ValidateSearchRequest(req); err != nil {
  91. return nil, fmt.Errorf("invalid search request: %w", err)
  92. }
  93. // Set defaults
  94. s.setRequestDefaults(req)
  95. // Check cache if enabled
  96. if s.config.EnableCache && req.UseCache {
  97. if cached := s.getFromCache(req); cached != nil {
  98. cached.FromCache = true
  99. cached.CacheHit = true
  100. return cached, nil
  101. }
  102. }
  103. // Apply timeout
  104. searchCtx := ctx
  105. if req.Timeout > 0 {
  106. var cancel context.CancelFunc
  107. searchCtx, cancel = context.WithTimeout(ctx, req.Timeout)
  108. defer cancel()
  109. } else if s.config.TimeoutDuration > 0 {
  110. var cancel context.CancelFunc
  111. searchCtx, cancel = context.WithTimeout(ctx, s.config.TimeoutDuration)
  112. defer cancel()
  113. }
  114. // Acquire semaphore for concurrency control
  115. select {
  116. case s.semaphore <- struct{}{}:
  117. defer func() { <-s.semaphore }()
  118. case <-searchCtx.Done():
  119. return nil, fmt.Errorf("search timeout")
  120. }
  121. atomic.AddInt32(&s.stats.activeSearches, 1)
  122. defer atomic.AddInt32(&s.stats.activeSearches, -1)
  123. // Build query
  124. query, err := s.queryBuilder.BuildQuery(req)
  125. if err != nil {
  126. return nil, fmt.Errorf("failed to build query: %w", err)
  127. }
  128. // Execute search across shards
  129. result, err := s.executeDistributedSearch(searchCtx, query, req)
  130. if err != nil {
  131. s.recordSearchMetrics(time.Since(startTime), false)
  132. return nil, err
  133. }
  134. result.Duration = time.Since(startTime)
  135. // Cache result if enabled
  136. if s.config.EnableCache && req.UseCache {
  137. s.cacheResult(req, result)
  138. }
  139. return result, nil
  140. }
  141. // SearchAsync performs asynchronous search
  142. func (s *Searcher) SearchAsync(ctx context.Context, req *SearchRequest) (<-chan *SearchResult, <-chan error) {
  143. resultChan := make(chan *SearchResult, 1)
  144. errorChan := make(chan error, 1)
  145. go func() {
  146. defer close(resultChan)
  147. defer close(errorChan)
  148. result, err := s.Search(ctx, req)
  149. if err != nil {
  150. errorChan <- err
  151. } else {
  152. resultChan <- result
  153. }
  154. }()
  155. return resultChan, errorChan
  156. }
  157. // executeDistributedSearch executes search across all healthy shards
  158. func (s *Searcher) executeDistributedSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  159. healthyShards := s.getHealthyShards()
  160. if len(healthyShards) == 0 {
  161. return nil, fmt.Errorf("no healthy shards available")
  162. }
  163. // If specific log groups are requested via main_log_path, we can still use all shards
  164. // because documents are filtered by main_log_path at query level. To avoid unnecessary
  165. // shard touches, in future we can maintain a mapping of group->shards and build a
  166. // narrowed alias. For now, rely on Bleve to skip shards quickly when the filter eliminates them.
  167. // Use Bleve's native distributed search with global scoring for consistent pagination
  168. return s.executeGlobalScoringSearch(ctx, query, req)
  169. }
  170. // executeGlobalScoringSearch uses Bleve's native distributed search with global scoring
  171. // This ensures consistent pagination by letting Bleve handle cross-shard ranking
  172. func (s *Searcher) executeGlobalScoringSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  173. // Create search request with proper pagination
  174. searchReq := bleve.NewSearchRequest(query)
  175. // Set pagination parameters directly - Bleve will handle distributed pagination correctly
  176. searchReq.Size = req.Limit
  177. if searchReq.Size <= 0 {
  178. searchReq.Size = 50 // Default page size
  179. }
  180. searchReq.From = req.Offset
  181. // Configure the search request with proper sorting and other settings
  182. s.configureSearchRequest(searchReq, req)
  183. // Enable global scoring for distributed search consistency
  184. // This is the key fix from Bleve documentation for distributed search
  185. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  186. // Debug: Log the constructed query for comparison
  187. if queryBytes, err := json.Marshal(searchReq.Query); err == nil {
  188. logger.Debugf("Main search query: %s", string(queryBytes))
  189. logger.Debugf("Main search Size=%d, From=%d, Fields=%v", searchReq.Size, searchReq.From, searchReq.Fields)
  190. }
  191. // Execute search using Bleve's IndexAlias with global scoring
  192. result, err := s.indexAlias.SearchInContext(globalCtx, searchReq)
  193. if err != nil {
  194. return nil, fmt.Errorf("global scoring search failed: %w", err)
  195. }
  196. // Convert Bleve result to our SearchResult format
  197. return s.convertBleveResult(result), nil
  198. }
  199. // convertBleveResult converts a Bleve SearchResult to our SearchResult format
  200. func (s *Searcher) convertBleveResult(bleveResult *bleve.SearchResult) *SearchResult {
  201. result := &SearchResult{
  202. Hits: make([]*SearchHit, 0, len(bleveResult.Hits)),
  203. TotalHits: bleveResult.Total,
  204. MaxScore: bleveResult.MaxScore,
  205. Facets: make(map[string]*Facet),
  206. }
  207. // Convert hits
  208. for _, hit := range bleveResult.Hits {
  209. searchHit := &SearchHit{
  210. ID: hit.ID,
  211. Score: hit.Score,
  212. Fields: hit.Fields,
  213. Highlighting: hit.Fragments,
  214. Index: hit.Index,
  215. }
  216. result.Hits = append(result.Hits, searchHit)
  217. }
  218. // Convert facets if present
  219. for name, facet := range bleveResult.Facets {
  220. convertedFacet := &Facet{
  221. Field: name,
  222. Total: facet.Total,
  223. Missing: facet.Missing,
  224. Other: facet.Other,
  225. Terms: make([]*FacetTerm, 0),
  226. }
  227. if facet.Terms != nil {
  228. facetTerms := facet.Terms.Terms()
  229. convertedFacet.Terms = make([]*FacetTerm, 0, len(facetTerms))
  230. for _, term := range facetTerms {
  231. convertedFacet.Terms = append(convertedFacet.Terms, &FacetTerm{
  232. Term: term.Term,
  233. Count: term.Count,
  234. })
  235. }
  236. // Fix Total to be the actual count of unique terms, not the sum
  237. // This addresses the issue where Bleve may incorrectly aggregate Total values
  238. // across multiple shards in IndexAlias
  239. convertedFacet.Total = len(facetTerms)
  240. } else {
  241. // If there are no terms, Total should be 0
  242. convertedFacet.Total = 0
  243. }
  244. result.Facets[name] = convertedFacet
  245. }
  246. return result
  247. }
  248. // configureSearchRequest sets up common search request configuration
  249. func (s *Searcher) configureSearchRequest(searchReq *bleve.SearchRequest, req *SearchRequest) {
  250. // Set up sorting with proper Bleve syntax
  251. sortField := req.SortBy
  252. if sortField == "" {
  253. sortField = "timestamp" // Default sort field
  254. }
  255. sortOrder := req.SortOrder
  256. if sortOrder == "" {
  257. sortOrder = SortOrderDesc // Default sort order
  258. }
  259. // Apply Bleve sorting - use "-" prefix for descending order
  260. if sortOrder == SortOrderDesc {
  261. searchReq.SortBy([]string{"-" + sortField})
  262. } else {
  263. searchReq.SortBy([]string{sortField})
  264. }
  265. // Configure highlighting
  266. if req.IncludeHighlighting && s.config.EnableHighlighting {
  267. searchReq.Highlight = bleve.NewHighlight()
  268. if len(req.Fields) > 0 {
  269. for _, field := range req.Fields {
  270. searchReq.Highlight.AddField(field)
  271. }
  272. } else {
  273. searchReq.Highlight.AddField("*")
  274. }
  275. }
  276. // Configure facets
  277. if req.IncludeFacets && s.config.EnableFaceting {
  278. facetFields := req.FacetFields
  279. if len(facetFields) == 0 {
  280. // Default facet fields
  281. facetFields = []string{"status", "method", "browser", "os", "device_type", "region_code"}
  282. }
  283. for _, field := range facetFields {
  284. size := DefaultFacetSize
  285. if req.FacetSize > 0 {
  286. size = req.FacetSize
  287. }
  288. facet := bleve.NewFacetRequest(field, size)
  289. searchReq.AddFacet(field, facet)
  290. }
  291. }
  292. // Configure fields to return
  293. if len(req.Fields) > 0 {
  294. searchReq.Fields = req.Fields
  295. } else {
  296. searchReq.Fields = []string{"*"}
  297. }
  298. }
  299. // Utility methods
  300. func (s *Searcher) setRequestDefaults(req *SearchRequest) {
  301. if req.Timeout == 0 {
  302. req.Timeout = s.config.TimeoutDuration
  303. }
  304. if req.UseCache && !s.config.EnableCache {
  305. req.UseCache = false
  306. }
  307. if !req.UseCache && s.config.EnableCache {
  308. req.UseCache = true
  309. }
  310. }
  311. func (s *Searcher) getHealthyShards() []int {
  312. // With IndexAlias, Bleve handles shard health internally
  313. // Return all shard IDs since the alias will route correctly
  314. healthy := make([]int, len(s.shards))
  315. for i := range s.shards {
  316. healthy[i] = i
  317. }
  318. return healthy
  319. }
  320. func (s *Searcher) recordSearchMetrics(duration time.Duration, success bool) {
  321. atomic.AddInt64(&s.stats.totalSearches, 1)
  322. atomic.AddInt64(&s.stats.totalLatency, int64(duration))
  323. if success {
  324. atomic.AddInt64(&s.stats.successfulSearches, 1)
  325. } else {
  326. atomic.AddInt64(&s.stats.failedSearches, 1)
  327. }
  328. // Update min/max latency
  329. durationNs := int64(duration)
  330. for {
  331. current := atomic.LoadInt64(&s.stats.minLatency)
  332. if durationNs >= current || atomic.CompareAndSwapInt64(&s.stats.minLatency, current, durationNs) {
  333. break
  334. }
  335. }
  336. for {
  337. current := atomic.LoadInt64(&s.stats.maxLatency)
  338. if durationNs <= current || atomic.CompareAndSwapInt64(&s.stats.maxLatency, current, durationNs) {
  339. break
  340. }
  341. }
  342. }
  343. // Health and statistics
  344. func (s *Searcher) IsHealthy() bool {
  345. healthy := s.getHealthyShards()
  346. return len(healthy) > 0
  347. }
  348. // IsRunning returns true if the searcher is currently running
  349. func (s *Searcher) IsRunning() bool {
  350. return atomic.LoadInt32(&s.running) == 1
  351. }
  352. func (s *Searcher) GetStats() *Stats {
  353. s.stats.mutex.RLock()
  354. defer s.stats.mutex.RUnlock()
  355. stats := &Stats{
  356. TotalSearches: atomic.LoadInt64(&s.stats.totalSearches),
  357. SuccessfulSearches: atomic.LoadInt64(&s.stats.successfulSearches),
  358. FailedSearches: atomic.LoadInt64(&s.stats.failedSearches),
  359. ActiveSearches: atomic.LoadInt32(&s.stats.activeSearches),
  360. QueuedSearches: len(s.semaphore),
  361. }
  362. // Calculate average latency
  363. totalLatency := atomic.LoadInt64(&s.stats.totalLatency)
  364. if stats.TotalSearches > 0 {
  365. stats.AverageLatency = time.Duration(totalLatency / stats.TotalSearches)
  366. }
  367. stats.MinLatency = time.Duration(atomic.LoadInt64(&s.stats.minLatency))
  368. stats.MaxLatency = time.Duration(atomic.LoadInt64(&s.stats.maxLatency))
  369. // Copy shard stats
  370. stats.ShardStats = make([]*ShardSearchStats, 0, len(s.stats.shardStats))
  371. for _, stat := range s.stats.shardStats {
  372. statCopy := *stat
  373. stats.ShardStats = append(stats.ShardStats, &statCopy)
  374. }
  375. // Add cache stats if cache is enabled
  376. if s.cache != nil {
  377. stats.CacheStats = s.cache.GetStats()
  378. }
  379. return stats
  380. }
  381. func (s *Searcher) GetConfig() *Config {
  382. return s.config
  383. }
  384. // GetShards returns the underlying shards for cardinality counting
  385. func (s *Searcher) GetShards() []bleve.Index {
  386. return s.shards
  387. }
  388. // SwapShards atomically replaces the current shards with new ones using IndexAlias.Swap()
  389. // This follows Bleve best practices for zero-downtime index updates
  390. func (s *Searcher) SwapShards(newShards []bleve.Index) error {
  391. if atomic.LoadInt32(&s.running) == 0 {
  392. return fmt.Errorf("searcher is not running")
  393. }
  394. if s.indexAlias == nil {
  395. return fmt.Errorf("indexAlias is nil")
  396. }
  397. // Store old shards for logging
  398. oldShards := s.shards
  399. // Perform atomic swap using IndexAlias - this is the key Bleve operation
  400. // that provides zero-downtime index updates
  401. logger.Debugf("SwapShards: Starting atomic swap - old=%d, new=%d", len(oldShards), len(newShards))
  402. swapStartTime := time.Now()
  403. s.indexAlias.Swap(newShards, oldShards)
  404. swapDuration := time.Since(swapStartTime)
  405. logger.Infof("IndexAlias.Swap completed in %v (old=%d shards, new=%d shards)",
  406. swapDuration, len(oldShards), len(newShards))
  407. // Update internal shards reference to match the IndexAlias
  408. s.shards = newShards
  409. // Clear cache after shard swap to prevent stale results
  410. // Use goroutine to avoid potential deadlock during shard swap
  411. if s.cache != nil {
  412. // Capture cache reference to avoid race condition
  413. cache := s.cache
  414. go func() {
  415. // Add a small delay to ensure shard swap is fully completed
  416. time.Sleep(100 * time.Millisecond)
  417. // Double-check cache is still valid before clearing
  418. if cache != nil {
  419. cache.Clear()
  420. logger.Infof("Cache cleared after shard swap to prevent stale results")
  421. }
  422. }()
  423. }
  424. // Update shard stats for the new shards
  425. s.stats.mutex.Lock()
  426. // Clear old shard stats
  427. s.stats.shardStats = make(map[int]*ShardSearchStats)
  428. // Initialize stats for new shards
  429. for i := range newShards {
  430. s.stats.shardStats[i] = &ShardSearchStats{
  431. ShardID: i,
  432. IsHealthy: true,
  433. }
  434. }
  435. s.stats.mutex.Unlock()
  436. logger.Debugf("IndexAlias.Swap() completed: %d old shards -> %d new shards",
  437. len(oldShards), len(newShards))
  438. // Test the searcher with a simple query to verify functionality
  439. testCtx := context.Background()
  440. testReq := &SearchRequest{
  441. Limit: 1,
  442. Offset: 0,
  443. }
  444. if _, err := s.Search(testCtx, testReq); err != nil {
  445. logger.Errorf("Post-swap searcher test query failed: %v", err)
  446. return fmt.Errorf("searcher test failed after shard swap: %w", err)
  447. } else {
  448. logger.Debug("Post-swap searcher test query succeeded")
  449. }
  450. return nil
  451. }
  452. // Stop gracefully stops the searcher and closes all bleve indexes
  453. func (s *Searcher) Stop() error {
  454. var err error
  455. s.closeOnce.Do(func() {
  456. // Set running to 0
  457. atomic.StoreInt32(&s.running, 0)
  458. // Close the index alias first (this doesn't close underlying indexes)
  459. if s.indexAlias != nil {
  460. if closeErr := s.indexAlias.Close(); closeErr != nil {
  461. logger.Errorf("Failed to close index alias: %v", closeErr)
  462. err = closeErr
  463. }
  464. s.indexAlias = nil
  465. }
  466. // DON'T close the underlying shards - they are managed by the indexer/shard manager
  467. // The searcher is just a consumer of these shards, not the owner
  468. // Clear the shards slice reference without closing the indexes
  469. s.shards = nil
  470. // Close cache if it exists
  471. if s.cache != nil {
  472. s.cache.Close()
  473. s.cache = nil
  474. }
  475. })
  476. return err
  477. }