monitor.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. package nginx_log
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
  12. )
  13. // Thresholds defines minimum acceptable performance metrics
  14. type Thresholds struct {
  15. ParseStreamOpsPerSec float64 `json:"parse_stream_ops_per_sec"`
  16. SIMDOpsPerSec float64 `json:"simd_ops_per_sec"`
  17. MemoryPoolOpsPerSec float64 `json:"memory_pool_ops_per_sec"`
  18. RegexCacheOpsPerSec float64 `json:"regex_cache_ops_per_sec"`
  19. MaxMemoryUsageMB float64 `json:"max_memory_usage_mb"`
  20. MaxResponseTimeMS float64 `json:"max_response_time_ms"`
  21. }
  22. // DefaultThresholds returns the expected minimum performance levels
  23. func DefaultThresholds() *Thresholds {
  24. return &Thresholds{
  25. ParseStreamOpsPerSec: 500.0, // 7-8x improvement target
  26. SIMDOpsPerSec: 10000.0, // 235x improvement target
  27. MemoryPoolOpsPerSec: 100000.0, // 48-81% improvement target
  28. RegexCacheOpsPerSec: 1000000.0, // High-performance caching target
  29. MaxMemoryUsageMB: 500.0, // Maximum memory usage
  30. MaxResponseTimeMS: 1000.0, // Maximum response time
  31. }
  32. }
  33. // Metrics represents current system performance
  34. type Metrics struct {
  35. Timestamp time.Time `json:"timestamp"`
  36. ParseStreamRate float64 `json:"parse_stream_rate"`
  37. SIMDRate float64 `json:"simd_rate"`
  38. MemoryPoolRate float64 `json:"memory_pool_rate"`
  39. RegexCacheRate float64 `json:"regex_cache_rate"`
  40. MemoryUsageMB float64 `json:"memory_usage_mb"`
  41. ResponseTimeMS float64 `json:"response_time_ms"`
  42. CacheHitRate float64 `json:"cache_hit_rate"`
  43. TotalOperations int64 `json:"total_operations"`
  44. ErrorRate float64 `json:"error_rate"`
  45. }
  46. // Alert represents a performance issue alert
  47. type Alert struct {
  48. Level string `json:"level"` // "warning", "critical"
  49. Component string `json:"component"` // "parser", "simd", "memory", "cache"
  50. Message string `json:"message"`
  51. CurrentValue float64 `json:"current_value"`
  52. ThresholdValue float64 `json:"threshold_value"`
  53. Timestamp time.Time `json:"timestamp"`
  54. Suggestions []string `json:"suggestions"`
  55. }
  56. // Monitor provides real-time performance monitoring and alerting
  57. type Monitor struct {
  58. thresholds *Thresholds
  59. metrics *Metrics
  60. alerts []Alert
  61. alertCallback func(Alert)
  62. mu sync.RWMutex
  63. running bool
  64. stopChan chan struct{}
  65. }
  66. // NewMonitor creates a new performance monitor
  67. func NewMonitor(thresholds *Thresholds) *Monitor {
  68. if thresholds == nil {
  69. thresholds = DefaultThresholds()
  70. }
  71. return &Monitor{
  72. thresholds: thresholds,
  73. metrics: &Metrics{},
  74. alerts: make([]Alert, 0),
  75. stopChan: make(chan struct{}),
  76. }
  77. }
  78. // SetAlertCallback sets a callback function for performance alerts
  79. func (pm *Monitor) SetAlertCallback(callback func(Alert)) {
  80. pm.mu.Lock()
  81. defer pm.mu.Unlock()
  82. pm.alertCallback = callback
  83. }
  84. // StartMonitoring begins continuous performance monitoring
  85. func (pm *Monitor) StartMonitoring(ctx context.Context, interval time.Duration) {
  86. pm.mu.Lock()
  87. if pm.running {
  88. pm.mu.Unlock()
  89. return
  90. }
  91. pm.running = true
  92. pm.mu.Unlock()
  93. ticker := time.NewTicker(interval)
  94. defer ticker.Stop()
  95. for {
  96. select {
  97. case <-ctx.Done():
  98. pm.stopMonitoring()
  99. return
  100. case <-pm.stopChan:
  101. return
  102. case <-ticker.C:
  103. pm.collectMetrics()
  104. pm.checkThresholds()
  105. }
  106. }
  107. }
  108. // StopMonitoring stops the performance monitoring
  109. func (pm *Monitor) StopMonitoring() {
  110. pm.stopMonitoring()
  111. }
  112. func (pm *Monitor) stopMonitoring() {
  113. pm.mu.Lock()
  114. defer pm.mu.Unlock()
  115. if pm.running {
  116. pm.running = false
  117. close(pm.stopChan)
  118. pm.stopChan = make(chan struct{})
  119. }
  120. }
  121. // collectMetrics gathers current performance metrics
  122. func (pm *Monitor) collectMetrics() {
  123. pm.mu.Lock()
  124. defer pm.mu.Unlock()
  125. startTime := time.Now()
  126. // Test ParseStream performance
  127. parseRate := pm.benchmarkParseStream()
  128. // Test SIMD performance
  129. simdRate := pm.benchmarkSIMD()
  130. // Test memory pool performance
  131. poolRate := pm.benchmarkMemoryPools()
  132. // Test regex cache performance
  133. cacheRate := pm.benchmarkRegexCache()
  134. // Get cache hit rate
  135. hitRate := pm.getCacheHitRate()
  136. // Calculate response time
  137. responseTime := float64(time.Since(startTime).Nanoseconds()) / 1e6
  138. pm.metrics = &Metrics{
  139. Timestamp: time.Now(),
  140. ParseStreamRate: parseRate,
  141. SIMDRate: simdRate,
  142. MemoryPoolRate: poolRate,
  143. RegexCacheRate: cacheRate,
  144. MemoryUsageMB: pm.getMemoryUsage(),
  145. ResponseTimeMS: responseTime,
  146. CacheHitRate: hitRate,
  147. TotalOperations: pm.getTotalOperations(),
  148. ErrorRate: 0.0, // Can be enhanced with actual error tracking
  149. }
  150. }
  151. // benchmarkParseStream tests ParseStream performance
  152. func (pm *Monitor) benchmarkParseStream() float64 {
  153. ctx := context.Background()
  154. testData := generateMonitoringTestData(100)
  155. config := parser.DefaultParserConfig()
  156. config.BatchSize = 100
  157. optimizedParser := parser.NewParser(
  158. config,
  159. parser.NewSimpleUserAgentParser(),
  160. &mockMonitorGeoIPService{},
  161. )
  162. start := time.Now()
  163. result, err := optimizedParser.ParseStream(ctx, testData)
  164. if err != nil {
  165. return 0.0
  166. }
  167. duration := time.Since(start)
  168. return float64(result.Processed) / duration.Seconds()
  169. }
  170. // benchmarkSIMD tests SIMD parser performance
  171. func (pm *Monitor) benchmarkSIMD() float64 {
  172. testLine := `192.168.1.100 - - [06/Sep/2025:10:00:00 +0000] "GET /monitor HTTP/1.1" 200 1024 "https://test.com" "Monitor/1.0"`
  173. simdParser := parser.NewLogLineParser()
  174. operations := 1000
  175. start := time.Now()
  176. for i := 0; i < operations; i++ {
  177. _ = simdParser.ParseLine([]byte(testLine))
  178. }
  179. duration := time.Since(start)
  180. return float64(operations) / duration.Seconds()
  181. }
  182. // benchmarkMemoryPools tests memory pool performance
  183. func (pm *Monitor) benchmarkMemoryPools() float64 {
  184. operations := 1000
  185. start := time.Now()
  186. for i := 0; i < operations; i++ {
  187. // String builder pool
  188. sb := utils.LogStringBuilderPool.Get()
  189. sb.WriteString("performance monitor test")
  190. utils.LogStringBuilderPool.Put(sb)
  191. // Byte slice pool
  192. slice := utils.GlobalByteSlicePool.Get(1024)
  193. utils.GlobalByteSlicePool.Put(slice)
  194. }
  195. duration := time.Since(start)
  196. return float64(operations*2) / duration.Seconds()
  197. }
  198. // benchmarkRegexCache tests regex cache performance
  199. func (pm *Monitor) benchmarkRegexCache() float64 {
  200. cache := parser.GetGlobalRegexCache()
  201. operations := 1000
  202. start := time.Now()
  203. for i := 0; i < operations; i++ {
  204. _, _ = cache.GetCommonRegex("ipv4")
  205. _, _ = cache.GetCommonRegex("timestamp")
  206. _, _ = cache.GetCommonRegex("status")
  207. }
  208. duration := time.Since(start)
  209. return float64(operations*3) / duration.Seconds()
  210. }
  211. // getCacheHitRate gets current cache hit rate
  212. func (pm *Monitor) getCacheHitRate() float64 {
  213. cache := parser.GetGlobalRegexCache()
  214. stats := cache.GetStats()
  215. return stats.HitRate
  216. }
  217. // getMemoryUsage returns current memory usage in MB (simplified)
  218. func (pm *Monitor) getMemoryUsage() float64 {
  219. // In a real implementation, this would use runtime.MemStats
  220. return 50.0 // Placeholder
  221. }
  222. // getTotalOperations returns total operations processed
  223. func (pm *Monitor) getTotalOperations() int64 {
  224. // In a real implementation, this would track actual operations
  225. return int64(time.Since(time.Now()).Seconds()) // Placeholder
  226. }
  227. // checkThresholds compares current metrics against thresholds and generates alerts
  228. func (pm *Monitor) checkThresholds() {
  229. pm.mu.Lock()
  230. defer pm.mu.Unlock()
  231. // Check ParseStream performance
  232. if pm.metrics.ParseStreamRate < pm.thresholds.ParseStreamOpsPerSec {
  233. alert := Alert{
  234. Level: "critical",
  235. Component: "parser",
  236. Message: "ParseStream performance below threshold",
  237. CurrentValue: pm.metrics.ParseStreamRate,
  238. ThresholdValue: pm.thresholds.ParseStreamOpsPerSec,
  239. Timestamp: time.Now(),
  240. Suggestions: []string{
  241. "Check if ParseStream is being used",
  242. "Verify batch size configuration (recommended: 500-1500)",
  243. "Monitor memory usage and GC pressure",
  244. "Check for context cancellation overhead",
  245. },
  246. }
  247. pm.addAlert(alert)
  248. }
  249. // Check SIMD performance
  250. if pm.metrics.SIMDRate < pm.thresholds.SIMDOpsPerSec {
  251. alert := Alert{
  252. Level: "critical",
  253. Component: "simd",
  254. Message: "SIMD parsing performance below threshold",
  255. CurrentValue: pm.metrics.SIMDRate,
  256. ThresholdValue: pm.thresholds.SIMDOpsPerSec,
  257. Timestamp: time.Now(),
  258. Suggestions: []string{
  259. "Ensure SIMD parser is properly initialized",
  260. "Check log format compatibility with SIMD optimizations",
  261. "Verify CPU supports required SIMD instructions",
  262. "Monitor for regex compilation issues",
  263. },
  264. }
  265. pm.addAlert(alert)
  266. }
  267. // Check memory pool performance
  268. if pm.metrics.MemoryPoolRate < pm.thresholds.MemoryPoolOpsPerSec {
  269. alert := Alert{
  270. Level: "warning",
  271. Component: "memory",
  272. Message: "Memory pool performance below threshold",
  273. CurrentValue: pm.metrics.MemoryPoolRate,
  274. ThresholdValue: pm.thresholds.MemoryPoolOpsPerSec,
  275. Timestamp: time.Now(),
  276. Suggestions: []string{
  277. "Check pool reuse rates (should be >80%)",
  278. "Consider increasing pool sizes",
  279. "Monitor for pool contention under high concurrency",
  280. "Pre-warm pools at application startup",
  281. },
  282. }
  283. pm.addAlert(alert)
  284. }
  285. // Check regex cache performance
  286. if pm.metrics.RegexCacheRate < pm.thresholds.RegexCacheOpsPerSec {
  287. alert := Alert{
  288. Level: "warning",
  289. Component: "cache",
  290. Message: "Regex cache performance below threshold",
  291. CurrentValue: pm.metrics.RegexCacheRate,
  292. ThresholdValue: pm.thresholds.RegexCacheOpsPerSec,
  293. Timestamp: time.Now(),
  294. Suggestions: []string{
  295. "Check cache hit rate (should be >90%)",
  296. "Consider increasing cache size",
  297. "Monitor for cache eviction patterns",
  298. "Verify common regex patterns are cached",
  299. },
  300. }
  301. pm.addAlert(alert)
  302. }
  303. // Check cache hit rate
  304. if pm.metrics.CacheHitRate < 0.9 {
  305. alert := Alert{
  306. Level: "warning",
  307. Component: "cache",
  308. Message: "Cache hit rate is low",
  309. CurrentValue: pm.metrics.CacheHitRate * 100,
  310. ThresholdValue: 90.0,
  311. Timestamp: time.Now(),
  312. Suggestions: []string{
  313. "Increase cache size for better hit rates",
  314. "Analyze cache usage patterns",
  315. "Pre-populate cache with common patterns",
  316. },
  317. }
  318. pm.addAlert(alert)
  319. }
  320. // Check memory usage
  321. if pm.metrics.MemoryUsageMB > pm.thresholds.MaxMemoryUsageMB {
  322. alert := Alert{
  323. Level: "critical",
  324. Component: "memory",
  325. Message: "Memory usage exceeds threshold",
  326. CurrentValue: pm.metrics.MemoryUsageMB,
  327. ThresholdValue: pm.thresholds.MaxMemoryUsageMB,
  328. Timestamp: time.Now(),
  329. Suggestions: []string{
  330. "Check for memory leaks",
  331. "Increase pool usage to reduce allocations",
  332. "Monitor GC frequency and pressure",
  333. "Consider reducing batch sizes",
  334. },
  335. }
  336. pm.addAlert(alert)
  337. }
  338. }
  339. // addAlert adds a new alert and triggers the callback
  340. func (pm *Monitor) addAlert(alert Alert) {
  341. pm.alerts = append(pm.alerts, alert)
  342. // Keep only the last 100 alerts
  343. if len(pm.alerts) > 100 {
  344. pm.alerts = pm.alerts[len(pm.alerts)-100:]
  345. }
  346. // Trigger callback if set
  347. if pm.alertCallback != nil {
  348. go pm.alertCallback(alert)
  349. }
  350. }
  351. // GetCurrentMetrics returns the current performance metrics
  352. func (pm *Monitor) GetCurrentMetrics() Metrics {
  353. pm.mu.RLock()
  354. defer pm.mu.RUnlock()
  355. return *pm.metrics
  356. }
  357. // GetRecentAlerts returns recent performance alerts
  358. func (pm *Monitor) GetRecentAlerts(since time.Duration) []Alert {
  359. pm.mu.RLock()
  360. defer pm.mu.RUnlock()
  361. cutoff := time.Now().Add(-since)
  362. recent := make([]Alert, 0)
  363. for _, alert := range pm.alerts {
  364. if alert.Timestamp.After(cutoff) {
  365. recent = append(recent, alert)
  366. }
  367. }
  368. return recent
  369. }
  370. // GetHealthStatus returns overall system health based on current metrics
  371. func (pm *Monitor) GetHealthStatus() string {
  372. pm.mu.RLock()
  373. defer pm.mu.RUnlock()
  374. criticalAlerts := 0
  375. warningAlerts := 0
  376. recentAlerts := pm.getRecentAlertsInternal(time.Hour)
  377. for _, alert := range recentAlerts {
  378. if alert.Level == "critical" {
  379. criticalAlerts++
  380. } else if alert.Level == "warning" {
  381. warningAlerts++
  382. }
  383. }
  384. if criticalAlerts > 0 {
  385. return "critical"
  386. } else if warningAlerts > 3 {
  387. return "warning"
  388. } else {
  389. return "healthy"
  390. }
  391. }
  392. func (pm *Monitor) getRecentAlertsInternal(since time.Duration) []Alert {
  393. cutoff := time.Now().Add(-since)
  394. recent := make([]Alert, 0)
  395. for _, alert := range pm.alerts {
  396. if alert.Timestamp.After(cutoff) {
  397. recent = append(recent, alert)
  398. }
  399. }
  400. return recent
  401. }
  402. // ExportMetrics exports current metrics as JSON
  403. func (pm *Monitor) ExportMetrics() ([]byte, error) {
  404. pm.mu.RLock()
  405. defer pm.mu.RUnlock()
  406. export := struct {
  407. Metrics Metrics `json:"metrics"`
  408. Thresholds Thresholds `json:"thresholds"`
  409. Alerts []Alert `json:"recent_alerts"`
  410. Health string `json:"health_status"`
  411. }{
  412. Metrics: *pm.metrics,
  413. Thresholds: *pm.thresholds,
  414. Alerts: pm.getRecentAlertsInternal(time.Hour),
  415. Health: pm.GetHealthStatus(),
  416. }
  417. return json.MarshalIndent(export, "", " ")
  418. }
  419. // DefaultAlertHandler provides a default implementation for handling alerts
  420. func DefaultAlertHandler(alert Alert) {
  421. log.Printf("PERFORMANCE ALERT [%s/%s]: %s (Current: %.2f, Threshold: %.2f)",
  422. alert.Level, alert.Component, alert.Message, alert.CurrentValue, alert.ThresholdValue)
  423. for _, suggestion := range alert.Suggestions {
  424. log.Printf(" Suggestion: %s", suggestion)
  425. }
  426. }
  427. // Example usage functions
  428. // StartMonitoring starts monitoring with default configuration
  429. func StartMonitoring(ctx context.Context) *Monitor {
  430. monitor := NewMonitor(DefaultThresholds())
  431. monitor.SetAlertCallback(DefaultAlertHandler)
  432. // Start monitoring every 30 seconds
  433. go monitor.StartMonitoring(ctx, 30*time.Second)
  434. return monitor
  435. }
  436. // GetReport generates a comprehensive performance report
  437. func GetReport(monitor *Monitor) string {
  438. metrics := monitor.GetCurrentMetrics()
  439. recentAlerts := monitor.GetRecentAlerts(time.Hour)
  440. health := monitor.GetHealthStatus()
  441. report := fmt.Sprintf(`
  442. === NGINX-UI LOG PROCESSING PERFORMANCE REPORT ===
  443. Health Status: %s
  444. Report Generated: %s
  445. PERFORMANCE METRICS:
  446. ├─ ParseStream Performance: %.2f ops/sec
  447. ├─ SIMD Parser Performance: %.2f ops/sec
  448. ├─ Memory Pool Performance: %.2f ops/sec
  449. ├─ Regex Cache Performance: %.2f ops/sec
  450. ├─ Cache Hit Rate: %.2f%%
  451. ├─ Memory Usage: %.2f MB
  452. ├─ Response Time: %.2f ms
  453. └─ Total Operations: %d
  454. RECENT ALERTS (%d):`,
  455. health, metrics.Timestamp.Format(time.RFC3339),
  456. metrics.ParseStreamRate, metrics.SIMDRate, metrics.MemoryPoolRate,
  457. metrics.RegexCacheRate, metrics.CacheHitRate*100, metrics.MemoryUsageMB,
  458. metrics.ResponseTimeMS, metrics.TotalOperations, len(recentAlerts))
  459. if len(recentAlerts) == 0 {
  460. report += "\n└─ No recent alerts - System performing well!"
  461. } else {
  462. for i, alert := range recentAlerts {
  463. prefix := "├─"
  464. if i == len(recentAlerts)-1 {
  465. prefix = "└─"
  466. }
  467. report += fmt.Sprintf("\n%s [%s] %s: %.2f (threshold: %.2f)",
  468. prefix, alert.Level, alert.Message, alert.CurrentValue, alert.ThresholdValue)
  469. }
  470. }
  471. return report
  472. }
  473. // Helper functions and mock implementations
  474. func generateMonitoringTestData(lines int) *strings.Reader {
  475. // Simple test data generation for monitoring
  476. var testData strings.Builder
  477. for i := 0; i < lines; i++ {
  478. testData.WriteString(`192.168.1.100 - - [06/Sep/2025:10:00:00 +0000] "GET /test HTTP/1.1" 200 1024 "https://test.com" "Monitor/1.0"`)
  479. if i < lines-1 {
  480. testData.WriteString("\n")
  481. }
  482. }
  483. return strings.NewReader(testData.String())
  484. }
  485. type mockMonitorGeoIPService struct{}
  486. func (m *mockMonitorGeoIPService) Search(ip string) (*parser.GeoLocation, error) {
  487. return &parser.GeoLocation{
  488. CountryCode: "US",
  489. RegionCode: "CA",
  490. Province: "California",
  491. City: "San Francisco",
  492. }, nil
  493. }