1
0

integration_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/analytics"
  12. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  13. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  14. "github.com/blevesearch/bleve/v2"
  15. "github.com/stretchr/testify/assert"
  16. "github.com/stretchr/testify/require"
  17. )
  18. const (
  19. // Test configuration
  20. TestRecordsPerFile = 400000 // 40万条记录每个文件
  21. TestFileCount = 3 // 3个测试文件
  22. TestBaseDir = "./test_integration_logs"
  23. TestIndexDir = "./test_integration_index"
  24. )
  25. // IntegrationTestSuite contains all integration test data and services
  26. type IntegrationTestSuite struct {
  27. ctx context.Context
  28. cancel context.CancelFunc
  29. tempDir string
  30. indexDir string
  31. logFiles []string
  32. logFilePaths []string
  33. indexer *indexer.ParallelIndexer
  34. searcher searcher.Searcher
  35. analytics analytics.Service
  36. logFileManager *TestLogFileManager
  37. expectedMetrics map[string]*ExpectedFileMetrics
  38. mu sync.RWMutex
  39. cleanup func()
  40. }
  41. // TestLogFileManager is a simplified log file manager for testing that doesn't require database
  42. type TestLogFileManager struct {
  43. logCache map[string]*indexer.NginxLogCache
  44. cacheMutex sync.RWMutex
  45. indexingStatus map[string]bool
  46. indexingMutex sync.RWMutex
  47. indexMetadata map[string]*TestIndexMetadata
  48. metadataMutex sync.RWMutex
  49. }
  50. // TestIndexMetadata holds index metadata for testing
  51. type TestIndexMetadata struct {
  52. Path string
  53. DocumentCount uint64
  54. LastIndexed time.Time
  55. Duration time.Duration
  56. MinTime *time.Time
  57. MaxTime *time.Time
  58. }
  59. // ExpectedFileMetrics stores expected statistics for each log file
  60. type ExpectedFileMetrics struct {
  61. TotalRecords uint64
  62. UniqueIPs uint64
  63. UniquePaths uint64
  64. UniqueAgents uint64
  65. StatusCodes map[int]uint64
  66. Methods map[string]uint64
  67. TimeRange TestTimeRange
  68. }
  69. // TestTimeRange represents the time range of log entries for testing
  70. type TestTimeRange struct {
  71. StartTime time.Time
  72. EndTime time.Time
  73. }
  74. // NewIntegrationTestSuite creates a new integration test suite
  75. func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
  76. ctx, cancel := context.WithCancel(context.Background())
  77. // Create temporary directories
  78. tempDir, err := os.MkdirTemp("", "nginx_ui_integration_test_*")
  79. require.NoError(t, err)
  80. indexDir := filepath.Join(tempDir, "index")
  81. logsDir := filepath.Join(tempDir, "logs")
  82. err = os.MkdirAll(indexDir, 0755)
  83. require.NoError(t, err)
  84. err = os.MkdirAll(logsDir, 0755)
  85. require.NoError(t, err)
  86. suite := &IntegrationTestSuite{
  87. ctx: ctx,
  88. cancel: cancel,
  89. tempDir: tempDir,
  90. indexDir: indexDir,
  91. expectedMetrics: make(map[string]*ExpectedFileMetrics),
  92. }
  93. // Set cleanup function
  94. suite.cleanup = func() {
  95. // Stop services
  96. if suite.indexer != nil {
  97. suite.indexer.Stop()
  98. }
  99. if suite.searcher != nil {
  100. suite.searcher.Stop()
  101. }
  102. // Cancel context
  103. cancel()
  104. // Remove temporary directories
  105. os.RemoveAll(tempDir)
  106. }
  107. return suite
  108. }
  109. // GenerateTestData generates the test log files with expected statistics
  110. func (suite *IntegrationTestSuite) GenerateTestData(t *testing.T) {
  111. t.Logf("Generating %d test files with %d records each", TestFileCount, TestRecordsPerFile)
  112. baseTime := time.Now().Add(-24 * time.Hour)
  113. for i := 0; i < TestFileCount; i++ {
  114. filename := fmt.Sprintf("access_%d.log", i+1)
  115. filepath := filepath.Join(suite.tempDir, "logs", filename)
  116. metrics := suite.generateSingleLogFile(t, filepath, baseTime.Add(time.Duration(i)*time.Hour))
  117. suite.logFiles = append(suite.logFiles, filename)
  118. suite.logFilePaths = append(suite.logFilePaths, filepath)
  119. suite.expectedMetrics[filepath] = metrics
  120. t.Logf("Generated %s with %d records", filename, metrics.TotalRecords)
  121. }
  122. t.Logf("Test data generation completed. Total files: %d", len(suite.logFiles))
  123. }
  124. // generateSingleLogFile generates a single log file with known statistics
  125. func (suite *IntegrationTestSuite) generateSingleLogFile(t *testing.T, filepath string, baseTime time.Time) *ExpectedFileMetrics {
  126. file, err := os.Create(filepath)
  127. require.NoError(t, err)
  128. defer file.Close()
  129. metrics := &ExpectedFileMetrics{
  130. StatusCodes: make(map[int]uint64),
  131. Methods: make(map[string]uint64),
  132. TimeRange: TestTimeRange{
  133. StartTime: baseTime,
  134. EndTime: baseTime.Add(time.Duration(TestRecordsPerFile) * time.Second),
  135. },
  136. }
  137. // Predefined test data for consistent testing
  138. ips := []string{
  139. "192.168.1.1", "192.168.1.2", "192.168.1.3", "10.0.0.1", "10.0.0.2",
  140. "172.16.0.1", "172.16.0.2", "203.0.113.1", "203.0.113.2", "198.51.100.1",
  141. }
  142. paths := []string{
  143. "/", "/api/v1/status", "/api/v1/logs", "/admin", "/login",
  144. "/dashboard", "/api/v1/config", "/static/css/main.css", "/static/js/app.js", "/favicon.ico",
  145. }
  146. userAgents := []string{
  147. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  148. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
  149. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
  150. "PostmanRuntime/7.28.4",
  151. "Go-http-client/1.1",
  152. }
  153. statusCodes := []int{200, 301, 404, 500, 502}
  154. methods := []string{"GET", "POST", "PUT", "DELETE"}
  155. // Track unique values
  156. uniqueIPs := make(map[string]bool)
  157. uniquePaths := make(map[string]bool)
  158. uniqueAgents := make(map[string]bool)
  159. rand.Seed(time.Now().UnixNano() + int64(len(filepath))) // Different seed per file
  160. for i := 0; i < TestRecordsPerFile; i++ {
  161. // Generate log entry timestamp
  162. timestamp := baseTime.Add(time.Duration(i) * time.Second)
  163. // Select random values
  164. ip := ips[rand.Intn(len(ips))]
  165. path := paths[rand.Intn(len(paths))]
  166. agent := userAgents[rand.Intn(len(userAgents))]
  167. status := statusCodes[rand.Intn(len(statusCodes))]
  168. method := methods[rand.Intn(len(methods))]
  169. size := rand.Intn(10000) + 100 // 100-10100 bytes
  170. // Track unique values
  171. uniqueIPs[ip] = true
  172. uniquePaths[path] = true
  173. uniqueAgents[agent] = true
  174. // Update metrics
  175. metrics.StatusCodes[status]++
  176. metrics.Methods[method]++
  177. // Generate nginx log line (Common Log Format)
  178. logLine := fmt.Sprintf(`%s - - [%s] "%s %s HTTP/1.1" %d %d "-" "%s"`+"\n",
  179. ip,
  180. timestamp.Format("02/Jan/2006:15:04:05 -0700"),
  181. method,
  182. path,
  183. status,
  184. size,
  185. agent,
  186. )
  187. _, err := file.WriteString(logLine)
  188. require.NoError(t, err)
  189. }
  190. // Finalize metrics
  191. metrics.TotalRecords = TestRecordsPerFile
  192. metrics.UniqueIPs = uint64(len(uniqueIPs))
  193. metrics.UniquePaths = uint64(len(uniquePaths))
  194. metrics.UniqueAgents = uint64(len(uniqueAgents))
  195. return metrics
  196. }
  197. // InitializeServices initializes all nginx_log services for testing
  198. func (suite *IntegrationTestSuite) InitializeServices(t *testing.T) {
  199. t.Log("Initializing test services...")
  200. // Initialize indexer
  201. indexerConfig := indexer.DefaultIndexerConfig()
  202. indexerConfig.IndexPath = suite.indexDir
  203. shardManager := indexer.NewDefaultShardManager(indexerConfig)
  204. suite.indexer = indexer.NewParallelIndexer(indexerConfig, shardManager)
  205. err := suite.indexer.Start(suite.ctx)
  206. require.NoError(t, err)
  207. // Initialize searcher (empty initially)
  208. searcherConfig := searcher.DefaultSearcherConfig()
  209. suite.searcher = searcher.NewDistributedSearcher(searcherConfig, []bleve.Index{})
  210. // Initialize analytics
  211. suite.analytics = analytics.NewService(suite.searcher)
  212. // Initialize log file manager with test-specific behavior
  213. suite.logFileManager = suite.createTestLogFileManager(t)
  214. // Register test log files
  215. for _, logPath := range suite.logFilePaths {
  216. suite.logFileManager.AddLogPath(logPath, "access", filepath.Base(logPath), "test_config")
  217. }
  218. t.Log("Services initialized successfully")
  219. }
  220. // createTestLogFileManager creates a log file manager suitable for testing
  221. func (suite *IntegrationTestSuite) createTestLogFileManager(t *testing.T) *TestLogFileManager {
  222. return &TestLogFileManager{
  223. logCache: make(map[string]*indexer.NginxLogCache),
  224. indexingStatus: make(map[string]bool),
  225. indexMetadata: make(map[string]*TestIndexMetadata),
  226. }
  227. }
  228. // AddLogPath adds a log path to the test log cache
  229. func (tlm *TestLogFileManager) AddLogPath(path, logType, name, configFile string) {
  230. tlm.cacheMutex.Lock()
  231. defer tlm.cacheMutex.Unlock()
  232. tlm.logCache[path] = &indexer.NginxLogCache{
  233. Path: path,
  234. Type: logType,
  235. Name: name,
  236. ConfigFile: configFile,
  237. }
  238. }
  239. // GetAllLogsWithIndexGrouped returns all cached log paths with their index status for testing
  240. func (tlm *TestLogFileManager) GetAllLogsWithIndexGrouped(filters ...func(*indexer.NginxLogWithIndex) bool) []*indexer.NginxLogWithIndex {
  241. tlm.cacheMutex.RLock()
  242. defer tlm.cacheMutex.RUnlock()
  243. tlm.metadataMutex.RLock()
  244. defer tlm.metadataMutex.RUnlock()
  245. var logs []*indexer.NginxLogWithIndex
  246. for _, logEntry := range tlm.logCache {
  247. logWithIndex := &indexer.NginxLogWithIndex{
  248. Path: logEntry.Path,
  249. Type: logEntry.Type,
  250. Name: logEntry.Name,
  251. ConfigFile: logEntry.ConfigFile,
  252. IndexStatus: "not_indexed",
  253. }
  254. // Check if we have index metadata for this path
  255. if metadata, exists := tlm.indexMetadata[logEntry.Path]; exists {
  256. logWithIndex.IndexStatus = "indexed"
  257. logWithIndex.DocumentCount = metadata.DocumentCount
  258. logWithIndex.LastIndexed = metadata.LastIndexed.Unix()
  259. logWithIndex.IndexDuration = int64(metadata.Duration.Milliseconds())
  260. if metadata.MinTime != nil {
  261. logWithIndex.HasTimeRange = true
  262. logWithIndex.TimeRangeStart = metadata.MinTime.Unix()
  263. }
  264. if metadata.MaxTime != nil {
  265. logWithIndex.HasTimeRange = true
  266. logWithIndex.TimeRangeEnd = metadata.MaxTime.Unix()
  267. }
  268. }
  269. // Apply filters
  270. include := true
  271. for _, filter := range filters {
  272. if !filter(logWithIndex) {
  273. include = false
  274. break
  275. }
  276. }
  277. if include {
  278. logs = append(logs, logWithIndex)
  279. }
  280. }
  281. return logs
  282. }
  283. // SaveIndexMetadata saves index metadata for testing
  284. func (tlm *TestLogFileManager) SaveIndexMetadata(path string, docCount uint64, indexTime time.Time, duration time.Duration, minTime, maxTime *time.Time) error {
  285. tlm.metadataMutex.Lock()
  286. defer tlm.metadataMutex.Unlock()
  287. tlm.indexMetadata[path] = &TestIndexMetadata{
  288. Path: path,
  289. DocumentCount: docCount,
  290. LastIndexed: indexTime,
  291. Duration: duration,
  292. MinTime: minTime,
  293. MaxTime: maxTime,
  294. }
  295. return nil
  296. }
  297. // DeleteIndexMetadataByGroup deletes index metadata for a log group (for testing)
  298. func (tlm *TestLogFileManager) DeleteIndexMetadataByGroup(logGroup string) error {
  299. tlm.metadataMutex.Lock()
  300. defer tlm.metadataMutex.Unlock()
  301. delete(tlm.indexMetadata, logGroup)
  302. return nil
  303. }
  304. // DeleteAllIndexMetadata deletes all index metadata (for testing)
  305. func (tlm *TestLogFileManager) DeleteAllIndexMetadata() error {
  306. tlm.metadataMutex.Lock()
  307. defer tlm.metadataMutex.Unlock()
  308. tlm.indexMetadata = make(map[string]*TestIndexMetadata)
  309. return nil
  310. }
  311. // PerformGlobalIndexRebuild performs a complete index rebuild of all files
  312. func (suite *IntegrationTestSuite) PerformGlobalIndexRebuild(t *testing.T) {
  313. t.Log("Starting global index rebuild...")
  314. startTime := time.Now()
  315. // Create progress tracking
  316. var completedFiles []string
  317. var mu sync.Mutex
  318. progressConfig := &indexer.ProgressConfig{
  319. NotifyInterval: 1 * time.Second,
  320. OnProgress: func(progress indexer.ProgressNotification) {
  321. t.Logf("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  322. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  323. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  324. },
  325. OnCompletion: func(completion indexer.CompletionNotification) {
  326. mu.Lock()
  327. completedFiles = append(completedFiles, completion.LogGroupPath)
  328. mu.Unlock()
  329. t.Logf("Index completion: %s - Success: %t, Duration: %s, Lines: %d",
  330. completion.LogGroupPath, completion.Success, completion.Duration, completion.TotalLines)
  331. },
  332. }
  333. // Destroy existing indexes
  334. err := suite.indexer.DestroyAllIndexes(suite.ctx)
  335. require.NoError(t, err)
  336. // Re-initialize indexer
  337. err = suite.indexer.Start(suite.ctx)
  338. require.NoError(t, err)
  339. // Index all log files
  340. allLogs := suite.logFileManager.GetAllLogsWithIndexGrouped()
  341. for _, log := range allLogs {
  342. docsCountMap, minTime, maxTime, err := suite.indexer.IndexLogGroupWithProgress(log.Path, progressConfig)
  343. require.NoError(t, err, "Failed to index log group: %s", log.Path)
  344. // Save metadata
  345. duration := time.Since(startTime)
  346. var totalDocs uint64
  347. for _, docCount := range docsCountMap {
  348. totalDocs += docCount
  349. }
  350. err = suite.logFileManager.SaveIndexMetadata(log.Path, totalDocs, startTime, duration, minTime, maxTime)
  351. require.NoError(t, err)
  352. }
  353. // Flush and update searcher
  354. err = suite.indexer.FlushAll()
  355. require.NoError(t, err)
  356. suite.updateSearcher(t)
  357. totalDuration := time.Since(startTime)
  358. t.Logf("Global index rebuild completed in %s. Completed files: %v", totalDuration, completedFiles)
  359. }
  360. // PerformSingleFileIndexRebuild rebuilds index for a single file
  361. func (suite *IntegrationTestSuite) PerformSingleFileIndexRebuild(t *testing.T, targetFile string) {
  362. t.Logf("Starting single file index rebuild for: %s", targetFile)
  363. startTime := time.Now()
  364. progressConfig := &indexer.ProgressConfig{
  365. NotifyInterval: 1 * time.Second,
  366. OnProgress: func(progress indexer.ProgressNotification) {
  367. t.Logf("Single file index progress: %s - %.1f%%", progress.LogGroupPath, progress.Percentage)
  368. },
  369. OnCompletion: func(completion indexer.CompletionNotification) {
  370. t.Logf("Single file index completion: %s - Success: %t, Lines: %d",
  371. completion.LogGroupPath, completion.Success, completion.TotalLines)
  372. },
  373. }
  374. // Delete existing index for this log group
  375. err := suite.indexer.DeleteIndexByLogGroup(targetFile, suite.logFileManager)
  376. require.NoError(t, err)
  377. // Clean up database records for this log group
  378. err = suite.logFileManager.DeleteIndexMetadataByGroup(targetFile)
  379. require.NoError(t, err)
  380. // Index the specific file
  381. docsCountMap, minTime, maxTime, err := suite.indexer.IndexLogGroupWithProgress(targetFile, progressConfig)
  382. require.NoError(t, err, "Failed to index single file: %s", targetFile)
  383. // Save metadata
  384. duration := time.Since(startTime)
  385. var totalDocs uint64
  386. for _, docCount := range docsCountMap {
  387. totalDocs += docCount
  388. }
  389. err = suite.logFileManager.SaveIndexMetadata(targetFile, totalDocs, startTime, duration, minTime, maxTime)
  390. require.NoError(t, err)
  391. // Flush and update searcher
  392. err = suite.indexer.FlushAll()
  393. require.NoError(t, err)
  394. suite.updateSearcher(t)
  395. totalDuration := time.Since(startTime)
  396. t.Logf("Single file index rebuild completed in %s for: %s", totalDuration, targetFile)
  397. }
  398. // updateSearcher updates the searcher with current shards
  399. func (suite *IntegrationTestSuite) updateSearcher(t *testing.T) {
  400. if !suite.indexer.IsHealthy() {
  401. t.Fatal("Indexer is not healthy, cannot update searcher")
  402. }
  403. newShards := suite.indexer.GetAllShards()
  404. t.Logf("Updating searcher with %d shards", len(newShards))
  405. if ds, ok := suite.searcher.(*searcher.DistributedSearcher); ok {
  406. err := ds.SwapShards(newShards)
  407. require.NoError(t, err)
  408. t.Log("Searcher shards updated successfully")
  409. } else {
  410. t.Fatal("Searcher is not a DistributedSearcher")
  411. }
  412. }
  413. // ValidateCardinalityCounter validates the accuracy of cardinality counting
  414. func (suite *IntegrationTestSuite) ValidateCardinalityCounter(t *testing.T, filePath string) {
  415. t.Logf("Validating CardinalityCounter accuracy for: %s", filePath)
  416. expected := suite.expectedMetrics[filePath]
  417. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  418. // Test IP cardinality
  419. suite.testFieldCardinality(t, filePath, "remote_addr", expected.UniqueIPs, "IP addresses")
  420. // Test path cardinality
  421. suite.testFieldCardinality(t, filePath, "uri_path", expected.UniquePaths, "URI paths")
  422. // Test user agent cardinality
  423. suite.testFieldCardinality(t, filePath, "http_user_agent", expected.UniqueAgents, "User agents")
  424. t.Logf("CardinalityCounter validation completed for: %s", filePath)
  425. }
  426. // testFieldCardinality tests cardinality counting for a specific field
  427. func (suite *IntegrationTestSuite) testFieldCardinality(t *testing.T, filePath string, field string, expectedCount uint64, fieldName string) {
  428. if ds, ok := suite.searcher.(*searcher.DistributedSearcher); ok {
  429. cardinalityCounter := searcher.NewCardinalityCounter(ds.GetShards())
  430. req := &searcher.CardinalityRequest{
  431. Field: field,
  432. LogPaths: []string{filePath},
  433. }
  434. result, err := cardinalityCounter.CountCardinality(suite.ctx, req)
  435. require.NoError(t, err, "Failed to count cardinality for field: %s", field)
  436. // Allow for small discrepancies due to indexing behavior
  437. tolerance := uint64(expectedCount) / 100 // 1% tolerance
  438. if tolerance < 1 {
  439. tolerance = 1
  440. }
  441. assert.InDelta(t, expectedCount, result.Cardinality, float64(tolerance),
  442. "Cardinality mismatch for %s in %s: expected %d, got %d",
  443. fieldName, filePath, expectedCount, result.Cardinality)
  444. t.Logf("✓ %s cardinality: expected=%d, actual=%d, total_docs=%d",
  445. fieldName, expectedCount, result.Cardinality, result.TotalDocs)
  446. } else {
  447. t.Fatal("Searcher is not a DistributedSearcher")
  448. }
  449. }
  450. // ValidateAnalyticsData validates the accuracy of analytics statistics
  451. func (suite *IntegrationTestSuite) ValidateAnalyticsData(t *testing.T, filePath string) {
  452. t.Logf("Validating Analytics data accuracy for: %s", filePath)
  453. expected := suite.expectedMetrics[filePath]
  454. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  455. // Test dashboard analytics
  456. dashboardReq := &analytics.DashboardQueryRequest{
  457. LogPaths: []string{filePath},
  458. StartTime: expected.TimeRange.StartTime.Unix(),
  459. EndTime: expected.TimeRange.EndTime.Unix(),
  460. }
  461. dashboard, err := suite.analytics.GetDashboardAnalytics(suite.ctx, dashboardReq)
  462. require.NoError(t, err, "Failed to get dashboard data for: %s", filePath)
  463. // Validate basic metrics
  464. tolerance := float64(expected.TotalRecords) * 0.01 // 1% tolerance
  465. assert.InDelta(t, expected.TotalRecords, dashboard.Summary.TotalPV, tolerance,
  466. "Total requests mismatch for %s", filePath)
  467. t.Logf("✓ Dashboard validation completed for: %s", filePath)
  468. t.Logf(" Total requests: expected=%d, actual=%d", expected.TotalRecords, dashboard.Summary.TotalPV)
  469. t.Logf(" Unique visitors: %d", dashboard.Summary.TotalUV)
  470. t.Logf(" Average daily PV: %f", dashboard.Summary.AvgDailyPV)
  471. }
  472. // ValidatePaginationFunctionality validates pagination works correctly using searcher
  473. func (suite *IntegrationTestSuite) ValidatePaginationFunctionality(t *testing.T, filePath string) {
  474. t.Logf("Validating pagination functionality for: %s", filePath)
  475. expected := suite.expectedMetrics[filePath]
  476. require.NotNil(t, expected, "Expected metrics not found for file: %s", filePath)
  477. startTime := expected.TimeRange.StartTime.Unix()
  478. endTime := expected.TimeRange.EndTime.Unix()
  479. // Test first page
  480. searchReq1 := &searcher.SearchRequest{
  481. Query: "*",
  482. LogPaths: []string{filePath},
  483. StartTime: &startTime,
  484. EndTime: &endTime,
  485. Limit: 100,
  486. Offset: 0,
  487. SortBy: "timestamp",
  488. SortOrder: "desc",
  489. }
  490. result1, err := suite.searcher.Search(suite.ctx, searchReq1)
  491. require.NoError(t, err, "Failed to get page 1 for: %s", filePath)
  492. assert.Equal(t, 100, len(result1.Hits), "First page should have 100 entries")
  493. assert.Equal(t, expected.TotalRecords, result1.TotalHits, "Total count mismatch")
  494. // Test second page
  495. searchReq2 := &searcher.SearchRequest{
  496. Query: "*",
  497. LogPaths: []string{filePath},
  498. StartTime: &startTime,
  499. EndTime: &endTime,
  500. Limit: 100,
  501. Offset: 100,
  502. SortBy: "timestamp",
  503. SortOrder: "desc",
  504. }
  505. result2, err := suite.searcher.Search(suite.ctx, searchReq2)
  506. require.NoError(t, err, "Failed to get page 2 for: %s", filePath)
  507. assert.Equal(t, 100, len(result2.Hits), "Second page should have 100 entries")
  508. assert.Equal(t, expected.TotalRecords, result2.TotalHits, "Total count should be consistent")
  509. // Ensure different pages return different entries
  510. if len(result1.Hits) > 0 && len(result2.Hits) > 0 {
  511. firstPageFirstEntry := result1.Hits[0].ID
  512. secondPageFirstEntry := result2.Hits[0].ID
  513. assert.NotEqual(t, firstPageFirstEntry, secondPageFirstEntry,
  514. "Different pages should return different entries")
  515. }
  516. t.Logf("✓ Pagination validation completed for: %s", filePath)
  517. t.Logf(" Page 1 entries: %d", len(result1.Hits))
  518. t.Logf(" Page 2 entries: %d", len(result2.Hits))
  519. t.Logf(" Total entries: %d", result1.TotalHits)
  520. }
  521. // TestNginxLogIntegration is the main integration test function
  522. func TestNginxLogIntegration(t *testing.T) {
  523. suite := NewIntegrationTestSuite(t)
  524. defer suite.cleanup()
  525. t.Log("=== Starting Nginx Log Integration Test ===")
  526. // Step 1: Generate test data
  527. suite.GenerateTestData(t)
  528. // Step 2: Initialize services
  529. suite.InitializeServices(t)
  530. // Step 3: Perform global index rebuild and validate during indexing
  531. t.Log("\n=== Testing Global Index Rebuild ===")
  532. suite.PerformGlobalIndexRebuild(t)
  533. // Step 4: Validate all files after global rebuild
  534. for _, filePath := range suite.logFilePaths {
  535. t.Logf("\n--- Validating file after global rebuild: %s ---", filepath.Base(filePath))
  536. suite.ValidateCardinalityCounter(t, filePath)
  537. suite.ValidateAnalyticsData(t, filePath)
  538. suite.ValidatePaginationFunctionality(t, filePath)
  539. }
  540. // Step 5: Test single file rebuild
  541. t.Log("\n=== Testing Single File Index Rebuild ===")
  542. targetFile := suite.logFilePaths[1] // Rebuild second file
  543. suite.PerformSingleFileIndexRebuild(t, targetFile)
  544. // Step 6: Validate all files after single file rebuild
  545. for _, filePath := range suite.logFilePaths {
  546. t.Logf("\n--- Validating file after single file rebuild: %s ---", filepath.Base(filePath))
  547. suite.ValidateCardinalityCounter(t, filePath)
  548. suite.ValidateAnalyticsData(t, filePath)
  549. suite.ValidatePaginationFunctionality(t, filePath)
  550. }
  551. t.Log("\n=== Integration Test Completed Successfully ===")
  552. }
  553. // TestConcurrentIndexingAndQuerying tests querying while indexing is in progress
  554. func TestConcurrentIndexingAndQuerying(t *testing.T) {
  555. suite := NewIntegrationTestSuite(t)
  556. defer suite.cleanup()
  557. t.Log("=== Starting Concurrent Indexing and Querying Test ===")
  558. // Generate test data and initialize services
  559. suite.GenerateTestData(t)
  560. suite.InitializeServices(t)
  561. var wg sync.WaitGroup
  562. // Start indexing in background
  563. wg.Add(1)
  564. go func() {
  565. defer wg.Done()
  566. suite.PerformGlobalIndexRebuild(t)
  567. }()
  568. // Wait a bit for indexing to start
  569. time.Sleep(2 * time.Second)
  570. // Query while indexing is in progress
  571. wg.Add(1)
  572. go func() {
  573. defer wg.Done()
  574. for i := 0; i < 10; i++ {
  575. time.Sleep(1 * time.Second)
  576. // Test search functionality
  577. if suite.searcher.IsHealthy() {
  578. searchReq := &searcher.SearchRequest{
  579. Query: "GET",
  580. LogPaths: []string{suite.logFilePaths[0]},
  581. Limit: 10,
  582. }
  583. result, err := suite.searcher.Search(suite.ctx, searchReq)
  584. if err == nil {
  585. t.Logf("Concurrent query %d: found %d results", i+1, result.TotalHits)
  586. }
  587. }
  588. }
  589. }()
  590. wg.Wait()
  591. // Final validation
  592. for _, filePath := range suite.logFilePaths {
  593. suite.ValidateCardinalityCounter(t, filePath)
  594. suite.ValidateAnalyticsData(t, filePath)
  595. }
  596. t.Log("=== Concurrent Test Completed Successfully ===")
  597. }