1
0

dashboard.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. package analytics
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // GetDashboardAnalytics generates comprehensive dashboard analytics
  11. func (s *service) GetDashboardAnalytics(ctx context.Context, req *DashboardQueryRequest) (*DashboardAnalytics, error) {
  12. if req == nil {
  13. return nil, fmt.Errorf("request cannot be nil")
  14. }
  15. if err := s.ValidateTimeRange(req.StartTime, req.EndTime); err != nil {
  16. return nil, fmt.Errorf("invalid time range: %w", err)
  17. }
  18. searchReq := &searcher.SearchRequest{
  19. StartTime: &req.StartTime,
  20. EndTime: &req.EndTime,
  21. LogPaths: req.LogPaths,
  22. UseMainLogPath: true, // Use main_log_path field for efficient log group queries
  23. IncludeFacets: true,
  24. FacetFields: []string{"browser", "os", "device_type"}, // Removed 'ip' to reduce facet computation
  25. FacetSize: 50, // Significantly reduced for faster facet computation
  26. UseCache: true,
  27. SortBy: "timestamp",
  28. SortOrder: "desc",
  29. Limit: 0, // Don't fetch documents, use aggregations instead
  30. }
  31. // Execute search
  32. result, err := s.searcher.Search(ctx, searchReq)
  33. if err != nil {
  34. return nil, fmt.Errorf("failed to search logs for dashboard: %w", err)
  35. }
  36. // DEBUG: Check if documents have main_log_path field
  37. if result.TotalHits == 0 {
  38. logger.Warnf("⚠️ No results found with main_log_path query!")
  39. debugReq := &searcher.SearchRequest{
  40. Limit: 3,
  41. UseCache: false,
  42. Fields: []string{"main_log_path", "file_path", "timestamp"},
  43. }
  44. if debugResult, debugErr := s.searcher.Search(ctx, debugReq); debugErr == nil {
  45. logger.Warnf("📊 Index contains %d total documents", debugResult.TotalHits)
  46. if len(debugResult.Hits) > 0 {
  47. for i, hit := range debugResult.Hits {
  48. logger.Warnf("📄 Document %d fields: %+v", i, hit.Fields)
  49. if i >= 2 { break }
  50. }
  51. }
  52. }
  53. }
  54. // --- DIAGNOSTIC LOGGING ---
  55. logger.Debugf("Dashboard search completed. Total Hits: %d, Returned Hits: %d, Facets: %d",
  56. result.TotalHits, len(result.Hits), len(result.Facets))
  57. if result.TotalHits > uint64(len(result.Hits)) {
  58. logger.Warnf("Dashboard sampling: using %d/%d documents for time calculations (%.1f%% coverage)",
  59. len(result.Hits), result.TotalHits, float64(len(result.Hits))/float64(result.TotalHits)*100)
  60. }
  61. // --- END DIAGNOSTIC LOGGING ---
  62. // Initialize analytics with empty slices
  63. analytics := &DashboardAnalytics{}
  64. // Calculate analytics if we have results
  65. if result.TotalHits > 0 {
  66. // For now, use batch queries to get complete data
  67. analytics.HourlyStats = s.calculateHourlyStatsWithBatch(ctx, req)
  68. analytics.DailyStats = s.calculateDailyStatsWithBatch(ctx, req)
  69. // Use cardinality counter for efficient unique URLs counting
  70. analytics.TopURLs = s.calculateTopURLsWithCardinality(ctx, req)
  71. analytics.Browsers = s.calculateBrowserStats(result)
  72. analytics.OperatingSystems = s.calculateOSStats(result)
  73. analytics.Devices = s.calculateDeviceStats(result)
  74. } else {
  75. // Ensure slices are initialized even if there are no hits
  76. analytics.HourlyStats = make([]HourlyAccessStats, 0)
  77. analytics.DailyStats = make([]DailyAccessStats, 0)
  78. analytics.TopURLs = make([]URLAccessStats, 0)
  79. analytics.Browsers = make([]BrowserAccessStats, 0)
  80. analytics.OperatingSystems = make([]OSAccessStats, 0)
  81. analytics.Devices = make([]DeviceAccessStats, 0)
  82. }
  83. // Calculate summary with cardinality counting for accurate unique pages
  84. analytics.Summary = s.calculateDashboardSummaryWithCardinality(ctx, analytics, result, req)
  85. return analytics, nil
  86. }
  87. // calculateHourlyStats calculates hourly access statistics.
  88. // Returns 48 hours of data centered around the end_date to support all timezones.
  89. func (s *service) calculateHourlyStats(result *searcher.SearchResult, startTime, endTime int64) []HourlyAccessStats {
  90. // Use a map with timestamp as key for easier processing
  91. hourlyMap := make(map[int64]*HourlyAccessStats)
  92. uniqueIPsPerHour := make(map[int64]map[string]bool)
  93. // Calculate 48 hours range: from UTC end_date minus 12 hours to plus 36 hours
  94. // This covers UTC-12 to UTC+14 timezones
  95. endDate := time.Unix(endTime, 0).UTC()
  96. endDateStart := time.Date(endDate.Year(), endDate.Month(), endDate.Day(), 0, 0, 0, 0, time.UTC)
  97. // Create hourly buckets for 48 hours (12 hours before to 36 hours after the UTC date boundary)
  98. rangeStart := endDateStart.Add(-12 * time.Hour)
  99. rangeEnd := endDateStart.Add(36 * time.Hour)
  100. // Initialize hourly buckets
  101. for t := rangeStart; t.Before(rangeEnd); t = t.Add(time.Hour) {
  102. timestamp := t.Unix()
  103. hourlyMap[timestamp] = &HourlyAccessStats{
  104. Hour: t.Hour(),
  105. UV: 0,
  106. PV: 0,
  107. Timestamp: timestamp,
  108. }
  109. uniqueIPsPerHour[timestamp] = make(map[string]bool)
  110. }
  111. // Process search results - count hits within the 48-hour window
  112. for _, hit := range result.Hits {
  113. if timestampField, ok := hit.Fields["timestamp"]; ok {
  114. if timestampFloat, ok := timestampField.(float64); ok {
  115. timestamp := int64(timestampFloat)
  116. // Check if this hit falls within our 48-hour window
  117. if timestamp >= rangeStart.Unix() && timestamp < rangeEnd.Unix() {
  118. // Round down to the hour
  119. t := time.Unix(timestamp, 0).UTC()
  120. hourTimestamp := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, time.UTC).Unix()
  121. if stats, exists := hourlyMap[hourTimestamp]; exists {
  122. stats.PV++
  123. if ipField, ok := hit.Fields["ip"]; ok {
  124. if ip, ok := ipField.(string); ok && ip != "" {
  125. if !uniqueIPsPerHour[hourTimestamp][ip] {
  126. uniqueIPsPerHour[hourTimestamp][ip] = true
  127. stats.UV++
  128. }
  129. }
  130. }
  131. }
  132. }
  133. }
  134. }
  135. }
  136. // Convert to slice and sort by timestamp
  137. var stats []HourlyAccessStats
  138. for _, stat := range hourlyMap {
  139. stats = append(stats, *stat)
  140. }
  141. sort.Slice(stats, func(i, j int) bool {
  142. return stats[i].Timestamp < stats[j].Timestamp
  143. })
  144. return stats
  145. }
  146. // calculateDailyStats calculates daily access statistics
  147. func (s *service) calculateDailyStats(result *searcher.SearchResult, startTime, endTime int64) []DailyAccessStats {
  148. dailyMap := make(map[string]*DailyAccessStats)
  149. uniqueIPsPerDay := make(map[string]map[string]bool)
  150. // Initialize daily buckets for the entire time range
  151. start := time.Unix(startTime, 0)
  152. end := time.Unix(endTime, 0)
  153. for t := start; t.Before(end) || t.Equal(end); t = t.AddDate(0, 0, 1) {
  154. dateStr := t.Format("2006-01-02")
  155. if _, exists := dailyMap[dateStr]; !exists {
  156. dailyMap[dateStr] = &DailyAccessStats{
  157. Date: dateStr,
  158. UV: 0,
  159. PV: 0,
  160. Timestamp: t.Unix(),
  161. }
  162. uniqueIPsPerDay[dateStr] = make(map[string]bool)
  163. }
  164. }
  165. // Process search results
  166. for _, hit := range result.Hits {
  167. if timestampField, ok := hit.Fields["timestamp"]; ok {
  168. if timestampFloat, ok := timestampField.(float64); ok {
  169. timestamp := int64(timestampFloat)
  170. t := time.Unix(timestamp, 0)
  171. dateStr := t.Format("2006-01-02")
  172. if stats, exists := dailyMap[dateStr]; exists {
  173. stats.PV++
  174. if ipField, ok := hit.Fields["ip"]; ok {
  175. if ip, ok := ipField.(string); ok && ip != "" {
  176. if !uniqueIPsPerDay[dateStr][ip] {
  177. uniqueIPsPerDay[dateStr][ip] = true
  178. stats.UV++
  179. }
  180. }
  181. }
  182. }
  183. }
  184. }
  185. }
  186. // Convert to slice and sort
  187. var stats []DailyAccessStats
  188. for _, stat := range dailyMap {
  189. stats = append(stats, *stat)
  190. }
  191. sort.Slice(stats, func(i, j int) bool {
  192. return stats[i].Timestamp < stats[j].Timestamp
  193. })
  194. return stats
  195. }
  196. // calculateTopURLs calculates top URL statistics from facets (legacy method)
  197. func (s *service) calculateTopURLs(result *searcher.SearchResult) []URLAccessStats {
  198. if facet, ok := result.Facets["path_exact"]; ok {
  199. logger.Infof("📊 Facet-based URL calculation: facet.Total=%d, TotalHits=%d",
  200. facet.Total, result.TotalHits)
  201. urlStats := calculateTopFieldStats(facet, int(result.TotalHits), func(term string, count int, percent float64) URLAccessStats {
  202. return URLAccessStats{URL: term, Visits: count, Percent: percent}
  203. })
  204. logger.Infof("📈 Calculated %d URL stats from facet", len(urlStats))
  205. return urlStats
  206. } else {
  207. logger.Errorf("❌ path_exact facet not found in search results")
  208. return []URLAccessStats{}
  209. }
  210. }
  211. // calculateTopURLsWithCardinality calculates top URL statistics using facet-based approach
  212. // Always returns actual top URLs with their visit counts instead of just a summary
  213. func (s *service) calculateTopURLsWithCardinality(ctx context.Context, req *DashboardQueryRequest) []URLAccessStats {
  214. // Always use facet-based calculation to get actual top URLs with visit counts
  215. searchReq := &searcher.SearchRequest{
  216. StartTime: &req.StartTime,
  217. EndTime: &req.EndTime,
  218. LogPaths: req.LogPaths,
  219. UseMainLogPath: true, // Use main_log_path for efficient log group queries
  220. IncludeFacets: true,
  221. FacetFields: []string{"path_exact"},
  222. FacetSize: 100, // Reasonable facet size to get top URLs
  223. UseCache: true,
  224. }
  225. result, err := s.searcher.Search(ctx, searchReq)
  226. if err != nil {
  227. logger.Errorf("Failed to search for URL facets: %v", err)
  228. return []URLAccessStats{}
  229. }
  230. // Get actual top URLs with visit counts
  231. return s.calculateTopURLs(result)
  232. }
  233. // calculateBrowserStats calculates browser statistics from facets
  234. func (s *service) calculateBrowserStats(result *searcher.SearchResult) []BrowserAccessStats {
  235. return calculateTopFieldStats(result.Facets["browser"], int(result.TotalHits), func(term string, count int, percent float64) BrowserAccessStats {
  236. return BrowserAccessStats{Browser: term, Count: count, Percent: percent}
  237. })
  238. }
  239. // calculateOSStats calculates operating system statistics from facets
  240. func (s *service) calculateOSStats(result *searcher.SearchResult) []OSAccessStats {
  241. return calculateTopFieldStats(result.Facets["os"], int(result.TotalHits), func(term string, count int, percent float64) OSAccessStats {
  242. return OSAccessStats{OS: term, Count: count, Percent: percent}
  243. })
  244. }
  245. // calculateDeviceStats calculates device statistics from facets
  246. func (s *service) calculateDeviceStats(result *searcher.SearchResult) []DeviceAccessStats {
  247. return calculateTopFieldStats(result.Facets["device_type"], int(result.TotalHits), func(term string, count int, percent float64) DeviceAccessStats {
  248. return DeviceAccessStats{Device: term, Count: count, Percent: percent}
  249. })
  250. }
  251. // calculateTopFieldStats is a generic function to calculate top N items from a facet result.
  252. func calculateTopFieldStats[T any](
  253. facet *searcher.Facet,
  254. totalHits int,
  255. creator func(term string, count int, percent float64) T,
  256. ) []T {
  257. if facet == nil || totalHits == 0 {
  258. return []T{}
  259. }
  260. var items []T
  261. for _, term := range facet.Terms {
  262. percent := float64(term.Count) / float64(totalHits) * 100
  263. items = append(items, creator(term.Term, term.Count, percent))
  264. }
  265. return items
  266. }
  267. // calculateDashboardSummary calculates summary statistics
  268. func (s *service) calculateDashboardSummary(analytics *DashboardAnalytics, result *searcher.SearchResult) DashboardSummary {
  269. // Calculate total UV from IP facet, which is now reliable.
  270. totalUV := 0
  271. if result.Facets != nil {
  272. if ipFacet, ok := result.Facets["ip"]; ok {
  273. // The total number of unique terms in the facet is the UV count.
  274. totalUV = ipFacet.Total
  275. }
  276. }
  277. totalPV := int(result.TotalHits)
  278. // Calculate average daily UV and PV
  279. var avgDailyUV, avgDailyPV float64
  280. if len(analytics.DailyStats) > 0 {
  281. var sumPV int
  282. for _, daily := range analytics.DailyStats {
  283. sumPV += daily.PV
  284. }
  285. // Use total unique visitors divided by number of days for accurate daily UV average
  286. // The totalUV represents unique visitors across the entire period, not sum of daily UVs
  287. avgDailyUV = float64(totalUV) / float64(len(analytics.DailyStats))
  288. avgDailyPV = float64(sumPV) / float64(len(analytics.DailyStats))
  289. }
  290. // Find peak hour
  291. var peakHour, peakHourTraffic int
  292. for _, hourly := range analytics.HourlyStats {
  293. if hourly.PV > peakHourTraffic {
  294. peakHour = hourly.Hour
  295. peakHourTraffic = hourly.PV
  296. }
  297. }
  298. return DashboardSummary{
  299. TotalUV: totalUV,
  300. TotalPV: totalPV,
  301. AvgDailyUV: avgDailyUV,
  302. AvgDailyPV: avgDailyPV,
  303. PeakHour: peakHour,
  304. PeakHourTraffic: peakHourTraffic,
  305. }
  306. }
  307. // calculateDashboardSummaryWithCardinality calculates enhanced summary statistics using cardinality counters
  308. func (s *service) calculateDashboardSummaryWithCardinality(ctx context.Context, analytics *DashboardAnalytics, result *searcher.SearchResult, req *DashboardQueryRequest) DashboardSummary {
  309. // Start with the basic summary but we'll override the UV calculation
  310. summary := s.calculateDashboardSummary(analytics, result)
  311. // Use cardinality counter for accurate unique visitor (UV) counting if available
  312. cardinalityCounter := s.getCardinalityCounter()
  313. if cardinalityCounter != nil {
  314. // Count unique IPs (visitors) using cardinality counter instead of limited facet
  315. uvCardReq := &searcher.CardinalityRequest{
  316. Field: "ip",
  317. StartTime: &req.StartTime,
  318. EndTime: &req.EndTime,
  319. LogPaths: req.LogPaths,
  320. UseMainLogPath: true, // Use main_log_path for efficient log group queries
  321. }
  322. if uvResult, err := cardinalityCounter.CountCardinality(ctx, uvCardReq); err == nil {
  323. // Override the facet-limited UV count with accurate cardinality count
  324. summary.TotalUV = int(uvResult.Cardinality)
  325. // Recalculate average daily UV with accurate count
  326. if len(analytics.DailyStats) > 0 {
  327. summary.AvgDailyUV = float64(summary.TotalUV) / float64(len(analytics.DailyStats))
  328. }
  329. // Log the improvement - handle case where IP facet might not exist
  330. facetUV := "N/A"
  331. if result.Facets != nil && result.Facets["ip"] != nil {
  332. facetUV = fmt.Sprintf("%d", result.Facets["ip"].Total)
  333. }
  334. logger.Infof("✓ Accurate UV count using CardinalityCounter: %d (was limited to %s by facet)",
  335. uvResult.Cardinality, facetUV)
  336. } else {
  337. logger.Errorf("Failed to count unique visitors with cardinality counter: %v", err)
  338. }
  339. // Also count unique pages for additional insights
  340. pageCardReq := &searcher.CardinalityRequest{
  341. Field: "path_exact",
  342. StartTime: &req.StartTime,
  343. EndTime: &req.EndTime,
  344. LogPaths: req.LogPaths,
  345. UseMainLogPath: true, // Use main_log_path for efficient log group queries
  346. }
  347. if pageResult, err := cardinalityCounter.CountCardinality(ctx, pageCardReq); err == nil {
  348. logger.Debugf("Accurate unique pages count: %d (vs Total PV: %d)", pageResult.Cardinality, summary.TotalPV)
  349. if pageResult.Cardinality <= uint64(summary.TotalPV) {
  350. logger.Infof("✓ Unique pages (%d) ≤ Total PV (%d) - data consistency verified", pageResult.Cardinality, summary.TotalPV)
  351. } else {
  352. logger.Warnf("⚠ Unique pages (%d) > Total PV (%d) - possible data inconsistency", pageResult.Cardinality, summary.TotalPV)
  353. }
  354. } else {
  355. logger.Errorf("Failed to count unique pages: %v", err)
  356. }
  357. } else {
  358. logger.Warnf("CardinalityCounter not available, UV count limited by facet size to %d", summary.TotalUV)
  359. }
  360. return summary
  361. }
  362. // calculateDailyStatsWithBatch calculates daily statistics by fetching data in batches
  363. func (s *service) calculateDailyStatsWithBatch(ctx context.Context, req *DashboardQueryRequest) []DailyAccessStats {
  364. dailyMap := make(map[string]*DailyAccessStats)
  365. uniqueIPsPerDay := make(map[string]map[string]bool)
  366. // Initialize daily buckets for the entire time range
  367. start := time.Unix(req.StartTime, 0)
  368. end := time.Unix(req.EndTime, 0)
  369. for t := start; t.Before(end) || t.Equal(end); t = t.AddDate(0, 0, 1) {
  370. dateStr := t.Format("2006-01-02")
  371. if _, exists := dailyMap[dateStr]; !exists {
  372. dailyMap[dateStr] = &DailyAccessStats{
  373. Date: dateStr,
  374. UV: 0,
  375. PV: 0,
  376. Timestamp: t.Unix(),
  377. }
  378. uniqueIPsPerDay[dateStr] = make(map[string]bool)
  379. }
  380. }
  381. // Process data in batches to avoid memory issues - significantly increased batch size for maximum performance
  382. batchSize := 150000 // Increased batch size for better throughput
  383. offset := 0
  384. logger.Debugf("📅 Daily stats batch query: start=%d (%s), end=%d (%s), expected days=%d",
  385. req.StartTime, time.Unix(req.StartTime, 0).Format("2006-01-02 15:04:05"),
  386. req.EndTime, time.Unix(req.EndTime, 0).Format("2006-01-02 15:04:05"),
  387. len(dailyMap))
  388. totalProcessedDaily := 0
  389. for {
  390. searchReq := &searcher.SearchRequest{
  391. StartTime: &req.StartTime,
  392. EndTime: &req.EndTime,
  393. LogPaths: req.LogPaths,
  394. UseMainLogPath: true, // Use main_log_path for efficient log group queries
  395. Limit: batchSize,
  396. Offset: offset,
  397. Fields: []string{"timestamp", "ip"},
  398. UseCache: false, // Don't cache intermediate results
  399. }
  400. result, err := s.searcher.Search(ctx, searchReq)
  401. if err != nil {
  402. logger.Errorf("Failed to fetch batch at offset %d: %v", offset, err)
  403. break
  404. }
  405. logger.Debugf("🔍 Daily batch %d: returned %d hits, totalHits=%d",
  406. offset/batchSize, len(result.Hits), result.TotalHits)
  407. // Process this batch of results
  408. processedInBatch := 0
  409. for _, hit := range result.Hits {
  410. if timestampField, ok := hit.Fields["timestamp"]; ok {
  411. if timestampFloat, ok := timestampField.(float64); ok {
  412. timestamp := int64(timestampFloat)
  413. t := time.Unix(timestamp, 0)
  414. dateStr := t.Format("2006-01-02")
  415. if stats, exists := dailyMap[dateStr]; exists {
  416. stats.PV++
  417. processedInBatch++
  418. if ipField, ok := hit.Fields["ip"]; ok {
  419. if ip, ok := ipField.(string); ok && ip != "" {
  420. if !uniqueIPsPerDay[dateStr][ip] {
  421. uniqueIPsPerDay[dateStr][ip] = true
  422. stats.UV++
  423. }
  424. }
  425. }
  426. } else {
  427. if offset < 10 { // Only log first few mismatches to avoid spam
  428. logger.Debugf("⚠️ Daily: timestamp %d (%s) -> date %s not found in dailyMap",
  429. timestamp, t.Format("2006-01-02 15:04:05"), dateStr)
  430. }
  431. }
  432. } else {
  433. if offset < 10 {
  434. logger.Debugf("⚠️ Daily: timestamp field is not float64: %T = %v", timestampField, timestampField)
  435. }
  436. }
  437. } else {
  438. if offset < 10 {
  439. logger.Debugf("⚠️ Daily: no timestamp field in hit: %+v", hit.Fields)
  440. }
  441. }
  442. }
  443. logger.Debugf("📝 Daily batch %d: processed %d/%d records", offset/batchSize, processedInBatch, len(result.Hits))
  444. // Check if we've processed all results
  445. if len(result.Hits) < batchSize {
  446. break
  447. }
  448. offset += batchSize
  449. totalProcessedDaily += processedInBatch
  450. // Log progress
  451. logger.Debugf("Processed %d/%d records for daily stats", offset, result.TotalHits)
  452. }
  453. logger.Infof("📊 Daily stats processing completed: %d total records processed, %d day buckets", totalProcessedDaily, len(dailyMap))
  454. // Convert to slice and sort
  455. var stats []DailyAccessStats
  456. for _, stat := range dailyMap {
  457. stats = append(stats, *stat)
  458. }
  459. sort.Slice(stats, func(i, j int) bool {
  460. return stats[i].Timestamp < stats[j].Timestamp
  461. })
  462. return stats
  463. }
  464. // calculateHourlyStatsWithBatch calculates hourly statistics by fetching data in batches
  465. func (s *service) calculateHourlyStatsWithBatch(ctx context.Context, req *DashboardQueryRequest) []HourlyAccessStats {
  466. // Use a map with timestamp as key for easier processing
  467. hourlyMap := make(map[int64]*HourlyAccessStats)
  468. uniqueIPsPerHour := make(map[int64]map[string]bool)
  469. // For user date range queries, cover the full requested range plus timezone buffer
  470. // This ensures we capture data in all timezones for the requested dates
  471. startDate := time.Unix(req.StartTime, 0).UTC()
  472. endDate := time.Unix(req.EndTime, 0).UTC()
  473. // Add timezone buffer: 12 hours before start, 12 hours after end
  474. // This covers UTC-12 to UTC+12 timezones adequately
  475. rangeStart := startDate.Add(-12 * time.Hour)
  476. rangeEnd := endDate.Add(12 * time.Hour)
  477. // Initialize hourly buckets
  478. for t := rangeStart; t.Before(rangeEnd); t = t.Add(time.Hour) {
  479. timestamp := t.Unix()
  480. hourlyMap[timestamp] = &HourlyAccessStats{
  481. Hour: t.Hour(),
  482. UV: 0,
  483. PV: 0,
  484. Timestamp: timestamp,
  485. }
  486. uniqueIPsPerHour[timestamp] = make(map[string]bool)
  487. }
  488. // Process data in batches - significantly increased batch size for maximum performance
  489. batchSize := 150000 // Increased batch size for better throughput
  490. offset := 0
  491. // Adjust time range for hourly query
  492. hourlyStartTime := rangeStart.Unix()
  493. hourlyEndTime := rangeEnd.Unix()
  494. logger.Debugf("🕐 Hourly stats batch query: start=%d (%s), end=%d (%s), expected buckets=%d",
  495. hourlyStartTime, time.Unix(hourlyStartTime, 0).Format("2006-01-02 15:04:05"),
  496. hourlyEndTime, time.Unix(hourlyEndTime, 0).Format("2006-01-02 15:04:05"),
  497. len(hourlyMap))
  498. totalProcessed := 0
  499. for {
  500. searchReq := &searcher.SearchRequest{
  501. StartTime: &hourlyStartTime,
  502. EndTime: &hourlyEndTime,
  503. LogPaths: req.LogPaths,
  504. UseMainLogPath: true, // Use main_log_path for efficient log group queries
  505. Limit: batchSize,
  506. Offset: offset,
  507. Fields: []string{"timestamp", "ip"},
  508. UseCache: false,
  509. }
  510. result, err := s.searcher.Search(ctx, searchReq)
  511. if err != nil {
  512. logger.Errorf("Failed to fetch batch at offset %d: %v", offset, err)
  513. break
  514. }
  515. logger.Debugf("🔍 Hourly batch %d: returned %d hits, totalHits=%d",
  516. offset/batchSize, len(result.Hits), result.TotalHits)
  517. // Process this batch of results
  518. processedInBatch := 0
  519. for _, hit := range result.Hits {
  520. if timestampField, ok := hit.Fields["timestamp"]; ok {
  521. if timestampFloat, ok := timestampField.(float64); ok {
  522. timestamp := int64(timestampFloat)
  523. // Round down to the hour
  524. t := time.Unix(timestamp, 0).UTC()
  525. hourTimestamp := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, time.UTC).Unix()
  526. if stats, exists := hourlyMap[hourTimestamp]; exists {
  527. stats.PV++
  528. processedInBatch++
  529. if ipField, ok := hit.Fields["ip"]; ok {
  530. if ip, ok := ipField.(string); ok && ip != "" {
  531. if !uniqueIPsPerHour[hourTimestamp][ip] {
  532. uniqueIPsPerHour[hourTimestamp][ip] = true
  533. stats.UV++
  534. }
  535. }
  536. }
  537. } else {
  538. if offset < 10 { // Only log first few mismatches
  539. hourStr := time.Unix(hourTimestamp, 0).Format("2006-01-02 15:04:05")
  540. logger.Debugf("⚠️ Hourly: timestamp %d (%s) -> hour %d (%s) not found in hourlyMap",
  541. timestamp, t.Format("2006-01-02 15:04:05"), hourTimestamp, hourStr)
  542. }
  543. }
  544. } else {
  545. if offset < 10 {
  546. logger.Debugf("⚠️ Hourly: timestamp field is not float64: %T = %v", timestampField, timestampField)
  547. }
  548. }
  549. } else {
  550. if offset < 10 {
  551. logger.Debugf("⚠️ Hourly: no timestamp field in hit: %+v", hit.Fields)
  552. }
  553. }
  554. }
  555. logger.Debugf("📝 Hourly batch %d: processed %d/%d records", offset/batchSize, processedInBatch, len(result.Hits))
  556. // Check if we've processed all results
  557. if len(result.Hits) < batchSize {
  558. break
  559. }
  560. offset += batchSize
  561. totalProcessed += processedInBatch
  562. // Log progress
  563. logger.Debugf("Processed %d/%d records for hourly stats", offset, result.TotalHits)
  564. }
  565. logger.Infof("📊 Hourly stats processing completed: %d total records processed, %d hour buckets", totalProcessed, len(hourlyMap))
  566. // Convert to slice and sort by timestamp
  567. var stats []HourlyAccessStats
  568. for _, stat := range hourlyMap {
  569. stats = append(stats, *stat)
  570. }
  571. sort.Slice(stats, func(i, j int) bool {
  572. return stats[i].Timestamp < stats[j].Timestamp
  573. })
  574. return stats
  575. }