persistence.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/uozi-tech/cosy"
  12. "github.com/uozi-tech/cosy/logger"
  13. "gorm.io/gen/field"
  14. )
  15. // PersistenceManager handles database operations for log index positions
  16. // Enhanced for incremental indexing with position tracking
  17. type PersistenceManager struct {
  18. // Configuration for incremental indexing
  19. maxBatchSize int
  20. flushInterval time.Duration
  21. enabledPaths map[string]bool // Cache for enabled paths
  22. lastFlushTime time.Time
  23. }
  24. // LogFileInfo represents information about a log file for incremental indexing
  25. type LogFileInfo struct {
  26. Path string
  27. LastModified int64 // Unix timestamp
  28. LastSize int64 // File size at last index
  29. LastIndexed int64 // Unix timestamp of last indexing
  30. LastPosition int64 // Byte position where indexing left off
  31. }
  32. // IncrementalIndexConfig configuration for incremental indexing
  33. type IncrementalIndexConfig struct {
  34. MaxBatchSize int `yaml:"max_batch_size" json:"max_batch_size"`
  35. FlushInterval time.Duration `yaml:"flush_interval" json:"flush_interval"`
  36. CheckInterval time.Duration `yaml:"check_interval" json:"check_interval"`
  37. MaxAge time.Duration `yaml:"max_age" json:"max_age"`
  38. }
  39. // DefaultIncrementalConfig returns the default configuration for incremental indexing
  40. func DefaultIncrementalConfig() *IncrementalIndexConfig {
  41. return &IncrementalIndexConfig{
  42. MaxBatchSize: 1000,
  43. FlushInterval: 30 * time.Second,
  44. CheckInterval: 5 * time.Minute,
  45. MaxAge: 30 * 24 * time.Hour, // 30 days
  46. }
  47. }
  48. // NewPersistenceManager creates a new persistence manager with incremental indexing support
  49. func NewPersistenceManager(config *IncrementalIndexConfig) *PersistenceManager {
  50. if config == nil {
  51. config = DefaultIncrementalConfig()
  52. }
  53. return &PersistenceManager{
  54. maxBatchSize: config.MaxBatchSize,
  55. flushInterval: config.FlushInterval,
  56. enabledPaths: make(map[string]bool),
  57. lastFlushTime: time.Now(),
  58. }
  59. }
  60. // GetLogIndex retrieves the index record for a log file path
  61. func (pm *PersistenceManager) GetLogIndex(path string) (*model.NginxLogIndex, error) {
  62. q := query.NginxLogIndex
  63. // Determine main log path for grouping
  64. mainLogPath := getMainLogPathFromFile(path)
  65. // Use FirstOrCreate to get existing record or create a new one
  66. logIndex, err := q.Where(q.Path.Eq(path)).
  67. Assign(field.Attrs(&model.NginxLogIndex{
  68. Path: path,
  69. MainLogPath: mainLogPath,
  70. Enabled: true,
  71. })).
  72. FirstOrCreate()
  73. if err != nil {
  74. return nil, fmt.Errorf("failed to get or create log index: %w", err)
  75. }
  76. return logIndex, nil
  77. }
  78. // SaveLogIndex saves or updates the index record with incremental indexing support
  79. func (pm *PersistenceManager) SaveLogIndex(logIndex *model.NginxLogIndex) error {
  80. logIndex.Enabled = true
  81. // Ensure MainLogPath is set
  82. if logIndex.MainLogPath == "" {
  83. logIndex.MainLogPath = getMainLogPathFromFile(logIndex.Path)
  84. }
  85. // Update last indexed time
  86. logIndex.LastIndexed = time.Now()
  87. q := query.NginxLogIndex
  88. savedRecord, err := q.Where(q.Path.Eq(logIndex.Path)).
  89. Assign(field.Attrs(logIndex)).
  90. FirstOrCreate()
  91. if err != nil {
  92. return fmt.Errorf("failed to save log index: %w", err)
  93. }
  94. // Update the passed object with the saved record data
  95. *logIndex = *savedRecord
  96. // Update cache
  97. pm.enabledPaths[logIndex.Path] = logIndex.Enabled
  98. return nil
  99. }
  100. // GetIncrementalInfo retrieves incremental indexing information for a log file
  101. func (pm *PersistenceManager) GetIncrementalInfo(path string) (*LogFileInfo, error) {
  102. logIndex, err := pm.GetLogIndex(path)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return &LogFileInfo{
  107. Path: logIndex.Path,
  108. LastModified: logIndex.LastModified.Unix(),
  109. LastSize: logIndex.LastSize,
  110. LastIndexed: logIndex.LastIndexed.Unix(),
  111. LastPosition: logIndex.LastPosition,
  112. }, nil
  113. }
  114. // UpdateIncrementalInfo updates incremental indexing information
  115. func (pm *PersistenceManager) UpdateIncrementalInfo(path string, info *LogFileInfo) error {
  116. logIndex, err := pm.GetLogIndex(path)
  117. if err != nil {
  118. return err
  119. }
  120. logIndex.LastModified = time.Unix(info.LastModified, 0)
  121. logIndex.LastSize = info.LastSize
  122. logIndex.LastIndexed = time.Unix(info.LastIndexed, 0)
  123. logIndex.LastPosition = info.LastPosition
  124. return pm.SaveLogIndex(logIndex)
  125. }
  126. // IsPathEnabled checks if indexing is enabled for a path (with caching)
  127. func (pm *PersistenceManager) IsPathEnabled(path string) (bool, error) {
  128. // Check cache first
  129. if enabled, exists := pm.enabledPaths[path]; exists {
  130. return enabled, nil
  131. }
  132. // Query database
  133. logIndex, err := pm.GetLogIndex(path)
  134. if err != nil {
  135. return false, err
  136. }
  137. // Update cache
  138. pm.enabledPaths[path] = logIndex.Enabled
  139. return logIndex.Enabled, nil
  140. }
  141. // GetChangedFiles returns files that have been modified since last indexing
  142. func (pm *PersistenceManager) GetChangedFiles(mainLogPath string) ([]*model.NginxLogIndex, error) {
  143. q := query.NginxLogIndex
  144. indexes, err := q.Where(
  145. q.MainLogPath.Eq(mainLogPath),
  146. q.Enabled.Is(true),
  147. ).Find()
  148. if err != nil {
  149. return nil, fmt.Errorf("failed to get changed files: %w", err)
  150. }
  151. return indexes, nil
  152. }
  153. // GetFilesForFullReindex returns files that need full reindexing
  154. func (pm *PersistenceManager) GetFilesForFullReindex(mainLogPath string, maxAge time.Duration) ([]*model.NginxLogIndex, error) {
  155. cutoff := time.Now().Add(-maxAge)
  156. q := query.NginxLogIndex
  157. indexes, err := q.Where(
  158. q.MainLogPath.Eq(mainLogPath),
  159. q.Enabled.Is(true),
  160. q.LastIndexed.Lt(cutoff),
  161. ).Find()
  162. if err != nil {
  163. return nil, fmt.Errorf("failed to get files for full reindex: %w", err)
  164. }
  165. return indexes, nil
  166. }
  167. // MarkFileAsIndexed marks a file as successfully indexed with current timestamp and position
  168. func (pm *PersistenceManager) MarkFileAsIndexed(path string, documentCount uint64, lastPosition int64) error {
  169. logIndex, err := pm.GetLogIndex(path)
  170. if err != nil {
  171. return err
  172. }
  173. now := time.Now()
  174. logIndex.LastIndexed = now
  175. logIndex.LastPosition = lastPosition
  176. logIndex.DocumentCount = documentCount
  177. return pm.SaveLogIndex(logIndex)
  178. }
  179. // GetAllLogIndexes retrieves all log index records
  180. func (pm *PersistenceManager) GetAllLogIndexes() ([]*model.NginxLogIndex, error) {
  181. q := query.NginxLogIndex
  182. indexes, err := q.Where(q.Enabled.Is(true)).Order(q.Path).Find()
  183. if err != nil {
  184. return nil, fmt.Errorf("failed to get log indexes: %w", err)
  185. }
  186. return indexes, nil
  187. }
  188. // GetLogGroupIndexes retrieves all log index records for a specific log group
  189. func (pm *PersistenceManager) GetLogGroupIndexes(mainLogPath string) ([]*model.NginxLogIndex, error) {
  190. q := query.NginxLogIndex
  191. indexes, err := q.Where(
  192. q.MainLogPath.Eq(mainLogPath),
  193. q.Enabled.Is(true),
  194. ).Order(q.Path).Find()
  195. if err != nil {
  196. return nil, fmt.Errorf("failed to get log group indexes: %w", err)
  197. }
  198. return indexes, nil
  199. }
  200. // DeleteLogIndex deletes a log index record (hard delete)
  201. func (pm *PersistenceManager) DeleteLogIndex(path string) error {
  202. q := query.NginxLogIndex
  203. _, err := q.Unscoped().Where(q.Path.Eq(path)).Delete()
  204. if err != nil {
  205. return fmt.Errorf("failed to delete log index: %w", err)
  206. }
  207. // Remove from cache
  208. delete(pm.enabledPaths, path)
  209. logger.Infof("Hard deleted log index for path: %s", path)
  210. return nil
  211. }
  212. // DisableLogIndex disables indexing for a log file
  213. func (pm *PersistenceManager) DisableLogIndex(path string) error {
  214. q := query.NginxLogIndex
  215. _, err := q.Where(q.Path.Eq(path)).Update(q.Enabled, false)
  216. if err != nil {
  217. return fmt.Errorf("failed to disable log index: %w", err)
  218. }
  219. // Update cache
  220. pm.enabledPaths[path] = false
  221. logger.Infof("Disabled log index for path: %s", path)
  222. return nil
  223. }
  224. // EnableLogIndex enables indexing for a log file
  225. func (pm *PersistenceManager) EnableLogIndex(path string) error {
  226. q := query.NginxLogIndex
  227. _, err := q.Where(q.Path.Eq(path)).Update(q.Enabled, true)
  228. if err != nil {
  229. return fmt.Errorf("failed to enable log index: %w", err)
  230. }
  231. // Update cache
  232. pm.enabledPaths[path] = true
  233. logger.Infof("Enabled log index for path: %s", path)
  234. return nil
  235. }
  236. // CleanupOldIndexes removes index records for files that haven't been indexed in a long time
  237. func (pm *PersistenceManager) CleanupOldIndexes(maxAge time.Duration) error {
  238. cutoff := time.Now().Add(-maxAge)
  239. q := query.NginxLogIndex
  240. result, err := q.Unscoped().Where(q.LastIndexed.Lt(cutoff)).Delete()
  241. if err != nil {
  242. return fmt.Errorf("failed to cleanup old indexes: %w", err)
  243. }
  244. if result.RowsAffected > 0 {
  245. logger.Infof("Cleaned up %d old log index records", result.RowsAffected)
  246. // Clear cache for cleaned up entries
  247. pm.enabledPaths = make(map[string]bool)
  248. }
  249. return nil
  250. }
  251. // PersistenceStats represents statistics about stored index records
  252. type PersistenceStats struct {
  253. TotalFiles int64 `json:"total_files"`
  254. EnabledFiles int64 `json:"enabled_files"`
  255. TotalDocuments uint64 `json:"total_documents"`
  256. ChangedFiles int64 `json:"changed_files"`
  257. }
  258. // GetPersistenceStats returns statistics about stored index records
  259. func (pm *PersistenceManager) GetPersistenceStats() (*PersistenceStats, error) {
  260. q := query.NginxLogIndex
  261. // Count total records
  262. totalCount, err := q.Count()
  263. if err != nil {
  264. return nil, fmt.Errorf("failed to count total indexes: %w", err)
  265. }
  266. // Count enabled records
  267. enabledCount, err := q.Where(q.Enabled.Is(true)).Count()
  268. if err != nil {
  269. return nil, fmt.Errorf("failed to count enabled indexes: %w", err)
  270. }
  271. // Sum document counts
  272. var result struct {
  273. Total uint64
  274. }
  275. if err := q.Select(q.DocumentCount.Sum().As("total")).Scan(&result); err != nil {
  276. return nil, fmt.Errorf("failed to sum document counts: %w", err)
  277. }
  278. // Count files needing incremental update
  279. cutoff := time.Now().Add(-time.Hour) // Files modified in last hour
  280. changedCount, err := q.Where(
  281. q.Enabled.Is(true),
  282. q.LastModified.Gt(cutoff),
  283. ).Count()
  284. if err != nil {
  285. return nil, fmt.Errorf("failed to count changed files: %w", err)
  286. }
  287. return &PersistenceStats{
  288. TotalFiles: totalCount,
  289. EnabledFiles: enabledCount,
  290. TotalDocuments: result.Total,
  291. ChangedFiles: changedCount,
  292. }, nil
  293. }
  294. // GetLogFileInfo retrieves the log file info for a given path.
  295. func (pm *PersistenceManager) GetLogFileInfo(path string) (*LogFileInfo, error) {
  296. return pm.GetIncrementalInfo(path)
  297. }
  298. // SaveLogFileInfo saves the log file info for a given path.
  299. func (pm *PersistenceManager) SaveLogFileInfo(path string, info *LogFileInfo) error {
  300. return pm.UpdateIncrementalInfo(path, info)
  301. }
  302. // Close flushes any pending operations and cleans up resources
  303. func (pm *PersistenceManager) Close() error {
  304. // Flush any pending operations
  305. pm.enabledPaths = nil
  306. return nil
  307. }
  308. // DeleteAllLogIndexes deletes all log index records
  309. func (pm *PersistenceManager) DeleteAllLogIndexes() error {
  310. // GORM's `Delete` requires a WHERE clause for safety. To delete all records,
  311. // we use a raw Exec call, which is the standard way to perform bulk operations.
  312. db := cosy.UseDB(context.Background())
  313. if err := db.Exec("DELETE FROM nginx_log_indices").Error; err != nil {
  314. return fmt.Errorf("failed to delete all log indexes: %w", err)
  315. }
  316. // Clear cache
  317. pm.enabledPaths = make(map[string]bool)
  318. logger.Infof("Hard deleted all log index records")
  319. return nil
  320. }
  321. // DeleteLogIndexesByGroup deletes all log index records for a specific log group.
  322. // GetLogIndexesByGroup retrieves all log index records for a given main log path
  323. func (pm *PersistenceManager) GetLogIndexesByGroup(mainLogPath string) ([]*model.NginxLogIndex, error) {
  324. q := query.NginxLogIndex
  325. logIndexes, err := q.Where(q.MainLogPath.Eq(mainLogPath)).Find()
  326. if err != nil {
  327. return nil, fmt.Errorf("failed to get log indexes for group %s: %w", mainLogPath, err)
  328. }
  329. return logIndexes, nil
  330. }
  331. func (pm *PersistenceManager) DeleteLogIndexesByGroup(mainLogPath string) error {
  332. q := query.NginxLogIndex
  333. result, err := q.Unscoped().Where(q.MainLogPath.Eq(mainLogPath)).Delete()
  334. if err != nil {
  335. return fmt.Errorf("failed to delete log indexes for group %s: %w", mainLogPath, err)
  336. }
  337. logger.Infof("Deleted %d log index records for group: %s", result.RowsAffected, mainLogPath)
  338. return nil
  339. }
  340. // RefreshCache refreshes the enabled paths cache
  341. func (pm *PersistenceManager) RefreshCache() error {
  342. q := query.NginxLogIndex
  343. indexes, err := q.Select(q.Path, q.Enabled).Find()
  344. if err != nil {
  345. return fmt.Errorf("failed to refresh cache: %w", err)
  346. }
  347. // Rebuild cache
  348. pm.enabledPaths = make(map[string]bool)
  349. for _, index := range indexes {
  350. pm.enabledPaths[index.Path] = index.Enabled
  351. }
  352. return nil
  353. }
  354. // IncrementalIndexStats represents statistics specific to incremental indexing
  355. type IncrementalIndexStats struct {
  356. GroupFiles int64 `json:"group_files"`
  357. ChangedFiles int `json:"changed_files"`
  358. OldFiles int `json:"old_files"`
  359. NeedsReindex int `json:"needs_reindex"`
  360. }
  361. // GetIncrementalIndexStats returns statistics specific to incremental indexing
  362. func (pm *PersistenceManager) GetIncrementalIndexStats(mainLogPath string) (*IncrementalIndexStats, error) {
  363. q := query.NginxLogIndex
  364. // Files in this log group
  365. groupCount, err := q.Where(q.MainLogPath.Eq(mainLogPath), q.Enabled.Is(true)).Count()
  366. if err != nil {
  367. return nil, fmt.Errorf("failed to count group files: %w", err)
  368. }
  369. // Files needing incremental update
  370. changedFiles, err := pm.GetChangedFiles(mainLogPath)
  371. if err != nil {
  372. return nil, fmt.Errorf("failed to get changed files: %w", err)
  373. }
  374. // Files needing full reindex (older than 7 days)
  375. oldFiles, err := pm.GetFilesForFullReindex(mainLogPath, 7*24*time.Hour)
  376. if err != nil {
  377. return nil, fmt.Errorf("failed to get old files: %w", err)
  378. }
  379. return &IncrementalIndexStats{
  380. GroupFiles: groupCount,
  381. ChangedFiles: len(changedFiles),
  382. OldFiles: len(oldFiles),
  383. NeedsReindex: len(changedFiles) + len(oldFiles),
  384. }, nil
  385. }
  386. // getMainLogPathFromFile extracts the main log path from a file (including rotated files)
  387. // Enhanced for better rotation pattern detection
  388. func getMainLogPathFromFile(filePath string) string {
  389. dir := filepath.Dir(filePath)
  390. filename := filepath.Base(filePath)
  391. // Remove compression extensions (.gz, .bz2, .xz, .lz4)
  392. for _, ext := range []string{".gz", ".bz2", ".xz", ".lz4"} {
  393. filename = strings.TrimSuffix(filename, ext)
  394. }
  395. // Check if it's a dot-separated date rotation FIRST (access.log.YYYYMMDD or access.log.YYYY.MM.DD)
  396. // This must come before numbered rotation check to avoid false positives
  397. parts := strings.Split(filename, ".")
  398. if len(parts) >= 3 {
  399. // First check for multi-part date patterns like YYYY.MM.DD (need at least 4 parts total)
  400. if len(parts) >= 4 {
  401. // Try to match the last 3 parts as a date
  402. lastThreeParts := strings.Join(parts[len(parts)-3:], ".")
  403. // Check if this looks like YYYY.MM.DD pattern
  404. if matched, _ := regexp.MatchString(`^\d{4}\.\d{2}\.\d{2}$`, lastThreeParts); matched {
  405. // Remove the date parts (last 3 parts)
  406. basenameParts := parts[:len(parts)-3]
  407. baseFilename := strings.Join(basenameParts, ".")
  408. return filepath.Join(dir, baseFilename)
  409. }
  410. }
  411. // Then check for single-part date patterns in the last part
  412. lastPart := parts[len(parts)-1]
  413. if isFullDatePattern(lastPart) { // Only match full date patterns, not partial ones
  414. // Remove the date part
  415. basenameParts := parts[:len(parts)-1]
  416. baseFilename := strings.Join(basenameParts, ".")
  417. return filepath.Join(dir, baseFilename)
  418. }
  419. }
  420. // Handle numbered rotation (access.log.1, access.log.2, etc.)
  421. // This comes AFTER date pattern checks to avoid matching date components as rotation numbers
  422. if match := regexp.MustCompile(`^(.+)\.(\d{1,3})$`).FindStringSubmatch(filename); len(match) > 1 {
  423. baseFilename := match[1]
  424. return filepath.Join(dir, baseFilename)
  425. }
  426. // Handle middle-numbered rotation (access.1.log, access.2.log)
  427. if match := regexp.MustCompile(`^(.+)\.(\d{1,3})\.log$`).FindStringSubmatch(filename); len(match) > 1 {
  428. baseName := match[1]
  429. return filepath.Join(dir, baseName+".log")
  430. }
  431. // Handle date-based rotation (access.20231201, access.2023-12-01, etc.)
  432. if isDatePattern(filename) {
  433. // This is a date-based rotation, return the parent directory
  434. // as we can't determine the exact base name
  435. return filepath.Join(dir, "access.log") // Default assumption
  436. }
  437. // If no rotation pattern is found, return the original path
  438. return filePath
  439. }
  440. // isDatePattern checks if a string looks like a date pattern (including multi-part)
  441. func isDatePattern(s string) bool {
  442. // Check for full date patterns first
  443. if isFullDatePattern(s) {
  444. return true
  445. }
  446. // Check for multi-part date patterns like YYYY.MM.DD
  447. if matched, _ := regexp.MatchString(`^2\d{3}\.\d{2}\.\d{2}$`, s); matched {
  448. return true
  449. }
  450. return false
  451. }
  452. // isFullDatePattern checks if a string is a complete date pattern (not partial)
  453. func isFullDatePattern(s string) bool {
  454. // Complete date patterns for log rotation
  455. patterns := []string{
  456. `^\d{8}$`, // YYYYMMDD
  457. `^\d{4}-\d{2}-\d{2}$`, // YYYY-MM-DD
  458. `^\d{6}$`, // YYMMDD
  459. }
  460. for _, pattern := range patterns {
  461. if matched, _ := regexp.MatchString(pattern, s); matched {
  462. return true
  463. }
  464. }
  465. return false
  466. }