distributed_searcher.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. package searcher
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/blevesearch/bleve/v2/search"
  11. "github.com/blevesearch/bleve/v2/search/query"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // DistributedSearcher implements high-performance distributed search across multiple shards
  15. type DistributedSearcher struct {
  16. config *Config
  17. shards []bleve.Index
  18. indexAlias bleve.IndexAlias // Index alias for global scoring
  19. queryBuilder *QueryBuilderService
  20. cache *OptimizedSearchCache
  21. stats *searcherStats
  22. // Concurrency control
  23. semaphore chan struct{}
  24. // State
  25. running int32
  26. // Cleanup control
  27. closeOnce sync.Once
  28. }
  29. // searcherStats tracks search performance metrics
  30. type searcherStats struct {
  31. totalSearches int64
  32. successfulSearches int64
  33. failedSearches int64
  34. totalLatency int64 // nanoseconds
  35. minLatency int64
  36. maxLatency int64
  37. activeSearches int32
  38. shardStats map[int]*ShardSearchStats
  39. mutex sync.RWMutex
  40. }
  41. // NewDistributedSearcher creates a new distributed searcher
  42. func NewDistributedSearcher(config *Config, shards []bleve.Index) *DistributedSearcher {
  43. if config == nil {
  44. config = DefaultSearcherConfig()
  45. }
  46. // Create index alias for global scoring across shards
  47. indexAlias := bleve.NewIndexAlias(shards...)
  48. // Set the index mapping from the first shard (all shards should have the same mapping)
  49. if len(shards) > 0 && shards[0] != nil {
  50. mapping := shards[0].Mapping()
  51. if err := indexAlias.SetIndexMapping(mapping); err != nil {
  52. // Log error but continue - this is not critical for basic functionality
  53. }
  54. }
  55. searcher := &DistributedSearcher{
  56. config: config,
  57. shards: shards,
  58. indexAlias: indexAlias,
  59. queryBuilder: NewQueryBuilderService(),
  60. semaphore: make(chan struct{}, config.MaxConcurrency),
  61. stats: &searcherStats{
  62. shardStats: make(map[int]*ShardSearchStats),
  63. minLatency: int64(time.Hour), // Start with high value
  64. },
  65. running: 1,
  66. }
  67. // Initialize cache if enabled
  68. if config.EnableCache {
  69. searcher.cache = NewOptimizedSearchCache(int64(config.CacheSize))
  70. }
  71. // Initialize shard stats
  72. for i := range shards {
  73. searcher.stats.shardStats[i] = &ShardSearchStats{
  74. ShardID: i,
  75. IsHealthy: true,
  76. }
  77. }
  78. return searcher
  79. }
  80. // Search performs a distributed search across all shards
  81. func (ds *DistributedSearcher) Search(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  82. if atomic.LoadInt32(&ds.running) == 0 {
  83. return nil, fmt.Errorf("searcher is not running")
  84. }
  85. startTime := time.Now()
  86. defer func() {
  87. ds.recordSearchMetrics(time.Since(startTime), true)
  88. }()
  89. // Validate request
  90. if err := ds.queryBuilder.ValidateSearchRequest(req); err != nil {
  91. return nil, fmt.Errorf("invalid search request: %w", err)
  92. }
  93. // Set defaults
  94. ds.setRequestDefaults(req)
  95. // Check cache if enabled
  96. if ds.config.EnableCache && req.UseCache {
  97. if cached := ds.getFromCache(req); cached != nil {
  98. cached.FromCache = true
  99. cached.CacheHit = true
  100. return cached, nil
  101. }
  102. }
  103. // Apply timeout
  104. searchCtx := ctx
  105. if req.Timeout > 0 {
  106. var cancel context.CancelFunc
  107. searchCtx, cancel = context.WithTimeout(ctx, req.Timeout)
  108. defer cancel()
  109. } else if ds.config.TimeoutDuration > 0 {
  110. var cancel context.CancelFunc
  111. searchCtx, cancel = context.WithTimeout(ctx, ds.config.TimeoutDuration)
  112. defer cancel()
  113. }
  114. // Acquire semaphore for concurrency control
  115. select {
  116. case ds.semaphore <- struct{}{}:
  117. defer func() { <-ds.semaphore }()
  118. case <-searchCtx.Done():
  119. return nil, fmt.Errorf("search timeout")
  120. }
  121. atomic.AddInt32(&ds.stats.activeSearches, 1)
  122. defer atomic.AddInt32(&ds.stats.activeSearches, -1)
  123. // Build query
  124. query, err := ds.queryBuilder.BuildQuery(req)
  125. if err != nil {
  126. return nil, fmt.Errorf("failed to build query: %w", err)
  127. }
  128. // Execute search across shards
  129. result, err := ds.executeDistributedSearch(searchCtx, query, req)
  130. if err != nil {
  131. ds.recordSearchMetrics(time.Since(startTime), false)
  132. return nil, err
  133. }
  134. result.Duration = time.Since(startTime)
  135. // Cache result if enabled
  136. if ds.config.EnableCache && req.UseCache {
  137. ds.cacheResult(req, result)
  138. }
  139. return result, nil
  140. }
  141. // SearchAsync performs asynchronous search
  142. func (ds *DistributedSearcher) SearchAsync(ctx context.Context, req *SearchRequest) (<-chan *SearchResult, <-chan error) {
  143. resultChan := make(chan *SearchResult, 1)
  144. errorChan := make(chan error, 1)
  145. go func() {
  146. defer close(resultChan)
  147. defer close(errorChan)
  148. result, err := ds.Search(ctx, req)
  149. if err != nil {
  150. errorChan <- err
  151. } else {
  152. resultChan <- result
  153. }
  154. }()
  155. return resultChan, errorChan
  156. }
  157. // executeDistributedSearch executes search across all healthy shards
  158. func (ds *DistributedSearcher) executeDistributedSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  159. healthyShards := ds.getHealthyShards()
  160. if len(healthyShards) == 0 {
  161. return nil, fmt.Errorf("no healthy shards available")
  162. }
  163. // Use Bleve's native distributed search with global scoring for consistent pagination
  164. return ds.executeGlobalScoringSearch(ctx, query, req)
  165. }
  166. // executeGlobalScoringSearch uses Bleve's native distributed search with global scoring
  167. // This ensures consistent pagination by letting Bleve handle cross-shard ranking
  168. func (ds *DistributedSearcher) executeGlobalScoringSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  169. // Create search request with proper pagination
  170. searchReq := bleve.NewSearchRequest(query)
  171. // Set pagination parameters directly - Bleve will handle distributed pagination correctly
  172. searchReq.Size = req.Limit
  173. if searchReq.Size <= 0 {
  174. searchReq.Size = 50 // Default page size
  175. }
  176. searchReq.From = req.Offset
  177. // Configure the search request with proper sorting and other settings
  178. ds.configureSearchRequest(searchReq, req)
  179. // Enable global scoring for distributed search consistency
  180. // This is the key fix from Bleve documentation for distributed search
  181. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  182. // Debug: Log the constructed query for comparison
  183. if queryBytes, err := json.Marshal(searchReq.Query); err == nil {
  184. logger.Debugf("Main search query: %s", string(queryBytes))
  185. logger.Debugf("Main search Size=%d, From=%d, Fields=%v", searchReq.Size, searchReq.From, searchReq.Fields)
  186. }
  187. // Execute search using Bleve's IndexAlias with global scoring
  188. result, err := ds.indexAlias.SearchInContext(globalCtx, searchReq)
  189. if err != nil {
  190. return nil, fmt.Errorf("global scoring search failed: %w", err)
  191. }
  192. // Convert Bleve result to our SearchResult format
  193. return ds.convertBleveResult(result), nil
  194. }
  195. // convertBleveResult converts a Bleve SearchResult to our SearchResult format
  196. func (ds *DistributedSearcher) convertBleveResult(bleveResult *bleve.SearchResult) *SearchResult {
  197. result := &SearchResult{
  198. Hits: make([]*SearchHit, 0, len(bleveResult.Hits)),
  199. TotalHits: bleveResult.Total,
  200. MaxScore: bleveResult.MaxScore,
  201. Facets: make(map[string]*Facet),
  202. }
  203. // Convert hits
  204. for _, hit := range bleveResult.Hits {
  205. searchHit := &SearchHit{
  206. ID: hit.ID,
  207. Score: hit.Score,
  208. Fields: hit.Fields,
  209. Highlighting: hit.Fragments,
  210. Index: hit.Index,
  211. }
  212. result.Hits = append(result.Hits, searchHit)
  213. }
  214. // Convert facets if present
  215. for name, facet := range bleveResult.Facets {
  216. convertedFacet := &Facet{
  217. Field: name,
  218. Total: facet.Total,
  219. Missing: facet.Missing,
  220. Other: facet.Other,
  221. Terms: make([]*FacetTerm, 0),
  222. }
  223. if facet.Terms != nil {
  224. facetTerms := facet.Terms.Terms()
  225. convertedFacet.Terms = make([]*FacetTerm, 0, len(facetTerms))
  226. for _, term := range facetTerms {
  227. convertedFacet.Terms = append(convertedFacet.Terms, &FacetTerm{
  228. Term: term.Term,
  229. Count: term.Count,
  230. })
  231. }
  232. // Fix Total to be the actual count of unique terms, not the sum
  233. // This addresses the issue where Bleve may incorrectly aggregate Total values
  234. // across multiple shards in IndexAlias
  235. convertedFacet.Total = len(facetTerms)
  236. } else {
  237. // If there are no terms, Total should be 0
  238. convertedFacet.Total = 0
  239. }
  240. result.Facets[name] = convertedFacet
  241. }
  242. return result
  243. }
  244. // configureSearchRequest sets up common search request configuration
  245. func (ds *DistributedSearcher) configureSearchRequest(searchReq *bleve.SearchRequest, req *SearchRequest) {
  246. // Set up sorting with proper Bleve syntax
  247. sortField := req.SortBy
  248. if sortField == "" {
  249. sortField = "timestamp" // Default sort field
  250. }
  251. sortOrder := req.SortOrder
  252. if sortOrder == "" {
  253. sortOrder = SortOrderDesc // Default sort order
  254. }
  255. // Apply Bleve sorting - use "-" prefix for descending order
  256. if sortOrder == SortOrderDesc {
  257. searchReq.SortBy([]string{"-" + sortField})
  258. } else {
  259. searchReq.SortBy([]string{sortField})
  260. }
  261. // Configure highlighting
  262. if req.IncludeHighlighting && ds.config.EnableHighlighting {
  263. searchReq.Highlight = bleve.NewHighlight()
  264. if len(req.Fields) > 0 {
  265. for _, field := range req.Fields {
  266. searchReq.Highlight.AddField(field)
  267. }
  268. } else {
  269. searchReq.Highlight.AddField("*")
  270. }
  271. }
  272. // Configure facets
  273. if req.IncludeFacets && ds.config.EnableFaceting {
  274. facetFields := req.FacetFields
  275. if len(facetFields) == 0 {
  276. // Default facet fields
  277. facetFields = []string{"status", "method", "browser", "os", "device_type", "region_code"}
  278. }
  279. for _, field := range facetFields {
  280. size := DefaultFacetSize
  281. if req.FacetSize > 0 {
  282. size = req.FacetSize
  283. }
  284. facet := bleve.NewFacetRequest(field, size)
  285. searchReq.AddFacet(field, facet)
  286. }
  287. }
  288. // Configure fields to return
  289. if len(req.Fields) > 0 {
  290. searchReq.Fields = req.Fields
  291. } else {
  292. searchReq.Fields = []string{"*"}
  293. }
  294. }
  295. // Utility methods
  296. func (ds *DistributedSearcher) setRequestDefaults(req *SearchRequest) {
  297. if req.Timeout == 0 {
  298. req.Timeout = ds.config.TimeoutDuration
  299. }
  300. if req.UseCache && !ds.config.EnableCache {
  301. req.UseCache = false
  302. }
  303. if !req.UseCache && ds.config.EnableCache {
  304. req.UseCache = true
  305. }
  306. }
  307. func (ds *DistributedSearcher) getHealthyShards() []int {
  308. // With IndexAlias, Bleve handles shard health internally
  309. // Return all shard IDs since the alias will route correctly
  310. healthy := make([]int, len(ds.shards))
  311. for i := range ds.shards {
  312. healthy[i] = i
  313. }
  314. return healthy
  315. }
  316. func (ds *DistributedSearcher) updateShardStats(shardID int, duration time.Duration, success bool) {
  317. ds.stats.mutex.Lock()
  318. if stat, exists := ds.stats.shardStats[shardID]; exists {
  319. stat.SearchCount++
  320. stat.LastSearchTime = time.Now()
  321. // Update average latency
  322. if stat.AverageLatency == 0 {
  323. stat.AverageLatency = duration
  324. } else {
  325. stat.AverageLatency = (stat.AverageLatency + duration) / 2
  326. }
  327. if !success {
  328. stat.ErrorCount++
  329. }
  330. }
  331. ds.stats.mutex.Unlock()
  332. }
  333. func (ds *DistributedSearcher) recordSearchMetrics(duration time.Duration, success bool) {
  334. atomic.AddInt64(&ds.stats.totalSearches, 1)
  335. atomic.AddInt64(&ds.stats.totalLatency, int64(duration))
  336. if success {
  337. atomic.AddInt64(&ds.stats.successfulSearches, 1)
  338. } else {
  339. atomic.AddInt64(&ds.stats.failedSearches, 1)
  340. }
  341. // Update min/max latency
  342. durationNs := int64(duration)
  343. for {
  344. current := atomic.LoadInt64(&ds.stats.minLatency)
  345. if durationNs >= current || atomic.CompareAndSwapInt64(&ds.stats.minLatency, current, durationNs) {
  346. break
  347. }
  348. }
  349. for {
  350. current := atomic.LoadInt64(&ds.stats.maxLatency)
  351. if durationNs <= current || atomic.CompareAndSwapInt64(&ds.stats.maxLatency, current, durationNs) {
  352. break
  353. }
  354. }
  355. }
  356. // Health and statistics
  357. func (ds *DistributedSearcher) IsHealthy() bool {
  358. healthy := ds.getHealthyShards()
  359. return len(healthy) > 0
  360. }
  361. // IsRunning returns true if the searcher is currently running
  362. func (ds *DistributedSearcher) IsRunning() bool {
  363. return atomic.LoadInt32(&ds.running) == 1
  364. }
  365. func (ds *DistributedSearcher) GetStats() *Stats {
  366. ds.stats.mutex.RLock()
  367. defer ds.stats.mutex.RUnlock()
  368. stats := &Stats{
  369. TotalSearches: atomic.LoadInt64(&ds.stats.totalSearches),
  370. SuccessfulSearches: atomic.LoadInt64(&ds.stats.successfulSearches),
  371. FailedSearches: atomic.LoadInt64(&ds.stats.failedSearches),
  372. ActiveSearches: atomic.LoadInt32(&ds.stats.activeSearches),
  373. QueuedSearches: len(ds.semaphore),
  374. }
  375. // Calculate average latency
  376. totalLatency := atomic.LoadInt64(&ds.stats.totalLatency)
  377. if stats.TotalSearches > 0 {
  378. stats.AverageLatency = time.Duration(totalLatency / stats.TotalSearches)
  379. }
  380. stats.MinLatency = time.Duration(atomic.LoadInt64(&ds.stats.minLatency))
  381. stats.MaxLatency = time.Duration(atomic.LoadInt64(&ds.stats.maxLatency))
  382. // Copy shard stats
  383. stats.ShardStats = make([]*ShardSearchStats, 0, len(ds.stats.shardStats))
  384. for _, stat := range ds.stats.shardStats {
  385. statCopy := *stat
  386. stats.ShardStats = append(stats.ShardStats, &statCopy)
  387. }
  388. // Add cache stats if cache is enabled
  389. if ds.cache != nil {
  390. stats.CacheStats = ds.cache.GetStats()
  391. }
  392. return stats
  393. }
  394. func (ds *DistributedSearcher) GetConfig() *Config {
  395. return ds.config
  396. }
  397. // GetShards returns the underlying shards for cardinality counting
  398. func (ds *DistributedSearcher) GetShards() []bleve.Index {
  399. return ds.shards
  400. }
  401. // SwapShards atomically replaces the current shards with new ones using IndexAlias.Swap()
  402. // This follows Bleve best practices for zero-downtime index updates
  403. func (ds *DistributedSearcher) SwapShards(newShards []bleve.Index) error {
  404. if atomic.LoadInt32(&ds.running) == 0 {
  405. return fmt.Errorf("searcher is not running")
  406. }
  407. if ds.indexAlias == nil {
  408. return fmt.Errorf("indexAlias is nil")
  409. }
  410. // Store old shards for logging
  411. oldShards := ds.shards
  412. // Perform atomic swap using IndexAlias - this is the key Bleve operation
  413. // that provides zero-downtime index updates
  414. logger.Debugf("SwapShards: Starting atomic swap - old=%d, new=%d", len(oldShards), len(newShards))
  415. swapStartTime := time.Now()
  416. ds.indexAlias.Swap(newShards, oldShards)
  417. swapDuration := time.Since(swapStartTime)
  418. logger.Infof("IndexAlias.Swap completed in %v (old=%d shards, new=%d shards)",
  419. swapDuration, len(oldShards), len(newShards))
  420. // Update internal shards reference to match the IndexAlias
  421. ds.shards = newShards
  422. // Clear cache after shard swap to prevent stale results
  423. // Use goroutine to avoid potential deadlock during shard swap
  424. if ds.cache != nil {
  425. // Capture cache reference to avoid race condition
  426. cache := ds.cache
  427. go func() {
  428. // Add a small delay to ensure shard swap is fully completed
  429. time.Sleep(100 * time.Millisecond)
  430. // Double-check cache is still valid before clearing
  431. if cache != nil {
  432. cache.Clear()
  433. logger.Infof("Cache cleared after shard swap to prevent stale results")
  434. }
  435. }()
  436. }
  437. // Update shard stats for the new shards
  438. ds.stats.mutex.Lock()
  439. // Clear old shard stats
  440. ds.stats.shardStats = make(map[int]*ShardSearchStats)
  441. // Initialize stats for new shards
  442. for i := range newShards {
  443. ds.stats.shardStats[i] = &ShardSearchStats{
  444. ShardID: i,
  445. IsHealthy: true,
  446. }
  447. }
  448. ds.stats.mutex.Unlock()
  449. logger.Infof("IndexAlias.Swap() completed: %d old shards -> %d new shards",
  450. len(oldShards), len(newShards))
  451. // Verify each new shard's document count for debugging
  452. for i, shard := range newShards {
  453. if shard != nil {
  454. if docCount, err := shard.DocCount(); err != nil {
  455. logger.Warnf("New shard %d: error getting doc count: %v", i, err)
  456. } else {
  457. logger.Infof("New shard %d: contains %d documents", i, docCount)
  458. }
  459. } else {
  460. logger.Warnf("New shard %d: is nil", i)
  461. }
  462. }
  463. // Test the searcher with a simple query to verify functionality
  464. testCtx := context.Background()
  465. testReq := &SearchRequest{
  466. Limit: 1,
  467. Offset: 0,
  468. }
  469. if _, err := ds.Search(testCtx, testReq); err != nil {
  470. logger.Errorf("Post-swap searcher test query failed: %v", err)
  471. return fmt.Errorf("searcher test failed after shard swap: %w", err)
  472. } else {
  473. logger.Info("Post-swap searcher test query succeeded")
  474. }
  475. return nil
  476. }
  477. // Stop gracefully stops the searcher and closes all bleve indexes
  478. func (ds *DistributedSearcher) Stop() error {
  479. var err error
  480. ds.closeOnce.Do(func() {
  481. // Set running to 0
  482. atomic.StoreInt32(&ds.running, 0)
  483. // Close the index alias first (this doesn't close underlying indexes)
  484. if ds.indexAlias != nil {
  485. if closeErr := ds.indexAlias.Close(); closeErr != nil {
  486. logger.Errorf("Failed to close index alias: %v", closeErr)
  487. err = closeErr
  488. }
  489. ds.indexAlias = nil
  490. }
  491. // DON'T close the underlying shards - they are managed by the indexer/shard manager
  492. // The searcher is just a consumer of these shards, not the owner
  493. // Clear the shards slice reference without closing the indexes
  494. ds.shards = nil
  495. // Close cache if it exists
  496. if ds.cache != nil {
  497. ds.cache.Close()
  498. ds.cache = nil
  499. }
  500. })
  501. return err
  502. }