log_indexer_core.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/blevesearch/bleve/v2/mapping"
  11. "github.com/dgraph-io/ristretto/v2"
  12. "github.com/fsnotify/fsnotify"
  13. "github.com/uozi-tech/cosy/logger"
  14. cosysettings "github.com/uozi-tech/cosy/settings"
  15. )
  16. const (
  17. // MinIndexInterval is the minimum interval between two index operations for the same file
  18. MinIndexInterval = 30 * time.Second
  19. )
  20. // LogIndexer provides high-performance log indexing and querying capabilities
  21. type LogIndexer struct {
  22. indexPath string
  23. index bleve.Index
  24. cache *ristretto.Cache[string, *CachedSearchResult]
  25. statsCache *ristretto.Cache[string, *CachedStatsResult]
  26. parser *OptimizedLogParser
  27. watcher *fsnotify.Watcher
  28. logPaths map[string]*LogFileInfo
  29. mu sync.RWMutex
  30. // Background processing
  31. ctx context.Context
  32. cancel context.CancelFunc
  33. indexQueue chan *IndexTask
  34. indexingLock sync.Map // map[string]*sync.Mutex for per-file locking
  35. // File debouncing
  36. debounceTimers sync.Map // map[string]*time.Timer for per-file debouncing
  37. lastIndexTime sync.Map // map[string]time.Time for tracking last index time
  38. // Progress event deduplication
  39. lastProgressNotify sync.Map // map[string]time.Time for tracking last progress notification per log group
  40. // Log group completion tracking to prevent duplicate notifications
  41. logGroupCompletionSent sync.Map // map[string]bool for tracking completion notifications per log group
  42. // Persistence
  43. persistence *PersistenceManager
  44. // Configuration
  45. maxCacheSize int64
  46. indexBatch int
  47. }
  48. // NewLogIndexer creates a new log indexer instance
  49. func NewLogIndexer(ctx context.Context) (*LogIndexer, error) {
  50. // Use nginx-ui config directory for index storage
  51. configDir := filepath.Dir(cosysettings.ConfPath)
  52. if configDir == "" {
  53. return nil, fmt.Errorf("nginx-ui config directory not found")
  54. }
  55. indexPath := filepath.Join(configDir, "log-index")
  56. // Create index directory if it doesn't exist
  57. if err := os.MkdirAll(indexPath, 0755); err != nil {
  58. return nil, fmt.Errorf("failed to create index directory: %w", err)
  59. }
  60. // Create or open Bleve index
  61. index, err := createOrOpenIndex(indexPath)
  62. if err != nil {
  63. return nil, fmt.Errorf("failed to create/open index: %w", err)
  64. }
  65. // Initialize cache with 100MB capacity
  66. cache, err := ristretto.NewCache(&ristretto.Config[string, *CachedSearchResult]{
  67. NumCounters: 1e7, // number of keys to track frequency of (10M)
  68. MaxCost: 1 << 27, // maximum cost of cache (128MB)
  69. BufferItems: 64, // number of keys per Get buffer
  70. })
  71. if err != nil {
  72. return nil, fmt.Errorf("failed to create cache: %w", err)
  73. }
  74. // Initialize statistics cache with 50MB capacity
  75. statsCache, err := ristretto.NewCache(&ristretto.Config[string, *CachedStatsResult]{
  76. NumCounters: 1e5, // number of keys to track frequency of (100K)
  77. MaxCost: 1 << 26, // maximum cost of cache (64MB)
  78. BufferItems: 64, // number of keys per Get buffer
  79. })
  80. if err != nil {
  81. return nil, fmt.Errorf("failed to create stats cache: %w", err)
  82. }
  83. // Create user agent parser
  84. userAgent := NewSimpleUserAgentParser()
  85. parser := NewOptimizedLogParser(userAgent)
  86. // Initialize file system watcher
  87. watcher, err := fsnotify.NewWatcher()
  88. if err != nil {
  89. logger.Warnf("Failed to create file watcher: %v", err)
  90. // Continue without watcher - manual indexing will still work
  91. }
  92. // Create context for background processing
  93. ctx, cancel := context.WithCancel(ctx)
  94. // Create persistence manager
  95. persistence := NewPersistenceManager()
  96. indexer := &LogIndexer{
  97. indexPath: indexPath,
  98. index: index,
  99. cache: cache,
  100. statsCache: statsCache,
  101. parser: parser,
  102. watcher: watcher,
  103. logPaths: make(map[string]*LogFileInfo),
  104. ctx: ctx,
  105. cancel: cancel,
  106. indexQueue: make(chan *IndexTask, 1000),
  107. persistence: persistence,
  108. maxCacheSize: 128 * 1024 * 1024, // 128MB
  109. indexBatch: 1000, // Process 1000 entries per batch
  110. }
  111. // Start background processing
  112. go indexer.processIndexQueue()
  113. if watcher != nil {
  114. go indexer.watchFiles()
  115. }
  116. logger.Info("Log indexer initialized successfully")
  117. return indexer, nil
  118. }
  119. // createOrOpenIndex creates or opens a Bleve index
  120. func createOrOpenIndex(indexPath string) (bleve.Index, error) {
  121. // Try to open existing index first
  122. index, err := bleve.Open(indexPath)
  123. if err == nil {
  124. logger.Infof("Opened existing Bleve index at %s", indexPath)
  125. return index, nil
  126. }
  127. // If opening failed, create a new index
  128. logger.Infof("Creating new Bleve index at %s", indexPath)
  129. indexMapping := createIndexMapping()
  130. index, err = bleve.New(indexPath, indexMapping)
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to create new index: %w", err)
  133. }
  134. logger.Infof("Created new Bleve index at %s", indexPath)
  135. return index, nil
  136. }
  137. // createIndexMapping creates the mapping for log entries
  138. func createIndexMapping() mapping.IndexMapping {
  139. // Create a mapping with keyword analyzer for file_path to enable exact matching
  140. indexMapping := bleve.NewIndexMapping()
  141. // Create a document mapping for log entries
  142. logMapping := bleve.NewDocumentMapping()
  143. // Create field mappings
  144. textFieldMapping := bleve.NewTextFieldMapping()
  145. textFieldMapping.Store = true
  146. textFieldMapping.Index = true
  147. textFieldMapping.IncludeTermVectors = false
  148. textFieldMapping.IncludeInAll = false
  149. // For file_path, use keyword analyzer to enable exact matching
  150. filePathFieldMapping := bleve.NewTextFieldMapping()
  151. filePathFieldMapping.Store = true
  152. filePathFieldMapping.Index = true
  153. filePathFieldMapping.Analyzer = "keyword" // Use keyword analyzer for exact matching
  154. filePathFieldMapping.IncludeTermVectors = false
  155. filePathFieldMapping.IncludeInAll = false
  156. dateFieldMapping := bleve.NewDateTimeFieldMapping()
  157. dateFieldMapping.Store = true
  158. dateFieldMapping.Index = true
  159. dateFieldMapping.IncludeInAll = false
  160. numericFieldMapping := bleve.NewNumericFieldMapping()
  161. numericFieldMapping.Store = true
  162. numericFieldMapping.Index = true
  163. numericFieldMapping.IncludeInAll = false
  164. // Map fields to their types
  165. logMapping.AddFieldMappingsAt("file_path", filePathFieldMapping) // Use keyword analyzer for exact matching
  166. logMapping.AddFieldMappingsAt("timestamp", numericFieldMapping) // Use numeric mapping for Unix timestamps
  167. logMapping.AddFieldMappingsAt("ip", textFieldMapping)
  168. logMapping.AddFieldMappingsAt("location", textFieldMapping)
  169. logMapping.AddFieldMappingsAt("region_code", textFieldMapping)
  170. logMapping.AddFieldMappingsAt("province", textFieldMapping)
  171. logMapping.AddFieldMappingsAt("city", textFieldMapping)
  172. logMapping.AddFieldMappingsAt("isp", textFieldMapping)
  173. logMapping.AddFieldMappingsAt("method", textFieldMapping)
  174. logMapping.AddFieldMappingsAt("path", textFieldMapping)
  175. logMapping.AddFieldMappingsAt("protocol", textFieldMapping)
  176. logMapping.AddFieldMappingsAt("status", numericFieldMapping)
  177. logMapping.AddFieldMappingsAt("bytes_sent", numericFieldMapping)
  178. logMapping.AddFieldMappingsAt("referer", textFieldMapping)
  179. logMapping.AddFieldMappingsAt("user_agent", textFieldMapping)
  180. logMapping.AddFieldMappingsAt("browser", textFieldMapping)
  181. logMapping.AddFieldMappingsAt("browser_ver", textFieldMapping)
  182. logMapping.AddFieldMappingsAt("os", textFieldMapping)
  183. logMapping.AddFieldMappingsAt("os_version", textFieldMapping)
  184. logMapping.AddFieldMappingsAt("device_type", textFieldMapping)
  185. logMapping.AddFieldMappingsAt("request_time", numericFieldMapping)
  186. logMapping.AddFieldMappingsAt("upstream_time", numericFieldMapping)
  187. logMapping.AddFieldMappingsAt("raw", textFieldMapping)
  188. // Set the default mapping
  189. indexMapping.DefaultMapping = logMapping
  190. // Enable the _all field for general text search
  191. indexMapping.DefaultAnalyzer = "standard"
  192. return indexMapping
  193. }
  194. // Close closes the indexer and cleans up resources
  195. func (li *LogIndexer) Close() error {
  196. logger.Info("Closing log indexer...")
  197. // Cancel context to stop background processing
  198. if li.cancel != nil {
  199. li.cancel()
  200. }
  201. // Close the index queue
  202. if li.indexQueue != nil {
  203. close(li.indexQueue)
  204. }
  205. // Close file watcher
  206. if li.watcher != nil {
  207. if err := li.watcher.Close(); err != nil {
  208. logger.Warnf("Failed to close file watcher: %v", err)
  209. }
  210. }
  211. // Close persistence manager
  212. if li.persistence != nil {
  213. if err := li.persistence.Close(); err != nil {
  214. logger.Warnf("Failed to close persistence manager: %v", err)
  215. }
  216. }
  217. // Close cache
  218. if li.cache != nil {
  219. li.cache.Close()
  220. }
  221. if li.statsCache != nil {
  222. li.statsCache.Close()
  223. }
  224. // Close Bleve index
  225. if li.index != nil {
  226. if err := li.index.Close(); err != nil {
  227. logger.Errorf("Failed to close Bleve index: %v", err)
  228. return err
  229. }
  230. }
  231. logger.Info("Log indexer closed successfully")
  232. return nil
  233. }