1
0

parallel_indexer.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. package indexer
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/blevesearch/bleve/v2"
  16. "github.com/uozi-tech/cosy/logger"
  17. "github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
  18. )
  19. // ParallelIndexer provides high-performance parallel indexing with sharding
  20. type ParallelIndexer struct {
  21. config *Config
  22. shardManager ShardManager
  23. metrics MetricsCollector
  24. // Worker management
  25. workers []*indexWorker
  26. jobQueue chan *IndexJob
  27. resultQueue chan *IndexResult
  28. // State management
  29. ctx context.Context
  30. cancel context.CancelFunc
  31. wg sync.WaitGroup
  32. running int32
  33. // Cleanup control
  34. stopOnce sync.Once
  35. channelsClosed int32
  36. // Statistics
  37. stats *IndexStats
  38. statsMutex sync.RWMutex
  39. // Optimization
  40. lastOptimized int64
  41. optimizing int32
  42. adaptiveOptimizer *AdaptiveOptimizer
  43. zeroAllocProcessor *ZeroAllocBatchProcessor
  44. optimizationEnabled bool
  45. // Dynamic shard awareness
  46. dynamicAwareness *DynamicShardAwareness
  47. }
  48. // indexWorker represents a single indexing worker
  49. type indexWorker struct {
  50. id int
  51. indexer *ParallelIndexer
  52. stats *WorkerStats
  53. statsMutex sync.RWMutex
  54. }
  55. // NewParallelIndexer creates a new parallel indexer with dynamic shard awareness
  56. func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelIndexer {
  57. if config == nil {
  58. config = DefaultIndexerConfig()
  59. }
  60. ctx, cancel := context.WithCancel(context.Background())
  61. // Initialize dynamic shard awareness
  62. dynamicAwareness := NewDynamicShardAwareness(config)
  63. // If no shard manager provided, use dynamic awareness to detect optimal type
  64. var actualShardManager ShardManager
  65. if shardManager == nil {
  66. detected, err := dynamicAwareness.DetectAndSetupShardManager()
  67. if err != nil {
  68. logger.Warnf("Failed to setup dynamic shard manager, using default: %v", err)
  69. detected = NewDefaultShardManager(config)
  70. detected.(*DefaultShardManager).Initialize()
  71. }
  72. // Type assertion to ShardManager interface
  73. if sm, ok := detected.(ShardManager); ok {
  74. actualShardManager = sm
  75. } else {
  76. // Fallback to default
  77. actualShardManager = NewDefaultShardManager(config)
  78. actualShardManager.(*DefaultShardManager).Initialize()
  79. }
  80. } else {
  81. actualShardManager = shardManager
  82. }
  83. indexer := &ParallelIndexer{
  84. config: config,
  85. shardManager: actualShardManager,
  86. metrics: NewDefaultMetricsCollector(),
  87. jobQueue: make(chan *IndexJob, config.MaxQueueSize),
  88. resultQueue: make(chan *IndexResult, config.WorkerCount),
  89. ctx: ctx,
  90. cancel: cancel,
  91. stats: &IndexStats{
  92. WorkerStats: make([]*WorkerStats, config.WorkerCount),
  93. },
  94. adaptiveOptimizer: NewAdaptiveOptimizer(config),
  95. zeroAllocProcessor: NewZeroAllocBatchProcessor(config),
  96. optimizationEnabled: true, // Enable optimizations by default
  97. dynamicAwareness: dynamicAwareness,
  98. }
  99. // Initialize workers
  100. indexer.workers = make([]*indexWorker, config.WorkerCount)
  101. for i := 0; i < config.WorkerCount; i++ {
  102. indexer.workers[i] = &indexWorker{
  103. id: i,
  104. indexer: indexer,
  105. stats: &WorkerStats{
  106. ID: i,
  107. Status: WorkerStatusIdle,
  108. },
  109. }
  110. indexer.stats.WorkerStats[i] = indexer.workers[i].stats
  111. }
  112. return indexer
  113. }
  114. // Start begins the indexer operation
  115. func (pi *ParallelIndexer) Start(ctx context.Context) error {
  116. if !atomic.CompareAndSwapInt32(&pi.running, 0, 1) {
  117. return fmt.Errorf("indexer not started")
  118. }
  119. // Initialize shard manager
  120. if err := pi.shardManager.Initialize(); err != nil {
  121. atomic.StoreInt32(&pi.running, 0)
  122. return fmt.Errorf("failed to initialize shard manager: %w", err)
  123. }
  124. // Start workers
  125. for _, worker := range pi.workers {
  126. pi.wg.Add(1)
  127. go worker.run()
  128. }
  129. // Start result processor
  130. pi.wg.Add(1)
  131. go pi.processResults()
  132. // Start optimization routine if enabled
  133. if pi.config.OptimizeInterval > 0 {
  134. pi.wg.Add(1)
  135. go pi.optimizationRoutine()
  136. }
  137. // Start metrics collection if enabled
  138. if pi.config.EnableMetrics {
  139. pi.wg.Add(1)
  140. go pi.metricsRoutine()
  141. }
  142. // Start adaptive optimizer if enabled
  143. if pi.optimizationEnabled && pi.adaptiveOptimizer != nil {
  144. if err := pi.adaptiveOptimizer.Start(); err != nil {
  145. logger.Warnf("Failed to start adaptive optimizer: %v", err)
  146. }
  147. }
  148. // Start dynamic shard awareness monitoring if enabled
  149. if pi.dynamicAwareness != nil {
  150. pi.dynamicAwareness.StartMonitoring(ctx)
  151. if pi.dynamicAwareness.IsDynamic() {
  152. logger.Info("Dynamic shard management is active with automatic scaling")
  153. } else {
  154. logger.Info("Static shard management is active")
  155. }
  156. }
  157. return nil
  158. }
  159. // Stop gracefully stops the indexer
  160. func (pi *ParallelIndexer) Stop() error {
  161. var stopErr error
  162. pi.stopOnce.Do(func() {
  163. // Set running to 0
  164. if !atomic.CompareAndSwapInt32(&pi.running, 1, 0) {
  165. logger.Warnf("[ParallelIndexer] Stop called but indexer already stopped")
  166. stopErr = fmt.Errorf("indexer already stopped")
  167. return
  168. }
  169. // Cancel context to stop all routines
  170. pi.cancel()
  171. // Stop adaptive optimizer
  172. if pi.adaptiveOptimizer != nil {
  173. pi.adaptiveOptimizer.Stop()
  174. }
  175. // Close channels safely if they haven't been closed yet
  176. if atomic.CompareAndSwapInt32(&pi.channelsClosed, 0, 1) {
  177. // Close job queue to stop accepting new jobs
  178. close(pi.jobQueue)
  179. // Wait for all workers to finish
  180. pi.wg.Wait()
  181. // Close result queue
  182. close(pi.resultQueue)
  183. } else {
  184. // If channels are already closed, just wait for workers
  185. pi.wg.Wait()
  186. }
  187. // Skip flush during stop - shards may already be closed by searcher
  188. // FlushAll should be called before Stop() if needed
  189. // Close the shard manager - this will close all shards
  190. // But we don't do this here because the shards might be in use by the searcher
  191. // The shards will be closed when the searcher is stopped
  192. })
  193. return stopErr
  194. }
  195. // IndexDocument indexes a single document
  196. func (pi *ParallelIndexer) IndexDocument(ctx context.Context, doc *Document) error {
  197. return pi.IndexDocuments(ctx, []*Document{doc})
  198. }
  199. // IndexDocuments indexes multiple documents
  200. func (pi *ParallelIndexer) IndexDocuments(ctx context.Context, docs []*Document) error {
  201. if !pi.IsHealthy() {
  202. return fmt.Errorf("indexer not started")
  203. }
  204. if len(docs) == 0 {
  205. return nil
  206. }
  207. // Create job
  208. job := &IndexJob{
  209. Documents: docs,
  210. Priority: PriorityNormal,
  211. }
  212. // Submit job and wait for completion
  213. done := make(chan error, 1)
  214. job.Callback = func(err error) {
  215. done <- err
  216. }
  217. select {
  218. case pi.jobQueue <- job:
  219. select {
  220. case err := <-done:
  221. return err
  222. case <-ctx.Done():
  223. return ctx.Err()
  224. }
  225. case <-ctx.Done():
  226. return ctx.Err()
  227. case <-pi.ctx.Done():
  228. return fmt.Errorf("indexer stopped")
  229. }
  230. }
  231. // IndexDocumentAsync indexes a document asynchronously
  232. func (pi *ParallelIndexer) IndexDocumentAsync(doc *Document, callback func(error)) {
  233. pi.IndexDocumentsAsync([]*Document{doc}, callback)
  234. }
  235. // IndexDocumentsAsync indexes multiple documents asynchronously
  236. func (pi *ParallelIndexer) IndexDocumentsAsync(docs []*Document, callback func(error)) {
  237. if !pi.IsHealthy() {
  238. if callback != nil {
  239. callback(fmt.Errorf("indexer not started"))
  240. }
  241. return
  242. }
  243. if len(docs) == 0 {
  244. if callback != nil {
  245. callback(nil)
  246. }
  247. return
  248. }
  249. job := &IndexJob{
  250. Documents: docs,
  251. Priority: PriorityNormal,
  252. Callback: callback,
  253. }
  254. select {
  255. case pi.jobQueue <- job:
  256. // Job queued successfully
  257. case <-pi.ctx.Done():
  258. if callback != nil {
  259. callback(fmt.Errorf("indexer stopped"))
  260. }
  261. default:
  262. // Queue is full
  263. if callback != nil {
  264. callback(fmt.Errorf("queue is full"))
  265. }
  266. }
  267. }
  268. // StartBatch returns a new batch writer with adaptive batch size
  269. func (pi *ParallelIndexer) StartBatch() BatchWriterInterface {
  270. batchSize := pi.config.BatchSize
  271. if pi.adaptiveOptimizer != nil {
  272. batchSize = pi.adaptiveOptimizer.GetOptimalBatchSize()
  273. }
  274. return NewBatchWriter(pi, batchSize)
  275. }
  276. // GetOptimizationStats returns current optimization statistics
  277. func (pi *ParallelIndexer) GetOptimizationStats() AdaptiveOptimizationStats {
  278. if pi.adaptiveOptimizer != nil {
  279. return pi.adaptiveOptimizer.GetOptimizationStats()
  280. }
  281. return AdaptiveOptimizationStats{}
  282. }
  283. // GetPoolStats returns object pool statistics
  284. func (pi *ParallelIndexer) GetPoolStats() PoolStats {
  285. if pi.zeroAllocProcessor != nil {
  286. return pi.zeroAllocProcessor.GetPoolStats()
  287. }
  288. return PoolStats{}
  289. }
  290. // EnableOptimizations enables or disables adaptive optimizations
  291. func (pi *ParallelIndexer) EnableOptimizations(enabled bool) {
  292. pi.optimizationEnabled = enabled
  293. if !enabled && pi.adaptiveOptimizer != nil {
  294. pi.adaptiveOptimizer.Stop()
  295. } else if enabled && pi.adaptiveOptimizer != nil && atomic.LoadInt32(&pi.running) == 1 {
  296. pi.adaptiveOptimizer.Start()
  297. }
  298. }
  299. // GetDynamicShardInfo returns information about dynamic shard management
  300. func (pi *ParallelIndexer) GetDynamicShardInfo() *DynamicShardInfo {
  301. if pi.dynamicAwareness == nil {
  302. return &DynamicShardInfo{
  303. IsEnabled: false,
  304. IsActive: false,
  305. ShardCount: pi.config.ShardCount,
  306. ShardType: "static",
  307. }
  308. }
  309. isDynamic := pi.dynamicAwareness.IsDynamic()
  310. shardManager := pi.dynamicAwareness.GetCurrentShardManager()
  311. info := &DynamicShardInfo{
  312. IsEnabled: true,
  313. IsActive: isDynamic,
  314. ShardCount: pi.config.ShardCount,
  315. ShardType: "static",
  316. }
  317. if isDynamic {
  318. info.ShardType = "dynamic"
  319. if enhancedManager, ok := shardManager.(*EnhancedDynamicShardManager); ok {
  320. info.TargetShardCount = enhancedManager.GetTargetShardCount()
  321. info.IsScaling = enhancedManager.IsScalingInProgress()
  322. info.AutoScaleEnabled = enhancedManager.IsAutoScaleEnabled()
  323. // Get scaling recommendation
  324. recommendation := enhancedManager.GetScalingRecommendations()
  325. info.Recommendation = recommendation
  326. // Get shard health
  327. info.ShardHealth = enhancedManager.GetShardHealth()
  328. }
  329. }
  330. // Get performance analysis
  331. analysis := pi.dynamicAwareness.GetPerformanceAnalysis()
  332. info.PerformanceAnalysis = &analysis
  333. return info
  334. }
  335. // DynamicShardInfo contains information about dynamic shard management status
  336. type DynamicShardInfo struct {
  337. IsEnabled bool `json:"is_enabled"`
  338. IsActive bool `json:"is_active"`
  339. ShardType string `json:"shard_type"` // "static" or "dynamic"
  340. ShardCount int `json:"shard_count"`
  341. TargetShardCount int `json:"target_shard_count,omitempty"`
  342. IsScaling bool `json:"is_scaling,omitempty"`
  343. AutoScaleEnabled bool `json:"auto_scale_enabled,omitempty"`
  344. Recommendation *ScalingRecommendation `json:"recommendation,omitempty"`
  345. ShardHealth map[int]*ShardHealthStatus `json:"shard_health,omitempty"`
  346. PerformanceAnalysis *PerformanceAnalysis `json:"performance_analysis,omitempty"`
  347. }
  348. // FlushAll flushes all pending operations
  349. func (pi *ParallelIndexer) FlushAll() error {
  350. // Check if indexer is still running
  351. if atomic.LoadInt32(&pi.running) != 1 {
  352. return fmt.Errorf("indexer not running")
  353. }
  354. // Get all shards and flush them
  355. shards := pi.shardManager.GetAllShards()
  356. var errs []error
  357. for i, shard := range shards {
  358. if shard == nil {
  359. continue
  360. }
  361. // Force flush by creating and immediately deleting a temporary document
  362. batch := shard.NewBatch()
  363. // Use efficient string building instead of fmt.Sprintf
  364. tempIDBuf := make([]byte, 0, 64)
  365. tempIDBuf = append(tempIDBuf, "_flush_temp_"...)
  366. tempIDBuf = utils.AppendInt(tempIDBuf, i)
  367. tempIDBuf = append(tempIDBuf, '_')
  368. tempIDBuf = utils.AppendInt(tempIDBuf, int(time.Now().UnixNano()))
  369. tempID := utils.BytesToStringUnsafe(tempIDBuf)
  370. batch.Index(tempID, map[string]interface{}{"_temp": true})
  371. if err := shard.Batch(batch); err != nil {
  372. errs = append(errs, fmt.Errorf("failed to flush shard %d: %w", i, err))
  373. continue
  374. }
  375. // Delete the temporary document
  376. shard.Delete(tempID)
  377. }
  378. if len(errs) > 0 {
  379. return fmt.Errorf("flush errors: %v", errs)
  380. }
  381. return nil
  382. }
  383. // Optimize triggers optimization of all shards
  384. func (pi *ParallelIndexer) Optimize() error {
  385. if !atomic.CompareAndSwapInt32(&pi.optimizing, 0, 1) {
  386. return fmt.Errorf("optimization already in progress")
  387. }
  388. defer atomic.StoreInt32(&pi.optimizing, 0)
  389. startTime := time.Now()
  390. stats := pi.shardManager.GetShardStats()
  391. var errs []error
  392. for _, stat := range stats {
  393. if err := pi.shardManager.OptimizeShard(stat.ID); err != nil {
  394. errs = append(errs, fmt.Errorf("failed to optimize shard %d: %w", stat.ID, err))
  395. }
  396. }
  397. // Update optimization stats
  398. pi.statsMutex.Lock()
  399. if pi.stats.OptimizationStats == nil {
  400. pi.stats.OptimizationStats = &OptimizationStats{}
  401. }
  402. pi.stats.OptimizationStats.LastRun = time.Now().Unix()
  403. pi.stats.OptimizationStats.Duration = time.Since(startTime)
  404. pi.stats.OptimizationStats.Success = len(errs) == 0
  405. pi.stats.LastOptimized = time.Now().Unix()
  406. pi.statsMutex.Unlock()
  407. atomic.StoreInt64(&pi.lastOptimized, time.Now().Unix())
  408. if len(errs) > 0 {
  409. return fmt.Errorf("optimization errors: %v", errs)
  410. }
  411. // Record optimization metrics
  412. pi.metrics.RecordOptimization(time.Since(startTime), len(errs) == 0)
  413. return nil
  414. }
  415. // IndexLogFile reads and indexes a single log file
  416. func (pi *ParallelIndexer) IndexLogFile(filePath string) error {
  417. if !pi.IsHealthy() {
  418. return fmt.Errorf("indexer not healthy")
  419. }
  420. file, err := os.Open(filePath)
  421. if err != nil {
  422. return fmt.Errorf("failed to open log file %s: %w", filePath, err)
  423. }
  424. defer file.Close()
  425. // Use a batch writer for efficient indexing
  426. batch := pi.StartBatch()
  427. scanner := bufio.NewScanner(file)
  428. docCount := 0
  429. for scanner.Scan() {
  430. line := scanner.Text()
  431. if line == "" {
  432. continue
  433. }
  434. // In a real implementation, parse the log line into a structured format
  435. // For now, we create a simple document
  436. logDoc, err := ParseLogLine(line) // Assuming a parser function exists
  437. if err != nil {
  438. logger.Warnf("Skipping line due to parse error in file %s: %v", filePath, err)
  439. continue
  440. }
  441. logDoc.FilePath = filePath
  442. // Use efficient string building for document ID
  443. docIDBuf := make([]byte, 0, len(filePath)+16)
  444. docIDBuf = append(docIDBuf, filePath...)
  445. docIDBuf = append(docIDBuf, '-')
  446. docIDBuf = utils.AppendInt(docIDBuf, int(docCount))
  447. doc := &Document{
  448. ID: utils.BytesToStringUnsafe(docIDBuf),
  449. Fields: logDoc,
  450. }
  451. if err := batch.Add(doc); err != nil {
  452. // This indicates an auto-flush occurred and failed.
  453. // Log the error and stop processing this file to avoid further issues.
  454. return fmt.Errorf("failed to add document to batch for %s (auto-flush might have failed): %w", filePath, err)
  455. }
  456. docCount++
  457. }
  458. if err := scanner.Err(); err != nil {
  459. return fmt.Errorf("error reading log file %s: %w", filePath, err)
  460. }
  461. if _, err := batch.Flush(); err != nil {
  462. return fmt.Errorf("failed to flush batch for %s: %w", filePath, err)
  463. }
  464. return nil
  465. }
  466. // GetStats returns current indexer statistics
  467. func (pi *ParallelIndexer) GetStats() *IndexStats {
  468. pi.statsMutex.RLock()
  469. defer pi.statsMutex.RUnlock()
  470. // Update shard stats
  471. shardStats := pi.shardManager.GetShardStats()
  472. pi.stats.Shards = shardStats
  473. pi.stats.ShardCount = len(shardStats)
  474. var totalDocs uint64
  475. var totalSize int64
  476. for _, shard := range shardStats {
  477. totalDocs += shard.DocumentCount
  478. totalSize += shard.Size
  479. }
  480. pi.stats.TotalDocuments = totalDocs
  481. pi.stats.TotalSize = totalSize
  482. pi.stats.QueueSize = len(pi.jobQueue)
  483. // Calculate memory usage
  484. var memStats runtime.MemStats
  485. runtime.ReadMemStats(&memStats)
  486. pi.stats.MemoryUsage = int64(memStats.Alloc)
  487. // Copy stats to avoid race conditions
  488. statsCopy := *pi.stats
  489. return &statsCopy
  490. }
  491. // IsRunning returns whether the indexer is currently running
  492. func (pi *ParallelIndexer) IsRunning() bool {
  493. return atomic.LoadInt32(&pi.running) != 0
  494. }
  495. // GetShardInfo returns information about a specific shard
  496. func (pi *ParallelIndexer) GetShardInfo(shardID int) (*ShardInfo, error) {
  497. shardStats := pi.shardManager.GetShardStats()
  498. for _, stat := range shardStats {
  499. if stat.ID == shardID {
  500. return stat, nil
  501. }
  502. }
  503. return nil, fmt.Errorf("%s: %d", ErrShardNotFound, shardID)
  504. }
  505. // IsHealthy checks if the indexer is running and healthy
  506. func (pi *ParallelIndexer) IsHealthy() bool {
  507. if atomic.LoadInt32(&pi.running) != 1 {
  508. return false
  509. }
  510. // Check shard manager health
  511. return pi.shardManager.HealthCheck() == nil
  512. }
  513. // GetConfig returns the current configuration
  514. func (pi *ParallelIndexer) GetConfig() *Config {
  515. return pi.config
  516. }
  517. // GetAllShards returns all managed shards
  518. func (pi *ParallelIndexer) GetAllShards() []bleve.Index {
  519. return pi.shardManager.GetAllShards()
  520. }
  521. // DeleteIndexByLogGroup deletes all index entries for a specific log group (base path and its rotated files)
  522. func (pi *ParallelIndexer) DeleteIndexByLogGroup(basePath string, logFileManager interface{}) error {
  523. if !pi.IsHealthy() {
  524. return fmt.Errorf("indexer not healthy")
  525. }
  526. // Get all file paths for this log group from the database
  527. if logFileManager == nil {
  528. return fmt.Errorf("log file manager is required")
  529. }
  530. lfm, ok := logFileManager.(GroupFileProvider)
  531. if !ok {
  532. return fmt.Errorf("log file manager does not support GetFilePathsForGroup")
  533. }
  534. filesToDelete, err := lfm.GetFilePathsForGroup(basePath)
  535. if err != nil {
  536. return fmt.Errorf("failed to get file paths for log group %s: %w", basePath, err)
  537. }
  538. logger.Infof("Deleting index entries for log group %s, files: %v", basePath, filesToDelete)
  539. // Delete documents from all shards for these files
  540. shards := pi.shardManager.GetAllShards()
  541. var deleteErrors []error
  542. for _, shard := range shards {
  543. // Search for documents with matching file_path
  544. for _, filePath := range filesToDelete {
  545. query := bleve.NewTermQuery(filePath)
  546. query.SetField("file_path")
  547. searchRequest := bleve.NewSearchRequest(query)
  548. searchRequest.Size = 1000 // Process in batches
  549. searchRequest.Fields = []string{"file_path"}
  550. for {
  551. searchResult, err := shard.Search(searchRequest)
  552. if err != nil {
  553. deleteErrors = append(deleteErrors, fmt.Errorf("failed to search for documents in file %s: %w", filePath, err))
  554. break
  555. }
  556. if len(searchResult.Hits) == 0 {
  557. break // No more documents to delete
  558. }
  559. // Delete documents in batch
  560. batch := shard.NewBatch()
  561. for _, hit := range searchResult.Hits {
  562. batch.Delete(hit.ID)
  563. }
  564. if err := shard.Batch(batch); err != nil {
  565. deleteErrors = append(deleteErrors, fmt.Errorf("failed to delete batch for file %s: %w", filePath, err))
  566. }
  567. // If we got fewer results than requested, we're done
  568. if len(searchResult.Hits) < searchRequest.Size {
  569. break
  570. }
  571. // Continue from where we left off
  572. searchRequest.From += searchRequest.Size
  573. }
  574. }
  575. }
  576. if len(deleteErrors) > 0 {
  577. return fmt.Errorf("encountered %d errors during deletion: %v", len(deleteErrors), deleteErrors[0])
  578. }
  579. logger.Infof("Successfully deleted index entries for log group: %s", basePath)
  580. return nil
  581. }
  582. // DestroyAllIndexes closes and deletes all index data from disk.
  583. func (pi *ParallelIndexer) DestroyAllIndexes(parentCtx context.Context) error {
  584. // Stop all background routines before deleting files
  585. pi.cancel()
  586. pi.wg.Wait()
  587. // Safely close channels if they haven't been closed yet
  588. if atomic.CompareAndSwapInt32(&pi.channelsClosed, 0, 1) {
  589. close(pi.jobQueue)
  590. close(pi.resultQueue)
  591. }
  592. atomic.StoreInt32(&pi.running, 0) // Mark as not running
  593. var destructionErr error
  594. if manager, ok := pi.shardManager.(*DefaultShardManager); ok {
  595. destructionErr = manager.Destroy()
  596. } else {
  597. destructionErr = fmt.Errorf("shard manager does not support destruction")
  598. }
  599. // Re-initialize context and channels for a potential restart using parent context
  600. pi.ctx, pi.cancel = context.WithCancel(parentCtx)
  601. pi.jobQueue = make(chan *IndexJob, pi.config.MaxQueueSize)
  602. pi.resultQueue = make(chan *IndexResult, pi.config.WorkerCount)
  603. atomic.StoreInt32(&pi.channelsClosed, 0) // Reset the channel closed flag
  604. return destructionErr
  605. }
  606. // IndexLogGroup finds all files related to a base log path (e.g., rotated logs) and indexes them.
  607. // It returns a map of [filePath -> docCount], and the min/max timestamps found.
  608. func (pi *ParallelIndexer) IndexLogGroup(basePath string) (map[string]uint64, *time.Time, *time.Time, error) {
  609. if !pi.IsHealthy() {
  610. return nil, nil, nil, fmt.Errorf("indexer not healthy")
  611. }
  612. // Find all files belonging to this log group by globbing
  613. globPath := basePath + "*"
  614. matches, err := filepath.Glob(globPath)
  615. if err != nil {
  616. return nil, nil, nil, fmt.Errorf("failed to glob for log files with base %s: %w", basePath, err)
  617. }
  618. // filepath.Glob might not match the base file itself if it has no extension,
  619. // so we check for it explicitly and add it to the list.
  620. info, err := os.Stat(basePath)
  621. if err == nil && info.Mode().IsRegular() {
  622. matches = append(matches, basePath)
  623. }
  624. // Deduplicate file list
  625. seen := make(map[string]struct{})
  626. uniqueFiles := make([]string, 0)
  627. for _, match := range matches {
  628. if _, ok := seen[match]; !ok {
  629. // Further check if it's a file, not a directory. Glob can match dirs.
  630. info, err := os.Stat(match)
  631. if err == nil && info.Mode().IsRegular() {
  632. seen[match] = struct{}{}
  633. uniqueFiles = append(uniqueFiles, match)
  634. }
  635. }
  636. }
  637. if len(uniqueFiles) == 0 {
  638. logger.Warnf("No actual log file found for group: %s", basePath)
  639. return nil, nil, nil, nil
  640. }
  641. logger.Infof("Found %d file(s) for log group %s: %v", len(uniqueFiles), basePath, uniqueFiles)
  642. docsCountMap := make(map[string]uint64)
  643. var overallMinTime, overallMaxTime *time.Time
  644. for _, filePath := range uniqueFiles {
  645. docsIndexed, minTime, maxTime, err := pi.indexSingleFile(filePath)
  646. if err != nil {
  647. logger.Warnf("Failed to index file '%s' in group '%s', skipping: %v", filePath, basePath, err)
  648. continue // Continue with the next file
  649. }
  650. docsCountMap[filePath] = docsIndexed
  651. if minTime != nil {
  652. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  653. overallMinTime = minTime
  654. }
  655. }
  656. if maxTime != nil {
  657. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  658. overallMaxTime = maxTime
  659. }
  660. }
  661. }
  662. return docsCountMap, overallMinTime, overallMaxTime, nil
  663. }
  664. // indexSingleFile contains the logic to process one physical log file.
  665. // It returns the number of documents indexed from the file, and the min/max timestamps.
  666. func (pi *ParallelIndexer) indexSingleFile(filePath string) (uint64, *time.Time, *time.Time, error) {
  667. file, err := os.Open(filePath)
  668. if err != nil {
  669. return 0, nil, nil, fmt.Errorf("failed to open log file %s: %w", filePath, err)
  670. }
  671. defer file.Close()
  672. var reader io.Reader = file
  673. // Handle gzipped files
  674. if strings.HasSuffix(filePath, ".gz") {
  675. gz, err := gzip.NewReader(file)
  676. if err != nil {
  677. return 0, nil, nil, fmt.Errorf("failed to create gzip reader for %s: %w", filePath, err)
  678. }
  679. defer gz.Close()
  680. reader = gz
  681. }
  682. logger.Infof("Starting to process file: %s", filePath)
  683. batch := pi.StartBatch()
  684. scanner := bufio.NewScanner(reader)
  685. docCount := 0
  686. var minTime, maxTime *time.Time
  687. for scanner.Scan() {
  688. line := scanner.Text()
  689. if line == "" {
  690. continue
  691. }
  692. logDoc, err := ParseLogLine(line)
  693. if err != nil {
  694. logger.Warnf("Skipping line due to parse error in file %s: %v", filePath, err)
  695. continue
  696. }
  697. logDoc.FilePath = filePath
  698. // Track min/max timestamps
  699. ts := time.Unix(logDoc.Timestamp, 0)
  700. if minTime == nil || ts.Before(*minTime) {
  701. minTime = &ts
  702. }
  703. if maxTime == nil || ts.After(*maxTime) {
  704. maxTime = &ts
  705. }
  706. // Use efficient string building for document ID
  707. docIDBuf := make([]byte, 0, len(filePath)+16)
  708. docIDBuf = append(docIDBuf, filePath...)
  709. docIDBuf = append(docIDBuf, '-')
  710. docIDBuf = utils.AppendInt(docIDBuf, int(docCount))
  711. doc := &Document{
  712. ID: utils.BytesToStringUnsafe(docIDBuf),
  713. Fields: logDoc,
  714. }
  715. if err := batch.Add(doc); err != nil {
  716. // This indicates an auto-flush occurred and failed.
  717. // Log the error and stop processing this file to avoid further issues.
  718. return uint64(docCount), minTime, maxTime, fmt.Errorf("failed to add document to batch for %s (auto-flush might have failed): %w", filePath, err)
  719. }
  720. docCount++
  721. }
  722. if err := scanner.Err(); err != nil {
  723. return uint64(docCount), minTime, maxTime, fmt.Errorf("error reading log file %s: %w", filePath, err)
  724. }
  725. logger.Infof("Finished processing file: %s. Total lines processed: %d", filePath, docCount)
  726. if docCount > 0 {
  727. if _, err := batch.Flush(); err != nil {
  728. return uint64(docCount), minTime, maxTime, fmt.Errorf("failed to flush batch for %s: %w", filePath, err)
  729. }
  730. }
  731. return uint64(docCount), minTime, maxTime, nil
  732. }
  733. // UpdateConfig updates the indexer configuration
  734. func (pi *ParallelIndexer) UpdateConfig(config *Config) error {
  735. // Only allow updating certain configuration parameters while running
  736. pi.config.BatchSize = config.BatchSize
  737. pi.config.FlushInterval = config.FlushInterval
  738. pi.config.EnableMetrics = config.EnableMetrics
  739. return nil
  740. }
  741. // Worker implementation
  742. func (w *indexWorker) run() {
  743. defer w.indexer.wg.Done()
  744. w.updateStatus(WorkerStatusIdle)
  745. for {
  746. select {
  747. case job, ok := <-w.indexer.jobQueue:
  748. if !ok {
  749. return // Channel closed, worker should exit
  750. }
  751. w.updateStatus(WorkerStatusBusy)
  752. result := w.processJob(job)
  753. // Send result
  754. select {
  755. case w.indexer.resultQueue <- result:
  756. case <-w.indexer.ctx.Done():
  757. return
  758. }
  759. // Execute callback if provided
  760. if job.Callback != nil {
  761. var err error
  762. if result.Failed > 0 {
  763. err = fmt.Errorf("indexing failed for %d documents", result.Failed)
  764. }
  765. job.Callback(err)
  766. }
  767. w.updateStatus(WorkerStatusIdle)
  768. case <-w.indexer.ctx.Done():
  769. return
  770. }
  771. }
  772. }
  773. func (w *indexWorker) processJob(job *IndexJob) *IndexResult {
  774. startTime := time.Now()
  775. result := &IndexResult{
  776. Processed: len(job.Documents),
  777. }
  778. // Group documents by shard
  779. shardDocs := make(map[int][]*Document)
  780. for _, doc := range job.Documents {
  781. if doc.ID == "" {
  782. result.Failed++
  783. continue
  784. }
  785. _, shardID, err := w.indexer.shardManager.GetShard(doc.ID)
  786. if err != nil {
  787. result.Failed++
  788. continue
  789. }
  790. shardDocs[shardID] = append(shardDocs[shardID], doc)
  791. }
  792. // Index documents per shard
  793. for shardID, docs := range shardDocs {
  794. if err := w.indexShardDocuments(shardID, docs); err != nil {
  795. result.Failed += len(docs)
  796. } else {
  797. result.Succeeded += len(docs)
  798. }
  799. }
  800. result.Duration = time.Since(startTime)
  801. if result.Processed > 0 {
  802. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  803. result.Throughput = float64(result.Processed) / result.Duration.Seconds()
  804. }
  805. // Update worker stats
  806. w.statsMutex.Lock()
  807. w.stats.ProcessedJobs++
  808. w.stats.ProcessedDocs += int64(result.Processed)
  809. w.stats.ErrorCount += int64(result.Failed)
  810. w.stats.LastActive = time.Now().Unix()
  811. // Update average latency (simple moving average)
  812. if w.stats.AverageLatency == 0 {
  813. w.stats.AverageLatency = result.Duration
  814. } else {
  815. w.stats.AverageLatency = (w.stats.AverageLatency + result.Duration) / 2
  816. }
  817. w.statsMutex.Unlock()
  818. return result
  819. }
  820. func (w *indexWorker) indexShardDocuments(shardID int, docs []*Document) error {
  821. shard, err := w.indexer.shardManager.GetShardByID(shardID)
  822. if err != nil {
  823. return err
  824. }
  825. batch := shard.NewBatch()
  826. for _, doc := range docs {
  827. // Convert LogDocument to map for Bleve indexing
  828. docMap := w.logDocumentToMap(doc.Fields)
  829. batch.Index(doc.ID, docMap)
  830. }
  831. if err := shard.Batch(batch); err != nil {
  832. return fmt.Errorf("failed to index batch for shard %d: %w", shardID, err)
  833. }
  834. return nil
  835. }
  836. // logDocumentToMap converts LogDocument to map[string]interface{} for Bleve
  837. func (w *indexWorker) logDocumentToMap(doc *LogDocument) map[string]interface{} {
  838. docMap := map[string]interface{}{
  839. "timestamp": doc.Timestamp,
  840. "ip": doc.IP,
  841. "method": doc.Method,
  842. "path": doc.Path,
  843. "path_exact": doc.PathExact,
  844. "status": doc.Status,
  845. "bytes_sent": doc.BytesSent,
  846. "file_path": doc.FilePath,
  847. "raw": doc.Raw,
  848. }
  849. // Add optional fields only if they have values
  850. if doc.RegionCode != "" {
  851. docMap["region_code"] = doc.RegionCode
  852. }
  853. if doc.Province != "" {
  854. docMap["province"] = doc.Province
  855. }
  856. if doc.City != "" {
  857. docMap["city"] = doc.City
  858. }
  859. if doc.Protocol != "" {
  860. docMap["protocol"] = doc.Protocol
  861. }
  862. if doc.Referer != "" {
  863. docMap["referer"] = doc.Referer
  864. }
  865. if doc.UserAgent != "" {
  866. docMap["user_agent"] = doc.UserAgent
  867. }
  868. if doc.Browser != "" {
  869. docMap["browser"] = doc.Browser
  870. }
  871. if doc.BrowserVer != "" {
  872. docMap["browser_version"] = doc.BrowserVer
  873. }
  874. if doc.OS != "" {
  875. docMap["os"] = doc.OS
  876. }
  877. if doc.OSVersion != "" {
  878. docMap["os_version"] = doc.OSVersion
  879. }
  880. if doc.DeviceType != "" {
  881. docMap["device_type"] = doc.DeviceType
  882. }
  883. if doc.RequestTime > 0 {
  884. docMap["request_time"] = doc.RequestTime
  885. }
  886. if doc.UpstreamTime != nil {
  887. docMap["upstream_time"] = *doc.UpstreamTime
  888. }
  889. return docMap
  890. }
  891. func (w *indexWorker) updateStatus(status string) {
  892. w.statsMutex.Lock()
  893. w.stats.Status = status
  894. w.statsMutex.Unlock()
  895. }
  896. // Background routines
  897. func (pi *ParallelIndexer) processResults() {
  898. defer pi.wg.Done()
  899. for {
  900. select {
  901. case result := <-pi.resultQueue:
  902. if result != nil {
  903. pi.metrics.RecordIndexOperation(
  904. result.Processed,
  905. result.Duration,
  906. result.Failed == 0,
  907. )
  908. }
  909. case <-pi.ctx.Done():
  910. return
  911. }
  912. }
  913. }
  914. func (pi *ParallelIndexer) optimizationRoutine() {
  915. defer pi.wg.Done()
  916. ticker := time.NewTicker(pi.config.OptimizeInterval)
  917. defer ticker.Stop()
  918. for {
  919. select {
  920. case <-ticker.C:
  921. if atomic.LoadInt32(&pi.optimizing) == 0 {
  922. go pi.Optimize() // Run in background to avoid blocking
  923. }
  924. case <-pi.ctx.Done():
  925. return
  926. }
  927. }
  928. }
  929. func (pi *ParallelIndexer) metricsRoutine() {
  930. defer pi.wg.Done()
  931. ticker := time.NewTicker(10 * time.Second)
  932. defer ticker.Stop()
  933. for {
  934. select {
  935. case <-ticker.C:
  936. pi.updateMetrics()
  937. case <-pi.ctx.Done():
  938. return
  939. }
  940. }
  941. }
  942. func (pi *ParallelIndexer) updateMetrics() {
  943. pi.statsMutex.Lock()
  944. defer pi.statsMutex.Unlock()
  945. // Update indexing rate based on recent activity
  946. metrics := pi.metrics.GetMetrics()
  947. pi.stats.IndexingRate = metrics.IndexingRate
  948. }
  949. // IndexLogGroupWithProgress indexes a log group with progress tracking
  950. func (pi *ParallelIndexer) IndexLogGroupWithProgress(basePath string, progressConfig *ProgressConfig) (map[string]uint64, *time.Time, *time.Time, error) {
  951. if !pi.IsHealthy() {
  952. return nil, nil, nil, fmt.Errorf("indexer not healthy")
  953. }
  954. // Create progress tracker if config is provided
  955. var progressTracker *ProgressTracker
  956. if progressConfig != nil {
  957. progressTracker = NewProgressTracker(basePath, progressConfig)
  958. }
  959. // Find all files belonging to this log group by globbing
  960. globPath := basePath + "*"
  961. matches, err := filepath.Glob(globPath)
  962. if err != nil {
  963. if progressTracker != nil {
  964. progressTracker.Cancel(fmt.Sprintf("glob failed: %v", err))
  965. }
  966. return nil, nil, nil, fmt.Errorf("failed to glob for log files with base %s: %w", basePath, err)
  967. }
  968. // filepath.Glob might not match the base file itself if it has no extension,
  969. // so we check for it explicitly and add it to the list.
  970. info, err := os.Stat(basePath)
  971. if err == nil && info.Mode().IsRegular() {
  972. matches = append(matches, basePath)
  973. }
  974. // Deduplicate file list
  975. seen := make(map[string]struct{})
  976. uniqueFiles := make([]string, 0)
  977. for _, match := range matches {
  978. if _, ok := seen[match]; !ok {
  979. // Further check if it's a file, not a directory. Glob can match dirs.
  980. info, err := os.Stat(match)
  981. if err == nil && info.Mode().IsRegular() {
  982. seen[match] = struct{}{}
  983. uniqueFiles = append(uniqueFiles, match)
  984. }
  985. }
  986. }
  987. if len(uniqueFiles) == 0 {
  988. logger.Warnf("No actual log file found for group: %s", basePath)
  989. if progressTracker != nil {
  990. progressTracker.Cancel("no files found")
  991. }
  992. return nil, nil, nil, nil
  993. }
  994. logger.Infof("Found %d file(s) for log group %s: %v", len(uniqueFiles), basePath, uniqueFiles)
  995. // Set up progress tracking for all files
  996. if progressTracker != nil {
  997. for _, filePath := range uniqueFiles {
  998. isCompressed := IsCompressedFile(filePath)
  999. progressTracker.AddFile(filePath, isCompressed)
  1000. // Get file size and estimate lines
  1001. if stat, err := os.Stat(filePath); err == nil {
  1002. progressTracker.SetFileSize(filePath, stat.Size())
  1003. // Estimate lines for progress calculation
  1004. if estimatedLines, err := EstimateFileLines(context.Background(), filePath, stat.Size(), isCompressed); err == nil {
  1005. progressTracker.SetFileEstimate(filePath, estimatedLines)
  1006. }
  1007. }
  1008. }
  1009. }
  1010. docsCountMap := make(map[string]uint64)
  1011. var overallMinTime, overallMaxTime *time.Time
  1012. // Process each file with progress tracking
  1013. for _, filePath := range uniqueFiles {
  1014. if progressTracker != nil {
  1015. progressTracker.StartFile(filePath)
  1016. }
  1017. docsIndexed, minTime, maxTime, err := pi.indexSingleFileWithProgress(filePath, progressTracker)
  1018. if err != nil {
  1019. logger.Warnf("Failed to index file '%s' in group '%s', skipping: %v", filePath, basePath, err)
  1020. if progressTracker != nil {
  1021. progressTracker.FailFile(filePath, err.Error())
  1022. }
  1023. continue // Continue with the next file
  1024. }
  1025. docsCountMap[filePath] = docsIndexed
  1026. if progressTracker != nil {
  1027. progressTracker.CompleteFile(filePath, int64(docsIndexed))
  1028. }
  1029. if minTime != nil {
  1030. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  1031. overallMinTime = minTime
  1032. }
  1033. }
  1034. if maxTime != nil {
  1035. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  1036. overallMaxTime = maxTime
  1037. }
  1038. }
  1039. }
  1040. return docsCountMap, overallMinTime, overallMaxTime, nil
  1041. }
  1042. // indexSingleFileWithProgress indexes a single file with progress updates
  1043. func (pi *ParallelIndexer) indexSingleFileWithProgress(filePath string, progressTracker *ProgressTracker) (uint64, *time.Time, *time.Time, error) {
  1044. // If no progress tracker, just call the original method
  1045. if progressTracker == nil {
  1046. return pi.indexSingleFile(filePath)
  1047. }
  1048. // Call the original indexing method to do the actual indexing work
  1049. docsIndexed, minTime, maxTime, err := pi.indexSingleFile(filePath)
  1050. if err != nil {
  1051. return 0, nil, nil, err
  1052. }
  1053. // Just do one final progress update when done - no artificial delays
  1054. if progressTracker != nil && docsIndexed > 0 {
  1055. if strings.HasSuffix(filePath, ".gz") {
  1056. progressTracker.UpdateFileProgress(filePath, int64(docsIndexed))
  1057. } else {
  1058. // Estimate position based on average line size
  1059. estimatedPos := int64(docsIndexed * 150) // Assume ~150 bytes per line
  1060. progressTracker.UpdateFileProgress(filePath, int64(docsIndexed), estimatedPos)
  1061. }
  1062. }
  1063. // Return the actual timestamps from the original method
  1064. return docsIndexed, minTime, maxTime, nil
  1065. }