integration_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/analytics"
  12. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  13. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  14. "github.com/blevesearch/bleve/v2"
  15. "github.com/stretchr/testify/assert"
  16. "github.com/stretchr/testify/require"
  17. )
  18. const (
  19. // Test configuration
  20. TestRecordsPerFile = 400000 // 40万条记录每个文件
  21. TestFileCount = 3 // 3个测试文件
  22. TestBaseDir = "./test_integration_logs"
  23. TestIndexDir = "./test_integration_index"
  24. )
  25. // IntegrationTestSuite contains all integration test data and services
  26. type IntegrationTestSuite struct {
  27. ctx context.Context
  28. cancel context.CancelFunc
  29. tempDir string
  30. indexDir string
  31. logFiles []string
  32. logFilePaths []string
  33. indexer *indexer.ParallelIndexer
  34. searcher *searcher.Searcher
  35. analytics analytics.Service
  36. logFileManager *TestLogFileManager
  37. expectedMetrics map[string]*ExpectedFileMetrics
  38. cleanup func()
  39. }
  40. // TestLogFileManager is a simplified log file manager for testing that doesn't require database
  41. type TestLogFileManager struct {
  42. logCache map[string]*indexer.NginxLogCache
  43. cacheMutex sync.RWMutex
  44. indexingStatus map[string]bool
  45. indexMetadata map[string]*TestIndexMetadata
  46. metadataMutex sync.RWMutex
  47. }
  48. // TestIndexMetadata holds index metadata for testing
  49. type TestIndexMetadata struct {
  50. Path string
  51. DocumentCount uint64
  52. LastIndexed time.Time
  53. Duration time.Duration
  54. MinTime *time.Time
  55. MaxTime *time.Time
  56. }
  57. // ExpectedFileMetrics stores expected statistics for each log file
  58. type ExpectedFileMetrics struct {
  59. TotalRecords uint64
  60. UniqueIPs uint64
  61. UniquePaths uint64
  62. UniqueAgents uint64
  63. StatusCodes map[int]uint64
  64. Methods map[string]uint64
  65. TimeRange TestTimeRange
  66. }
  67. // TestTimeRange represents the time range of log entries for testing
  68. type TestTimeRange struct {
  69. StartTime time.Time
  70. EndTime time.Time
  71. }
  72. // NewIntegrationTestSuite creates a new integration test suite
  73. func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
  74. ctx, cancel := context.WithCancel(context.Background())
  75. // Create temporary directories
  76. tempDir, err := os.MkdirTemp("", "nginx_ui_integration_test_*")
  77. require.NoError(t, err)
  78. indexDir := filepath.Join(tempDir, "index")
  79. logsDir := filepath.Join(tempDir, "logs")
  80. err = os.MkdirAll(indexDir, 0755)
  81. require.NoError(t, err)
  82. err = os.MkdirAll(logsDir, 0755)
  83. require.NoError(t, err)
  84. suite := &IntegrationTestSuite{
  85. ctx: ctx,
  86. cancel: cancel,
  87. tempDir: tempDir,
  88. indexDir: indexDir,
  89. expectedMetrics: make(map[string]*ExpectedFileMetrics),
  90. }
  91. // Set cleanup function
  92. suite.cleanup = func() {
  93. // Stop services
  94. if suite.indexer != nil {
  95. suite.indexer.Stop()
  96. }
  97. if suite.searcher != nil {
  98. suite.searcher.Stop()
  99. }
  100. // Cancel context
  101. cancel()
  102. // Remove temporary directories
  103. os.RemoveAll(tempDir)
  104. }
  105. return suite
  106. }
  107. // GenerateTestData generates the test log files with expected statistics
  108. func (suite *IntegrationTestSuite) GenerateTestData(t *testing.T) {
  109. t.Logf("Generating %d test files with %d records each", TestFileCount, TestRecordsPerFile)
  110. baseTime := time.Now().Add(-24 * time.Hour)
  111. for i := 0; i < TestFileCount; i++ {
  112. filename := fmt.Sprintf("access_%d.log", i+1)
  113. filepath := filepath.Join(suite.tempDir, "logs", filename)
  114. metrics := suite.generateSingleLogFile(t, filepath, baseTime.Add(time.Duration(i)*time.Hour))
  115. suite.logFiles = append(suite.logFiles, filename)
  116. suite.logFilePaths = append(suite.logFilePaths, filepath)
  117. suite.expectedMetrics[filepath] = metrics
  118. t.Logf("Generated %s with %d records", filename, metrics.TotalRecords)
  119. }
  120. t.Logf("Test data generation completed. Total files: %d", len(suite.logFiles))
  121. }
  122. // generateSingleLogFile generates a single log file with known statistics
  123. func (suite *IntegrationTestSuite) generateSingleLogFile(t *testing.T, filepath string, baseTime time.Time) *ExpectedFileMetrics {
  124. file, err := os.Create(filepath)
  125. require.NoError(t, err)
  126. defer file.Close()
  127. metrics := &ExpectedFileMetrics{
  128. StatusCodes: make(map[int]uint64),
  129. Methods: make(map[string]uint64),
  130. TimeRange: TestTimeRange{
  131. StartTime: baseTime,
  132. EndTime: baseTime.Add(time.Duration(TestRecordsPerFile) * time.Second),
  133. },
  134. }
  135. // Predefined test data for consistent testing
  136. ips := []string{
  137. "192.168.1.1", "192.168.1.2", "192.168.1.3", "10.0.0.1", "10.0.0.2",
  138. "172.16.0.1", "172.16.0.2", "203.0.113.1", "203.0.113.2", "198.51.100.1",
  139. }
  140. paths := []string{
  141. "/", "/api/v1/status", "/api/v1/logs", "/admin", "/login",
  142. "/dashboard", "/api/v1/config", "/static/css/main.css", "/static/js/app.js", "/favicon.ico",
  143. }
  144. userAgents := []string{
  145. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  146. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
  147. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
  148. "PostmanRuntime/7.28.4",
  149. "Go-http-client/1.1",
  150. }
  151. statusCodes := []int{200, 301, 404, 500, 502}
  152. methods := []string{"GET", "POST", "PUT", "DELETE"}
  153. // Track unique values
  154. uniqueIPs := make(map[string]bool)
  155. uniquePaths := make(map[string]bool)
  156. uniqueAgents := make(map[string]bool)
  157. // use global rng defaults; no explicit rand.Seed needed
  158. for i := 0; i < TestRecordsPerFile; i++ {
  159. // Generate log entry timestamp
  160. timestamp := baseTime.Add(time.Duration(i) * time.Second)
  161. // Select random values
  162. ip := ips[rand.Intn(len(ips))]
  163. path := paths[rand.Intn(len(paths))]
  164. agent := userAgents[rand.Intn(len(userAgents))]
  165. status := statusCodes[rand.Intn(len(statusCodes))]
  166. method := methods[rand.Intn(len(methods))]
  167. size := rand.Intn(10000) + 100 // 100-10100 bytes
  168. // Track unique values
  169. uniqueIPs[ip] = true
  170. uniquePaths[path] = true
  171. uniqueAgents[agent] = true
  172. // Update metrics
  173. metrics.StatusCodes[status]++
  174. metrics.Methods[method]++
  175. // Generate nginx log line (Common Log Format)
  176. logLine := fmt.Sprintf(`%s - - [%s] "%s %s HTTP/1.1" %d %d "-" "%s"`+"\n",
  177. ip,
  178. timestamp.Format("02/Jan/2006:15:04:05 -0700"),
  179. method,
  180. path,
  181. status,
  182. size,
  183. agent,
  184. )
  185. _, err := file.WriteString(logLine)
  186. require.NoError(t, err)
  187. }
  188. // Finalize metrics
  189. metrics.TotalRecords = TestRecordsPerFile
  190. metrics.UniqueIPs = uint64(len(uniqueIPs))
  191. metrics.UniquePaths = uint64(len(uniquePaths))
  192. metrics.UniqueAgents = uint64(len(uniqueAgents))
  193. return metrics
  194. }
  195. // InitializeServices initializes all nginx_log services for testing
  196. func (suite *IntegrationTestSuite) InitializeServices(t *testing.T) {
  197. t.Log("Initializing test services...")
  198. // Initialize indexer
  199. indexerConfig := indexer.DefaultIndexerConfig()
  200. indexerConfig.IndexPath = suite.indexDir
  201. shardManager := indexer.NewGroupedShardManager(indexerConfig)
  202. suite.indexer = indexer.NewParallelIndexer(indexerConfig, shardManager)
  203. err := suite.indexer.Start(suite.ctx)
  204. require.NoError(t, err)
  205. // Initialize searcher (empty initially)
  206. searcherConfig := searcher.DefaultSearcherConfig()
  207. suite.searcher = searcher.NewSearcher(searcherConfig, []bleve.Index{})
  208. // Initialize analytics
  209. suite.analytics = analytics.NewService(suite.searcher)
  210. // Initialize log file manager with test-specific behavior
  211. suite.logFileManager = suite.createTestLogFileManager(t)
  212. // Register test log files
  213. for _, logPath := range suite.logFilePaths {
  214. suite.logFileManager.AddLogPath(logPath, "access", filepath.Base(logPath), "test_config")
  215. }
  216. t.Log("Services initialized successfully")
  217. }
  218. // createTestLogFileManager creates a log file manager suitable for testing
  219. func (suite *IntegrationTestSuite) createTestLogFileManager(t *testing.T) *TestLogFileManager {
  220. return &TestLogFileManager{
  221. logCache: make(map[string]*indexer.NginxLogCache),
  222. indexingStatus: make(map[string]bool),
  223. indexMetadata: make(map[string]*TestIndexMetadata),
  224. }
  225. }
  226. // AddLogPath adds a log path to the test log cache
  227. func (tlm *TestLogFileManager) AddLogPath(path, logType, name, configFile string) {
  228. tlm.cacheMutex.Lock()
  229. defer tlm.cacheMutex.Unlock()
  230. tlm.logCache[path] = &indexer.NginxLogCache{
  231. Path: path,
  232. Type: logType,
  233. Name: name,
  234. ConfigFile: configFile,
  235. }
  236. }
  237. // GetAllLogsWithIndexGrouped returns all cached log paths with their index status for testing
  238. func (tlm *TestLogFileManager) GetAllLogsWithIndexGrouped(filters ...func(*indexer.NginxLogWithIndex) bool) []*indexer.NginxLogWithIndex {
  239. tlm.cacheMutex.RLock()
  240. defer tlm.cacheMutex.RUnlock()
  241. tlm.metadataMutex.RLock()
  242. defer tlm.metadataMutex.RUnlock()
  243. var logs []*indexer.NginxLogWithIndex
  244. for _, logEntry := range tlm.logCache {
  245. logWithIndex := &indexer.NginxLogWithIndex{
  246. Path: logEntry.Path,
  247. Type: logEntry.Type,
  248. Name: logEntry.Name,
  249. ConfigFile: logEntry.ConfigFile,
  250. IndexStatus: "not_indexed",
  251. }
  252. // Check if we have index metadata for this path
  253. if metadata, exists := tlm.indexMetadata[logEntry.Path]; exists {
  254. logWithIndex.IndexStatus = "indexed"
  255. logWithIndex.DocumentCount = metadata.DocumentCount
  256. logWithIndex.LastIndexed = metadata.LastIndexed.Unix()
  257. logWithIndex.IndexDuration = int64(metadata.Duration.Milliseconds())
  258. if metadata.MinTime != nil {
  259. logWithIndex.HasTimeRange = true
  260. logWithIndex.TimeRangeStart = metadata.MinTime.Unix()
  261. }
  262. if metadata.MaxTime != nil {
  263. logWithIndex.HasTimeRange = true
  264. logWithIndex.TimeRangeEnd = metadata.MaxTime.Unix()
  265. }
  266. }
  267. // Apply filters
  268. include := true
  269. for _, filter := range filters {
  270. if !filter(logWithIndex) {
  271. include = false
  272. break
  273. }
  274. }
  275. if include {
  276. logs = append(logs, logWithIndex)
  277. }
  278. }
  279. return logs
  280. }
  281. // SaveIndexMetadata saves index metadata for testing
  282. func (tlm *TestLogFileManager) SaveIndexMetadata(path string, docCount uint64, indexTime time.Time, duration time.Duration, minTime, maxTime *time.Time) error {
  283. tlm.metadataMutex.Lock()
  284. defer tlm.metadataMutex.Unlock()
  285. tlm.indexMetadata[path] = &TestIndexMetadata{
  286. Path: path,
  287. DocumentCount: docCount,
  288. LastIndexed: indexTime,
  289. Duration: duration,
  290. MinTime: minTime,
  291. MaxTime: maxTime,
  292. }
  293. return nil
  294. }
  295. // DeleteIndexMetadataByGroup deletes index metadata for a log group (for testing)
  296. func (tlm *TestLogFileManager) DeleteIndexMetadataByGroup(logGroup string) error {
  297. tlm.metadataMutex.Lock()
  298. defer tlm.metadataMutex.Unlock()
  299. delete(tlm.indexMetadata, logGroup)
  300. return nil
  301. }
  302. // DeleteAllIndexMetadata deletes all index metadata (for testing)
  303. func (tlm *TestLogFileManager) DeleteAllIndexMetadata() error {
  304. tlm.metadataMutex.Lock()
  305. defer tlm.metadataMutex.Unlock()
  306. tlm.indexMetadata = make(map[string]*TestIndexMetadata)
  307. return nil
  308. }
  309. // PerformGlobalIndexRebuild performs a complete index rebuild of all files
  310. func (suite *IntegrationTestSuite) PerformGlobalIndexRebuild(t *testing.T) {
  311. t.Log("Starting global index rebuild...")
  312. startTime := time.Now()
  313. // Create progress tracking
  314. var completedFiles []string
  315. var mu sync.Mutex
  316. progressConfig := &indexer.ProgressConfig{
  317. NotifyInterval: 1 * time.Second,
  318. OnProgress: func(progress indexer.ProgressNotification) {
  319. t.Logf("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  320. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  321. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  322. },
  323. OnCompletion: func(completion indexer.CompletionNotification) {
  324. mu.Lock()
  325. completedFiles = append(completedFiles, completion.LogGroupPath)
  326. mu.Unlock()
  327. t.Logf("Index completion: %s - Success: %t, Duration: %s, Lines: %d",
  328. completion.LogGroupPath, completion.Success, completion.Duration, completion.TotalLines)
  329. },
  330. }
  331. // Destroy existing indexes
  332. err := suite.indexer.DestroyAllIndexes(suite.ctx)
  333. require.NoError(t, err)
  334. // Re-initialize indexer
  335. err = suite.indexer.Start(suite.ctx)
  336. require.NoError(t, err)
  337. // Index all log files
  338. allLogs := suite.logFileManager.GetAllLogsWithIndexGrouped()
  339. for _, log := range allLogs {
  340. docsCountMap, minTime, maxTime, err := suite.indexer.IndexLogGroupWithProgress(log.Path, progressConfig)
  341. require.NoError(t, err, "Failed to index log group: %s", log.Path)
  342. // Save metadata
  343. duration := time.Since(startTime)
  344. var totalDocs uint64
  345. for _, docCount := range docsCountMap {
  346. totalDocs += docCount
  347. }
  348. err = suite.logFileManager.SaveIndexMetadata(log.Path, totalDocs, startTime, duration, minTime, maxTime)
  349. require.NoError(t, err)
  350. }
  351. // Flush and update searcher
  352. err = suite.indexer.FlushAll()
  353. require.NoError(t, err)
  354. suite.updateSearcher(t)
  355. totalDuration := time.Since(startTime)
  356. t.Logf("Global index rebuild completed in %s. Completed files: %v", totalDuration, completedFiles)
  357. }
  358. // PerformSingleFileIndexRebuild rebuilds index for a single file
  359. func (suite *IntegrationTestSuite) PerformSingleFileIndexRebuild(t *testing.T, targetFile string) {
  360. t.Logf("Starting single file index rebuild for: %s", targetFile)
  361. startTime := time.Now()
  362. progressConfig := &indexer.ProgressConfig{
  363. NotifyInterval: 1 * time.Second,
  364. OnProgress: func(progress indexer.ProgressNotification) {
  365. t.Logf("Single file index progress: %s - %.1f%%", progress.LogGroupPath, progress.Percentage)
  366. },
  367. OnCompletion: func(completion indexer.CompletionNotification) {
  368. t.Logf("Single file index completion: %s - Success: %t, Lines: %d",
  369. completion.LogGroupPath, completion.Success, completion.TotalLines)
  370. },
  371. }
  372. // Delete existing index for this log group
  373. err := suite.indexer.DeleteIndexByLogGroup(targetFile, suite.logFileManager)
  374. require.NoError(t, err)
  375. // Clean up database records for this log group
  376. err = suite.logFileManager.DeleteIndexMetadataByGroup(targetFile)
  377. require.NoError(t, err)
  378. // Index the specific file
  379. docsCountMap, minTime, maxTime, err := suite.indexer.IndexLogGroupWithProgress(targetFile, progressConfig)
  380. require.NoError(t, err, "Failed to index single file: %s", targetFile)
  381. // Save metadata
  382. duration := time.Since(startTime)
  383. var totalDocs uint64
  384. for _, docCount := range docsCountMap {
  385. totalDocs += docCount
  386. }
  387. err = suite.logFileManager.SaveIndexMetadata(targetFile, totalDocs, startTime, duration, minTime, maxTime)
  388. require.NoError(t, err)
  389. // Flush and update searcher
  390. err = suite.indexer.FlushAll()
  391. require.NoError(t, err)
  392. suite.updateSearcher(t)
  393. totalDuration := time.Since(startTime)
  394. t.Logf("Single file index rebuild completed in %s for: %s", totalDuration, targetFile)
  395. }
  396. // updateSearcher updates the searcher with current shards
  397. func (suite *IntegrationTestSuite) updateSearcher(t *testing.T) {
  398. if !suite.indexer.IsHealthy() {
  399. t.Fatal("Indexer is not healthy, cannot update searcher")
  400. }
  401. newShards := suite.indexer.GetAllShards()
  402. t.Logf("Updating searcher with %d shards", len(newShards))
  403. if suite.searcher != nil {
  404. err := suite.searcher.SwapShards(newShards)
  405. require.NoError(t, err)
  406. t.Log("Searcher shards updated successfully")
  407. } else {
  408. t.Fatal("Searcher is not a Searcher")
  409. }
  410. }
  411. // ValidateCardinalityCounter validates the accuracy of cardinality counting
  412. func (suite *IntegrationTestSuite) ValidateCardinalityCounter(t *testing.T, filePath string) {
  413. t.Logf("Validating Counter accuracy for: %s", filePath)
  414. expected := suite.expectedMetrics[filePath]
  415. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  416. // Test IP cardinality
  417. suite.testFieldCardinality(t, filePath, "remote_addr", expected.UniqueIPs, "IP addresses")
  418. // Test path cardinality
  419. suite.testFieldCardinality(t, filePath, "uri_path", expected.UniquePaths, "URI paths")
  420. // Test user agent cardinality
  421. suite.testFieldCardinality(t, filePath, "http_user_agent", expected.UniqueAgents, "User agents")
  422. t.Logf("Counter validation completed for: %s", filePath)
  423. }
  424. // testFieldCardinality tests cardinality counting for a specific field
  425. func (suite *IntegrationTestSuite) testFieldCardinality(t *testing.T, filePath string, field string, expectedCount uint64, fieldName string) {
  426. if suite.searcher != nil {
  427. cardinalityCounter := searcher.NewCounter(suite.searcher.GetShards())
  428. req := &searcher.CardinalityRequest{
  429. Field: field,
  430. LogPaths: []string{filePath},
  431. }
  432. result, err := cardinalityCounter.Count(suite.ctx, req)
  433. require.NoError(t, err, "Failed to count cardinality for field: %s", field)
  434. // Allow for small discrepancies due to indexing behavior
  435. tolerance := uint64(expectedCount) / 100 // 1% tolerance
  436. if tolerance < 1 {
  437. tolerance = 1
  438. }
  439. assert.InDelta(t, expectedCount, result.Cardinality, float64(tolerance),
  440. "Cardinality mismatch for %s in %s: expected %d, got %d",
  441. fieldName, filePath, expectedCount, result.Cardinality)
  442. t.Logf("✓ %s cardinality: expected=%d, actual=%d, total_docs=%d",
  443. fieldName, expectedCount, result.Cardinality, result.TotalDocs)
  444. } else {
  445. t.Fatal("Searcher is not a Searcher")
  446. }
  447. }
  448. // ValidateAnalyticsData validates the accuracy of analytics statistics
  449. func (suite *IntegrationTestSuite) ValidateAnalyticsData(t *testing.T, filePath string) {
  450. t.Logf("Validating Analytics data accuracy for: %s", filePath)
  451. expected := suite.expectedMetrics[filePath]
  452. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  453. // Test dashboard analytics
  454. dashboardReq := &analytics.DashboardQueryRequest{
  455. LogPaths: []string{filePath},
  456. StartTime: expected.TimeRange.StartTime.Unix(),
  457. EndTime: expected.TimeRange.EndTime.Unix(),
  458. }
  459. dashboard, err := suite.analytics.GetDashboardAnalytics(suite.ctx, dashboardReq)
  460. require.NoError(t, err, "Failed to get dashboard data for: %s", filePath)
  461. // Validate basic metrics
  462. tolerance := float64(expected.TotalRecords) * 0.01 // 1% tolerance
  463. assert.InDelta(t, expected.TotalRecords, dashboard.Summary.TotalPV, tolerance,
  464. "Total requests mismatch for %s", filePath)
  465. t.Logf("✓ Dashboard validation completed for: %s", filePath)
  466. t.Logf(" Total requests: expected=%d, actual=%d", expected.TotalRecords, dashboard.Summary.TotalPV)
  467. t.Logf(" Unique visitors: %d", dashboard.Summary.TotalUV)
  468. t.Logf(" Average daily PV: %f", dashboard.Summary.AvgDailyPV)
  469. }
  470. // ValidatePaginationFunctionality validates pagination works correctly using searcher
  471. func (suite *IntegrationTestSuite) ValidatePaginationFunctionality(t *testing.T, filePath string) {
  472. t.Logf("Validating pagination functionality for: %s", filePath)
  473. expected := suite.expectedMetrics[filePath]
  474. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  475. startTime := expected.TimeRange.StartTime.Unix()
  476. endTime := expected.TimeRange.EndTime.Unix()
  477. // Test first page
  478. searchReq1 := &searcher.SearchRequest{
  479. Query: "*",
  480. LogPaths: []string{filePath},
  481. StartTime: &startTime,
  482. EndTime: &endTime,
  483. Limit: 100,
  484. Offset: 0,
  485. SortBy: "timestamp",
  486. SortOrder: "desc",
  487. }
  488. result1, err := suite.searcher.Search(suite.ctx, searchReq1)
  489. require.NoError(t, err, "Failed to get page 1 for: %s", filePath)
  490. assert.Equal(t, 100, len(result1.Hits), "First page should have 100 entries")
  491. assert.Equal(t, expected.TotalRecords, result1.TotalHits, "Total count mismatch")
  492. // Test second page
  493. searchReq2 := &searcher.SearchRequest{
  494. Query: "*",
  495. LogPaths: []string{filePath},
  496. StartTime: &startTime,
  497. EndTime: &endTime,
  498. Limit: 100,
  499. Offset: 100,
  500. SortBy: "timestamp",
  501. SortOrder: "desc",
  502. }
  503. result2, err := suite.searcher.Search(suite.ctx, searchReq2)
  504. require.NoError(t, err, "Failed to get page 2 for: %s", filePath)
  505. assert.Equal(t, 100, len(result2.Hits), "Second page should have 100 entries")
  506. assert.Equal(t, expected.TotalRecords, result2.TotalHits, "Total count should be consistent")
  507. // Ensure different pages return different entries
  508. if len(result1.Hits) > 0 && len(result2.Hits) > 0 {
  509. firstPageFirstEntry := result1.Hits[0].ID
  510. secondPageFirstEntry := result2.Hits[0].ID
  511. assert.NotEqual(t, firstPageFirstEntry, secondPageFirstEntry,
  512. "Different pages should return different entries")
  513. }
  514. t.Logf("✓ Pagination validation completed for: %s", filePath)
  515. t.Logf(" Page 1 entries: %d", len(result1.Hits))
  516. t.Logf(" Page 2 entries: %d", len(result2.Hits))
  517. t.Logf(" Total entries: %d", result1.TotalHits)
  518. }
  519. // TestNginxLogIntegration is the main integration test function
  520. func TestNginxLogIntegration(t *testing.T) {
  521. if testing.Short() {
  522. t.Skip("skipping integration test in short mode")
  523. }
  524. suite := NewIntegrationTestSuite(t)
  525. defer suite.cleanup()
  526. t.Log("=== Starting Nginx Log Integration Test ===")
  527. // Step 1: Generate test data
  528. suite.GenerateTestData(t)
  529. // Step 2: Initialize services
  530. suite.InitializeServices(t)
  531. // Step 3: Perform global index rebuild and validate during indexing
  532. t.Log("\n=== Testing Global Index Rebuild ===")
  533. suite.PerformGlobalIndexRebuild(t)
  534. // Step 4: Validate all files after global rebuild
  535. for _, filePath := range suite.logFilePaths {
  536. t.Logf("\n--- Validating file after global rebuild: %s ---", filepath.Base(filePath))
  537. suite.ValidateCardinalityCounter(t, filePath)
  538. suite.ValidateAnalyticsData(t, filePath)
  539. suite.ValidatePaginationFunctionality(t, filePath)
  540. }
  541. // Step 5: Test single file rebuild
  542. t.Log("\n=== Testing Single File Index Rebuild ===")
  543. targetFile := suite.logFilePaths[1] // Rebuild second file
  544. suite.PerformSingleFileIndexRebuild(t, targetFile)
  545. // Step 6: Validate all files after single file rebuild
  546. for _, filePath := range suite.logFilePaths {
  547. t.Logf("\n--- Validating file after single file rebuild: %s ---", filepath.Base(filePath))
  548. suite.ValidateCardinalityCounter(t, filePath)
  549. suite.ValidateAnalyticsData(t, filePath)
  550. suite.ValidatePaginationFunctionality(t, filePath)
  551. }
  552. t.Log("\n=== Integration Test Completed Successfully ===")
  553. }
  554. // TestConcurrentIndexingAndQuerying tests querying while indexing is in progress
  555. func TestConcurrentIndexingAndQuerying(t *testing.T) {
  556. if testing.Short() {
  557. t.Skip("skipping concurrent integration test in short mode")
  558. }
  559. suite := NewIntegrationTestSuite(t)
  560. defer suite.cleanup()
  561. t.Log("=== Starting Concurrent Indexing and Querying Test ===")
  562. // Generate test data and initialize services
  563. suite.GenerateTestData(t)
  564. suite.InitializeServices(t)
  565. var wg sync.WaitGroup
  566. // Start indexing in background
  567. wg.Add(1)
  568. go func() {
  569. defer wg.Done()
  570. suite.PerformGlobalIndexRebuild(t)
  571. }()
  572. // Wait a bit for indexing to start
  573. time.Sleep(2 * time.Second)
  574. // Query while indexing is in progress
  575. wg.Add(1)
  576. go func() {
  577. defer wg.Done()
  578. for i := 0; i < 10; i++ {
  579. time.Sleep(1 * time.Second)
  580. // Test search functionality
  581. if suite.searcher.IsHealthy() {
  582. searchReq := &searcher.SearchRequest{
  583. Query: "GET",
  584. LogPaths: []string{suite.logFilePaths[0]},
  585. Limit: 10,
  586. }
  587. result, err := suite.searcher.Search(suite.ctx, searchReq)
  588. if err == nil {
  589. t.Logf("Concurrent query %d: found %d results", i+1, result.TotalHits)
  590. }
  591. }
  592. }
  593. }()
  594. wg.Wait()
  595. // Final validation
  596. for _, filePath := range suite.logFilePaths {
  597. suite.ValidateCardinalityCounter(t, filePath)
  598. suite.ValidateAnalyticsData(t, filePath)
  599. }
  600. t.Log("=== Concurrent Test Completed Successfully ===")
  601. }