|
|
@@ -3,12 +3,12 @@ package searcher
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
- "sort"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/blevesearch/bleve/v2"
|
|
|
+ "github.com/blevesearch/bleve/v2/search"
|
|
|
"github.com/blevesearch/bleve/v2/search/query"
|
|
|
)
|
|
|
|
|
|
@@ -16,6 +16,7 @@ import (
|
|
|
type DistributedSearcher struct {
|
|
|
config *Config
|
|
|
shards []bleve.Index
|
|
|
+ indexAlias bleve.IndexAlias // Index alias for global scoring
|
|
|
queryBuilder *QueryBuilderService
|
|
|
cache *OptimizedSearchCache
|
|
|
stats *searcherStats
|
|
|
@@ -47,9 +48,21 @@ func NewDistributedSearcher(config *Config, shards []bleve.Index) *DistributedSe
|
|
|
config = DefaultSearcherConfig()
|
|
|
}
|
|
|
|
|
|
+ // Create index alias for global scoring across shards
|
|
|
+ indexAlias := bleve.NewIndexAlias(shards...)
|
|
|
+
|
|
|
+ // Set the index mapping from the first shard (all shards should have the same mapping)
|
|
|
+ if len(shards) > 0 && shards[0] != nil {
|
|
|
+ mapping := shards[0].Mapping()
|
|
|
+ if err := indexAlias.SetIndexMapping(mapping); err != nil {
|
|
|
+ // Log error but continue - this is not critical for basic functionality
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
searcher := &DistributedSearcher{
|
|
|
config: config,
|
|
|
shards: shards,
|
|
|
+ indexAlias: indexAlias,
|
|
|
queryBuilder: NewQueryBuilderService(),
|
|
|
semaphore: make(chan struct{}, config.MaxConcurrency),
|
|
|
stats: &searcherStats{
|
|
|
@@ -176,25 +189,106 @@ func (ds *DistributedSearcher) executeDistributedSearch(ctx context.Context, que
|
|
|
return nil, fmt.Errorf("no healthy shards available")
|
|
|
}
|
|
|
|
|
|
+ // Use Bleve's native distributed search with global scoring for consistent pagination
|
|
|
+ return ds.executeGlobalScoringSearch(ctx, query, req)
|
|
|
+}
|
|
|
+
|
|
|
+// executeGlobalScoringSearch uses Bleve's native distributed search with global scoring
|
|
|
+// This ensures consistent pagination by letting Bleve handle cross-shard ranking
|
|
|
+func (ds *DistributedSearcher) executeGlobalScoringSearch(ctx context.Context, query query.Query, req *SearchRequest) (*SearchResult, error) {
|
|
|
+ // Create search request with proper pagination
|
|
|
searchReq := bleve.NewSearchRequest(query)
|
|
|
- // Use a very large size or implement batching for dashboard requests
|
|
|
- if req.Limit == 0 {
|
|
|
- searchReq.Size = 10_000_000 // Very large limit for unlimited requests
|
|
|
- } else {
|
|
|
- searchReq.Size = req.Limit + req.Offset // Ensure we get enough data for pagination
|
|
|
+
|
|
|
+ // Set pagination parameters directly - Bleve will handle distributed pagination correctly
|
|
|
+ searchReq.Size = req.Limit
|
|
|
+ if searchReq.Size <= 0 {
|
|
|
+ searchReq.Size = 50 // Default page size
|
|
|
+ }
|
|
|
+ searchReq.From = req.Offset
|
|
|
+
|
|
|
+ // Configure the search request with proper sorting and other settings
|
|
|
+ ds.configureSearchRequest(searchReq, req)
|
|
|
+
|
|
|
+ // Enable global scoring for distributed search consistency
|
|
|
+ // This is the key fix from Bleve documentation for distributed search
|
|
|
+ globalCtx := context.WithValue(ctx, search.SearchTypeKey, search.GlobalScoring)
|
|
|
+
|
|
|
+ // Execute search using Bleve's IndexAlias with global scoring
|
|
|
+ result, err := ds.indexAlias.SearchInContext(globalCtx, searchReq)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("global scoring search failed: %w", err)
|
|
|
}
|
|
|
- searchReq.From = 0
|
|
|
+
|
|
|
+ // Convert Bleve result to our SearchResult format
|
|
|
+ return ds.convertBleveResult(result), nil
|
|
|
+}
|
|
|
|
|
|
- // Set up sorting with proper direction
|
|
|
- if req.SortBy != "" {
|
|
|
- sortField := req.SortBy
|
|
|
- if req.SortOrder == "desc" {
|
|
|
- sortField = "-" + sortField // Bleve uses "-" prefix for descending sort
|
|
|
+// convertBleveResult converts a Bleve SearchResult to our SearchResult format
|
|
|
+func (ds *DistributedSearcher) convertBleveResult(bleveResult *bleve.SearchResult) *SearchResult {
|
|
|
+ result := &SearchResult{
|
|
|
+ Hits: make([]*SearchHit, 0, len(bleveResult.Hits)),
|
|
|
+ TotalHits: bleveResult.Total,
|
|
|
+ MaxScore: bleveResult.MaxScore,
|
|
|
+ Facets: make(map[string]*Facet),
|
|
|
+ }
|
|
|
+
|
|
|
+ // Convert hits
|
|
|
+ for _, hit := range bleveResult.Hits {
|
|
|
+ searchHit := &SearchHit{
|
|
|
+ ID: hit.ID,
|
|
|
+ Score: hit.Score,
|
|
|
+ Fields: hit.Fields,
|
|
|
+ Highlighting: hit.Fragments,
|
|
|
+ Index: hit.Index,
|
|
|
}
|
|
|
- searchReq.SortBy([]string{sortField})
|
|
|
+ result.Hits = append(result.Hits, searchHit)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Convert facets if present
|
|
|
+ for name, facet := range bleveResult.Facets {
|
|
|
+ convertedFacet := &Facet{
|
|
|
+ Field: name,
|
|
|
+ Total: facet.Total,
|
|
|
+ Missing: facet.Missing,
|
|
|
+ Other: facet.Other,
|
|
|
+ Terms: make([]*FacetTerm, 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ if facet.Terms != nil {
|
|
|
+ facetTerms := facet.Terms.Terms()
|
|
|
+ convertedFacet.Terms = make([]*FacetTerm, 0, len(facetTerms))
|
|
|
+ for _, term := range facetTerms {
|
|
|
+ convertedFacet.Terms = append(convertedFacet.Terms, &FacetTerm{
|
|
|
+ Term: term.Term,
|
|
|
+ Count: term.Count,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Facets[name] = convertedFacet
|
|
|
+ }
|
|
|
+
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+// configureSearchRequest sets up common search request configuration
|
|
|
+func (ds *DistributedSearcher) configureSearchRequest(searchReq *bleve.SearchRequest, req *SearchRequest) {
|
|
|
+ // Set up sorting with proper Bleve syntax
|
|
|
+ sortField := req.SortBy
|
|
|
+ if sortField == "" {
|
|
|
+ sortField = "timestamp" // Default sort field
|
|
|
+ }
|
|
|
+
|
|
|
+ sortOrder := req.SortOrder
|
|
|
+ if sortOrder == "" {
|
|
|
+ sortOrder = SortOrderDesc // Default sort order
|
|
|
+ }
|
|
|
+
|
|
|
+ // Apply Bleve sorting - use "-" prefix for descending order
|
|
|
+ if sortOrder == SortOrderDesc {
|
|
|
+ searchReq.SortBy([]string{"-" + sortField})
|
|
|
} else {
|
|
|
- // Default to timestamp descending if no sort specified
|
|
|
- searchReq.SortBy([]string{"-timestamp"})
|
|
|
+ searchReq.SortBy([]string{sortField})
|
|
|
}
|
|
|
|
|
|
// Configure highlighting
|
|
|
@@ -233,213 +327,31 @@ func (ds *DistributedSearcher) executeDistributedSearch(ctx context.Context, que
|
|
|
} else {
|
|
|
searchReq.Fields = []string{"*"}
|
|
|
}
|
|
|
-
|
|
|
- // Execute searches in parallel
|
|
|
- shardResults := make(chan *bleve.SearchResult, len(healthyShards))
|
|
|
- errChan := make(chan error, len(healthyShards))
|
|
|
- var wg sync.WaitGroup
|
|
|
-
|
|
|
- for _, shardID := range healthyShards {
|
|
|
- wg.Add(1)
|
|
|
- go func(sid int) {
|
|
|
- defer wg.Done()
|
|
|
- shard := ds.shards[sid]
|
|
|
- if shard == nil {
|
|
|
- errChan <- fmt.Errorf("shard %d is nil", sid)
|
|
|
- return
|
|
|
- }
|
|
|
- result, err := shard.SearchInContext(ctx, searchReq)
|
|
|
- if err != nil {
|
|
|
- errChan <- fmt.Errorf("shard %d error: %w", sid, err)
|
|
|
- ds.markShardUnhealthy(sid, err)
|
|
|
- return
|
|
|
- }
|
|
|
- shardResults <- result
|
|
|
- ds.markShardHealthy(sid)
|
|
|
- }(shardID)
|
|
|
- }
|
|
|
-
|
|
|
- wg.Wait()
|
|
|
- close(errChan)
|
|
|
- close(shardResults)
|
|
|
-
|
|
|
- // Collect errors
|
|
|
- var errors []error
|
|
|
- for err := range errChan {
|
|
|
- errors = append(errors, err)
|
|
|
- }
|
|
|
- if len(errors) > 0 {
|
|
|
- // For simplicity, just return the first error. A more robust implementation might wrap all errors.
|
|
|
- return nil, errors[0]
|
|
|
- }
|
|
|
-
|
|
|
- // Convert channel to slice for merging
|
|
|
- resultsSlice := make([]*bleve.SearchResult, 0, len(shardResults))
|
|
|
- for result := range shardResults {
|
|
|
- resultsSlice = append(resultsSlice, result)
|
|
|
- }
|
|
|
-
|
|
|
- // Merge results from all shards
|
|
|
- mergedResult := ds.mergeShardResults(resultsSlice)
|
|
|
-
|
|
|
- // Perform a stable sort in-memory on the combined result set.
|
|
|
- // This is inefficient for large datasets but necessary for accurate cross-shard sorting.
|
|
|
- sort.SliceStable(mergedResult.Hits, func(i, j int) bool {
|
|
|
- // Handle sorting for different field types
|
|
|
- val1, ok1 := mergedResult.Hits[i].Fields[req.SortBy]
|
|
|
- val2, ok2 := mergedResult.Hits[j].Fields[req.SortBy]
|
|
|
- if !ok1 || !ok2 {
|
|
|
- return false // Cannot compare if fields are missing
|
|
|
- }
|
|
|
-
|
|
|
- // Assuming timestamp or other numeric fields for now
|
|
|
- fVal1, ok1 := val1.(float64)
|
|
|
- fVal2, ok2 := val2.(float64)
|
|
|
- if !ok1 || !ok2 {
|
|
|
- return false // Cannot compare non-numeric fields
|
|
|
- }
|
|
|
-
|
|
|
- if req.SortOrder == SortOrderDesc {
|
|
|
- return fVal1 > fVal2
|
|
|
- }
|
|
|
- return fVal1 < fVal2
|
|
|
- })
|
|
|
-
|
|
|
- // Manually apply pagination to the globally sorted list
|
|
|
- if req.Limit > 0 {
|
|
|
- start := req.Offset
|
|
|
- end := start + req.Limit
|
|
|
-
|
|
|
- if start >= len(mergedResult.Hits) {
|
|
|
- mergedResult.Hits = []*SearchHit{}
|
|
|
- } else {
|
|
|
- if end > len(mergedResult.Hits) {
|
|
|
- end = len(mergedResult.Hits)
|
|
|
- }
|
|
|
- mergedResult.Hits = mergedResult.Hits[start:end]
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return mergedResult, nil
|
|
|
}
|
|
|
|
|
|
-// mergeShardResults merges results from multiple Bleve search results into a single SearchResult
|
|
|
-func (ds *DistributedSearcher) mergeShardResults(shardResults []*bleve.SearchResult) *SearchResult {
|
|
|
- merged := &SearchResult{
|
|
|
- Hits: make([]*SearchHit, 0),
|
|
|
- TotalHits: 0,
|
|
|
- MaxScore: 0,
|
|
|
- Facets: make(map[string]*Facet),
|
|
|
- }
|
|
|
-
|
|
|
- for _, result := range shardResults {
|
|
|
- if result == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- merged.TotalHits += result.Total
|
|
|
- if result.MaxScore > merged.MaxScore {
|
|
|
- merged.MaxScore = result.MaxScore
|
|
|
- }
|
|
|
-
|
|
|
- // Merge hits
|
|
|
- for _, hit := range result.Hits {
|
|
|
- merged.Hits = append(merged.Hits, &SearchHit{
|
|
|
- ID: hit.ID,
|
|
|
- Score: hit.Score,
|
|
|
- Fields: hit.Fields,
|
|
|
- Highlighting: hit.Fragments,
|
|
|
- Index: hit.Index,
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- // Merge facets
|
|
|
- for name, facet := range result.Facets {
|
|
|
- if _, ok := merged.Facets[name]; !ok {
|
|
|
- merged.Facets[name] = &Facet{
|
|
|
- Field: name,
|
|
|
- Total: 0,
|
|
|
- Terms: make([]*FacetTerm, 0),
|
|
|
- }
|
|
|
- }
|
|
|
- merged.Facets[name].Total += facet.Total
|
|
|
- merged.Facets[name].Missing += facet.Missing
|
|
|
- merged.Facets[name].Other += facet.Other
|
|
|
-
|
|
|
- // A map-based merge to correctly handle term counts across shards.
|
|
|
- termMap := make(map[string]*FacetTerm)
|
|
|
- // Prime the map with already merged terms
|
|
|
- for _, term := range merged.Facets[name].Terms {
|
|
|
- termMap[term.Term] = term
|
|
|
- }
|
|
|
- // Merge new terms from the current shard's facet result
|
|
|
- if facet.Terms != nil {
|
|
|
- for _, term := range facet.Terms.Terms() {
|
|
|
- if existing, ok := termMap[term.Term]; ok {
|
|
|
- existing.Count += term.Count
|
|
|
- } else {
|
|
|
- termMap[term.Term] = &FacetTerm{Term: term.Term, Count: term.Count}
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Convert map back to slice and sort
|
|
|
- newTerms := make([]*FacetTerm, 0, len(termMap))
|
|
|
- for _, term := range termMap {
|
|
|
- newTerms = append(newTerms, term)
|
|
|
- }
|
|
|
- sort.Slice(newTerms, func(i, j int) bool {
|
|
|
- return newTerms[i].Count > newTerms[j].Count
|
|
|
- })
|
|
|
- merged.Facets[name].Terms = newTerms
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return merged
|
|
|
-}
|
|
|
|
|
|
// Utility methods
|
|
|
|
|
|
func (ds *DistributedSearcher) setRequestDefaults(req *SearchRequest) {
|
|
|
- if req.SortBy == "" {
|
|
|
- req.SortBy = "timestamp"
|
|
|
- }
|
|
|
- if req.SortOrder == "" {
|
|
|
- req.SortOrder = SortOrderDesc
|
|
|
- }
|
|
|
if req.Timeout == 0 {
|
|
|
req.Timeout = ds.config.TimeoutDuration
|
|
|
}
|
|
|
- req.UseCache = ds.config.EnableCache
|
|
|
-}
|
|
|
-
|
|
|
-func (ds *DistributedSearcher) getHealthyShards() []int {
|
|
|
- var healthy []int
|
|
|
- ds.stats.mutex.RLock()
|
|
|
- for id, stat := range ds.stats.shardStats {
|
|
|
- if stat.IsHealthy {
|
|
|
- healthy = append(healthy, id)
|
|
|
- }
|
|
|
+ if req.UseCache && !ds.config.EnableCache {
|
|
|
+ req.UseCache = false
|
|
|
}
|
|
|
- ds.stats.mutex.RUnlock()
|
|
|
- return healthy
|
|
|
-}
|
|
|
-
|
|
|
-func (ds *DistributedSearcher) markShardHealthy(shardID int) {
|
|
|
- ds.stats.mutex.Lock()
|
|
|
- if stat, exists := ds.stats.shardStats[shardID]; exists {
|
|
|
- stat.IsHealthy = true
|
|
|
- stat.LastSearchTime = time.Now()
|
|
|
+ if !req.UseCache && ds.config.EnableCache {
|
|
|
+ req.UseCache = true
|
|
|
}
|
|
|
- ds.stats.mutex.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (ds *DistributedSearcher) markShardUnhealthy(shardID int, err error) {
|
|
|
- ds.stats.mutex.Lock()
|
|
|
- if stat, exists := ds.stats.shardStats[shardID]; exists {
|
|
|
- stat.IsHealthy = false
|
|
|
- stat.ErrorCount++
|
|
|
+func (ds *DistributedSearcher) getHealthyShards() []int {
|
|
|
+ // With IndexAlias, Bleve handles shard health internally
|
|
|
+ // Return all shard IDs since the alias will route correctly
|
|
|
+ healthy := make([]int, len(ds.shards))
|
|
|
+ for i := range ds.shards {
|
|
|
+ healthy[i] = i
|
|
|
}
|
|
|
- ds.stats.mutex.Unlock()
|
|
|
+ return healthy
|
|
|
}
|
|
|
|
|
|
func (ds *DistributedSearcher) updateShardStats(shardID int, duration time.Duration, success bool) {
|