1
0

service.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package analytics
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  6. "github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
  7. )
  8. // Service defines the interface for analytics operations
  9. type Service interface {
  10. GetDashboardAnalytics(ctx context.Context, req *DashboardQueryRequest) (*DashboardAnalytics, error)
  11. GetLogEntriesStats(ctx context.Context, req *searcher.SearchRequest) (*EntriesStats, error)
  12. GetGeoDistribution(ctx context.Context, req *GeoQueryRequest) (*GeoDistribution, error)
  13. GetGeoDistributionByCountry(ctx context.Context, req *GeoQueryRequest, countryCode string) (*GeoDistribution, error)
  14. GetTopCountries(ctx context.Context, req *GeoQueryRequest) ([]CountryStats, error)
  15. GetTopCities(ctx context.Context, req *GeoQueryRequest) ([]CityStats, error)
  16. GetGeoStatsForIP(ctx context.Context, req *GeoQueryRequest, ip string) (*CityStats, error)
  17. GetTopPaths(ctx context.Context, req *TopListRequest) ([]KeyValue, error)
  18. GetTopIPs(ctx context.Context, req *TopListRequest) ([]KeyValue, error)
  19. GetTopUserAgents(ctx context.Context, req *TopListRequest) ([]KeyValue, error)
  20. ValidateLogPath(logPath string) error
  21. ValidateTimeRange(startTime, endTime int64) error
  22. Stop() error
  23. }
  24. // service implements the Service interface
  25. type service struct {
  26. searcher searcher.Searcher
  27. cardinalityCounter *searcher.CardinalityCounter
  28. }
  29. // NewService creates a new analytics service
  30. func NewService(s searcher.Searcher) Service {
  31. // Try to extract shards from distributed searcher for cardinality counting
  32. var cardinalityCounter *searcher.CardinalityCounter
  33. if ds, ok := s.(*searcher.DistributedSearcher); ok {
  34. shards := ds.GetShards()
  35. if len(shards) > 0 {
  36. cardinalityCounter = searcher.NewCardinalityCounter(shards)
  37. }
  38. }
  39. return &service{
  40. searcher: s,
  41. cardinalityCounter: cardinalityCounter,
  42. }
  43. }
  44. // Stop gracefully stops the analytics service and its components
  45. func (s *service) Stop() error {
  46. if s.cardinalityCounter != nil {
  47. return s.cardinalityCounter.Stop()
  48. }
  49. return nil
  50. }
  51. // getCardinalityCounter dynamically creates or returns a cardinality counter
  52. // This is necessary because shards may be updated after service initialization
  53. func (s *service) getCardinalityCounter() *searcher.CardinalityCounter {
  54. // If we already have a cardinality counter and it's still valid, use it
  55. if s.cardinalityCounter != nil {
  56. return s.cardinalityCounter
  57. }
  58. // Try to create a new cardinality counter from current shards
  59. if ds, ok := s.searcher.(*searcher.DistributedSearcher); ok {
  60. shards := ds.GetShards()
  61. if len(shards) > 0 {
  62. // Update our cached cardinality counter
  63. s.cardinalityCounter = searcher.NewCardinalityCounter(shards)
  64. return s.cardinalityCounter
  65. }
  66. }
  67. return nil
  68. }
  69. // ValidateLogPath validates the log path against whitelist
  70. func (s *service) ValidateLogPath(logPath string) error {
  71. if logPath == "" {
  72. return nil // Empty path is acceptable for global search
  73. }
  74. if !utils.IsValidLogPath(logPath) {
  75. return fmt.Errorf("log path is not under whitelist")
  76. }
  77. return nil
  78. }
  79. // ValidateTimeRange validates the time range parameters
  80. func (s *service) ValidateTimeRange(startTime, endTime int64) error {
  81. if startTime < 0 || endTime < 0 {
  82. return fmt.Errorf("time values cannot be negative")
  83. }
  84. if startTime > 0 && endTime > 0 && startTime >= endTime {
  85. return fmt.Errorf("start time must be before end time")
  86. }
  87. return nil
  88. }
  89. // buildBaseSearchRequest builds a base search request with common parameters
  90. func (s *service) buildBaseSearchRequest(startTime, endTime int64, logPath string) *searcher.SearchRequest {
  91. req := &searcher.SearchRequest{
  92. Limit: DefaultLimit,
  93. Offset: 0,
  94. UseCache: true,
  95. }
  96. if startTime > 0 {
  97. req.StartTime = &startTime
  98. }
  99. if endTime > 0 {
  100. req.EndTime = &endTime
  101. }
  102. if logPath != "" {
  103. req.LogPaths = []string{logPath}
  104. }
  105. return req
  106. }
  107. // validateAndNormalizeSearchRequest validates and normalizes a search request
  108. func (s *service) validateAndNormalizeSearchRequest(req *searcher.SearchRequest) error {
  109. if req == nil {
  110. return fmt.Errorf("request cannot be nil")
  111. }
  112. if req.Limit <= 0 {
  113. req.Limit = DefaultLimit
  114. }
  115. if req.Limit > MaxLimit {
  116. req.Limit = MaxLimit
  117. }
  118. if req.Offset < 0 {
  119. req.Offset = 0
  120. }
  121. return nil
  122. }