search.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. package cache
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/blevesearch/bleve/v2"
  11. "github.com/blevesearch/bleve/v2/analysis/lang/en"
  12. "github.com/blevesearch/bleve/v2/mapping"
  13. "github.com/blevesearch/bleve/v2/search/query"
  14. "github.com/gabriel-vasile/mimetype"
  15. "github.com/uozi-tech/cosy/logger"
  16. )
  17. // SearchDocument represents a document in the search index
  18. type SearchDocument struct {
  19. ID string `json:"id"`
  20. Type string `json:"type"` // "site", "stream", or "config"
  21. Name string `json:"name"` // extracted from filename
  22. Path string `json:"path"` // file path
  23. Content string `json:"content"` // file content
  24. UpdatedAt time.Time `json:"updated_at"`
  25. }
  26. // SearchResult represents a search result
  27. type SearchResult struct {
  28. Document SearchDocument `json:"document"`
  29. Score float64 `json:"score"`
  30. }
  31. // SearchIndexer manages the Bleve search index
  32. type SearchIndexer struct {
  33. index bleve.Index
  34. indexPath string
  35. indexMutex sync.RWMutex
  36. ctx context.Context
  37. cancel context.CancelFunc
  38. cleanupOnce sync.Once
  39. // Memory management
  40. totalContentSize int64
  41. documentCount int64
  42. maxMemoryUsage int64
  43. memoryMutex sync.RWMutex
  44. }
  45. var (
  46. searchIndexer *SearchIndexer
  47. searchIndexerOnce sync.Once
  48. )
  49. // GetSearchIndexer returns the singleton search indexer instance
  50. func GetSearchIndexer() *SearchIndexer {
  51. searchIndexerOnce.Do(func() {
  52. // Create a temporary directory for the index
  53. tempDir, err := os.MkdirTemp("", "nginx-ui-search-index-*")
  54. if err != nil {
  55. logger.Fatalf("Failed to create temp directory for search index: %v", err)
  56. }
  57. searchIndexer = &SearchIndexer{
  58. indexPath: tempDir,
  59. maxMemoryUsage: 100 * 1024 * 1024, // 100MB memory limit for indexed content
  60. }
  61. })
  62. return searchIndexer
  63. }
  64. // InitSearchIndex initializes the search index
  65. func InitSearchIndex(ctx context.Context) error {
  66. indexer := GetSearchIndexer()
  67. return indexer.Initialize(ctx)
  68. }
  69. // Initialize sets up the Bleve search index
  70. func (si *SearchIndexer) Initialize(ctx context.Context) error {
  71. si.indexMutex.Lock()
  72. defer si.indexMutex.Unlock()
  73. // Create a derived context for cleanup
  74. si.ctx, si.cancel = context.WithCancel(ctx)
  75. // Check if context is cancelled
  76. select {
  77. case <-ctx.Done():
  78. return ctx.Err()
  79. default:
  80. }
  81. // Try to open existing index, create new if it fails
  82. var err error
  83. si.index, err = bleve.Open(si.indexPath)
  84. if err != nil {
  85. // Check context again before creating new index
  86. select {
  87. case <-ctx.Done():
  88. return ctx.Err()
  89. default:
  90. }
  91. logger.Info("Creating new search index at:", si.indexPath)
  92. si.index, err = bleve.New(si.indexPath, si.createIndexMapping())
  93. if err != nil {
  94. return fmt.Errorf("failed to create search index: %w", err)
  95. }
  96. }
  97. // Register callback for config scanning
  98. RegisterCallback("search.handleConfigScan", si.handleConfigScan)
  99. // Start cleanup goroutine
  100. go si.watchContext()
  101. logger.Info("Search index initialized successfully")
  102. return nil
  103. }
  104. // watchContext monitors the context and cleans up when it's cancelled
  105. func (si *SearchIndexer) watchContext() {
  106. <-si.ctx.Done()
  107. si.cleanup()
  108. }
  109. // cleanup closes the index and removes the temporary directory
  110. func (si *SearchIndexer) cleanup() {
  111. si.cleanupOnce.Do(func() {
  112. logger.Info("Cleaning up search index...")
  113. si.indexMutex.Lock()
  114. defer si.indexMutex.Unlock()
  115. if si.index != nil {
  116. si.index.Close()
  117. si.index = nil
  118. }
  119. // Reset memory tracking
  120. si.memoryMutex.Lock()
  121. si.totalContentSize = 0
  122. si.documentCount = 0
  123. si.memoryMutex.Unlock()
  124. // Remove the temporary directory
  125. if err := os.RemoveAll(si.indexPath); err != nil {
  126. logger.Error("Failed to remove search index directory:", err)
  127. } else {
  128. logger.Info("Search index directory removed successfully")
  129. }
  130. })
  131. }
  132. // createIndexMapping creates the mapping for the search index
  133. func (si *SearchIndexer) createIndexMapping() mapping.IndexMapping {
  134. docMapping := bleve.NewDocumentMapping()
  135. // Text fields with standard analyzer
  136. textField := bleve.NewTextFieldMapping()
  137. textField.Analyzer = en.AnalyzerName
  138. textField.Store = true
  139. textField.Index = true
  140. // Keyword fields for exact match
  141. keywordField := bleve.NewKeywordFieldMapping()
  142. keywordField.Store = true
  143. keywordField.Index = true
  144. // Date field
  145. dateField := bleve.NewDateTimeFieldMapping()
  146. dateField.Store = true
  147. dateField.Index = true
  148. // Map fields to types
  149. fieldMappings := map[string]*mapping.FieldMapping{
  150. "id": keywordField,
  151. "type": keywordField,
  152. "path": keywordField,
  153. "name": textField,
  154. "content": textField,
  155. "updated_at": dateField,
  156. }
  157. for field, fieldMapping := range fieldMappings {
  158. docMapping.AddFieldMappingsAt(field, fieldMapping)
  159. }
  160. indexMapping := bleve.NewIndexMapping()
  161. indexMapping.DefaultMapping = docMapping
  162. indexMapping.DefaultAnalyzer = en.AnalyzerName
  163. return indexMapping
  164. }
  165. // handleConfigScan processes scanned config files and indexes them
  166. func (si *SearchIndexer) handleConfigScan(configPath string, content []byte) (err error) {
  167. // Add panic recovery to prevent the entire application from crashing
  168. defer func() {
  169. if r := recover(); r != nil {
  170. err = fmt.Errorf("panic during config scan: %v", r)
  171. logger.Error("Panic occurred while scanning config", "config_path", configPath, "content_size", len(content), "error", err)
  172. }
  173. }()
  174. // File size limit: 1MB to prevent memory overflow and improve performance
  175. const maxFileSize = 1024 * 1024 // 1MB
  176. if len(content) > maxFileSize {
  177. return nil
  178. }
  179. // Skip empty files
  180. if len(content) == 0 {
  181. return nil
  182. }
  183. // Basic content validation: check if it's a configuration file
  184. if !isConfigFile(content) {
  185. return nil
  186. }
  187. docType := si.determineConfigType(configPath)
  188. if docType == "" {
  189. return nil // Skip unsupported file types
  190. }
  191. doc := SearchDocument{
  192. ID: configPath,
  193. Type: docType,
  194. Name: filepath.Base(configPath),
  195. Path: configPath,
  196. Content: string(content),
  197. UpdatedAt: time.Now(),
  198. }
  199. return si.IndexDocument(doc)
  200. }
  201. // determineConfigType determines the type of config file based on path
  202. func (si *SearchIndexer) determineConfigType(configPath string) string {
  203. normalizedPath := filepath.ToSlash(configPath)
  204. switch {
  205. case strings.Contains(normalizedPath, "sites-available") || strings.Contains(normalizedPath, "sites-enabled"):
  206. return "site"
  207. case strings.Contains(normalizedPath, "streams-available") || strings.Contains(normalizedPath, "streams-enabled"):
  208. return "stream"
  209. default:
  210. return "config"
  211. }
  212. }
  213. // IndexDocument indexes a single document
  214. func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
  215. // Add panic recovery to prevent the entire application from crashing
  216. defer func() {
  217. if r := recover(); r != nil {
  218. err = fmt.Errorf("panic during indexing: %v", r)
  219. logger.Error("Panic occurred while indexing document", "document_id", doc.ID, "error", err)
  220. }
  221. }()
  222. // Additional size check as a safety measure
  223. if len(doc.Content) > 2*1024*1024 { // 2MB absolute limit
  224. return fmt.Errorf("document content too large: %d bytes", len(doc.Content))
  225. }
  226. si.indexMutex.RLock()
  227. defer si.indexMutex.RUnlock()
  228. if si.index == nil {
  229. return fmt.Errorf("search index not initialized")
  230. }
  231. // Check if document already exists in the index
  232. contentSize := int64(len(doc.Content))
  233. existingDoc, err := si.index.Document(doc.ID)
  234. isNewDocument := err != nil || existingDoc == nil
  235. // For new documents, check memory limits
  236. if isNewDocument {
  237. if !si.checkMemoryLimitBeforeIndexing(contentSize) {
  238. logger.Warn("Skipping document due to memory limit", "document_id", doc.ID, "content_size", contentSize)
  239. return nil
  240. }
  241. }
  242. // Index the document (this will update existing or create new)
  243. err = si.index.Index(doc.ID, doc)
  244. if err != nil {
  245. return err
  246. }
  247. // Update memory usage tracking only for new documents
  248. if isNewDocument {
  249. si.updateMemoryUsage(doc.ID, contentSize, true)
  250. }
  251. return nil
  252. }
  253. // Search performs a search query
  254. func (si *SearchIndexer) Search(ctx context.Context, queryStr string, limit int) ([]SearchResult, error) {
  255. return si.searchWithType(ctx, queryStr, "", limit)
  256. }
  257. // SearchByType performs a search filtered by document type
  258. func (si *SearchIndexer) SearchByType(ctx context.Context, queryStr string, docType string, limit int) ([]SearchResult, error) {
  259. return si.searchWithType(ctx, queryStr, docType, limit)
  260. }
  261. // searchWithType performs the actual search with optional type filtering
  262. func (si *SearchIndexer) searchWithType(ctx context.Context, queryStr string, docType string, limit int) ([]SearchResult, error) {
  263. si.indexMutex.RLock()
  264. defer si.indexMutex.RUnlock()
  265. // Check if context is cancelled
  266. select {
  267. case <-ctx.Done():
  268. return nil, ctx.Err()
  269. default:
  270. }
  271. if si.index == nil {
  272. return nil, fmt.Errorf("search index not initialized")
  273. }
  274. if limit <= 0 {
  275. limit = 500 // Increase default limit to handle more results
  276. }
  277. query := si.buildQuery(queryStr, docType)
  278. searchRequest := bleve.NewSearchRequest(query)
  279. searchRequest.Size = limit
  280. searchRequest.Fields = []string{"*"}
  281. // Use a channel to handle search with context cancellation
  282. type searchResult struct {
  283. result *bleve.SearchResult
  284. err error
  285. }
  286. resultChan := make(chan searchResult, 1)
  287. go func() {
  288. result, err := si.index.Search(searchRequest)
  289. resultChan <- searchResult{result: result, err: err}
  290. }()
  291. // Wait for search result or context cancellation
  292. select {
  293. case <-ctx.Done():
  294. return nil, ctx.Err()
  295. case res := <-resultChan:
  296. if res.err != nil {
  297. return nil, fmt.Errorf("search execution failed: %w", res.err)
  298. }
  299. results := si.convertResults(res.result)
  300. // log the search execution
  301. logger.Debugf("Search index query '%s' (type: %s, limit: %d) returned %d results",
  302. queryStr, docType, limit, len(results))
  303. return results, nil
  304. }
  305. }
  306. // buildQuery builds a search query with optional type filtering
  307. func (si *SearchIndexer) buildQuery(queryStr string, docType string) query.Query {
  308. mainQuery := bleve.NewBooleanQuery()
  309. // Add type filter if specified
  310. if docType != "" {
  311. typeQuery := bleve.NewTermQuery(docType)
  312. typeQuery.SetField("type")
  313. mainQuery.AddMust(typeQuery)
  314. }
  315. // Add text search across name and content fields only
  316. textQuery := bleve.NewBooleanQuery()
  317. searchFields := []string{"name", "content"}
  318. for _, field := range searchFields {
  319. // Create a boolean query for this field to combine multiple query types
  320. fieldQuery := bleve.NewBooleanQuery()
  321. // 1. Exact match query (highest priority)
  322. matchQuery := bleve.NewMatchQuery(queryStr)
  323. matchQuery.SetField(field)
  324. matchQuery.SetBoost(3.0) // Higher boost for exact matches
  325. fieldQuery.AddShould(matchQuery)
  326. // 2. Prefix query for partial matches (e.g., "access" matches "access_log")
  327. prefixQuery := bleve.NewPrefixQuery(queryStr)
  328. prefixQuery.SetField(field)
  329. prefixQuery.SetBoost(2.0) // Medium boost for prefix matches
  330. fieldQuery.AddShould(prefixQuery)
  331. // 3. Wildcard query for more flexible matching
  332. wildcardQuery := bleve.NewWildcardQuery("*" + queryStr + "*")
  333. wildcardQuery.SetField(field)
  334. wildcardQuery.SetBoost(1.5) // Lower boost for wildcard matches
  335. fieldQuery.AddShould(wildcardQuery)
  336. // 4. Fuzzy match query (allows 1 character difference)
  337. fuzzyQuery := bleve.NewFuzzyQuery(queryStr)
  338. fuzzyQuery.SetField(field)
  339. fuzzyQuery.SetFuzziness(1)
  340. fuzzyQuery.SetBoost(1.0) // Lowest boost for fuzzy matches
  341. fieldQuery.AddShould(fuzzyQuery)
  342. textQuery.AddShould(fieldQuery)
  343. }
  344. if docType != "" {
  345. mainQuery.AddMust(textQuery)
  346. } else {
  347. return textQuery
  348. }
  349. return mainQuery
  350. }
  351. // convertResults converts Bleve search results to our SearchResult format
  352. func (si *SearchIndexer) convertResults(searchResult *bleve.SearchResult) []SearchResult {
  353. results := make([]SearchResult, 0, len(searchResult.Hits))
  354. for _, hit := range searchResult.Hits {
  355. doc := SearchDocument{
  356. ID: si.getStringField(hit.Fields, "id"),
  357. Type: si.getStringField(hit.Fields, "type"),
  358. Name: si.getStringField(hit.Fields, "name"),
  359. Path: si.getStringField(hit.Fields, "path"),
  360. Content: si.getStringField(hit.Fields, "content"),
  361. }
  362. // Parse updated_at if present
  363. if updatedAtStr := si.getStringField(hit.Fields, "updated_at"); updatedAtStr != "" {
  364. if updatedAt, err := time.Parse(time.RFC3339, updatedAtStr); err == nil {
  365. doc.UpdatedAt = updatedAt
  366. }
  367. }
  368. results = append(results, SearchResult{
  369. Document: doc,
  370. Score: hit.Score,
  371. })
  372. }
  373. return results
  374. }
  375. // getStringField safely gets a string field from search results
  376. func (si *SearchIndexer) getStringField(fields map[string]interface{}, fieldName string) string {
  377. if value, ok := fields[fieldName]; ok {
  378. if str, ok := value.(string); ok {
  379. return str
  380. }
  381. }
  382. return ""
  383. }
  384. // DeleteDocument removes a document from the index
  385. func (si *SearchIndexer) DeleteDocument(docID string) error {
  386. si.indexMutex.RLock()
  387. defer si.indexMutex.RUnlock()
  388. if si.index == nil {
  389. return fmt.Errorf("search index not initialized")
  390. }
  391. // Note: We don't track the exact size of deleted documents here
  392. // as it would require storing document sizes separately.
  393. // The memory tracking will reset during periodic cleanups or restarts.
  394. return si.index.Delete(docID)
  395. }
  396. // RebuildIndex rebuilds the entire search index
  397. func (si *SearchIndexer) RebuildIndex(ctx context.Context) error {
  398. si.indexMutex.Lock()
  399. defer si.indexMutex.Unlock()
  400. // Check if context is cancelled
  401. select {
  402. case <-ctx.Done():
  403. return ctx.Err()
  404. default:
  405. }
  406. if si.index != nil {
  407. si.index.Close()
  408. }
  409. // Check context before removing old index
  410. select {
  411. case <-ctx.Done():
  412. return ctx.Err()
  413. default:
  414. }
  415. // Remove old index
  416. if err := os.RemoveAll(si.indexPath); err != nil {
  417. logger.Error("Failed to remove old index:", err)
  418. }
  419. // Check context before creating new index
  420. select {
  421. case <-ctx.Done():
  422. return ctx.Err()
  423. default:
  424. }
  425. // Create new index
  426. var err error
  427. si.index, err = bleve.New(si.indexPath, si.createIndexMapping())
  428. if err != nil {
  429. return fmt.Errorf("failed to create new index: %w", err)
  430. }
  431. logger.Info("Search index rebuilt successfully")
  432. return nil
  433. }
  434. // GetIndexStats returns statistics about the search index
  435. func (si *SearchIndexer) GetIndexStats() (map[string]interface{}, error) {
  436. si.indexMutex.RLock()
  437. defer si.indexMutex.RUnlock()
  438. if si.index == nil {
  439. return nil, fmt.Errorf("search index not initialized")
  440. }
  441. docCount, err := si.index.DocCount()
  442. if err != nil {
  443. return nil, err
  444. }
  445. // Get memory usage statistics
  446. totalContentSize, trackedDocCount, maxMemoryUsage := si.getMemoryUsage()
  447. return map[string]interface{}{
  448. "document_count": docCount,
  449. "tracked_document_count": trackedDocCount,
  450. "total_content_size": totalContentSize,
  451. "max_memory_usage": maxMemoryUsage,
  452. "memory_usage_percent": float64(totalContentSize) / float64(maxMemoryUsage) * 100,
  453. "index_path": si.indexPath,
  454. }, nil
  455. }
  456. // Close closes the search index and triggers cleanup
  457. func (si *SearchIndexer) Close() error {
  458. if si.cancel != nil {
  459. si.cancel()
  460. }
  461. si.cleanup()
  462. return nil
  463. }
  464. // Convenience functions for different search types
  465. // SearchSites searches only site configurations
  466. func SearchSites(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  467. return GetSearchIndexer().SearchByType(ctx, query, "site", limit)
  468. }
  469. // SearchStreams searches only stream configurations
  470. func SearchStreams(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  471. return GetSearchIndexer().SearchByType(ctx, query, "stream", limit)
  472. }
  473. // SearchConfigs searches only general configurations
  474. func SearchConfigs(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  475. return GetSearchIndexer().SearchByType(ctx, query, "config", limit)
  476. }
  477. // SearchAll searches across all configuration types
  478. func SearchAll(ctx context.Context, query string, limit int) ([]SearchResult, error) {
  479. return GetSearchIndexer().Search(ctx, query, limit)
  480. }
  481. // checkMemoryLimitBeforeIndexing checks if adding new content would exceed memory limits
  482. func (si *SearchIndexer) checkMemoryLimitBeforeIndexing(contentSize int64) bool {
  483. si.memoryMutex.RLock()
  484. defer si.memoryMutex.RUnlock()
  485. // Check if adding this content would exceed the memory limit
  486. newTotalSize := si.totalContentSize + contentSize
  487. if newTotalSize > si.maxMemoryUsage {
  488. logger.Debugf("Memory limit would be exceeded: current=%d, new=%d, limit=%d",
  489. si.totalContentSize, newTotalSize, si.maxMemoryUsage)
  490. return false
  491. }
  492. // Also check document count limit (max 1000 documents)
  493. if si.documentCount >= 1000 {
  494. logger.Debugf("Document count limit reached: %d", si.documentCount)
  495. return false
  496. }
  497. return true
  498. }
  499. // updateMemoryUsage updates the memory usage tracking
  500. func (si *SearchIndexer) updateMemoryUsage(documentID string, contentSize int64, isAddition bool) {
  501. si.memoryMutex.Lock()
  502. defer si.memoryMutex.Unlock()
  503. if isAddition {
  504. si.totalContentSize += contentSize
  505. si.documentCount++
  506. // logger.Debugf("Added document %s: size=%d, total_size=%d, count=%d",
  507. // documentID, contentSize, si.totalContentSize, si.documentCount)
  508. } else {
  509. si.totalContentSize -= contentSize
  510. si.documentCount--
  511. if si.totalContentSize < 0 {
  512. si.totalContentSize = 0
  513. }
  514. if si.documentCount < 0 {
  515. si.documentCount = 0
  516. }
  517. // logger.Debugf("Removed document %s: size=%d, total_size=%d, count=%d",
  518. // documentID, contentSize, si.totalContentSize, si.documentCount)
  519. }
  520. }
  521. // getMemoryUsage returns current memory usage statistics
  522. func (si *SearchIndexer) getMemoryUsage() (int64, int64, int64) {
  523. si.memoryMutex.RLock()
  524. defer si.memoryMutex.RUnlock()
  525. return si.totalContentSize, si.documentCount, si.maxMemoryUsage
  526. }
  527. // isConfigFile checks if the content is a text/plain file (most nginx configs)
  528. func isConfigFile(content []byte) bool {
  529. if len(content) == 0 {
  530. return false // Empty files are not useful for configuration
  531. }
  532. // Detect MIME type and only accept text/plain
  533. mtype := mimetype.Detect(content)
  534. if mtype.Is("text/plain") {
  535. return true
  536. }
  537. return false
  538. }