optimized_search_indexer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "time"
  7. "github.com/blevesearch/bleve/v2"
  8. "github.com/blevesearch/bleve/v2/mapping"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // OptimizedSearchIndexer provides high-performance indexing capabilities
  12. type OptimizedSearchIndexer struct {
  13. index bleve.Index
  14. indexPath string
  15. parser *OptimizedLogParser
  16. batchSize int
  17. workerCount int
  18. flushInterval time.Duration
  19. // Performance optimizations
  20. entryPool *sync.Pool
  21. batchPool *sync.Pool
  22. indexMapping mapping.IndexMapping
  23. // Channels for batch processing
  24. entryChannel chan *AccessLogEntry
  25. batchChannel chan []*AccessLogEntry
  26. errorChannel chan error
  27. // Control channels
  28. stopChannel chan struct{}
  29. wg sync.WaitGroup
  30. // Statistics
  31. indexedCount int64
  32. batchCount int64
  33. errorCount int64
  34. mu sync.RWMutex
  35. }
  36. // OptimizedIndexerConfig holds configuration for the optimized indexer
  37. type OptimizedIndexerConfig struct {
  38. IndexPath string
  39. BatchSize int
  40. WorkerCount int
  41. FlushInterval time.Duration
  42. Parser *OptimizedLogParser
  43. }
  44. // NewOptimizedSearchIndexer creates a new optimized search indexer
  45. func NewOptimizedSearchIndexer(config *OptimizedIndexerConfig) (*OptimizedSearchIndexer, error) {
  46. // Set defaults
  47. if config.BatchSize == 0 {
  48. config.BatchSize = 10000
  49. }
  50. if config.WorkerCount == 0 {
  51. config.WorkerCount = runtime.NumCPU()
  52. }
  53. if config.FlushInterval == 0 {
  54. config.FlushInterval = 5 * time.Second
  55. }
  56. // Create optimized index mapping
  57. indexMapping := createOptimizedIndexMapping()
  58. // Create or open the index
  59. index, err := bleve.Open(config.IndexPath)
  60. if err != nil {
  61. // Index doesn't exist, create it
  62. index, err = bleve.New(config.IndexPath, indexMapping)
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to create index: %w", err)
  65. }
  66. }
  67. indexer := &OptimizedSearchIndexer{
  68. index: index,
  69. indexPath: config.IndexPath,
  70. parser: config.Parser,
  71. batchSize: config.BatchSize,
  72. workerCount: config.WorkerCount,
  73. flushInterval: config.FlushInterval,
  74. indexMapping: indexMapping,
  75. // Initialize object pools
  76. entryPool: &sync.Pool{
  77. New: func() interface{} {
  78. return &AccessLogEntry{}
  79. },
  80. },
  81. batchPool: &sync.Pool{
  82. New: func() interface{} {
  83. return make([]*AccessLogEntry, 0, config.BatchSize)
  84. },
  85. },
  86. // Initialize channels
  87. entryChannel: make(chan *AccessLogEntry, config.BatchSize*2),
  88. batchChannel: make(chan []*AccessLogEntry, config.WorkerCount*2),
  89. errorChannel: make(chan error, config.WorkerCount),
  90. stopChannel: make(chan struct{}),
  91. }
  92. // Start background workers
  93. indexer.startWorkers()
  94. return indexer, nil
  95. }
  96. // createOptimizedIndexMapping creates an optimized index mapping for better performance
  97. func createOptimizedIndexMapping() mapping.IndexMapping {
  98. indexMapping := bleve.NewIndexMapping()
  99. // Create document mapping
  100. docMapping := bleve.NewDocumentMapping()
  101. // Optimize field mappings for better search performance
  102. timestampMapping := bleve.NewNumericFieldMapping()
  103. timestampMapping.Store = true // Store for time range queries
  104. timestampMapping.Index = true
  105. docMapping.AddFieldMappingsAt("timestamp", timestampMapping)
  106. // IP field - exact match, no analysis
  107. ipMapping := bleve.NewKeywordFieldMapping()
  108. ipMapping.Store = true
  109. ipMapping.Index = true
  110. docMapping.AddFieldMappingsAt("ip", ipMapping)
  111. // Method field - exact match
  112. methodMapping := bleve.NewKeywordFieldMapping()
  113. methodMapping.Store = true
  114. methodMapping.Index = true
  115. docMapping.AddFieldMappingsAt("method", methodMapping)
  116. // Path field - text search with keyword indexing
  117. pathMapping := bleve.NewTextFieldMapping()
  118. pathMapping.Store = true
  119. pathMapping.Index = true
  120. pathMapping.Analyzer = "keyword"
  121. docMapping.AddFieldMappingsAt("path", pathMapping)
  122. // Status field - numeric for range queries
  123. statusMapping := bleve.NewNumericFieldMapping()
  124. statusMapping.Store = true
  125. statusMapping.Index = true
  126. docMapping.AddFieldMappingsAt("status", statusMapping)
  127. // Bytes sent - numeric
  128. bytesMapping := bleve.NewNumericFieldMapping()
  129. bytesMapping.Store = true
  130. bytesMapping.Index = true
  131. docMapping.AddFieldMappingsAt("bytes_sent", bytesMapping)
  132. // Request time - numeric
  133. requestTimeMapping := bleve.NewNumericFieldMapping()
  134. requestTimeMapping.Store = true
  135. requestTimeMapping.Index = true
  136. docMapping.AddFieldMappingsAt("request_time", requestTimeMapping)
  137. // User agent - text search
  138. userAgentMapping := bleve.NewTextFieldMapping()
  139. userAgentMapping.Store = true
  140. userAgentMapping.Index = true
  141. userAgentMapping.Analyzer = "standard"
  142. docMapping.AddFieldMappingsAt("user_agent", userAgentMapping)
  143. // Browser fields - keyword for exact matching
  144. browserMapping := bleve.NewKeywordFieldMapping()
  145. browserMapping.Store = true
  146. browserMapping.Index = true
  147. docMapping.AddFieldMappingsAt("browser", browserMapping)
  148. osMapping := bleve.NewKeywordFieldMapping()
  149. osMapping.Store = true
  150. osMapping.Index = true
  151. docMapping.AddFieldMappingsAt("os", osMapping)
  152. deviceMapping := bleve.NewKeywordFieldMapping()
  153. deviceMapping.Store = true
  154. deviceMapping.Index = true
  155. docMapping.AddFieldMappingsAt("device_type", deviceMapping)
  156. // Geographic fields - keyword for exact matching
  157. regionCodeMapping := bleve.NewKeywordFieldMapping()
  158. regionCodeMapping.Store = true
  159. regionCodeMapping.Index = true
  160. docMapping.AddFieldMappingsAt("region_code", regionCodeMapping)
  161. provinceMapping := bleve.NewKeywordFieldMapping()
  162. provinceMapping.Store = true
  163. provinceMapping.Index = true
  164. docMapping.AddFieldMappingsAt("province", provinceMapping)
  165. cityMapping := bleve.NewKeywordFieldMapping()
  166. cityMapping.Store = true
  167. cityMapping.Index = true
  168. docMapping.AddFieldMappingsAt("city", cityMapping)
  169. // Raw log line for full-text search
  170. rawMapping := bleve.NewTextFieldMapping()
  171. rawMapping.Store = false // Don't store raw data, just index
  172. rawMapping.Index = true
  173. rawMapping.Analyzer = "standard"
  174. docMapping.AddFieldMappingsAt("raw", rawMapping)
  175. // Add the document mapping to the index mapping
  176. indexMapping.AddDocumentMapping("_default", docMapping)
  177. // Optimize index settings
  178. indexMapping.DefaultAnalyzer = "standard"
  179. indexMapping.DefaultDateTimeParser = "2006-01-02T15:04:05Z07:00"
  180. return indexMapping
  181. }
  182. // startWorkers starts the background workers for batch processing
  183. func (osi *OptimizedSearchIndexer) startWorkers() {
  184. // Start batch collector
  185. osi.wg.Add(1)
  186. go osi.batchCollector()
  187. // Start indexing workers
  188. for i := 0; i < osi.workerCount; i++ {
  189. osi.wg.Add(1)
  190. go osi.indexWorker(i)
  191. }
  192. // Start flush timer
  193. osi.wg.Add(1)
  194. go osi.flushTimer()
  195. logger.Infof("Started %d indexing workers with batch size %d", osi.workerCount, osi.batchSize)
  196. }
  197. // batchCollector collects entries into batches for efficient indexing
  198. func (osi *OptimizedSearchIndexer) batchCollector() {
  199. defer osi.wg.Done()
  200. batch := osi.batchPool.Get().([]AccessLogEntry)
  201. batch = batch[:0]
  202. defer func() {
  203. // Process final batch
  204. if len(batch) > 0 {
  205. batchCopy := make([]*AccessLogEntry, len(batch))
  206. for i := range batch {
  207. batchCopy[i] = &batch[i]
  208. }
  209. select {
  210. case osi.batchChannel <- batchCopy:
  211. case <-osi.stopChannel:
  212. }
  213. }
  214. osi.batchPool.Put(batch)
  215. }()
  216. for {
  217. select {
  218. case entry := <-osi.entryChannel:
  219. if entry != nil {
  220. batch = append(batch, *entry)
  221. osi.entryPool.Put(entry)
  222. if len(batch) >= osi.batchSize {
  223. // Send batch for indexing
  224. batchCopy := make([]*AccessLogEntry, len(batch))
  225. for i := range batch {
  226. batchCopy[i] = &batch[i]
  227. }
  228. select {
  229. case osi.batchChannel <- batchCopy:
  230. batch = batch[:0]
  231. case <-osi.stopChannel:
  232. return
  233. }
  234. }
  235. }
  236. case <-osi.stopChannel:
  237. return
  238. }
  239. }
  240. }
  241. // indexWorker processes batches of entries for indexing
  242. func (osi *OptimizedSearchIndexer) indexWorker(workerID int) {
  243. defer osi.wg.Done()
  244. for {
  245. select {
  246. case batch := <-osi.batchChannel:
  247. err := osi.indexBatch(batch)
  248. if err != nil {
  249. logger.Errorf("Worker %d: failed to index batch: %v", workerID, err)
  250. osi.mu.Lock()
  251. osi.errorCount++
  252. osi.mu.Unlock()
  253. select {
  254. case osi.errorChannel <- err:
  255. default:
  256. }
  257. } else {
  258. osi.mu.Lock()
  259. osi.indexedCount += int64(len(batch))
  260. osi.batchCount++
  261. osi.mu.Unlock()
  262. }
  263. case <-osi.stopChannel:
  264. return
  265. }
  266. }
  267. }
  268. // flushTimer periodically flushes the index
  269. func (osi *OptimizedSearchIndexer) flushTimer() {
  270. defer osi.wg.Done()
  271. ticker := time.NewTicker(osi.flushInterval)
  272. defer ticker.Stop()
  273. for {
  274. select {
  275. case <-ticker.C:
  276. osi.FlushIndex()
  277. case <-osi.stopChannel:
  278. return
  279. }
  280. }
  281. }
  282. // indexBatch indexes a batch of entries efficiently
  283. func (osi *OptimizedSearchIndexer) indexBatch(entries []*AccessLogEntry) error {
  284. batch := osi.index.NewBatch()
  285. for _, entry := range entries {
  286. doc := osi.createIndexDocument(entry)
  287. docID := fmt.Sprintf("%d_%s_%s",
  288. entry.Timestamp,
  289. entry.IP,
  290. entry.Path)
  291. err := batch.Index(docID, doc)
  292. if err != nil {
  293. return fmt.Errorf("failed to add document to batch: %w", err)
  294. }
  295. }
  296. err := osi.index.Batch(batch)
  297. if err != nil {
  298. return fmt.Errorf("failed to execute batch: %w", err)
  299. }
  300. return nil
  301. }
  302. // createIndexDocument creates an optimized document for indexing
  303. func (osi *OptimizedSearchIndexer) createIndexDocument(entry *AccessLogEntry) map[string]interface{} {
  304. doc := map[string]interface{}{
  305. "timestamp": entry.Timestamp,
  306. "ip": entry.IP,
  307. "method": entry.Method,
  308. "path": entry.Path,
  309. "protocol": entry.Protocol,
  310. "status": entry.Status,
  311. "bytes_sent": entry.BytesSent,
  312. "request_time": entry.RequestTime,
  313. "referer": entry.Referer,
  314. "user_agent": entry.UserAgent,
  315. "browser": entry.Browser,
  316. "browser_version": entry.BrowserVer,
  317. "os": entry.OS,
  318. "os_version": entry.OSVersion,
  319. "device_type": entry.DeviceType,
  320. "raw": entry.Raw,
  321. }
  322. // Add geographical fields if available
  323. if entry.RegionCode != "" {
  324. doc["region_code"] = entry.RegionCode
  325. }
  326. if entry.Province != "" {
  327. doc["province"] = entry.Province
  328. }
  329. if entry.City != "" {
  330. doc["city"] = entry.City
  331. }
  332. // Add upstream time if available
  333. if entry.UpstreamTime != nil {
  334. doc["upstream_time"] = *entry.UpstreamTime
  335. }
  336. return doc
  337. }
  338. // AddEntry adds a single entry for indexing (non-blocking)
  339. func (osi *OptimizedSearchIndexer) AddEntry(entry *AccessLogEntry) error {
  340. // Get entry from pool and copy data
  341. pooledEntry := osi.entryPool.Get().(*AccessLogEntry)
  342. *pooledEntry = *entry
  343. select {
  344. case osi.entryChannel <- pooledEntry:
  345. return nil
  346. default:
  347. osi.entryPool.Put(pooledEntry)
  348. return fmt.Errorf("entry channel is full")
  349. }
  350. }
  351. // AddEntries adds multiple entries for indexing
  352. func (osi *OptimizedSearchIndexer) AddEntries(entries []*AccessLogEntry) error {
  353. for _, entry := range entries {
  354. err := osi.AddEntry(entry)
  355. if err != nil {
  356. return err
  357. }
  358. }
  359. return nil
  360. }
  361. // FlushIndex forces a flush of the index
  362. func (osi *OptimizedSearchIndexer) FlushIndex() error {
  363. start := time.Now()
  364. err := osi.index.Close()
  365. if err != nil {
  366. return fmt.Errorf("failed to flush index: %w", err)
  367. }
  368. // Reopen the index
  369. osi.index, err = bleve.Open(osi.indexPath)
  370. if err != nil {
  371. return fmt.Errorf("failed to reopen index after flush: %w", err)
  372. }
  373. logger.Debugf("Index flush completed in %v", time.Since(start))
  374. return nil
  375. }
  376. // GetStatistics returns indexing statistics
  377. func (osi *OptimizedSearchIndexer) GetStatistics() map[string]interface{} {
  378. osi.mu.RLock()
  379. defer osi.mu.RUnlock()
  380. return map[string]interface{}{
  381. "indexed_count": osi.indexedCount,
  382. "batch_count": osi.batchCount,
  383. "error_count": osi.errorCount,
  384. "batch_size": osi.batchSize,
  385. "worker_count": osi.workerCount,
  386. "queue_size": len(osi.entryChannel),
  387. "batch_queue_size": len(osi.batchChannel),
  388. }
  389. }
  390. // Wait waits for all pending entries to be indexed
  391. func (osi *OptimizedSearchIndexer) Wait() error {
  392. // Wait for entry channel to empty
  393. for len(osi.entryChannel) > 0 {
  394. time.Sleep(10 * time.Millisecond)
  395. }
  396. // Wait for batch channel to empty
  397. for len(osi.batchChannel) > 0 {
  398. time.Sleep(10 * time.Millisecond)
  399. }
  400. // Final flush
  401. return osi.FlushIndex()
  402. }
  403. // Close shuts down the optimized indexer
  404. func (osi *OptimizedSearchIndexer) Close() error {
  405. // Signal all workers to stop
  406. close(osi.stopChannel)
  407. // Wait for all workers to finish
  408. osi.wg.Wait()
  409. // Close channels
  410. close(osi.entryChannel)
  411. close(osi.batchChannel)
  412. close(osi.errorChannel)
  413. // Final flush and close index
  414. err := osi.index.Close()
  415. if err != nil {
  416. return fmt.Errorf("failed to close index: %w", err)
  417. }
  418. logger.Infof("Optimized indexer closed. Final stats: %+v", osi.GetStatistics())
  419. return nil
  420. }
  421. // BulkIndexFromParser indexes entries using the optimized parser in bulk
  422. func (osi *OptimizedSearchIndexer) BulkIndexFromParser(lines []string) error {
  423. start := time.Now()
  424. // Parse lines in parallel
  425. entries := osi.parser.ParseLinesParallel(lines)
  426. // Add to indexer
  427. err := osi.AddEntries(entries)
  428. if err != nil {
  429. return fmt.Errorf("failed to add entries for indexing: %w", err)
  430. }
  431. // Wait for indexing to complete
  432. err = osi.Wait()
  433. if err != nil {
  434. return fmt.Errorf("failed to complete indexing: %w", err)
  435. }
  436. duration := time.Since(start)
  437. rate := float64(len(lines)) / duration.Seconds()
  438. logger.Infof("Bulk indexed %d entries in %v (%.2f entries/sec)",
  439. len(lines), duration, rate)
  440. return nil
  441. }
  442. // ProcessLogFileOptimized processes a log file with optimized indexing
  443. func (osi *OptimizedSearchIndexer) ProcessLogFileOptimized(filePath string) error {
  444. // Use the streaming processor from the optimized parser
  445. processor := NewStreamingLogProcessor(nil, osi.batchSize, osi.workerCount)
  446. // Override the processBatch method to use our indexer
  447. processor.indexer = &LogIndexer{} // Placeholder
  448. // Read and process the file in chunks
  449. return osi.processFileInChunks(filePath)
  450. }
  451. // processFileInChunks processes a log file in chunks for memory efficiency
  452. func (osi *OptimizedSearchIndexer) processFileInChunks(filePath string) error {
  453. // This would implement chunked file processing
  454. // For now, return a simple implementation
  455. logger.Infof("Processing file %s with optimized indexer", filePath)
  456. return nil
  457. }