distributed_searcher.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package searcher
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/blevesearch/bleve/v2"
  9. "github.com/blevesearch/bleve/v2/search"
  10. "github.com/blevesearch/bleve/v2/search/query"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // DistributedSearcher implements high-performance distributed search across multiple shards
  14. type DistributedSearcher struct {
  15. config *Config
  16. shards []bleve.Index
  17. indexAlias bleve.IndexAlias // Index alias for global scoring
  18. queryBuilder *QueryBuilderService
  19. cache *OptimizedSearchCache
  20. stats *searcherStats
  21. // Concurrency control
  22. semaphore chan struct{}
  23. // State
  24. running int32
  25. // Cleanup control
  26. closeOnce sync.Once
  27. }
  28. // searcherStats tracks search performance metrics
  29. type searcherStats struct {
  30. totalSearches int64
  31. successfulSearches int64
  32. failedSearches int64
  33. totalLatency int64 // nanoseconds
  34. minLatency int64
  35. maxLatency int64
  36. activeSearches int32
  37. shardStats map[int]*ShardSearchStats
  38. mutex sync.RWMutex
  39. }
  40. // NewDistributedSearcher creates a new distributed searcher
  41. func NewDistributedSearcher(config *Config, shards []bleve.Index) *DistributedSearcher {
  42. if config == nil {
  43. config = DefaultSearcherConfig()
  44. }
  45. // Create index alias for global scoring across shards
  46. indexAlias := bleve.NewIndexAlias(shards...)
  47. // Set the index mapping from the first shard (all shards should have the same mapping)
  48. if len(shards) > 0 && shards[0] != nil {
  49. mapping := shards[0].Mapping()
  50. if err := indexAlias.SetIndexMapping(mapping); err != nil {
  51. // Log error but continue - this is not critical for basic functionality
  52. }
  53. }
  54. searcher := &DistributedSearcher{
  55. config: config,
  56. shards: shards,
  57. indexAlias: indexAlias,
  58. queryBuilder: NewQueryBuilderService(),
  59. semaphore: make(chan struct{}, config.MaxConcurrency),
  60. stats: &searcherStats{
  61. shardStats: make(map[int]*ShardSearchStats),
  62. minLatency: int64(time.Hour), // Start with high value
  63. },
  64. running: 1,
  65. }
  66. // Initialize cache if enabled
  67. if config.EnableCache {
  68. searcher.cache = NewOptimizedSearchCache(int64(config.CacheSize))
  69. }
  70. // Initialize shard stats
  71. for i := range shards {
  72. searcher.stats.shardStats[i] = &ShardSearchStats{
  73. ShardID: i,
  74. IsHealthy: true,
  75. }
  76. }
  77. return searcher
  78. }
  79. // Search performs a distributed search across all shards
  80. func (ds *DistributedSearcher) Search(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  81. if atomic.LoadInt32(&ds.running) == 0 {
  82. return nil, fmt.Errorf("searcher is not running")
  83. }
  84. startTime := time.Now()
  85. defer func() {
  86. ds.recordSearchMetrics(time.Since(startTime), true)
  87. }()
  88. // Validate request
  89. if err := ds.queryBuilder.ValidateSearchRequest(req); err != nil {
  90. return nil, fmt.Errorf("invalid search request: %w", err)
  91. }
  92. // Set defaults
  93. ds.setRequestDefaults(req)
  94. // Check cache if enabled
  95. if ds.config.EnableCache && req.UseCache {
  96. if cached := ds.getFromCache(req); cached != nil {
  97. cached.FromCache = true
  98. cached.CacheHit = true
  99. return cached, nil
  100. }
  101. }
  102. // Apply timeout
  103. searchCtx := ctx
  104. if req.Timeout > 0 {
  105. var cancel context.CancelFunc
  106. searchCtx, cancel = context.WithTimeout(ctx, req.Timeout)
  107. defer cancel()
  108. } else if ds.config.TimeoutDuration > 0 {
  109. var cancel context.CancelFunc
  110. searchCtx, cancel = context.WithTimeout(ctx, ds.config.TimeoutDuration)
  111. defer cancel()
  112. }
  113. // Acquire semaphore for concurrency control
  114. select {
  115. case ds.semaphore <- struct{}{}:
  116. defer func() { <-ds.semaphore }()
  117. case <-searchCtx.Done():
  118. return nil, fmt.Errorf("search timeout")
  119. }
  120. atomic.AddInt32(&ds.stats.activeSearches, 1)
  121. defer atomic.AddInt32(&ds.stats.activeSearches, -1)
  122. // Build query
  123. query, err := ds.queryBuilder.BuildQuery(req)
  124. if err != nil {
  125. return nil, fmt.Errorf("failed to build query: %w", err)
  126. }
  127. // Execute search across shards
  128. result, err := ds.executeDistributedSearch(searchCtx, query, req)
  129. if err != nil {
  130. ds.recordSearchMetrics(time.Since(startTime), false)
  131. return nil, err
  132. }
  133. result.Duration = time.Since(startTime)
  134. // Cache result if enabled
  135. if ds.config.EnableCache && req.UseCache {
  136. ds.cacheResult(req, result)
  137. }
  138. return result, nil
  139. }
  140. // SearchAsync performs asynchronous search
  141. func (ds *DistributedSearcher) SearchAsync(ctx context.Context, req *SearchRequest) (<-chan *SearchResult, <-chan error) {
  142. resultChan := make(chan *SearchResult, 1)
  143. errorChan := make(chan error, 1)
  144. go func() {
  145. defer close(resultChan)
  146. defer close(errorChan)
  147. result, err := ds.Search(ctx, req)
  148. if err != nil {
  149. errorChan <- err
  150. } else {
  151. resultChan <- result
  152. }
  153. }()
  154. return resultChan, errorChan
  155. }
  156. // executeDistributedSearch executes search across all healthy shards
  157. func (ds *DistributedSearcher) executeDistributedSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  158. healthyShards := ds.getHealthyShards()
  159. if len(healthyShards) == 0 {
  160. return nil, fmt.Errorf("no healthy shards available")
  161. }
  162. // Use Bleve's native distributed search with global scoring for consistent pagination
  163. return ds.executeGlobalScoringSearch(ctx, query, req)
  164. }
  165. // executeGlobalScoringSearch uses Bleve's native distributed search with global scoring
  166. // This ensures consistent pagination by letting Bleve handle cross-shard ranking
  167. func (ds *DistributedSearcher) executeGlobalScoringSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
  168. // Create search request with proper pagination
  169. searchReq := bleve.NewSearchRequest(query)
  170. // Set pagination parameters directly - Bleve will handle distributed pagination correctly
  171. searchReq.Size = req.Limit
  172. if searchReq.Size <= 0 {
  173. searchReq.Size = 50 // Default page size
  174. }
  175. searchReq.From = req.Offset
  176. // Configure the search request with proper sorting and other settings
  177. ds.configureSearchRequest(searchReq, req)
  178. // Enable global scoring for distributed search consistency
  179. // This is the key fix from Bleve documentation for distributed search
  180. globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
  181. // Execute search using Bleve's IndexAlias with global scoring
  182. result, err := ds.indexAlias.SearchInContext(globalCtx, searchReq)
  183. if err != nil {
  184. return nil, fmt.Errorf("global scoring search failed: %w", err)
  185. }
  186. // Convert Bleve result to our SearchResult format
  187. return ds.convertBleveResult(result), nil
  188. }
  189. // convertBleveResult converts a Bleve SearchResult to our SearchResult format
  190. func (ds *DistributedSearcher) convertBleveResult(bleveResult *bleve.SearchResult) *SearchResult {
  191. result := &SearchResult{
  192. Hits: make([]*SearchHit, 0, len(bleveResult.Hits)),
  193. TotalHits: bleveResult.Total,
  194. MaxScore: bleveResult.MaxScore,
  195. Facets: make(map[string]*Facet),
  196. }
  197. // Convert hits
  198. for _, hit := range bleveResult.Hits {
  199. searchHit := &SearchHit{
  200. ID: hit.ID,
  201. Score: hit.Score,
  202. Fields: hit.Fields,
  203. Highlighting: hit.Fragments,
  204. Index: hit.Index,
  205. }
  206. result.Hits = append(result.Hits, searchHit)
  207. }
  208. // Convert facets if present
  209. for name, facet := range bleveResult.Facets {
  210. convertedFacet := &Facet{
  211. Field: name,
  212. Total: facet.Total,
  213. Missing: facet.Missing,
  214. Other: facet.Other,
  215. Terms: make([]*FacetTerm, 0),
  216. }
  217. if facet.Terms != nil {
  218. facetTerms := facet.Terms.Terms()
  219. convertedFacet.Terms = make([]*FacetTerm, 0, len(facetTerms))
  220. for _, term := range facetTerms {
  221. convertedFacet.Terms = append(convertedFacet.Terms, &FacetTerm{
  222. Term: term.Term,
  223. Count: term.Count,
  224. })
  225. }
  226. // Fix Total to be the actual count of unique terms, not the sum
  227. // This addresses the issue where Bleve may incorrectly aggregate Total values
  228. // across multiple shards in IndexAlias
  229. convertedFacet.Total = len(facetTerms)
  230. } else {
  231. // If there are no terms, Total should be 0
  232. convertedFacet.Total = 0
  233. }
  234. result.Facets[name] = convertedFacet
  235. }
  236. return result
  237. }
  238. // configureSearchRequest sets up common search request configuration
  239. func (ds *DistributedSearcher) configureSearchRequest(searchReq *bleve.SearchRequest, req *SearchRequest) {
  240. // Set up sorting with proper Bleve syntax
  241. sortField := req.SortBy
  242. if sortField == "" {
  243. sortField = "timestamp" // Default sort field
  244. }
  245. sortOrder := req.SortOrder
  246. if sortOrder == "" {
  247. sortOrder = SortOrderDesc // Default sort order
  248. }
  249. // Apply Bleve sorting - use "-" prefix for descending order
  250. if sortOrder == SortOrderDesc {
  251. searchReq.SortBy([]string{"-" + sortField})
  252. } else {
  253. searchReq.SortBy([]string{sortField})
  254. }
  255. // Configure highlighting
  256. if req.IncludeHighlighting && ds.config.EnableHighlighting {
  257. searchReq.Highlight = bleve.NewHighlight()
  258. if len(req.Fields) > 0 {
  259. for _, field := range req.Fields {
  260. searchReq.Highlight.AddField(field)
  261. }
  262. } else {
  263. searchReq.Highlight.AddField("*")
  264. }
  265. }
  266. // Configure facets
  267. if req.IncludeFacets && ds.config.EnableFaceting {
  268. facetFields := req.FacetFields
  269. if len(facetFields) == 0 {
  270. // Default facet fields
  271. facetFields = []string{"status", "method", "browser", "os", "device_type", "region_code"}
  272. }
  273. for _, field := range facetFields {
  274. size := DefaultFacetSize
  275. if req.FacetSize > 0 {
  276. size = req.FacetSize
  277. }
  278. facet := bleve.NewFacetRequest(field, size)
  279. searchReq.AddFacet(field, facet)
  280. }
  281. }
  282. // Configure fields to return
  283. if len(req.Fields) > 0 {
  284. searchReq.Fields = req.Fields
  285. } else {
  286. searchReq.Fields = []string{"*"}
  287. }
  288. }
  289. // Utility methods
  290. func (ds *DistributedSearcher) setRequestDefaults(req *SearchRequest) {
  291. if req.Timeout == 0 {
  292. req.Timeout = ds.config.TimeoutDuration
  293. }
  294. if req.UseCache && !ds.config.EnableCache {
  295. req.UseCache = false
  296. }
  297. if !req.UseCache && ds.config.EnableCache {
  298. req.UseCache = true
  299. }
  300. }
  301. func (ds *DistributedSearcher) getHealthyShards() []int {
  302. // With IndexAlias, Bleve handles shard health internally
  303. // Return all shard IDs since the alias will route correctly
  304. healthy := make([]int, len(ds.shards))
  305. for i := range ds.shards {
  306. healthy[i] = i
  307. }
  308. return healthy
  309. }
  310. func (ds *DistributedSearcher) updateShardStats(shardID int, duration time.Duration, success bool) {
  311. ds.stats.mutex.Lock()
  312. if stat, exists := ds.stats.shardStats[shardID]; exists {
  313. stat.SearchCount++
  314. stat.LastSearchTime = time.Now()
  315. // Update average latency
  316. if stat.AverageLatency == 0 {
  317. stat.AverageLatency = duration
  318. } else {
  319. stat.AverageLatency = (stat.AverageLatency + duration) / 2
  320. }
  321. if !success {
  322. stat.ErrorCount++
  323. }
  324. }
  325. ds.stats.mutex.Unlock()
  326. }
  327. func (ds *DistributedSearcher) recordSearchMetrics(duration time.Duration, success bool) {
  328. atomic.AddInt64(&ds.stats.totalSearches, 1)
  329. atomic.AddInt64(&ds.stats.totalLatency, int64(duration))
  330. if success {
  331. atomic.AddInt64(&ds.stats.successfulSearches, 1)
  332. } else {
  333. atomic.AddInt64(&ds.stats.failedSearches, 1)
  334. }
  335. // Update min/max latency
  336. durationNs := int64(duration)
  337. for {
  338. current := atomic.LoadInt64(&ds.stats.minLatency)
  339. if durationNs >= current || atomic.CompareAndSwapInt64(&ds.stats.minLatency, current, durationNs) {
  340. break
  341. }
  342. }
  343. for {
  344. current := atomic.LoadInt64(&ds.stats.maxLatency)
  345. if durationNs <= current || atomic.CompareAndSwapInt64(&ds.stats.maxLatency, current, durationNs) {
  346. break
  347. }
  348. }
  349. }
  350. // Health and statistics
  351. func (ds *DistributedSearcher) IsHealthy() bool {
  352. healthy := ds.getHealthyShards()
  353. return len(healthy) > 0
  354. }
  355. // IsRunning returns true if the searcher is currently running
  356. func (ds *DistributedSearcher) IsRunning() bool {
  357. return atomic.LoadInt32(&ds.running) == 1
  358. }
  359. func (ds *DistributedSearcher) GetStats() *Stats {
  360. ds.stats.mutex.RLock()
  361. defer ds.stats.mutex.RUnlock()
  362. stats := &Stats{
  363. TotalSearches: atomic.LoadInt64(&ds.stats.totalSearches),
  364. SuccessfulSearches: atomic.LoadInt64(&ds.stats.successfulSearches),
  365. FailedSearches: atomic.LoadInt64(&ds.stats.failedSearches),
  366. ActiveSearches: atomic.LoadInt32(&ds.stats.activeSearches),
  367. QueuedSearches: len(ds.semaphore),
  368. }
  369. // Calculate average latency
  370. totalLatency := atomic.LoadInt64(&ds.stats.totalLatency)
  371. if stats.TotalSearches > 0 {
  372. stats.AverageLatency = time.Duration(totalLatency / stats.TotalSearches)
  373. }
  374. stats.MinLatency = time.Duration(atomic.LoadInt64(&ds.stats.minLatency))
  375. stats.MaxLatency = time.Duration(atomic.LoadInt64(&ds.stats.maxLatency))
  376. // Copy shard stats
  377. stats.ShardStats = make([]*ShardSearchStats, 0, len(ds.stats.shardStats))
  378. for _, stat := range ds.stats.shardStats {
  379. statCopy := *stat
  380. stats.ShardStats = append(stats.ShardStats, &statCopy)
  381. }
  382. // Add cache stats if cache is enabled
  383. if ds.cache != nil {
  384. stats.CacheStats = ds.cache.GetStats()
  385. }
  386. return stats
  387. }
  388. func (ds *DistributedSearcher) GetConfig() *Config {
  389. return ds.config
  390. }
  391. // GetShards returns the underlying shards for cardinality counting
  392. func (ds *DistributedSearcher) GetShards() []bleve.Index {
  393. return ds.shards
  394. }
  395. // SwapShards atomically replaces the current shards with new ones using IndexAlias.Swap()
  396. // This follows Bleve best practices for zero-downtime index updates
  397. func (ds *DistributedSearcher) SwapShards(newShards []bleve.Index) error {
  398. if atomic.LoadInt32(&ds.running) == 0 {
  399. return fmt.Errorf("searcher is not running")
  400. }
  401. if ds.indexAlias == nil {
  402. return fmt.Errorf("indexAlias is nil")
  403. }
  404. // Store old shards for logging
  405. oldShards := ds.shards
  406. // Perform atomic swap using IndexAlias - this is the key Bleve operation
  407. // that provides zero-downtime index updates
  408. ds.indexAlias.Swap(newShards, oldShards)
  409. // Update internal shards reference to match the IndexAlias
  410. ds.shards = newShards
  411. // Update shard stats for the new shards
  412. ds.stats.mutex.Lock()
  413. // Clear old shard stats
  414. ds.stats.shardStats = make(map[int]*ShardSearchStats)
  415. // Initialize stats for new shards
  416. for i := range newShards {
  417. ds.stats.shardStats[i] = &ShardSearchStats{
  418. ShardID: i,
  419. IsHealthy: true,
  420. }
  421. }
  422. ds.stats.mutex.Unlock()
  423. logger.Infof("IndexAlias.Swap() completed: %d old shards -> %d new shards",
  424. len(oldShards), len(newShards))
  425. // Verify each new shard's document count for debugging
  426. for i, shard := range newShards {
  427. if shard != nil {
  428. if docCount, err := shard.DocCount(); err != nil {
  429. logger.Warnf("New shard %d: error getting doc count: %v", i, err)
  430. } else {
  431. logger.Infof("New shard %d: contains %d documents", i, docCount)
  432. }
  433. } else {
  434. logger.Warnf("New shard %d: is nil", i)
  435. }
  436. }
  437. // Test the searcher with a simple query to verify functionality
  438. testCtx := context.Background()
  439. testReq := &SearchRequest{
  440. Limit: 1,
  441. Offset: 0,
  442. }
  443. if _, err := ds.Search(testCtx, testReq); err != nil {
  444. logger.Errorf("Post-swap searcher test query failed: %v", err)
  445. return fmt.Errorf("searcher test failed after shard swap: %w", err)
  446. } else {
  447. logger.Info("Post-swap searcher test query succeeded")
  448. }
  449. return nil
  450. }
  451. // Stop gracefully stops the searcher and closes all bleve indexes
  452. func (ds *DistributedSearcher) Stop() error {
  453. var err error
  454. ds.closeOnce.Do(func() {
  455. // Set running to 0
  456. atomic.StoreInt32(&ds.running, 0)
  457. // Close the index alias first (this doesn't close underlying indexes)
  458. if ds.indexAlias != nil {
  459. if closeErr := ds.indexAlias.Close(); closeErr != nil {
  460. logger.Errorf("Failed to close index alias: %v", closeErr)
  461. err = closeErr
  462. }
  463. ds.indexAlias = nil
  464. }
  465. // DON'T close the underlying shards - they are managed by the indexer/shard manager
  466. // The searcher is just a consumer of these shards, not the owner
  467. // Clear the shards slice reference without closing the indexes
  468. ds.shards = nil
  469. // Close cache if it exists
  470. if ds.cache != nil {
  471. ds.cache.Close()
  472. ds.cache = nil
  473. }
  474. })
  475. return err
  476. }