parallel_indexer.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440
  1. package indexer
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
  16. "github.com/blevesearch/bleve/v2"
  17. "github.com/uozi-tech/cosy/logger"
  18. )
  19. // ParallelIndexer provides high-performance parallel indexing with sharding
  20. type ParallelIndexer struct {
  21. config *Config
  22. shardManager ShardManager
  23. metrics MetricsCollector
  24. // Worker management
  25. workers []*indexWorker
  26. jobQueue chan *IndexJob
  27. resultQueue chan *IndexResult
  28. // State management
  29. ctx context.Context
  30. cancel context.CancelFunc
  31. wg sync.WaitGroup
  32. running int32
  33. // Cleanup control
  34. stopOnce sync.Once
  35. channelsClosed int32
  36. // Statistics
  37. stats *IndexStats
  38. statsMutex sync.RWMutex
  39. // Optimization
  40. lastOptimized int64
  41. optimizing int32
  42. adaptiveOptimizer *AdaptiveOptimizer
  43. zeroAllocProcessor *ZeroAllocBatchProcessor
  44. optimizationEnabled bool
  45. // Dynamic shard awareness
  46. dynamicAwareness *DynamicShardAwareness
  47. }
  48. // indexWorker represents a single indexing worker
  49. type indexWorker struct {
  50. id int
  51. indexer *ParallelIndexer
  52. stats *WorkerStats
  53. statsMutex sync.RWMutex
  54. }
  55. // NewParallelIndexer creates a new parallel indexer with dynamic shard awareness
  56. func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelIndexer {
  57. if config == nil {
  58. config = DefaultIndexerConfig()
  59. }
  60. ctx, cancel := context.WithCancel(context.Background())
  61. // Initialize dynamic shard awareness
  62. dynamicAwareness := NewDynamicShardAwareness(config)
  63. // If no shard manager provided, use dynamic awareness to detect optimal type
  64. var actualShardManager ShardManager
  65. if shardManager == nil {
  66. detected, err := dynamicAwareness.DetectAndSetupShardManager()
  67. if err != nil {
  68. logger.Warnf("Failed to setup dynamic shard manager, using default: %v", err)
  69. detected = NewDefaultShardManager(config)
  70. detected.(*DefaultShardManager).Initialize()
  71. }
  72. // Type assertion to ShardManager interface
  73. if sm, ok := detected.(ShardManager); ok {
  74. actualShardManager = sm
  75. } else {
  76. // Fallback to default
  77. actualShardManager = NewDefaultShardManager(config)
  78. actualShardManager.(*DefaultShardManager).Initialize()
  79. }
  80. } else {
  81. actualShardManager = shardManager
  82. }
  83. ao := NewAdaptiveOptimizer(config)
  84. indexer := &ParallelIndexer{
  85. config: config,
  86. shardManager: actualShardManager,
  87. metrics: NewDefaultMetricsCollector(),
  88. jobQueue: make(chan *IndexJob, config.MaxQueueSize),
  89. resultQueue: make(chan *IndexResult, config.WorkerCount),
  90. ctx: ctx,
  91. cancel: cancel,
  92. stats: &IndexStats{
  93. WorkerStats: make([]*WorkerStats, config.WorkerCount),
  94. },
  95. adaptiveOptimizer: ao,
  96. zeroAllocProcessor: NewZeroAllocBatchProcessor(config),
  97. optimizationEnabled: true, // Enable optimizations by default
  98. dynamicAwareness: dynamicAwareness,
  99. }
  100. // Set up the activity poller for the adaptive optimizer
  101. if indexer.adaptiveOptimizer != nil {
  102. indexer.adaptiveOptimizer.SetActivityPoller(indexer)
  103. }
  104. // Initialize workers
  105. indexer.workers = make([]*indexWorker, config.WorkerCount)
  106. for i := 0; i < config.WorkerCount; i++ {
  107. indexer.workers[i] = &indexWorker{
  108. id: i,
  109. indexer: indexer,
  110. stats: &WorkerStats{
  111. ID: i,
  112. Status: WorkerStatusIdle,
  113. },
  114. }
  115. indexer.stats.WorkerStats[i] = indexer.workers[i].stats
  116. }
  117. return indexer
  118. }
  119. // Start begins the indexer operation
  120. func (pi *ParallelIndexer) Start(ctx context.Context) error {
  121. if !atomic.CompareAndSwapInt32(&pi.running, 0, 1) {
  122. return fmt.Errorf("indexer not started")
  123. }
  124. // Initialize shard manager
  125. if err := pi.shardManager.Initialize(); err != nil {
  126. atomic.StoreInt32(&pi.running, 0)
  127. return fmt.Errorf("failed to initialize shard manager: %w", err)
  128. }
  129. // Start workers
  130. for _, worker := range pi.workers {
  131. pi.wg.Add(1)
  132. go worker.run()
  133. }
  134. // Start result processor
  135. pi.wg.Add(1)
  136. go pi.processResults()
  137. // Start optimization routine if enabled
  138. if pi.config.OptimizeInterval > 0 {
  139. pi.wg.Add(1)
  140. go pi.optimizationRoutine()
  141. }
  142. // Start metrics collection if enabled
  143. if pi.config.EnableMetrics {
  144. pi.wg.Add(1)
  145. go pi.metricsRoutine()
  146. }
  147. // Start adaptive optimizer if enabled
  148. if pi.optimizationEnabled && pi.adaptiveOptimizer != nil {
  149. // Set worker count change callback
  150. logger.Debugf("Setting up adaptive optimizer callback for worker count changes")
  151. pi.adaptiveOptimizer.SetWorkerCountChangeCallback(pi.handleWorkerCountChange)
  152. if err := pi.adaptiveOptimizer.Start(); err != nil {
  153. logger.Warnf("Failed to start adaptive optimizer: %v", err)
  154. } else {
  155. logger.Debugf("Adaptive optimizer started successfully")
  156. }
  157. }
  158. // Start dynamic shard awareness monitoring if enabled
  159. if pi.dynamicAwareness != nil {
  160. pi.dynamicAwareness.StartMonitoring(ctx)
  161. if pi.dynamicAwareness.IsDynamic() {
  162. logger.Info("Dynamic shard management is active with automatic scaling")
  163. } else {
  164. logger.Info("Static shard management is active")
  165. }
  166. }
  167. return nil
  168. }
  169. // handleWorkerCountChange handles dynamic worker count adjustments from adaptive optimizer
  170. func (pi *ParallelIndexer) handleWorkerCountChange(oldCount, newCount int) {
  171. logger.Infof("Handling worker count change from %d to %d", oldCount, newCount)
  172. // Check if indexer is running
  173. if atomic.LoadInt32(&pi.running) != 1 {
  174. logger.Warn("Cannot adjust worker count: indexer not running")
  175. return
  176. }
  177. // Prevent concurrent worker adjustments
  178. pi.statsMutex.Lock()
  179. defer pi.statsMutex.Unlock()
  180. currentWorkerCount := len(pi.workers)
  181. if currentWorkerCount == newCount {
  182. return // Already at desired count
  183. }
  184. if newCount > currentWorkerCount {
  185. // Add more workers
  186. pi.addWorkers(newCount - currentWorkerCount)
  187. } else {
  188. // Remove workers
  189. pi.removeWorkers(currentWorkerCount - newCount)
  190. }
  191. // Update config to reflect the change
  192. pi.config.WorkerCount = newCount
  193. logger.Infof("Successfully adjusted worker count to %d", newCount)
  194. }
  195. // addWorkers adds new workers to the pool
  196. func (pi *ParallelIndexer) addWorkers(count int) {
  197. for i := 0; i < count; i++ {
  198. workerID := len(pi.workers)
  199. worker := &indexWorker{
  200. id: workerID,
  201. indexer: pi,
  202. stats: &WorkerStats{
  203. ID: workerID,
  204. Status: WorkerStatusIdle,
  205. },
  206. }
  207. pi.workers = append(pi.workers, worker)
  208. pi.stats.WorkerStats = append(pi.stats.WorkerStats, worker.stats)
  209. // Start the new worker
  210. pi.wg.Add(1)
  211. go worker.run()
  212. logger.Debugf("Added worker %d", workerID)
  213. }
  214. }
  215. // removeWorkers gracefully removes workers from the pool
  216. func (pi *ParallelIndexer) removeWorkers(count int) {
  217. if count >= len(pi.workers) {
  218. logger.Warn("Cannot remove all workers, keeping at least one")
  219. count = len(pi.workers) - 1
  220. }
  221. // Remove workers from the end of the slice
  222. workersToRemove := pi.workers[len(pi.workers)-count:]
  223. pi.workers = pi.workers[:len(pi.workers)-count]
  224. pi.stats.WorkerStats = pi.stats.WorkerStats[:len(pi.stats.WorkerStats)-count]
  225. // Note: In a full implementation, you would need to:
  226. // 1. Signal workers to stop gracefully after finishing current jobs
  227. // 2. Wait for them to complete
  228. // 3. Clean up their resources
  229. // For now, we just remove them from tracking
  230. for _, worker := range workersToRemove {
  231. logger.Debugf("Removed worker %d", worker.id)
  232. }
  233. }
  234. // Stop gracefully stops the indexer
  235. func (pi *ParallelIndexer) Stop() error {
  236. var stopErr error
  237. pi.stopOnce.Do(func() {
  238. // Set running to 0
  239. if !atomic.CompareAndSwapInt32(&pi.running, 1, 0) {
  240. logger.Warnf("[ParallelIndexer] Stop called but indexer already stopped")
  241. stopErr = fmt.Errorf("indexer already stopped")
  242. return
  243. }
  244. // Cancel context to stop all routines
  245. pi.cancel()
  246. // Stop adaptive optimizer
  247. if pi.adaptiveOptimizer != nil {
  248. pi.adaptiveOptimizer.Stop()
  249. }
  250. // Close channels safely if they haven't been closed yet
  251. if atomic.CompareAndSwapInt32(&pi.channelsClosed, 0, 1) {
  252. // Close job queue to stop accepting new jobs
  253. close(pi.jobQueue)
  254. // Wait for all workers to finish
  255. pi.wg.Wait()
  256. // Close result queue
  257. close(pi.resultQueue)
  258. } else {
  259. // If channels are already closed, just wait for workers
  260. pi.wg.Wait()
  261. }
  262. // Skip flush during stop - shards may already be closed by searcher
  263. // FlushAll should be called before Stop() if needed
  264. // Close the shard manager - this will close all shards and stop Bleve worker goroutines
  265. // This is critical to prevent goroutine leaks from Bleve's internal workers
  266. if pi.shardManager != nil {
  267. if err := pi.shardManager.Close(); err != nil {
  268. logger.Errorf("Failed to close shard manager: %v", err)
  269. stopErr = err
  270. }
  271. }
  272. })
  273. return stopErr
  274. }
  275. // IndexDocument indexes a single document
  276. func (pi *ParallelIndexer) IndexDocument(ctx context.Context, doc *Document) error {
  277. return pi.IndexDocuments(ctx, []*Document{doc})
  278. }
  279. // IndexDocuments indexes multiple documents
  280. func (pi *ParallelIndexer) IndexDocuments(ctx context.Context, docs []*Document) error {
  281. if !pi.IsHealthy() {
  282. return fmt.Errorf("indexer not started")
  283. }
  284. if len(docs) == 0 {
  285. return nil
  286. }
  287. // Create job
  288. job := &IndexJob{
  289. Documents: docs,
  290. Priority: PriorityNormal,
  291. }
  292. // Submit job and wait for completion
  293. done := make(chan error, 1)
  294. job.Callback = func(err error) {
  295. done <- err
  296. }
  297. select {
  298. case pi.jobQueue <- job:
  299. select {
  300. case err := <-done:
  301. return err
  302. case <-ctx.Done():
  303. return ctx.Err()
  304. }
  305. case <-ctx.Done():
  306. return ctx.Err()
  307. case <-pi.ctx.Done():
  308. return fmt.Errorf("indexer stopped")
  309. }
  310. }
  311. // IndexDocumentAsync indexes a document asynchronously
  312. func (pi *ParallelIndexer) IndexDocumentAsync(doc *Document, callback func(error)) {
  313. pi.IndexDocumentsAsync([]*Document{doc}, callback)
  314. }
  315. // IndexDocumentsAsync indexes multiple documents asynchronously
  316. func (pi *ParallelIndexer) IndexDocumentsAsync(docs []*Document, callback func(error)) {
  317. if !pi.IsHealthy() {
  318. if callback != nil {
  319. callback(fmt.Errorf("indexer not started"))
  320. }
  321. return
  322. }
  323. if len(docs) == 0 {
  324. if callback != nil {
  325. callback(nil)
  326. }
  327. return
  328. }
  329. job := &IndexJob{
  330. Documents: docs,
  331. Priority: PriorityNormal,
  332. Callback: callback,
  333. }
  334. select {
  335. case pi.jobQueue <- job:
  336. // Job queued successfully
  337. case <-pi.ctx.Done():
  338. if callback != nil {
  339. callback(fmt.Errorf("indexer stopped"))
  340. }
  341. default:
  342. // Queue is full
  343. if callback != nil {
  344. callback(fmt.Errorf("queue is full"))
  345. }
  346. }
  347. }
  348. // StartBatch returns a new batch writer with adaptive batch size
  349. func (pi *ParallelIndexer) StartBatch() BatchWriterInterface {
  350. batchSize := pi.config.BatchSize
  351. if pi.adaptiveOptimizer != nil {
  352. batchSize = pi.adaptiveOptimizer.GetOptimalBatchSize()
  353. }
  354. return NewBatchWriter(pi, batchSize)
  355. }
  356. // GetOptimizationStats returns current optimization statistics
  357. func (pi *ParallelIndexer) GetOptimizationStats() AdaptiveOptimizationStats {
  358. if pi.adaptiveOptimizer != nil {
  359. return pi.adaptiveOptimizer.GetOptimizationStats()
  360. }
  361. return AdaptiveOptimizationStats{}
  362. }
  363. // GetPoolStats returns object pool statistics
  364. func (pi *ParallelIndexer) GetPoolStats() PoolStats {
  365. if pi.zeroAllocProcessor != nil {
  366. return pi.zeroAllocProcessor.GetPoolStats()
  367. }
  368. return PoolStats{}
  369. }
  370. // EnableOptimizations enables or disables adaptive optimizations
  371. func (pi *ParallelIndexer) EnableOptimizations(enabled bool) {
  372. pi.optimizationEnabled = enabled
  373. if !enabled && pi.adaptiveOptimizer != nil {
  374. pi.adaptiveOptimizer.Stop()
  375. } else if enabled && pi.adaptiveOptimizer != nil && atomic.LoadInt32(&pi.running) == 1 {
  376. pi.adaptiveOptimizer.Start()
  377. }
  378. }
  379. // GetDynamicShardInfo returns information about dynamic shard management
  380. func (pi *ParallelIndexer) GetDynamicShardInfo() *DynamicShardInfo {
  381. if pi.dynamicAwareness == nil {
  382. return &DynamicShardInfo{
  383. IsEnabled: false,
  384. IsActive: false,
  385. ShardCount: pi.config.ShardCount,
  386. ShardType: "static",
  387. }
  388. }
  389. isDynamic := pi.dynamicAwareness.IsDynamic()
  390. shardManager := pi.dynamicAwareness.GetCurrentShardManager()
  391. info := &DynamicShardInfo{
  392. IsEnabled: true,
  393. IsActive: isDynamic,
  394. ShardCount: pi.config.ShardCount,
  395. ShardType: "static",
  396. }
  397. if isDynamic {
  398. info.ShardType = "dynamic"
  399. if enhancedManager, ok := shardManager.(*EnhancedDynamicShardManager); ok {
  400. info.TargetShardCount = enhancedManager.GetTargetShardCount()
  401. info.IsScaling = enhancedManager.IsScalingInProgress()
  402. info.AutoScaleEnabled = enhancedManager.IsAutoScaleEnabled()
  403. // Get scaling recommendation
  404. recommendation := enhancedManager.GetScalingRecommendations()
  405. info.Recommendation = recommendation
  406. // Get shard health
  407. info.ShardHealth = enhancedManager.GetShardHealth()
  408. }
  409. }
  410. // Get performance analysis
  411. analysis := pi.dynamicAwareness.GetPerformanceAnalysis()
  412. info.PerformanceAnalysis = &analysis
  413. return info
  414. }
  415. // DynamicShardInfo contains information about dynamic shard management status
  416. type DynamicShardInfo struct {
  417. IsEnabled bool `json:"is_enabled"`
  418. IsActive bool `json:"is_active"`
  419. ShardType string `json:"shard_type"` // "static" or "dynamic"
  420. ShardCount int `json:"shard_count"`
  421. TargetShardCount int `json:"target_shard_count,omitempty"`
  422. IsScaling bool `json:"is_scaling,omitempty"`
  423. AutoScaleEnabled bool `json:"auto_scale_enabled,omitempty"`
  424. Recommendation *ScalingRecommendation `json:"recommendation,omitempty"`
  425. ShardHealth map[int]*ShardHealthStatus `json:"shard_health,omitempty"`
  426. PerformanceAnalysis *PerformanceAnalysis `json:"performance_analysis,omitempty"`
  427. }
  428. // FlushAll flushes all pending operations
  429. func (pi *ParallelIndexer) FlushAll() error {
  430. // Check if indexer is still running
  431. if atomic.LoadInt32(&pi.running) != 1 {
  432. return fmt.Errorf("indexer not running")
  433. }
  434. // Get all shards and flush them
  435. shards := pi.shardManager.GetAllShards()
  436. var errs []error
  437. for i, shard := range shards {
  438. if shard == nil {
  439. continue
  440. }
  441. // Force flush by creating and immediately deleting a temporary document
  442. batch := shard.NewBatch()
  443. // Use efficient string building instead of fmt.Sprintf
  444. tempIDBuf := make([]byte, 0, 64)
  445. tempIDBuf = append(tempIDBuf, "_flush_temp_"...)
  446. tempIDBuf = utils.AppendInt(tempIDBuf, i)
  447. tempIDBuf = append(tempIDBuf, '_')
  448. tempIDBuf = utils.AppendInt(tempIDBuf, int(time.Now().UnixNano()))
  449. tempID := utils.BytesToStringUnsafe(tempIDBuf)
  450. batch.Index(tempID, map[string]interface{}{"_temp": true})
  451. if err := shard.Batch(batch); err != nil {
  452. errs = append(errs, fmt.Errorf("failed to flush shard %d: %w", i, err))
  453. continue
  454. }
  455. // Delete the temporary document
  456. shard.Delete(tempID)
  457. }
  458. if len(errs) > 0 {
  459. return fmt.Errorf("flush errors: %v", errs)
  460. }
  461. return nil
  462. }
  463. // Optimize triggers optimization of all shards
  464. func (pi *ParallelIndexer) Optimize() error {
  465. if !atomic.CompareAndSwapInt32(&pi.optimizing, 0, 1) {
  466. return fmt.Errorf("optimization already in progress")
  467. }
  468. defer atomic.StoreInt32(&pi.optimizing, 0)
  469. startTime := time.Now()
  470. stats := pi.shardManager.GetShardStats()
  471. var errs []error
  472. for _, stat := range stats {
  473. if err := pi.shardManager.OptimizeShard(stat.ID); err != nil {
  474. errs = append(errs, fmt.Errorf("failed to optimize shard %d: %w", stat.ID, err))
  475. }
  476. }
  477. // Update optimization stats
  478. pi.statsMutex.Lock()
  479. if pi.stats.OptimizationStats == nil {
  480. pi.stats.OptimizationStats = &OptimizationStats{}
  481. }
  482. pi.stats.OptimizationStats.LastRun = time.Now().Unix()
  483. pi.stats.OptimizationStats.Duration = time.Since(startTime)
  484. pi.stats.OptimizationStats.Success = len(errs) == 0
  485. pi.stats.LastOptimized = time.Now().Unix()
  486. pi.statsMutex.Unlock()
  487. atomic.StoreInt64(&pi.lastOptimized, time.Now().Unix())
  488. if len(errs) > 0 {
  489. return fmt.Errorf("optimization errors: %v", errs)
  490. }
  491. // Record optimization metrics
  492. pi.metrics.RecordOptimization(time.Since(startTime), len(errs) == 0)
  493. return nil
  494. }
  495. // IndexLogFile reads and indexes a single log file
  496. func (pi *ParallelIndexer) IndexLogFile(filePath string) error {
  497. if !pi.IsHealthy() {
  498. return fmt.Errorf("indexer not healthy")
  499. }
  500. file, err := os.Open(filePath)
  501. if err != nil {
  502. return fmt.Errorf("failed to open log file %s: %w", filePath, err)
  503. }
  504. defer file.Close()
  505. // Use a batch writer for efficient indexing
  506. batch := pi.StartBatch()
  507. scanner := bufio.NewScanner(file)
  508. docCount := 0
  509. for scanner.Scan() {
  510. line := scanner.Text()
  511. if line == "" {
  512. continue
  513. }
  514. // In a real implementation, parse the log line into a structured format
  515. // For now, we create a simple document
  516. logDoc, err := ParseLogLine(line) // Assuming a parser function exists
  517. if err != nil {
  518. logger.Warnf("Skipping line due to parse error in file %s: %v", filePath, err)
  519. continue
  520. }
  521. logDoc.FilePath = filePath
  522. // Use efficient string building for document ID
  523. docIDBuf := make([]byte, 0, len(filePath)+16)
  524. docIDBuf = append(docIDBuf, filePath...)
  525. docIDBuf = append(docIDBuf, '-')
  526. docIDBuf = utils.AppendInt(docIDBuf, int(docCount))
  527. doc := &Document{
  528. ID: utils.BytesToStringUnsafe(docIDBuf),
  529. Fields: logDoc,
  530. }
  531. if err := batch.Add(doc); err != nil {
  532. // This indicates an auto-flush occurred and failed.
  533. // Log the error and stop processing this file to avoid further issues.
  534. return fmt.Errorf("failed to add document to batch for %s (auto-flush might have failed): %w", filePath, err)
  535. }
  536. docCount++
  537. }
  538. if err := scanner.Err(); err != nil {
  539. return fmt.Errorf("error reading log file %s: %w", filePath, err)
  540. }
  541. if _, err := batch.Flush(); err != nil {
  542. return fmt.Errorf("failed to flush batch for %s: %w", filePath, err)
  543. }
  544. return nil
  545. }
  546. // GetStats returns current indexer statistics
  547. func (pi *ParallelIndexer) GetStats() *IndexStats {
  548. pi.statsMutex.RLock()
  549. defer pi.statsMutex.RUnlock()
  550. // Update shard stats
  551. shardStats := pi.shardManager.GetShardStats()
  552. pi.stats.Shards = shardStats
  553. pi.stats.ShardCount = len(shardStats)
  554. var totalDocs uint64
  555. var totalSize int64
  556. for _, shard := range shardStats {
  557. totalDocs += shard.DocumentCount
  558. totalSize += shard.Size
  559. }
  560. pi.stats.TotalDocuments = totalDocs
  561. pi.stats.TotalSize = totalSize
  562. pi.stats.QueueSize = len(pi.jobQueue)
  563. // Calculate memory usage
  564. var memStats runtime.MemStats
  565. runtime.ReadMemStats(&memStats)
  566. pi.stats.MemoryUsage = int64(memStats.Alloc)
  567. // Copy stats to avoid race conditions
  568. statsCopy := *pi.stats
  569. return &statsCopy
  570. }
  571. // IsRunning returns whether the indexer is currently running
  572. func (pi *ParallelIndexer) IsRunning() bool {
  573. return atomic.LoadInt32(&pi.running) != 0
  574. }
  575. // IsBusy checks if the indexer has pending jobs or any active workers.
  576. func (pi *ParallelIndexer) IsBusy() bool {
  577. if len(pi.jobQueue) > 0 {
  578. return true
  579. }
  580. // This RLock protects the pi.workers slice from changing during iteration (e.g. scaling)
  581. pi.statsMutex.RLock()
  582. defer pi.statsMutex.RUnlock()
  583. for _, worker := range pi.workers {
  584. worker.statsMutex.RLock()
  585. isBusy := worker.stats.Status == WorkerStatusBusy
  586. worker.statsMutex.RUnlock()
  587. if isBusy {
  588. return true
  589. }
  590. }
  591. return false
  592. }
  593. // GetShardInfo returns information about a specific shard
  594. func (pi *ParallelIndexer) GetShardInfo(shardID int) (*ShardInfo, error) {
  595. shardStats := pi.shardManager.GetShardStats()
  596. for _, stat := range shardStats {
  597. if stat.ID == shardID {
  598. return stat, nil
  599. }
  600. }
  601. return nil, fmt.Errorf("%s: %d", ErrShardNotFound, shardID)
  602. }
  603. // IsHealthy checks if the indexer is running and healthy
  604. func (pi *ParallelIndexer) IsHealthy() bool {
  605. if atomic.LoadInt32(&pi.running) != 1 {
  606. return false
  607. }
  608. // Check shard manager health
  609. return pi.shardManager.HealthCheck() == nil
  610. }
  611. // GetConfig returns the current configuration
  612. func (pi *ParallelIndexer) GetConfig() *Config {
  613. return pi.config
  614. }
  615. // GetAllShards returns all managed shards
  616. func (pi *ParallelIndexer) GetAllShards() []bleve.Index {
  617. return pi.shardManager.GetAllShards()
  618. }
  619. // DeleteIndexByLogGroup deletes all index entries for a specific log group (base path and its rotated files)
  620. func (pi *ParallelIndexer) DeleteIndexByLogGroup(basePath string, logFileManager interface{}) error {
  621. if !pi.IsHealthy() {
  622. return fmt.Errorf("indexer not healthy")
  623. }
  624. // Get all file paths for this log group from the database
  625. if logFileManager == nil {
  626. return fmt.Errorf("log file manager is required")
  627. }
  628. lfm, ok := logFileManager.(GroupFileProvider)
  629. if !ok {
  630. return fmt.Errorf("log file manager does not support GetFilePathsForGroup")
  631. }
  632. filesToDelete, err := lfm.GetFilePathsForGroup(basePath)
  633. if err != nil {
  634. return fmt.Errorf("failed to get file paths for log group %s: %w", basePath, err)
  635. }
  636. logger.Infof("Deleting index entries for log group %s, files: %v", basePath, filesToDelete)
  637. // Delete documents from all shards for these files
  638. shards := pi.shardManager.GetAllShards()
  639. var deleteErrors []error
  640. for _, shard := range shards {
  641. // Search for documents with matching file_path
  642. for _, filePath := range filesToDelete {
  643. query := bleve.NewTermQuery(filePath)
  644. query.SetField("file_path")
  645. searchRequest := bleve.NewSearchRequest(query)
  646. searchRequest.Size = 1000 // Process in batches
  647. searchRequest.Fields = []string{"file_path"}
  648. for {
  649. searchResult, err := shard.Search(searchRequest)
  650. if err != nil {
  651. deleteErrors = append(deleteErrors, fmt.Errorf("failed to search for documents in file %s: %w", filePath, err))
  652. break
  653. }
  654. if len(searchResult.Hits) == 0 {
  655. break // No more documents to delete
  656. }
  657. // Delete documents in batch
  658. batch := shard.NewBatch()
  659. for _, hit := range searchResult.Hits {
  660. batch.Delete(hit.ID)
  661. }
  662. if err := shard.Batch(batch); err != nil {
  663. deleteErrors = append(deleteErrors, fmt.Errorf("failed to delete batch for file %s: %w", filePath, err))
  664. }
  665. // If we got fewer results than requested, we're done
  666. if len(searchResult.Hits) < searchRequest.Size {
  667. break
  668. }
  669. // Continue from where we left off
  670. searchRequest.From += searchRequest.Size
  671. }
  672. }
  673. }
  674. if len(deleteErrors) > 0 {
  675. return fmt.Errorf("encountered %d errors during deletion: %v", len(deleteErrors), deleteErrors[0])
  676. }
  677. logger.Infof("Successfully deleted index entries for log group: %s", basePath)
  678. return nil
  679. }
  680. // DestroyAllIndexes closes and deletes all index data from disk.
  681. func (pi *ParallelIndexer) DestroyAllIndexes(parentCtx context.Context) error {
  682. // Stop all background routines before deleting files
  683. pi.cancel()
  684. pi.wg.Wait()
  685. // Safely close channels if they haven't been closed yet
  686. if atomic.CompareAndSwapInt32(&pi.channelsClosed, 0, 1) {
  687. close(pi.jobQueue)
  688. close(pi.resultQueue)
  689. }
  690. atomic.StoreInt32(&pi.running, 0) // Mark as not running
  691. var destructionErr error
  692. if manager, ok := pi.shardManager.(*DefaultShardManager); ok {
  693. destructionErr = manager.Destroy()
  694. } else {
  695. destructionErr = fmt.Errorf("shard manager does not support destruction")
  696. }
  697. // Re-initialize context and channels for a potential restart using parent context
  698. pi.ctx, pi.cancel = context.WithCancel(parentCtx)
  699. pi.jobQueue = make(chan *IndexJob, pi.config.MaxQueueSize)
  700. pi.resultQueue = make(chan *IndexResult, pi.config.WorkerCount)
  701. atomic.StoreInt32(&pi.channelsClosed, 0) // Reset the channel closed flag
  702. return destructionErr
  703. }
  704. // IndexLogGroup finds all files related to a base log path (e.g., rotated logs) and indexes them.
  705. // It returns a map of [filePath -> docCount], and the min/max timestamps found.
  706. func (pi *ParallelIndexer) IndexLogGroup(basePath string) (map[string]uint64, *time.Time, *time.Time, error) {
  707. if !pi.IsHealthy() {
  708. return nil, nil, nil, fmt.Errorf("indexer not healthy")
  709. }
  710. // Find all files belonging to this log group by globbing
  711. globPath := basePath + "*"
  712. matches, err := filepath.Glob(globPath)
  713. if err != nil {
  714. return nil, nil, nil, fmt.Errorf("failed to glob for log files with base %s: %w", basePath, err)
  715. }
  716. // filepath.Glob might not match the base file itself if it has no extension,
  717. // so we check for it explicitly and add it to the list.
  718. info, err := os.Stat(basePath)
  719. if err == nil && info.Mode().IsRegular() {
  720. matches = append(matches, basePath)
  721. }
  722. // Deduplicate file list
  723. seen := make(map[string]struct{})
  724. uniqueFiles := make([]string, 0)
  725. for _, match := range matches {
  726. if _, ok := seen[match]; !ok {
  727. // Further check if it's a file, not a directory. Glob can match dirs.
  728. info, err := os.Stat(match)
  729. if err == nil && info.Mode().IsRegular() {
  730. seen[match] = struct{}{}
  731. uniqueFiles = append(uniqueFiles, match)
  732. }
  733. }
  734. }
  735. if len(uniqueFiles) == 0 {
  736. logger.Warnf("No actual log file found for group: %s", basePath)
  737. return nil, nil, nil, nil
  738. }
  739. logger.Infof("Found %d file(s) for log group %s: %v", len(uniqueFiles), basePath, uniqueFiles)
  740. docsCountMap := make(map[string]uint64)
  741. var overallMinTime, overallMaxTime *time.Time
  742. for _, filePath := range uniqueFiles {
  743. docsIndexed, minTime, maxTime, err := pi.indexSingleFile(filePath)
  744. if err != nil {
  745. logger.Warnf("Failed to index file '%s' in group '%s', skipping: %v", filePath, basePath, err)
  746. continue // Continue with the next file
  747. }
  748. docsCountMap[filePath] = docsIndexed
  749. if minTime != nil {
  750. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  751. overallMinTime = minTime
  752. }
  753. }
  754. if maxTime != nil {
  755. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  756. overallMaxTime = maxTime
  757. }
  758. }
  759. }
  760. return docsCountMap, overallMinTime, overallMaxTime, nil
  761. }
  762. // IndexSingleFileIncrementally is a more efficient version for incremental updates.
  763. // It indexes only the specified single file instead of the entire log group.
  764. func (pi *ParallelIndexer) IndexSingleFileIncrementally(filePath string, progressConfig *ProgressConfig) (map[string]uint64, *time.Time, *time.Time, error) {
  765. if !pi.IsHealthy() {
  766. return nil, nil, nil, fmt.Errorf("indexer not healthy")
  767. }
  768. // Create progress tracker if config is provided
  769. var progressTracker *ProgressTracker
  770. if progressConfig != nil {
  771. progressTracker = NewProgressTracker(filePath, progressConfig)
  772. // Setup file for tracking
  773. isCompressed := IsCompressedFile(filePath)
  774. progressTracker.AddFile(filePath, isCompressed)
  775. if stat, err := os.Stat(filePath); err == nil {
  776. progressTracker.SetFileSize(filePath, stat.Size())
  777. if estimatedLines, err := EstimateFileLines(context.Background(), filePath, stat.Size(), isCompressed); err == nil {
  778. progressTracker.SetFileEstimate(filePath, estimatedLines)
  779. }
  780. }
  781. }
  782. docsCountMap := make(map[string]uint64)
  783. if progressTracker != nil {
  784. progressTracker.StartFile(filePath)
  785. }
  786. docsIndexed, minTime, maxTime, err := pi.indexSingleFileWithProgress(filePath, progressTracker)
  787. if err != nil {
  788. logger.Warnf("Failed to incrementally index file '%s', skipping: %v", filePath, err)
  789. if progressTracker != nil {
  790. progressTracker.FailFile(filePath, err.Error())
  791. }
  792. // Return empty results and the error
  793. return docsCountMap, nil, nil, err
  794. }
  795. docsCountMap[filePath] = docsIndexed
  796. if progressTracker != nil {
  797. progressTracker.CompleteFile(filePath, int64(docsIndexed))
  798. }
  799. return docsCountMap, minTime, maxTime, nil
  800. }
  801. // indexSingleFile contains the logic to process one physical log file.
  802. // It returns the number of documents indexed from the file, and the min/max timestamps.
  803. func (pi *ParallelIndexer) indexSingleFile(filePath string) (uint64, *time.Time, *time.Time, error) {
  804. file, err := os.Open(filePath)
  805. if err != nil {
  806. return 0, nil, nil, fmt.Errorf("failed to open log file %s: %w", filePath, err)
  807. }
  808. defer file.Close()
  809. var reader io.Reader = file
  810. // Handle gzipped files
  811. if strings.HasSuffix(filePath, ".gz") {
  812. gz, err := gzip.NewReader(file)
  813. if err != nil {
  814. return 0, nil, nil, fmt.Errorf("failed to create gzip reader for %s: %w", filePath, err)
  815. }
  816. defer gz.Close()
  817. reader = gz
  818. }
  819. logger.Infof("Starting to process file: %s", filePath)
  820. batch := pi.StartBatch()
  821. scanner := bufio.NewScanner(reader)
  822. docCount := 0
  823. var minTime, maxTime *time.Time
  824. for scanner.Scan() {
  825. line := scanner.Text()
  826. if line == "" {
  827. continue
  828. }
  829. logDoc, err := ParseLogLine(line)
  830. if err != nil {
  831. logger.Warnf("Skipping line due to parse error in file %s: %v", filePath, err)
  832. continue
  833. }
  834. logDoc.FilePath = filePath
  835. // Track min/max timestamps
  836. ts := time.Unix(logDoc.Timestamp, 0)
  837. if minTime == nil || ts.Before(*minTime) {
  838. minTime = &ts
  839. }
  840. if maxTime == nil || ts.After(*maxTime) {
  841. maxTime = &ts
  842. }
  843. // Use efficient string building for document ID
  844. docIDBuf := make([]byte, 0, len(filePath)+16)
  845. docIDBuf = append(docIDBuf, filePath...)
  846. docIDBuf = append(docIDBuf, '-')
  847. docIDBuf = utils.AppendInt(docIDBuf, int(docCount))
  848. doc := &Document{
  849. ID: utils.BytesToStringUnsafe(docIDBuf),
  850. Fields: logDoc,
  851. }
  852. if err := batch.Add(doc); err != nil {
  853. // This indicates an auto-flush occurred and failed.
  854. // Log the error and stop processing this file to avoid further issues.
  855. return uint64(docCount), minTime, maxTime, fmt.Errorf("failed to add document to batch for %s (auto-flush might have failed): %w", filePath, err)
  856. }
  857. docCount++
  858. }
  859. if err := scanner.Err(); err != nil {
  860. return uint64(docCount), minTime, maxTime, fmt.Errorf("error reading log file %s: %w", filePath, err)
  861. }
  862. logger.Infof("Finished processing file: %s. Total lines processed: %d", filePath, docCount)
  863. if docCount > 0 {
  864. if _, err := batch.Flush(); err != nil {
  865. return uint64(docCount), minTime, maxTime, fmt.Errorf("failed to flush batch for %s: %w", filePath, err)
  866. }
  867. }
  868. return uint64(docCount), minTime, maxTime, nil
  869. }
  870. // UpdateConfig updates the indexer configuration
  871. func (pi *ParallelIndexer) UpdateConfig(config *Config) error {
  872. // Only allow updating certain configuration parameters while running
  873. pi.config.BatchSize = config.BatchSize
  874. pi.config.FlushInterval = config.FlushInterval
  875. pi.config.EnableMetrics = config.EnableMetrics
  876. return nil
  877. }
  878. // Worker implementation
  879. func (w *indexWorker) run() {
  880. defer w.indexer.wg.Done()
  881. w.updateStatus(WorkerStatusIdle)
  882. for {
  883. select {
  884. case job, ok := <-w.indexer.jobQueue:
  885. if !ok {
  886. return // Channel closed, worker should exit
  887. }
  888. w.updateStatus(WorkerStatusBusy)
  889. result := w.processJob(job)
  890. // Send result
  891. select {
  892. case w.indexer.resultQueue <- result:
  893. case <-w.indexer.ctx.Done():
  894. return
  895. }
  896. // Execute callback if provided
  897. if job.Callback != nil {
  898. var err error
  899. if result.Failed > 0 {
  900. err = fmt.Errorf("indexing failed for %d documents", result.Failed)
  901. }
  902. job.Callback(err)
  903. }
  904. w.updateStatus(WorkerStatusIdle)
  905. case <-w.indexer.ctx.Done():
  906. return
  907. }
  908. }
  909. }
  910. func (w *indexWorker) processJob(job *IndexJob) *IndexResult {
  911. startTime := time.Now()
  912. result := &IndexResult{
  913. Processed: len(job.Documents),
  914. }
  915. // Group documents by shard
  916. shardDocs := make(map[int][]*Document)
  917. for _, doc := range job.Documents {
  918. if doc.ID == "" {
  919. result.Failed++
  920. continue
  921. }
  922. _, shardID, err := w.indexer.shardManager.GetShard(doc.ID)
  923. if err != nil {
  924. result.Failed++
  925. continue
  926. }
  927. shardDocs[shardID] = append(shardDocs[shardID], doc)
  928. }
  929. // Index documents per shard
  930. for shardID, docs := range shardDocs {
  931. if err := w.indexShardDocuments(shardID, docs); err != nil {
  932. result.Failed += len(docs)
  933. } else {
  934. result.Succeeded += len(docs)
  935. }
  936. }
  937. result.Duration = time.Since(startTime)
  938. if result.Processed > 0 {
  939. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  940. result.Throughput = float64(result.Processed) / result.Duration.Seconds()
  941. }
  942. // Update worker stats
  943. w.statsMutex.Lock()
  944. w.stats.ProcessedJobs++
  945. w.stats.ProcessedDocs += int64(result.Processed)
  946. w.stats.ErrorCount += int64(result.Failed)
  947. w.stats.LastActive = time.Now().Unix()
  948. // Update average latency (simple moving average)
  949. if w.stats.AverageLatency == 0 {
  950. w.stats.AverageLatency = result.Duration
  951. } else {
  952. w.stats.AverageLatency = (w.stats.AverageLatency + result.Duration) / 2
  953. }
  954. w.statsMutex.Unlock()
  955. return result
  956. }
  957. func (w *indexWorker) indexShardDocuments(shardID int, docs []*Document) error {
  958. shard, err := w.indexer.shardManager.GetShardByID(shardID)
  959. if err != nil {
  960. return err
  961. }
  962. batch := shard.NewBatch()
  963. for _, doc := range docs {
  964. // Convert LogDocument to map for Bleve indexing
  965. docMap := w.logDocumentToMap(doc.Fields)
  966. batch.Index(doc.ID, docMap)
  967. }
  968. if err := shard.Batch(batch); err != nil {
  969. return fmt.Errorf("failed to index batch for shard %d: %w", shardID, err)
  970. }
  971. return nil
  972. }
  973. // logDocumentToMap converts LogDocument to map[string]interface{} for Bleve
  974. func (w *indexWorker) logDocumentToMap(doc *LogDocument) map[string]interface{} {
  975. docMap := map[string]interface{}{
  976. "timestamp": doc.Timestamp,
  977. "ip": doc.IP,
  978. "method": doc.Method,
  979. "path": doc.Path,
  980. "path_exact": doc.PathExact,
  981. "status": doc.Status,
  982. "bytes_sent": doc.BytesSent,
  983. "file_path": doc.FilePath,
  984. "raw": doc.Raw,
  985. }
  986. // Add optional fields only if they have values
  987. if doc.RegionCode != "" {
  988. docMap["region_code"] = doc.RegionCode
  989. }
  990. if doc.Province != "" {
  991. docMap["province"] = doc.Province
  992. }
  993. if doc.City != "" {
  994. docMap["city"] = doc.City
  995. }
  996. if doc.Protocol != "" {
  997. docMap["protocol"] = doc.Protocol
  998. }
  999. if doc.Referer != "" {
  1000. docMap["referer"] = doc.Referer
  1001. }
  1002. if doc.UserAgent != "" {
  1003. docMap["user_agent"] = doc.UserAgent
  1004. }
  1005. if doc.Browser != "" {
  1006. docMap["browser"] = doc.Browser
  1007. }
  1008. if doc.BrowserVer != "" {
  1009. docMap["browser_version"] = doc.BrowserVer
  1010. }
  1011. if doc.OS != "" {
  1012. docMap["os"] = doc.OS
  1013. }
  1014. if doc.OSVersion != "" {
  1015. docMap["os_version"] = doc.OSVersion
  1016. }
  1017. if doc.DeviceType != "" {
  1018. docMap["device_type"] = doc.DeviceType
  1019. }
  1020. if doc.RequestTime > 0 {
  1021. docMap["request_time"] = doc.RequestTime
  1022. }
  1023. if doc.UpstreamTime != nil {
  1024. docMap["upstream_time"] = *doc.UpstreamTime
  1025. }
  1026. return docMap
  1027. }
  1028. func (w *indexWorker) updateStatus(status string) {
  1029. w.statsMutex.Lock()
  1030. w.stats.Status = status
  1031. w.statsMutex.Unlock()
  1032. }
  1033. // Background routines
  1034. func (pi *ParallelIndexer) processResults() {
  1035. defer pi.wg.Done()
  1036. for {
  1037. select {
  1038. case result := <-pi.resultQueue:
  1039. if result != nil {
  1040. pi.metrics.RecordIndexOperation(
  1041. result.Processed,
  1042. result.Duration,
  1043. result.Failed == 0,
  1044. )
  1045. }
  1046. case <-pi.ctx.Done():
  1047. return
  1048. }
  1049. }
  1050. }
  1051. func (pi *ParallelIndexer) optimizationRoutine() {
  1052. defer pi.wg.Done()
  1053. ticker := time.NewTicker(pi.config.OptimizeInterval)
  1054. defer ticker.Stop()
  1055. for {
  1056. select {
  1057. case <-ticker.C:
  1058. if atomic.LoadInt32(&pi.optimizing) == 0 {
  1059. go pi.Optimize() // Run in background to avoid blocking
  1060. }
  1061. case <-pi.ctx.Done():
  1062. return
  1063. }
  1064. }
  1065. }
  1066. func (pi *ParallelIndexer) metricsRoutine() {
  1067. defer pi.wg.Done()
  1068. ticker := time.NewTicker(10 * time.Second)
  1069. defer ticker.Stop()
  1070. for {
  1071. select {
  1072. case <-ticker.C:
  1073. pi.updateMetrics()
  1074. case <-pi.ctx.Done():
  1075. return
  1076. }
  1077. }
  1078. }
  1079. func (pi *ParallelIndexer) updateMetrics() {
  1080. pi.statsMutex.Lock()
  1081. defer pi.statsMutex.Unlock()
  1082. // Update indexing rate based on recent activity
  1083. metrics := pi.metrics.GetMetrics()
  1084. pi.stats.IndexingRate = metrics.IndexingRate
  1085. }
  1086. // IndexLogGroupWithProgress indexes a log group with progress tracking
  1087. func (pi *ParallelIndexer) IndexLogGroupWithProgress(basePath string, progressConfig *ProgressConfig) (map[string]uint64, *time.Time, *time.Time, error) {
  1088. if !pi.IsHealthy() {
  1089. return nil, nil, nil, fmt.Errorf("indexer not healthy")
  1090. }
  1091. // Create progress tracker if config is provided
  1092. var progressTracker *ProgressTracker
  1093. if progressConfig != nil {
  1094. progressTracker = NewProgressTracker(basePath, progressConfig)
  1095. }
  1096. // Find all files belonging to this log group by globbing
  1097. globPath := basePath + "*"
  1098. matches, err := filepath.Glob(globPath)
  1099. if err != nil {
  1100. if progressTracker != nil {
  1101. progressTracker.Cancel(fmt.Sprintf("glob failed: %v", err))
  1102. }
  1103. return nil, nil, nil, fmt.Errorf("failed to glob for log files with base %s: %w", basePath, err)
  1104. }
  1105. // filepath.Glob might not match the base file itself if it has no extension,
  1106. // so we check for it explicitly and add it to the list.
  1107. info, err := os.Stat(basePath)
  1108. if err == nil && info.Mode().IsRegular() {
  1109. matches = append(matches, basePath)
  1110. }
  1111. // Deduplicate file list
  1112. seen := make(map[string]struct{})
  1113. uniqueFiles := make([]string, 0)
  1114. for _, match := range matches {
  1115. if _, ok := seen[match]; !ok {
  1116. // Further check if it's a file, not a directory. Glob can match dirs.
  1117. info, err := os.Stat(match)
  1118. if err == nil && info.Mode().IsRegular() {
  1119. seen[match] = struct{}{}
  1120. uniqueFiles = append(uniqueFiles, match)
  1121. }
  1122. }
  1123. }
  1124. if len(uniqueFiles) == 0 {
  1125. logger.Warnf("No actual log file found for group: %s", basePath)
  1126. if progressTracker != nil {
  1127. progressTracker.Cancel("no files found")
  1128. }
  1129. return nil, nil, nil, nil
  1130. }
  1131. logger.Infof("Found %d file(s) for log group %s: %v", len(uniqueFiles), basePath, uniqueFiles)
  1132. // Set up progress tracking for all files
  1133. if progressTracker != nil {
  1134. for _, filePath := range uniqueFiles {
  1135. isCompressed := IsCompressedFile(filePath)
  1136. progressTracker.AddFile(filePath, isCompressed)
  1137. // Get file size and estimate lines
  1138. if stat, err := os.Stat(filePath); err == nil {
  1139. progressTracker.SetFileSize(filePath, stat.Size())
  1140. // Estimate lines for progress calculation
  1141. if estimatedLines, err := EstimateFileLines(context.Background(), filePath, stat.Size(), isCompressed); err == nil {
  1142. progressTracker.SetFileEstimate(filePath, estimatedLines)
  1143. }
  1144. }
  1145. }
  1146. }
  1147. docsCountMap := make(map[string]uint64)
  1148. var overallMinTime, overallMaxTime *time.Time
  1149. // Process each file with progress tracking
  1150. for _, filePath := range uniqueFiles {
  1151. if progressTracker != nil {
  1152. progressTracker.StartFile(filePath)
  1153. }
  1154. docsIndexed, minTime, maxTime, err := pi.indexSingleFileWithProgress(filePath, progressTracker)
  1155. if err != nil {
  1156. logger.Warnf("Failed to index file '%s' in group '%s', skipping: %v", filePath, basePath, err)
  1157. if progressTracker != nil {
  1158. progressTracker.FailFile(filePath, err.Error())
  1159. }
  1160. continue // Continue with the next file
  1161. }
  1162. docsCountMap[filePath] = docsIndexed
  1163. if progressTracker != nil {
  1164. progressTracker.CompleteFile(filePath, int64(docsIndexed))
  1165. }
  1166. if minTime != nil {
  1167. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  1168. overallMinTime = minTime
  1169. }
  1170. }
  1171. if maxTime != nil {
  1172. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  1173. overallMaxTime = maxTime
  1174. }
  1175. }
  1176. }
  1177. return docsCountMap, overallMinTime, overallMaxTime, nil
  1178. }
  1179. // indexSingleFileWithProgress indexes a single file with progress updates
  1180. func (pi *ParallelIndexer) indexSingleFileWithProgress(filePath string, progressTracker *ProgressTracker) (uint64, *time.Time, *time.Time, error) {
  1181. // If no progress tracker, just call the original method
  1182. if progressTracker == nil {
  1183. return pi.indexSingleFile(filePath)
  1184. }
  1185. // Call the original indexing method to do the actual indexing work
  1186. docsIndexed, minTime, maxTime, err := pi.indexSingleFile(filePath)
  1187. if err != nil {
  1188. return 0, nil, nil, err
  1189. }
  1190. // Just do one final progress update when done - no artificial delays
  1191. if progressTracker != nil && docsIndexed > 0 {
  1192. if strings.HasSuffix(filePath, ".gz") {
  1193. progressTracker.UpdateFileProgress(filePath, int64(docsIndexed))
  1194. } else {
  1195. // Estimate position based on average line size
  1196. estimatedPos := int64(docsIndexed * 150) // Assume ~150 bytes per line
  1197. progressTracker.UpdateFileProgress(filePath, int64(docsIndexed), estimatedPos)
  1198. }
  1199. }
  1200. // Return the actual timestamps from the original method
  1201. return docsIndexed, minTime, maxTime, nil
  1202. }