integration_small_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/analytics"
  12. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  13. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  14. "github.com/blevesearch/bleve/v2"
  15. "github.com/stretchr/testify/assert"
  16. "github.com/stretchr/testify/require"
  17. )
  18. const (
  19. // Small test configuration for faster execution
  20. SmallTestRecordsPerFile = 1000 // 1000条记录每个文件
  21. SmallTestFileCount = 3 // 3个测试文件
  22. )
  23. // SmallIntegrationTestSuite contains all integration test data and services (small version)
  24. type SmallIntegrationTestSuite struct {
  25. ctx context.Context
  26. cancel context.CancelFunc
  27. tempDir string
  28. indexDir string
  29. logFiles []string
  30. logFilePaths []string
  31. indexer *indexer.ParallelIndexer
  32. searcher searcher.Searcher
  33. analytics analytics.Service
  34. logFileManager *TestLogFileManager
  35. expectedMetrics map[string]*SmallExpectedFileMetrics
  36. mu sync.RWMutex
  37. cleanup func()
  38. }
  39. // SmallExpectedFileMetrics stores expected statistics for each log file (small version)
  40. type SmallExpectedFileMetrics struct {
  41. TotalRecords uint64
  42. UniqueIPs uint64
  43. UniquePaths uint64
  44. UniqueAgents uint64
  45. StatusCodes map[int]uint64
  46. Methods map[string]uint64
  47. TimeRange SmallTestTimeRange
  48. }
  49. // SmallTestTimeRange represents the time range of log entries for small testing
  50. type SmallTestTimeRange struct {
  51. StartTime time.Time
  52. EndTime time.Time
  53. }
  54. // NewSmallIntegrationTestSuite creates a new small integration test suite
  55. func NewSmallIntegrationTestSuite(t *testing.T) *SmallIntegrationTestSuite {
  56. ctx, cancel := context.WithCancel(context.Background())
  57. // Create temporary directories
  58. tempDir, err := os.MkdirTemp("", "nginx_ui_small_integration_test_*")
  59. require.NoError(t, err)
  60. indexDir := filepath.Join(tempDir, "index")
  61. logsDir := filepath.Join(tempDir, "logs")
  62. err = os.MkdirAll(indexDir, 0755)
  63. require.NoError(t, err)
  64. err = os.MkdirAll(logsDir, 0755)
  65. require.NoError(t, err)
  66. suite := &SmallIntegrationTestSuite{
  67. ctx: ctx,
  68. cancel: cancel,
  69. tempDir: tempDir,
  70. indexDir: indexDir,
  71. expectedMetrics: make(map[string]*SmallExpectedFileMetrics),
  72. }
  73. // Set cleanup function
  74. suite.cleanup = func() {
  75. // Stop services
  76. if suite.indexer != nil {
  77. suite.indexer.Stop()
  78. }
  79. if suite.searcher != nil {
  80. suite.searcher.Stop()
  81. }
  82. // Cancel context
  83. cancel()
  84. // Remove temporary directories
  85. os.RemoveAll(tempDir)
  86. }
  87. return suite
  88. }
  89. // GenerateSmallTestData generates the small test log files with expected statistics
  90. func (suite *SmallIntegrationTestSuite) GenerateSmallTestData(t *testing.T) {
  91. t.Logf("Generating %d test files with %d records each", SmallTestFileCount, SmallTestRecordsPerFile)
  92. baseTime := time.Now().Add(-24 * time.Hour)
  93. for i := 0; i < SmallTestFileCount; i++ {
  94. filename := fmt.Sprintf("small_access_%d.log", i+1)
  95. filepath := filepath.Join(suite.tempDir, "logs", filename)
  96. metrics := suite.generateSmallSingleLogFile(t, filepath, baseTime.Add(time.Duration(i)*time.Hour))
  97. suite.logFiles = append(suite.logFiles, filename)
  98. suite.logFilePaths = append(suite.logFilePaths, filepath)
  99. suite.expectedMetrics[filepath] = metrics
  100. t.Logf("Generated %s with %d records", filename, metrics.TotalRecords)
  101. }
  102. t.Logf("Small test data generation completed. Total files: %d", len(suite.logFiles))
  103. }
  104. // generateSmallSingleLogFile generates a single small log file with known statistics
  105. func (suite *SmallIntegrationTestSuite) generateSmallSingleLogFile(t *testing.T, filepath string, baseTime time.Time) *SmallExpectedFileMetrics {
  106. file, err := os.Create(filepath)
  107. require.NoError(t, err)
  108. defer file.Close()
  109. metrics := &SmallExpectedFileMetrics{
  110. StatusCodes: make(map[int]uint64),
  111. Methods: make(map[string]uint64),
  112. TimeRange: SmallTestTimeRange{
  113. StartTime: baseTime,
  114. EndTime: baseTime.Add(time.Duration(SmallTestRecordsPerFile) * time.Second),
  115. },
  116. }
  117. // Predefined test data for consistent testing
  118. ips := []string{
  119. "192.168.1.1", "192.168.1.2", "192.168.1.3", "10.0.0.1", "10.0.0.2",
  120. }
  121. paths := []string{
  122. "/", "/api/v1/status", "/api/v1/logs", "/admin", "/login",
  123. }
  124. userAgents := []string{
  125. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  126. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
  127. "PostmanRuntime/7.28.4",
  128. }
  129. statusCodes := []int{200, 301, 404, 500}
  130. methods := []string{"GET", "POST", "PUT"}
  131. // Track unique values
  132. uniqueIPs := make(map[string]bool)
  133. uniquePaths := make(map[string]bool)
  134. uniqueAgents := make(map[string]bool)
  135. rand.Seed(time.Now().UnixNano() + int64(len(filepath))) // Different seed per file
  136. for i := 0; i < SmallTestRecordsPerFile; i++ {
  137. // Generate log entry timestamp
  138. timestamp := baseTime.Add(time.Duration(i) * time.Second)
  139. // Select random values
  140. ip := ips[rand.Intn(len(ips))]
  141. path := paths[rand.Intn(len(paths))]
  142. agent := userAgents[rand.Intn(len(userAgents))]
  143. status := statusCodes[rand.Intn(len(statusCodes))]
  144. method := methods[rand.Intn(len(methods))]
  145. size := rand.Intn(1000) + 100 // 100-1100 bytes
  146. // Track unique values
  147. uniqueIPs[ip] = true
  148. uniquePaths[path] = true
  149. uniqueAgents[agent] = true
  150. // Update metrics
  151. metrics.StatusCodes[status]++
  152. metrics.Methods[method]++
  153. // Generate nginx log line (Common Log Format)
  154. logLine := fmt.Sprintf(`%s - - [%s] "%s %s HTTP/1.1" %d %d "-" "%s"`+"\n",
  155. ip,
  156. timestamp.Format("02/Jan/2006:15:04:05 -0700"),
  157. method,
  158. path,
  159. status,
  160. size,
  161. agent,
  162. )
  163. _, err := file.WriteString(logLine)
  164. require.NoError(t, err)
  165. }
  166. // Finalize metrics
  167. metrics.TotalRecords = SmallTestRecordsPerFile
  168. metrics.UniqueIPs = uint64(len(uniqueIPs))
  169. metrics.UniquePaths = uint64(len(uniquePaths))
  170. metrics.UniqueAgents = uint64(len(uniqueAgents))
  171. return metrics
  172. }
  173. // InitializeSmallServices initializes all nginx_log services for small testing
  174. func (suite *SmallIntegrationTestSuite) InitializeSmallServices(t *testing.T) {
  175. t.Log("Initializing small test services...")
  176. // Initialize indexer
  177. indexerConfig := indexer.DefaultIndexerConfig()
  178. indexerConfig.IndexPath = suite.indexDir
  179. shardManager := indexer.NewDefaultShardManager(indexerConfig)
  180. suite.indexer = indexer.NewParallelIndexer(indexerConfig, shardManager)
  181. err := suite.indexer.Start(suite.ctx)
  182. require.NoError(t, err)
  183. // Initialize searcher (empty initially)
  184. searcherConfig := searcher.DefaultSearcherConfig()
  185. suite.searcher = searcher.NewDistributedSearcher(searcherConfig, []bleve.Index{})
  186. // Initialize analytics
  187. suite.analytics = analytics.NewService(suite.searcher)
  188. // Initialize log file manager with test-specific behavior
  189. suite.logFileManager = &TestLogFileManager{
  190. logCache: make(map[string]*indexer.NginxLogCache),
  191. indexingStatus: make(map[string]bool),
  192. indexMetadata: make(map[string]*TestIndexMetadata),
  193. }
  194. // Register test log files
  195. for _, logPath := range suite.logFilePaths {
  196. suite.logFileManager.AddLogPath(logPath, "access", filepath.Base(logPath), "test_config")
  197. }
  198. t.Log("Small services initialized successfully")
  199. }
  200. // PerformSmallGlobalIndexRebuild performs a complete index rebuild of all small files
  201. func (suite *SmallIntegrationTestSuite) PerformSmallGlobalIndexRebuild(t *testing.T) {
  202. t.Log("Starting small global index rebuild...")
  203. startTime := time.Now()
  204. // Create progress tracking
  205. var completedFiles []string
  206. var mu sync.Mutex
  207. progressConfig := &indexer.ProgressConfig{
  208. NotifyInterval: 500 * time.Millisecond,
  209. OnProgress: func(progress indexer.ProgressNotification) {
  210. t.Logf("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  211. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  212. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  213. },
  214. OnCompletion: func(completion indexer.CompletionNotification) {
  215. mu.Lock()
  216. completedFiles = append(completedFiles, completion.LogGroupPath)
  217. mu.Unlock()
  218. t.Logf("Index completion: %s - Success: %t, Duration: %s, Lines: %d",
  219. completion.LogGroupPath, completion.Success, completion.Duration, completion.TotalLines)
  220. },
  221. }
  222. // Destroy existing indexes
  223. err := suite.indexer.DestroyAllIndexes(suite.ctx)
  224. require.NoError(t, err)
  225. // Re-initialize indexer
  226. err = suite.indexer.Start(suite.ctx)
  227. require.NoError(t, err)
  228. // Index all log files
  229. allLogs := suite.logFileManager.GetAllLogsWithIndexGrouped()
  230. for _, log := range allLogs {
  231. docsCountMap, minTime, maxTime, err := suite.indexer.IndexLogGroupWithProgress(log.Path, progressConfig)
  232. require.NoError(t, err, "Failed to index log group: %s", log.Path)
  233. // Save metadata
  234. duration := time.Since(startTime)
  235. var totalDocs uint64
  236. for _, docCount := range docsCountMap {
  237. totalDocs += docCount
  238. }
  239. err = suite.logFileManager.SaveIndexMetadata(log.Path, totalDocs, startTime, duration, minTime, maxTime)
  240. require.NoError(t, err)
  241. }
  242. // Flush and update searcher
  243. err = suite.indexer.FlushAll()
  244. require.NoError(t, err)
  245. suite.updateSmallSearcher(t)
  246. totalDuration := time.Since(startTime)
  247. t.Logf("Small global index rebuild completed in %s. Completed files: %v", totalDuration, completedFiles)
  248. }
  249. // updateSmallSearcher updates the searcher with current shards
  250. func (suite *SmallIntegrationTestSuite) updateSmallSearcher(t *testing.T) {
  251. if !suite.indexer.IsHealthy() {
  252. t.Fatal("Indexer is not healthy, cannot update searcher")
  253. }
  254. newShards := suite.indexer.GetAllShards()
  255. t.Logf("Updating searcher with %d shards", len(newShards))
  256. if ds, ok := suite.searcher.(*searcher.DistributedSearcher); ok {
  257. err := ds.SwapShards(newShards)
  258. require.NoError(t, err)
  259. t.Log("Searcher shards updated successfully")
  260. } else {
  261. t.Fatal("Searcher is not a DistributedSearcher")
  262. }
  263. }
  264. // ValidateSmallCardinalityCounter validates the accuracy of cardinality counting
  265. func (suite *SmallIntegrationTestSuite) ValidateSmallCardinalityCounter(t *testing.T, filePath string) {
  266. t.Logf("Validating CardinalityCounter accuracy for: %s", filePath)
  267. expected := suite.expectedMetrics[filePath]
  268. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  269. if ds, ok := suite.searcher.(*searcher.DistributedSearcher); ok {
  270. cardinalityCounter := searcher.NewCardinalityCounter(ds.GetShards())
  271. // Test IP cardinality (for all files combined since we can't filter by file path yet)
  272. req := &searcher.CardinalityRequest{
  273. Field: "remote_addr",
  274. }
  275. result, err := cardinalityCounter.CountCardinality(suite.ctx, req)
  276. require.NoError(t, err, "Failed to count IP cardinality")
  277. // For combined files, we expect at least the unique IPs from this file
  278. // but possibly more since we're counting across all files
  279. assert.GreaterOrEqual(t, result.Cardinality, expected.UniqueIPs,
  280. "IP cardinality should be at least %d, got %d", expected.UniqueIPs, result.Cardinality)
  281. t.Logf("✓ IP cardinality (all files): actual=%d (expected at least %d), total_docs=%d",
  282. result.Cardinality, expected.UniqueIPs, result.TotalDocs)
  283. } else {
  284. t.Fatal("Searcher is not a DistributedSearcher")
  285. }
  286. }
  287. // ValidateSmallAnalyticsData validates the accuracy of analytics statistics
  288. func (suite *SmallIntegrationTestSuite) ValidateSmallAnalyticsData(t *testing.T, filePath string) {
  289. t.Logf("Validating Analytics data accuracy for: %s", filePath)
  290. expected := suite.expectedMetrics[filePath]
  291. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  292. // Test dashboard analytics
  293. dashboardReq := &analytics.DashboardQueryRequest{
  294. LogPaths: []string{filePath},
  295. StartTime: expected.TimeRange.StartTime.Unix(),
  296. EndTime: expected.TimeRange.EndTime.Unix(),
  297. }
  298. dashboard, err := suite.analytics.GetDashboardAnalytics(suite.ctx, dashboardReq)
  299. require.NoError(t, err, "Failed to get dashboard data for: %s", filePath)
  300. // Validate basic metrics
  301. tolerance := float64(10) // Small tolerance for small datasets
  302. assert.InDelta(t, expected.TotalRecords, dashboard.Summary.TotalPV, tolerance,
  303. "Total requests mismatch for %s", filePath)
  304. t.Logf("✓ Dashboard validation completed for: %s", filePath)
  305. t.Logf(" Total requests: expected=%d, actual=%d", expected.TotalRecords, dashboard.Summary.TotalPV)
  306. t.Logf(" Unique visitors: %d", dashboard.Summary.TotalUV)
  307. t.Logf(" Average daily PV: %f", dashboard.Summary.AvgDailyPV)
  308. }
  309. // ValidateSmallPaginationFunctionality validates pagination works correctly using searcher
  310. func (suite *SmallIntegrationTestSuite) ValidateSmallPaginationFunctionality(t *testing.T, filePath string) {
  311. t.Logf("Validating pagination functionality for: %s", filePath)
  312. expected := suite.expectedMetrics[filePath]
  313. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  314. // Test first page - search all records without any filters
  315. searchReq1 := &searcher.SearchRequest{
  316. Query: "", // Empty query should use match_all
  317. Limit: 50,
  318. Offset: 0,
  319. SortBy: "timestamp",
  320. SortOrder: "desc",
  321. }
  322. result1, err := suite.searcher.Search(suite.ctx, searchReq1)
  323. require.NoError(t, err, "Failed to get page 1 for: %s", filePath)
  324. // For small integration test, we expect at least some results from all files combined
  325. totalExpectedRecords := uint64(SmallTestFileCount * SmallTestRecordsPerFile)
  326. assert.Greater(t, len(result1.Hits), 0, "First page should have some entries")
  327. assert.Equal(t, totalExpectedRecords, result1.TotalHits, "Total count should match all files")
  328. // Test second page
  329. searchReq2 := &searcher.SearchRequest{
  330. Query: "", // Empty query should use match_all
  331. Limit: 50,
  332. Offset: 50,
  333. SortBy: "timestamp",
  334. SortOrder: "desc",
  335. }
  336. result2, err := suite.searcher.Search(suite.ctx, searchReq2)
  337. require.NoError(t, err, "Failed to get page 2 for: %s", filePath)
  338. // Check that pagination works by ensuring we get different results
  339. assert.Greater(t, len(result2.Hits), 0, "Second page should have some entries")
  340. assert.Equal(t, totalExpectedRecords, result2.TotalHits, "Total count should be consistent")
  341. // Ensure different pages return different entries
  342. if len(result1.Hits) > 0 && len(result2.Hits) > 0 {
  343. firstPageFirstEntry := result1.Hits[0].ID
  344. secondPageFirstEntry := result2.Hits[0].ID
  345. assert.NotEqual(t, firstPageFirstEntry, secondPageFirstEntry,
  346. "Different pages should return different entries")
  347. }
  348. t.Logf("✓ Pagination validation completed for: %s", filePath)
  349. t.Logf(" Page 1 entries: %d", len(result1.Hits))
  350. t.Logf(" Page 2 entries: %d", len(result2.Hits))
  351. t.Logf(" Total entries: %d", result1.TotalHits)
  352. }
  353. // TestSmallNginxLogIntegration is the main small integration test function
  354. func TestSmallNginxLogIntegration(t *testing.T) {
  355. if testing.Short() {
  356. t.Skip("Skipping integration test in short mode")
  357. }
  358. suite := NewSmallIntegrationTestSuite(t)
  359. defer suite.cleanup()
  360. t.Log("=== Starting Small Nginx Log Integration Test ===")
  361. // Step 1: Generate test data
  362. suite.GenerateSmallTestData(t)
  363. // Step 2: Initialize services
  364. suite.InitializeSmallServices(t)
  365. // Step 3: Perform global index rebuild and validate
  366. t.Log("\n=== Testing Small Global Index Rebuild ===")
  367. suite.PerformSmallGlobalIndexRebuild(t)
  368. // Step 4: Validate all files after global rebuild
  369. for _, filePath := range suite.logFilePaths {
  370. t.Logf("\n--- Validating file after global rebuild: %s ---", filepath.Base(filePath))
  371. suite.ValidateSmallCardinalityCounter(t, filePath)
  372. suite.ValidateSmallAnalyticsData(t, filePath)
  373. suite.ValidateSmallPaginationFunctionality(t, filePath)
  374. }
  375. t.Log("\n=== Small Integration Test Completed Successfully ===")
  376. }