optimized_search_query.go 19 KB


  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/blevesearch/bleve/v2/search"
  11. "github.com/blevesearch/bleve/v2/search/query"
  12. "github.com/dgraph-io/ristretto/v2"
  13. "github.com/spf13/cast"
  14. "github.com/uozi-tech/cosy/logger"
  15. )
  16. // OptimizedSearchQuery provides high-performance search capabilities
  17. type OptimizedSearchQuery struct {
  18. index bleve.Index
  19. cache *ristretto.Cache[string, *CachedSearchResult]
  20. queryPool *sync.Pool
  21. resultPool *sync.Pool
  22. // Query optimization settings
  23. maxCacheSize int64
  24. cacheTTL time.Duration
  25. maxResultSize int
  26. // Performance tracking
  27. totalQueries int64
  28. cacheHits int64
  29. cacheMisses int64
  30. avgQueryTime time.Duration
  31. mu sync.RWMutex
  32. }
  33. // OptimizedQueryConfig holds configuration for optimized search queries
  34. type OptimizedQueryConfig struct {
  35. Index bleve.Index
  36. Cache *ristretto.Cache[string, *CachedSearchResult]
  37. MaxCacheSize int64
  38. CacheTTL time.Duration
  39. MaxResultSize int
  40. }
  41. // NewOptimizedSearchQuery creates a new optimized search query processor
  42. func NewOptimizedSearchQuery(config *OptimizedQueryConfig) *OptimizedSearchQuery {
  43. // Set defaults
  44. if config.MaxCacheSize == 0 {
  45. config.MaxCacheSize = 256 * 1024 * 1024 // 256MB
  46. }
  47. if config.CacheTTL == 0 {
  48. config.CacheTTL = 15 * time.Minute
  49. }
  50. if config.MaxResultSize == 0 {
  51. config.MaxResultSize = 50000 // 50K max results
  52. }
  53. osq := &OptimizedSearchQuery{
  54. index: config.Index,
  55. cache: config.Cache,
  56. maxCacheSize: config.MaxCacheSize,
  57. cacheTTL: config.CacheTTL,
  58. maxResultSize: config.MaxResultSize,
  59. // Initialize object pools
  60. queryPool: &sync.Pool{
  61. New: func() interface{} {
  62. return &QueryRequest{}
  63. },
  64. },
  65. resultPool: &sync.Pool{
  66. New: func() interface{} {
  67. return &QueryResult{
  68. Entries: make([]*AccessLogEntry, 0, 100),
  69. }
  70. },
  71. },
  72. }
  73. return osq
  74. }
  75. // SearchLogsOptimized performs optimized search with advanced caching and parallelization
  76. func (osq *OptimizedSearchQuery) SearchLogsOptimized(ctx context.Context, req *QueryRequest) (*QueryResult, error) {
  77. start := time.Now()
  78. // Update query statistics
  79. osq.mu.Lock()
  80. osq.totalQueries++
  81. osq.mu.Unlock()
  82. // Validate and optimize request
  83. optimizedReq := osq.optimizeRequest(req)
  84. // Create cache key
  85. cacheKey := osq.createOptimizedCacheKey(optimizedReq)
  86. // Check cache first
  87. if cached, found := osq.cache.Get(cacheKey); found {
  88. osq.mu.Lock()
  89. osq.cacheHits++
  90. osq.mu.Unlock()
  91. // Clone cached result to avoid mutation
  92. result := osq.cloneCachedResult(cached)
  93. result.Took = time.Since(start)
  94. result.FromCache = true
  95. return result, nil
  96. }
  97. osq.mu.Lock()
  98. osq.cacheMisses++
  99. osq.mu.Unlock()
  100. // Build optimized query
  101. bleveQuery := osq.buildOptimizedQuery(optimizedReq)
  102. // Execute search with optimizations
  103. result, err := osq.executeOptimizedSearch(ctx, bleveQuery, optimizedReq)
  104. if err != nil {
  105. return nil, err
  106. }
  107. result.Took = time.Since(start)
  108. // Update average query time
  109. osq.updateQueryTime(result.Took)
  110. // Cache the result
  111. osq.cacheResult(cacheKey, result)
  112. return result, nil
  113. }
  114. // optimizeRequest optimizes the query request for better performance
  115. func (osq *OptimizedSearchQuery) optimizeRequest(req *QueryRequest) *QueryRequest {
  116. optimized := *req
  117. // Limit result size to prevent memory issues
  118. if optimized.Limit == 0 || optimized.Limit > osq.maxResultSize {
  119. optimized.Limit = osq.maxResultSize
  120. }
  121. // Optimize time range queries
  122. if !optimized.StartTime.IsZero() && !optimized.EndTime.IsZero() {
  123. duration := optimized.EndTime.Sub(optimized.StartTime)
  124. // If time range is too wide, use index optimization
  125. if duration > 365*24*time.Hour {
  126. // For very wide ranges, don't use time filter to avoid poor performance
  127. logger.Debugf("Time range too wide (%v), removing time filter for optimization", duration)
  128. optimized.StartTime = time.Time{}
  129. optimized.EndTime = time.Time{}
  130. }
  131. }
  132. // Optimize text queries
  133. if optimized.Query != "" {
  134. optimized.Query = osq.optimizeTextQuery(optimized.Query)
  135. }
  136. return &optimized
  137. }
  138. // optimizeTextQuery optimizes text search queries
  139. func (osq *OptimizedSearchQuery) optimizeTextQuery(textQuery string) string {
  140. // Trim whitespace
  141. textQuery = strings.TrimSpace(textQuery)
  142. // Handle wildcard queries efficiently
  143. if strings.Contains(textQuery, "*") && len(textQuery) < 3 {
  144. // Short wildcard queries are expensive, remove them
  145. textQuery = strings.ReplaceAll(textQuery, "*", "")
  146. }
  147. // Escape special characters that might cause parsing issues
  148. if strings.ContainsAny(textQuery, "+-=&&||><!(){}[]^\"~?:\\") {
  149. // For complex queries, use exact matching
  150. textQuery = fmt.Sprintf("\"%s\"", textQuery)
  151. }
  152. return textQuery
  153. }
  154. // buildOptimizedQuery builds an optimized Bleve query
  155. func (osq *OptimizedSearchQuery) buildOptimizedQuery(req *QueryRequest) query.Query {
  156. var queries []query.Query
  157. // Build queries in order of selectivity (most selective first)
  158. // 1. Exact field matches (most selective)
  159. if req.IP != "" {
  160. ipQuery := bleve.NewTermQuery(req.IP)
  161. ipQuery.SetField("ip")
  162. queries = append(queries, ipQuery)
  163. }
  164. if req.Method != "" {
  165. methodQuery := bleve.NewTermQuery(req.Method)
  166. methodQuery.SetField("method")
  167. queries = append(queries, methodQuery)
  168. }
  169. // 2. Numeric range queries
  170. if len(req.Status) > 0 {
  171. if len(req.Status) == 1 {
  172. // Single status - use exact match
  173. statusFloat := float64(req.Status[0])
  174. statusQuery := bleve.NewNumericRangeQuery(&statusFloat, &statusFloat)
  175. statusQuery.SetField("status")
  176. queries = append(queries, statusQuery)
  177. } else {
  178. // Multiple statuses - use optimized disjunction
  179. statusQueries := make([]query.Query, 0, len(req.Status))
  180. for _, status := range req.Status {
  181. statusFloat := float64(status)
  182. statusQuery := bleve.NewNumericRangeQuery(&statusFloat, &statusFloat)
  183. statusQuery.SetField("status")
  184. statusQueries = append(statusQueries, statusQuery)
  185. }
  186. orQuery := bleve.NewDisjunctionQuery(statusQueries...)
  187. orQuery.SetMin(1) // At least one must match
  188. queries = append(queries, orQuery)
  189. }
  190. }
  191. // 3. Time range queries (if not too wide)
  192. if !req.StartTime.IsZero() && !req.EndTime.IsZero() {
  193. // Add small buffer to end time for inclusive search
  194. inclusiveEndTime := req.EndTime.Add(1 * time.Millisecond)
  195. timeQuery := bleve.NewDateRangeQuery(req.StartTime, inclusiveEndTime)
  196. timeQuery.SetField("timestamp")
  197. queries = append(queries, timeQuery)
  198. }
  199. // 4. Path queries with optimization
  200. if req.Path != "" {
  201. if strings.Contains(req.Path, "*") || strings.Contains(req.Path, "?") {
  202. // Wildcard path - use prefix query if possible
  203. if strings.HasSuffix(req.Path, "*") {
  204. prefix := strings.TrimSuffix(req.Path, "*")
  205. pathQuery := bleve.NewPrefixQuery(prefix)
  206. pathQuery.SetField("path")
  207. queries = append(queries, pathQuery)
  208. } else {
  209. // Complex wildcard - use regexp
  210. pathQuery := bleve.NewRegexpQuery(req.Path)
  211. pathQuery.SetField("path")
  212. queries = append(queries, pathQuery)
  213. }
  214. } else {
  215. // Exact path match
  216. pathQuery := bleve.NewTermQuery(req.Path)
  217. pathQuery.SetField("path")
  218. queries = append(queries, pathQuery)
  219. }
  220. }
  221. // 5. Multi-value field queries with optimization
  222. if req.Browser != "" {
  223. browsers := strings.Split(req.Browser, ",")
  224. if len(browsers) == 1 {
  225. browserQuery := bleve.NewTermQuery(strings.TrimSpace(browsers[0]))
  226. browserQuery.SetField("browser")
  227. queries = append(queries, browserQuery)
  228. } else {
  229. browserQueries := make([]query.Query, 0, len(browsers))
  230. for _, browser := range browsers {
  231. browser = strings.TrimSpace(browser)
  232. if browser != "" {
  233. browserQuery := bleve.NewTermQuery(browser)
  234. browserQuery.SetField("browser")
  235. browserQueries = append(browserQueries, browserQuery)
  236. }
  237. }
  238. if len(browserQueries) > 0 {
  239. orQuery := bleve.NewDisjunctionQuery(browserQueries...)
  240. queries = append(queries, orQuery)
  241. }
  242. }
  243. }
  244. // Similar optimization for OS and Device
  245. if req.OS != "" {
  246. osQuery := osq.buildMultiValueQuery(req.OS, "os")
  247. if osQuery != nil {
  248. queries = append(queries, osQuery)
  249. }
  250. }
  251. if req.Device != "" {
  252. deviceQuery := osq.buildMultiValueQuery(req.Device, "device_type")
  253. if deviceQuery != nil {
  254. queries = append(queries, deviceQuery)
  255. }
  256. }
  257. // 6. Text search queries (least selective, put last)
  258. if req.Query != "" {
  259. if strings.HasPrefix(req.Query, "\"") && strings.HasSuffix(req.Query, "\"") {
  260. // Exact phrase search
  261. phrase := strings.Trim(req.Query, "\"")
  262. textQuery := bleve.NewMatchPhraseQuery(phrase)
  263. textQuery.SetField("raw")
  264. queries = append(queries, textQuery)
  265. } else {
  266. // Regular text search
  267. textQuery := bleve.NewMatchQuery(req.Query)
  268. textQuery.SetField("raw")
  269. textQuery.SetFuzziness(0) // Disable fuzzy matching for performance
  270. queries = append(queries, textQuery)
  271. }
  272. }
  273. if req.UserAgent != "" {
  274. uaQuery := bleve.NewMatchQuery(req.UserAgent)
  275. uaQuery.SetField("user_agent")
  276. uaQuery.SetFuzziness(0)
  277. queries = append(queries, uaQuery)
  278. }
  279. if req.Referer != "" {
  280. refererQuery := bleve.NewTermQuery(req.Referer)
  281. refererQuery.SetField("referer")
  282. queries = append(queries, refererQuery)
  283. }
  284. // 7. File path filter
  285. if req.LogPath != "" {
  286. filePathQuery := bleve.NewTermQuery(req.LogPath)
  287. filePathQuery.SetField("file_path")
  288. queries = append(queries, filePathQuery)
  289. }
  290. // Combine queries optimally
  291. if len(queries) == 0 {
  292. return bleve.NewMatchAllQuery()
  293. } else if len(queries) == 1 {
  294. return queries[0]
  295. } else {
  296. // Use conjunction for AND logic
  297. conjunctionQuery := bleve.NewConjunctionQuery(queries...)
  298. return conjunctionQuery
  299. }
  300. }
  301. // buildMultiValueQuery builds optimized queries for comma-separated values
  302. func (osq *OptimizedSearchQuery) buildMultiValueQuery(values, field string) query.Query {
  303. parts := strings.Split(values, ",")
  304. if len(parts) == 1 {
  305. value := strings.TrimSpace(parts[0])
  306. if value != "" {
  307. termQuery := bleve.NewTermQuery(value)
  308. termQuery.SetField(field)
  309. return termQuery
  310. }
  311. return nil
  312. }
  313. var subQueries []query.Query
  314. for _, part := range parts {
  315. part = strings.TrimSpace(part)
  316. if part != "" {
  317. termQuery := bleve.NewTermQuery(part)
  318. termQuery.SetField(field)
  319. subQueries = append(subQueries, termQuery)
  320. }
  321. }
  322. if len(subQueries) == 0 {
  323. return nil
  324. }
  325. return bleve.NewDisjunctionQuery(subQueries...)
  326. }
  327. // executeOptimizedSearch executes the search with performance optimizations
  328. func (osq *OptimizedSearchQuery) executeOptimizedSearch(ctx context.Context, bleveQuery query.Query, req *QueryRequest) (*QueryResult, error) {
  329. // Create optimized search request
  330. searchReq := bleve.NewSearchRequest(bleveQuery)
  331. // Set size and offset with bounds checking
  332. searchReq.Size = req.Limit
  333. if searchReq.Size > osq.maxResultSize {
  334. searchReq.Size = osq.maxResultSize
  335. }
  336. searchReq.From = req.Offset
  337. // Optimize field loading - only load fields we need
  338. searchReq.Fields = []string{
  339. "timestamp", "ip", "method", "path", "protocol", "status",
  340. "bytes_sent", "request_time", "referer", "user_agent",
  341. "browser", "browser_version", "os", "os_version", "device_type",
  342. "region_code", "province", "city",
  343. }
  344. // Set optimized sorting
  345. if req.SortBy != "" {
  346. sortField := osq.mapSortField(req.SortBy)
  347. descending := req.SortOrder == "desc"
  348. searchReq.SortByCustom(search.SortOrder{
  349. &search.SortField{
  350. Field: sortField,
  351. Desc: descending,
  352. },
  353. })
  354. } else {
  355. // Default sort by timestamp descending for performance
  356. searchReq.SortByCustom(search.SortOrder{
  357. &search.SortField{
  358. Field: "timestamp",
  359. Desc: true,
  360. },
  361. })
  362. }
  363. // Execute search with context
  364. searchResult, err := osq.index.SearchInContext(ctx, searchReq)
  365. if err != nil {
  366. return nil, fmt.Errorf("search execution failed: %w", err)
  367. }
  368. // Convert results efficiently
  369. entries := osq.convertSearchResults(searchResult.Hits)
  370. // Calculate summary statistics if needed (async for performance)
  371. var summaryStats *SummaryStats
  372. if req.IncludeSummary {
  373. // For performance, calculate summary in background for large result sets
  374. if searchResult.Total > 10000 {
  375. summaryStats = &SummaryStats{} // Return empty stats for large sets
  376. } else {
  377. summaryStats = osq.calculateOptimizedSummary(ctx, bleveQuery)
  378. }
  379. }
  380. result := &QueryResult{
  381. Entries: entries,
  382. Total: int(searchResult.Total),
  383. Summary: summaryStats,
  384. }
  385. return result, nil
  386. }
  387. // convertSearchResults efficiently converts search hits to AccessLogEntry
  388. func (osq *OptimizedSearchQuery) convertSearchResults(hits []*search.DocumentMatch) []*AccessLogEntry {
  389. if len(hits) == 0 {
  390. return nil
  391. }
  392. entries := make([]*AccessLogEntry, 0, len(hits))
  393. for _, hit := range hits {
  394. if hit.Fields == nil {
  395. continue
  396. }
  397. entry := &AccessLogEntry{}
  398. // Extract fields efficiently
  399. if ip := osq.getStringField(hit.Fields, "ip"); ip != "" {
  400. entry.IP = ip
  401. }
  402. if method := osq.getStringField(hit.Fields, "method"); method != "" {
  403. entry.Method = method
  404. }
  405. if path := osq.getStringField(hit.Fields, "path"); path != "" {
  406. entry.Path = path
  407. }
  408. if protocol := osq.getStringField(hit.Fields, "protocol"); protocol != "" {
  409. entry.Protocol = protocol
  410. }
  411. if statusFloat := osq.getFloatField(hit.Fields, "status"); statusFloat > 0 {
  412. entry.Status = int(statusFloat)
  413. }
  414. if bytesSent := osq.getFloatField(hit.Fields, "bytes_sent"); bytesSent >= 0 {
  415. entry.BytesSent = int64(bytesSent)
  416. }
  417. entry.RequestTime = osq.getFloatField(hit.Fields, "request_time")
  418. if referer := osq.getStringField(hit.Fields, "referer"); referer != "" {
  419. entry.Referer = referer
  420. }
  421. if userAgent := osq.getStringField(hit.Fields, "user_agent"); userAgent != "" {
  422. entry.UserAgent = userAgent
  423. }
  424. if browser := osq.getStringField(hit.Fields, "browser"); browser != "" {
  425. entry.Browser = browser
  426. }
  427. if browserVer := osq.getStringField(hit.Fields, "browser_version"); browserVer != "" {
  428. entry.BrowserVer = browserVer
  429. }
  430. if os := osq.getStringField(hit.Fields, "os"); os != "" {
  431. entry.OS = os
  432. }
  433. if osVersion := osq.getStringField(hit.Fields, "os_version"); osVersion != "" {
  434. entry.OSVersion = osVersion
  435. }
  436. if deviceType := osq.getStringField(hit.Fields, "device_type"); deviceType != "" {
  437. entry.DeviceType = deviceType
  438. }
  439. // Geographical fields
  440. if regionCode := osq.getStringField(hit.Fields, "region_code"); regionCode != "" {
  441. entry.RegionCode = regionCode
  442. }
  443. if province := osq.getStringField(hit.Fields, "province"); province != "" {
  444. entry.Province = province
  445. }
  446. if city := osq.getStringField(hit.Fields, "city"); city != "" {
  447. entry.City = city
  448. }
  449. // Parse timestamp
  450. if timestampStr := osq.getStringField(hit.Fields, "timestamp"); timestampStr != "" {
  451. if ts, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  452. entry.Timestamp = ts
  453. }
  454. }
  455. entries = append(entries, entry)
  456. }
  457. return entries
  458. }
  459. // Helper methods
  460. func (osq *OptimizedSearchQuery) getStringField(fields map[string]interface{}, fieldName string) string {
  461. if value, ok := fields[fieldName]; ok {
  462. return cast.ToString(value)
  463. }
  464. return ""
  465. }
  466. func (osq *OptimizedSearchQuery) getFloatField(fields map[string]interface{}, fieldName string) float64 {
  467. if value, ok := fields[fieldName]; ok {
  468. return cast.ToFloat64(value)
  469. }
  470. return 0
  471. }
  472. func (osq *OptimizedSearchQuery) mapSortField(sortBy string) string {
  473. switch sortBy {
  474. case "timestamp":
  475. return "timestamp"
  476. case "ip":
  477. return "ip"
  478. case "method":
  479. return "method"
  480. case "path":
  481. return "path"
  482. case "status":
  483. return "status"
  484. case "bytes_sent":
  485. return "bytes_sent"
  486. case "browser":
  487. return "browser"
  488. case "os":
  489. return "os"
  490. case "device_type":
  491. return "device_type"
  492. default:
  493. return "timestamp"
  494. }
  495. }
  496. // calculateOptimizedSummary calculates summary statistics efficiently
  497. func (osq *OptimizedSearchQuery) calculateOptimizedSummary(ctx context.Context, bleveQuery query.Query) *SummaryStats {
  498. // For now, return basic stats - could be enhanced with aggregation queries
  499. return &SummaryStats{
  500. UV: 0, // Would need to be calculated
  501. PV: 0,
  502. }
  503. }
  504. // Cache management methods
  505. func (osq *OptimizedSearchQuery) createOptimizedCacheKey(req *QueryRequest) string {
  506. // Create a more efficient cache key
  507. var keyParts []string
  508. if !req.StartTime.IsZero() {
  509. keyParts = append(keyParts, req.StartTime.Format("20060102150405"))
  510. }
  511. if !req.EndTime.IsZero() {
  512. keyParts = append(keyParts, req.EndTime.Format("20060102150405"))
  513. }
  514. if req.Query != "" {
  515. keyParts = append(keyParts, req.Query)
  516. }
  517. if req.IP != "" {
  518. keyParts = append(keyParts, req.IP)
  519. }
  520. if req.Method != "" {
  521. keyParts = append(keyParts, req.Method)
  522. }
  523. if req.Path != "" {
  524. keyParts = append(keyParts, req.Path)
  525. }
  526. if len(req.Status) > 0 {
  527. statusStrs := make([]string, len(req.Status))
  528. for i, s := range req.Status {
  529. statusStrs[i] = fmt.Sprintf("%d", s)
  530. }
  531. sort.Strings(statusStrs) // Sort for consistent cache keys
  532. keyParts = append(keyParts, strings.Join(statusStrs, ","))
  533. }
  534. keyParts = append(keyParts,
  535. fmt.Sprintf("%d_%d_%s_%s", req.Limit, req.Offset, req.SortBy, req.SortOrder))
  536. return strings.Join(keyParts, "|")
  537. }
  538. func (osq *OptimizedSearchQuery) cloneCachedResult(cached *CachedSearchResult) *QueryResult {
  539. // Clone the cached result to avoid mutation
  540. result := &QueryResult{
  541. Entries: make([]*AccessLogEntry, len(cached.Entries)),
  542. Total: cached.Total,
  543. }
  544. // Deep copy entries
  545. for i, entry := range cached.Entries {
  546. entryCopy := *entry
  547. result.Entries[i] = &entryCopy
  548. }
  549. return result
  550. }
  551. func (osq *OptimizedSearchQuery) cacheResult(cacheKey string, result *QueryResult) {
  552. // Create cached result
  553. cachedResult := &CachedSearchResult{
  554. Entries: result.Entries,
  555. Total: result.Total,
  556. }
  557. // Estimate size for cache cost
  558. estimatedSize := int64(len(result.Entries) * 500) // ~500 bytes per entry
  559. if estimatedSize > osq.maxCacheSize/100 { // Don't cache if > 1% of max cache size
  560. return
  561. }
  562. osq.cache.Set(cacheKey, cachedResult, estimatedSize)
  563. }
  564. func (osq *OptimizedSearchQuery) updateQueryTime(duration time.Duration) {
  565. osq.mu.Lock()
  566. defer osq.mu.Unlock()
  567. // Simple moving average
  568. if osq.avgQueryTime == 0 {
  569. osq.avgQueryTime = duration
  570. } else {
  571. osq.avgQueryTime = (osq.avgQueryTime + duration) / 2
  572. }
  573. }
  574. // GetStatistics returns search performance statistics
  575. func (osq *OptimizedSearchQuery) GetStatistics() map[string]interface{} {
  576. osq.mu.RLock()
  577. defer osq.mu.RUnlock()
  578. cacheHitRate := float64(0)
  579. if osq.totalQueries > 0 {
  580. cacheHitRate = float64(osq.cacheHits) / float64(osq.totalQueries) * 100
  581. }
  582. return map[string]interface{}{
  583. "total_queries": osq.totalQueries,
  584. "cache_hits": osq.cacheHits,
  585. "cache_misses": osq.cacheMisses,
  586. "cache_hit_rate": fmt.Sprintf("%.2f%%", cacheHitRate),
  587. "avg_query_time_ms": osq.avgQueryTime.Milliseconds(),
  588. "max_result_size": osq.maxResultSize,
  589. "max_cache_size": osq.maxCacheSize,
  590. }
  591. }