grouped_shard_manager.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. package indexer
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/blevesearch/bleve/v2"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // GroupedShardManager manages shards grouped by MainLogPath. Each group uses a
  15. // unique UUID directory:
  16. //
  17. // index_path/<uuid>/shard_{N}
  18. //
  19. // Key points:
  20. // - Lazy group creation: on first write for a MainLogPath we resolve/create UUID and shards
  21. // - GetAllShards returns all shards across groups with a stable global shard ID mapping
  22. // - Destroy removes all index data
  23. // - Concurrency-safe with RWMutex
  24. type GroupedShardManager struct {
  25. config *Config
  26. mu sync.RWMutex
  27. // mainLogPath -> uuid
  28. pathToUUID map[string]string
  29. // uuid -> group
  30. groups map[string]*ShardGroup
  31. // Stable global shard mapping (contiguous IDs across groups)
  32. // globalID -> (uuid, localShardID)
  33. globalToLocal map[int]groupShardRef
  34. // key: uuid#localID -> globalID
  35. localToGlobal map[string]int
  36. // Next available global shard ID
  37. nextGlobalID int
  38. }
  39. type groupShardRef struct {
  40. uuid string
  41. localID int
  42. }
  43. // ShardGroup represents a shard set belonging to a single log group (identified by uuid)
  44. type ShardGroup struct {
  45. UUID string
  46. MainLogPath string
  47. Shards map[int]bleve.Index
  48. ShardPaths map[int]string
  49. ShardCount int
  50. }
  51. // NewGroupedShardManager creates a new grouped shard manager
  52. func NewGroupedShardManager(config *Config) *GroupedShardManager {
  53. if config == nil {
  54. config = DefaultIndexerConfig()
  55. }
  56. return &GroupedShardManager{
  57. config: config,
  58. pathToUUID: make(map[string]string),
  59. groups: make(map[string]*ShardGroup),
  60. globalToLocal: make(map[int]groupShardRef),
  61. localToGlobal: make(map[string]int),
  62. }
  63. }
  64. // Initialize ensures the index root directory exists. Groups are created on demand.
  65. func (gsm *GroupedShardManager) Initialize() error {
  66. if err := os.MkdirAll(gsm.config.IndexPath, 0755); err != nil {
  67. return fmt.Errorf("failed to create index root: %w", err)
  68. }
  69. // Load existing shard groups from DB/disk so searcher has shards after restart
  70. if err := gsm.loadExistingGroups(); err != nil {
  71. // Non-fatal: continue even if loading fails; indexing will lazily create groups
  72. logger.Warnf("Failed to load existing shard groups: %v", err)
  73. }
  74. return nil
  75. }
  76. // Close closes all open shards across groups.
  77. func (gsm *GroupedShardManager) Close() error {
  78. gsm.mu.Lock()
  79. defer gsm.mu.Unlock()
  80. var errs []error
  81. for _, group := range gsm.groups {
  82. for id, shard := range group.Shards {
  83. if shard == nil {
  84. continue
  85. }
  86. if err := shard.Close(); err != nil {
  87. errs = append(errs, fmt.Errorf("failed to close shard %d in group %s: %w", id, group.UUID, err))
  88. }
  89. }
  90. }
  91. if len(errs) > 0 {
  92. return fmt.Errorf("errors closing grouped shards: %v", errs)
  93. }
  94. return nil
  95. }
  96. // HealthCheck verifies all opened shards are accessible.
  97. func (gsm *GroupedShardManager) HealthCheck() error {
  98. gsm.mu.RLock()
  99. defer gsm.mu.RUnlock()
  100. for _, group := range gsm.groups {
  101. for id, shard := range group.Shards {
  102. if shard == nil {
  103. return fmt.Errorf("shard %d in group %s is nil", id, group.UUID)
  104. }
  105. if _, err := shard.DocCount(); err != nil {
  106. return fmt.Errorf("health check failed for shard %d in group %s: %w", id, group.UUID, err)
  107. }
  108. }
  109. }
  110. return nil
  111. }
  112. // GetShardForDocument routes to the corresponding shard within a group based on MainLogPath and key.
  113. func (gsm *GroupedShardManager) GetShardForDocument(mainLogPath string, key string) (bleve.Index, int, error) {
  114. if mainLogPath == "" {
  115. return nil, -1, fmt.Errorf("mainLogPath required for grouped shard routing")
  116. }
  117. group, err := gsm.getOrCreateGroup(mainLogPath)
  118. if err != nil {
  119. return nil, -1, err
  120. }
  121. shardID := defaultHashFunc(key, group.ShardCount)
  122. shard := group.Shards[shardID]
  123. if shard == nil {
  124. return nil, -1, fmt.Errorf("shard %d not initialized for group %s", shardID, group.UUID)
  125. }
  126. return shard, shardID, nil
  127. }
  128. // GetShard is a compatibility interface: only available when there is exactly one group.
  129. func (gsm *GroupedShardManager) GetShard(key string) (bleve.Index, int, error) {
  130. gsm.mu.RLock()
  131. defer gsm.mu.RUnlock()
  132. if len(gsm.groups) == 0 {
  133. return nil, -1, fmt.Errorf("no shard groups initialized")
  134. }
  135. if len(gsm.groups) > 1 {
  136. return nil, -1, fmt.Errorf("ambiguous GetShard: multiple shard groups present; use GetShardForDocument")
  137. }
  138. for _, group := range gsm.groups {
  139. shardID := defaultHashFunc(key, group.ShardCount)
  140. shard := group.Shards[shardID]
  141. if shard == nil {
  142. return nil, -1, fmt.Errorf("shard %d not initialized for group %s", shardID, group.UUID)
  143. }
  144. return shard, shardID, nil
  145. }
  146. return nil, -1, fmt.Errorf("unexpected: no groups iterated")
  147. }
  148. // GetShardByID resolves to a specific group and local shard ID using a global ID.
  149. func (gsm *GroupedShardManager) GetShardByID(id int) (bleve.Index, error) {
  150. gsm.mu.RLock()
  151. ref, ok := gsm.globalToLocal[id]
  152. if !ok {
  153. gsm.mu.RUnlock()
  154. return nil, fmt.Errorf("%s: %d", ErrShardNotFound, id)
  155. }
  156. group := gsm.groups[ref.uuid]
  157. shard := group.Shards[ref.localID]
  158. gsm.mu.RUnlock()
  159. if shard == nil {
  160. return nil, fmt.Errorf("%s: %d", ErrShardNotFound, id)
  161. }
  162. return shard, nil
  163. }
  164. // GetAllShards returns all shards across groups, sorted by global ID.
  165. func (gsm *GroupedShardManager) GetAllShards() []bleve.Index {
  166. gsm.mu.RLock()
  167. defer gsm.mu.RUnlock()
  168. ids := make([]int, 0, len(gsm.globalToLocal))
  169. for id := range gsm.globalToLocal {
  170. ids = append(ids, id)
  171. }
  172. sort.Ints(ids)
  173. shards := make([]bleve.Index, 0, len(ids))
  174. for _, gid := range ids {
  175. ref := gsm.globalToLocal[gid]
  176. if grp, ok := gsm.groups[ref.uuid]; ok {
  177. shards = append(shards, grp.Shards[ref.localID])
  178. }
  179. }
  180. return shards
  181. }
  182. // GetShardStats summarizes shard statistics across groups, using global IDs.
  183. func (gsm *GroupedShardManager) GetShardStats() []*ShardInfo {
  184. gsm.mu.RLock()
  185. defer gsm.mu.RUnlock()
  186. ids := make([]int, 0, len(gsm.globalToLocal))
  187. for id := range gsm.globalToLocal {
  188. ids = append(ids, id)
  189. }
  190. sort.Ints(ids)
  191. stats := make([]*ShardInfo, 0, len(ids))
  192. for _, gid := range ids {
  193. ref := gsm.globalToLocal[gid]
  194. group := gsm.groups[ref.uuid]
  195. shard := group.Shards[ref.localID]
  196. if shard == nil {
  197. continue
  198. }
  199. docCount, _ := shard.DocCount()
  200. path := group.ShardPaths[ref.localID]
  201. var size int64
  202. if st, err := os.Stat(path); err == nil {
  203. size = st.Size()
  204. }
  205. stats = append(stats, &ShardInfo{
  206. ID: gid,
  207. Path: path,
  208. DocumentCount: docCount,
  209. Size: size,
  210. LastUpdated: 0,
  211. })
  212. }
  213. return stats
  214. }
  215. // CreateShard is not directly supported by global ID in grouped manager.
  216. func (gsm *GroupedShardManager) CreateShard(id int, path string) error {
  217. return fmt.Errorf("CreateShard by global id is not supported in grouped manager")
  218. }
  219. // CloseShard closes a shard by global ID.
  220. func (gsm *GroupedShardManager) CloseShard(id int) error {
  221. gsm.mu.Lock()
  222. defer gsm.mu.Unlock()
  223. ref, ok := gsm.globalToLocal[id]
  224. if !ok {
  225. return fmt.Errorf("%s: %d", ErrShardNotFound, id)
  226. }
  227. group := gsm.groups[ref.uuid]
  228. shard := group.Shards[ref.localID]
  229. if shard == nil {
  230. return fmt.Errorf("%s: %d", ErrShardNotFound, id)
  231. }
  232. if err := shard.Close(); err != nil {
  233. return fmt.Errorf("failed to close shard %d in group %s: %w", ref.localID, ref.uuid, err)
  234. }
  235. delete(group.Shards, ref.localID)
  236. delete(group.ShardPaths, ref.localID)
  237. delete(gsm.globalToLocal, id)
  238. delete(gsm.localToGlobal, gsm.makeLocalKey(ref.uuid, ref.localID))
  239. return nil
  240. }
  241. // OptimizeShard triggers internal optimization by global ID.
  242. func (gsm *GroupedShardManager) OptimizeShard(id int) error {
  243. shard, err := gsm.GetShardByID(id)
  244. if err != nil {
  245. return err
  246. }
  247. return shard.SetInternal([]byte("_optimize"), []byte("trigger"))
  248. }
  249. // Destroy removes all index data for all groups.
  250. func (gsm *GroupedShardManager) Destroy() error {
  251. gsm.mu.Lock()
  252. defer gsm.mu.Unlock()
  253. var errs []error
  254. for _, group := range gsm.groups {
  255. for id, shard := range group.Shards {
  256. if shard != nil {
  257. if err := shard.Close(); err != nil {
  258. errs = append(errs, fmt.Errorf("close shard %d in %s: %w", id, group.UUID, err))
  259. }
  260. }
  261. }
  262. // Delete directory
  263. groupPath := filepath.Join(gsm.config.IndexPath, group.UUID)
  264. if err := os.RemoveAll(groupPath); err != nil {
  265. errs = append(errs, fmt.Errorf("remove group path %s: %w", groupPath, err))
  266. }
  267. }
  268. // Extra safety: remove any residual entries under the index root that are not tracked in memory.
  269. // This ensures a clean slate in case of leftovers from previous runs/crashes.
  270. if gsm.config != nil && gsm.config.IndexPath != "" {
  271. entries, err := os.ReadDir(gsm.config.IndexPath)
  272. if err != nil {
  273. errs = append(errs, fmt.Errorf("read index root %s: %w", gsm.config.IndexPath, err))
  274. } else {
  275. for _, entry := range entries {
  276. residualPath := filepath.Join(gsm.config.IndexPath, entry.Name())
  277. if err := os.RemoveAll(residualPath); err != nil {
  278. errs = append(errs, fmt.Errorf("remove residual path %s: %w", residualPath, err))
  279. }
  280. }
  281. }
  282. }
  283. // Reset state
  284. gsm.groups = make(map[string]*ShardGroup)
  285. gsm.globalToLocal = make(map[int]groupShardRef)
  286. gsm.localToGlobal = make(map[string]int)
  287. gsm.nextGlobalID = 0
  288. if len(errs) > 0 {
  289. return fmt.Errorf("destroy errors: %v", errs)
  290. }
  291. return nil
  292. }
  293. // Helper: get or create group
  294. func (gsm *GroupedShardManager) getOrCreateGroup(mainLogPath string) (*ShardGroup, error) {
  295. // First check cache
  296. gsm.mu.RLock()
  297. if uuid, ok := gsm.pathToUUID[mainLogPath]; ok {
  298. if group, exists := gsm.groups[uuid]; exists {
  299. gsm.mu.RUnlock()
  300. return group, nil
  301. }
  302. }
  303. gsm.mu.RUnlock()
  304. // Cache miss, resolve/create UUID
  305. uuidStr, err := gsm.getOrCreateUUID(mainLogPath)
  306. if err != nil {
  307. return nil, err
  308. }
  309. gsm.mu.Lock()
  310. defer gsm.mu.Unlock()
  311. if group, exists := gsm.groups[uuidStr]; exists {
  312. return group, nil
  313. }
  314. // Initialize group directory and shards
  315. group := &ShardGroup{
  316. UUID: uuidStr,
  317. MainLogPath: mainLogPath,
  318. Shards: make(map[int]bleve.Index),
  319. ShardPaths: make(map[int]string),
  320. ShardCount: gsm.config.ShardCount,
  321. }
  322. groupBase := filepath.Join(gsm.config.IndexPath, uuidStr)
  323. if err := os.MkdirAll(groupBase, 0755); err != nil {
  324. return nil, fmt.Errorf("failed to create group base %s: %w", groupBase, err)
  325. }
  326. for i := 0; i < group.ShardCount; i++ {
  327. shard, shardPath, err := gsm.openOrCreateShard(groupBase, i)
  328. if err != nil {
  329. return nil, fmt.Errorf("failed to init shard %d for group %s: %w", i, uuidStr, err)
  330. }
  331. group.Shards[i] = shard
  332. group.ShardPaths[i] = shardPath
  333. // Assign global ID
  334. gID := gsm.nextGlobalID
  335. gsm.globalToLocal[gID] = groupShardRef{uuid: uuidStr, localID: i}
  336. gsm.localToGlobal[gsm.makeLocalKey(uuidStr, i)] = gID
  337. gsm.nextGlobalID++
  338. }
  339. gsm.groups[uuidStr] = group
  340. gsm.pathToUUID[mainLogPath] = uuidStr
  341. logger.Infof("Initialized shard group %s for mainLogPath %s with %d shards", uuidStr, mainLogPath, group.ShardCount)
  342. return group, nil
  343. }
  344. func (gsm *GroupedShardManager) openOrCreateShard(groupBase string, shardID int) (bleve.Index, string, error) {
  345. // shard path: groupBase/shard_{id}
  346. name := make([]byte, 0, 16)
  347. name = append(name, "shard_"...)
  348. name = append(name, []byte(fmt.Sprintf("%d", shardID))...)
  349. shardName := string(name)
  350. shardPath := filepath.Join(groupBase, shardName)
  351. if err := os.MkdirAll(shardPath, 0755); err != nil {
  352. return nil, "", fmt.Errorf("failed to create shard dir: %w", err)
  353. }
  354. var idx bleve.Index
  355. var err error
  356. if _, statErr := os.Stat(filepath.Join(shardPath, "index_meta.json")); os.IsNotExist(statErr) {
  357. // New index, reuse original mapping and storage optimizations
  358. mapping := CreateLogIndexMapping()
  359. kvConfig := map[string]interface{}{
  360. "scorchMergePlanOptions": map[string]interface{}{
  361. "FloorSegmentFileSize": 5000000,
  362. },
  363. }
  364. idx, err = bleve.NewUsing(shardPath, mapping, bleve.Config.DefaultIndexType, bleve.Config.DefaultMemKVStore, kvConfig)
  365. if err != nil {
  366. return nil, "", fmt.Errorf("create bleve index: %w", err)
  367. }
  368. } else {
  369. idx, err = bleve.Open(shardPath)
  370. if err != nil {
  371. return nil, "", fmt.Errorf("open bleve index: %w", err)
  372. }
  373. }
  374. return idx, shardPath, nil
  375. }
  376. // loadExistingGroups scans the database for existing main_log_path groups and opens their shards.
  377. // This ensures that after process restart, previously built indices are immediately available.
  378. func (gsm *GroupedShardManager) loadExistingGroups() error {
  379. // Fetch all enabled index records ordered by creation time so the first seen per main_log_path
  380. // becomes the canonical UUID for the group (consistent with getOrCreateUUID logic)
  381. q := query.NginxLogIndex
  382. records, err := q.Where(q.Enabled.Is(true)).Order(q.CreatedAt).Find()
  383. if err != nil {
  384. return fmt.Errorf("failed to query existing index records: %w", err)
  385. }
  386. // Build unique main_log_path -> uuid mapping
  387. type groupInfo struct {
  388. uuid string
  389. path string
  390. }
  391. groups := make(map[string]groupInfo)
  392. for _, rec := range records {
  393. if rec == nil || rec.MainLogPath == "" {
  394. continue
  395. }
  396. if _, exists := groups[rec.MainLogPath]; exists {
  397. continue
  398. }
  399. groups[rec.MainLogPath] = groupInfo{uuid: rec.ID.String(), path: rec.MainLogPath}
  400. }
  401. if len(groups) == 0 {
  402. logger.Debugf("loadExistingGroups: no existing groups found")
  403. return nil
  404. }
  405. gsm.mu.Lock()
  406. defer gsm.mu.Unlock()
  407. for mainPath, gi := range groups {
  408. // Skip if already present in memory (e.g., created by recent writes)
  409. if _, ok := gsm.pathToUUID[mainPath]; ok {
  410. continue
  411. }
  412. groupBase := filepath.Join(gsm.config.IndexPath, gi.uuid)
  413. // Ensure group directory exists; if not, skip silently
  414. if _, statErr := os.Stat(groupBase); os.IsNotExist(statErr) {
  415. // No on-disk index for this group yet; skip to avoid creating empty shards
  416. logger.Debugf("loadExistingGroups: group directory not found, skipping: %s (%s)", mainPath, gi.uuid)
  417. continue
  418. }
  419. group := &ShardGroup{
  420. UUID: gi.uuid,
  421. MainLogPath: mainPath,
  422. Shards: make(map[int]bleve.Index),
  423. ShardPaths: make(map[int]string),
  424. ShardCount: gsm.config.ShardCount,
  425. }
  426. // Open shards that exist; attempt to open up to configured ShardCount
  427. for i := 0; i < group.ShardCount; i++ {
  428. // mirror shard naming logic
  429. shardName := fmt.Sprintf("shard_%d", i)
  430. shardPath := filepath.Join(groupBase, shardName)
  431. if _, err := os.Stat(shardPath); err != nil {
  432. if os.IsNotExist(err) {
  433. // Shard directory missing - skip without creating to avoid empty indices
  434. continue
  435. }
  436. logger.Warnf("loadExistingGroups: stat shard dir failed: %s: %v", shardPath, err)
  437. continue
  438. }
  439. idx, openErr := bleve.Open(shardPath)
  440. if openErr != nil {
  441. logger.Warnf("loadExistingGroups: failed to open shard at %s: %v", shardPath, openErr)
  442. continue
  443. }
  444. group.Shards[i] = idx
  445. group.ShardPaths[i] = shardPath
  446. // Assign stable global ID for this shard in current process
  447. gID := gsm.nextGlobalID
  448. gsm.globalToLocal[gID] = groupShardRef{uuid: gi.uuid, localID: i}
  449. gsm.localToGlobal[gsm.makeLocalKey(gi.uuid, i)] = gID
  450. gsm.nextGlobalID++
  451. }
  452. // Only register group if at least one shard was opened
  453. if len(group.Shards) > 0 {
  454. gsm.groups[gi.uuid] = group
  455. gsm.pathToUUID[mainPath] = gi.uuid
  456. logger.Infof("Loaded shard group %s for mainLogPath %s with %d shard(s)", gi.uuid, mainPath, len(group.Shards))
  457. }
  458. }
  459. return nil
  460. }
  461. // getOrCreateUUID: Find the first record UUID for a MainLogPath in the DB; create a placeholder if not found.
  462. func (gsm *GroupedShardManager) getOrCreateUUID(mainLogPath string) (string, error) {
  463. // First check cache
  464. gsm.mu.RLock()
  465. if uuid, ok := gsm.pathToUUID[mainLogPath]; ok {
  466. gsm.mu.RUnlock()
  467. return uuid, nil
  468. }
  469. gsm.mu.RUnlock()
  470. q := query.NginxLogIndex
  471. // Get the first record in ascending order of creation time
  472. record, err := q.Where(q.MainLogPath.Eq(mainLogPath)).Order(q.CreatedAt).First()
  473. if err == nil && record != nil {
  474. return record.ID.String(), nil
  475. }
  476. // If not found, create a placeholder record: Path and MainLogPath are set to mainLogPath
  477. placeholder := &model.NginxLogIndex{
  478. BaseModelUUID: model.BaseModelUUID{},
  479. Path: mainLogPath,
  480. MainLogPath: mainLogPath,
  481. Enabled: true,
  482. }
  483. if err := q.Create(placeholder); err != nil {
  484. return "", fmt.Errorf("failed to create placeholder NginxLogIndex for %s: %w", mainLogPath, err)
  485. }
  486. return placeholder.ID.String(), nil
  487. }
  488. // makeLocalKey constructs a unique key.
  489. func (gsm *GroupedShardManager) makeLocalKey(uuid string, shardID int) string {
  490. var b strings.Builder
  491. b.Grow(len(uuid) + 8)
  492. b.WriteString(uuid)
  493. b.WriteString("#")
  494. b.WriteString(fmt.Sprintf("%d", shardID))
  495. return b.String()
  496. }
  497. // Simple hash (reusing old logic)
  498. func defaultHashFunc(key string, shardCount int) int {
  499. // Simplified FNV-1a implementation to avoid introducing additional dependencies
  500. var hash uint32 = 2166136261
  501. for i := 0; i < len(key); i++ {
  502. hash ^= uint32(key[i])
  503. hash *= 16777619
  504. }
  505. return int(hash % uint32(shardCount))
  506. }