123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- package nginx_log
- import (
- "context"
- "fmt"
- "strings"
- "time"
- "github.com/blevesearch/bleve/v2"
- "github.com/blevesearch/bleve/v2/search"
- "github.com/blevesearch/bleve/v2/search/query"
- "github.com/spf13/cast"
- "github.com/uozi-tech/cosy/logger"
- )
- // SearchLogs searches for log entries matching the given criteria
- func (li *LogIndexer) SearchLogs(ctx context.Context, req *QueryRequest) (*QueryResult, error) {
- start := time.Now()
- // Create cache key
- cacheKey := li.createCacheKey(req)
- // Check cache first
- if cached, found := li.cache.Get(cacheKey); found {
- // Calculate summary statistics from cache (we still need to do this since cache doesn't store summary)
- summaryStats, err := li.calculateSummaryStatsFromQuery(ctx, li.buildSearchQuery(req))
- if err != nil {
- logger.Warnf("Failed to calculate summary statistics from cache: %v", err)
- summaryStats = &SummaryStats{}
- }
- return &QueryResult{
- Entries: cached.Entries,
- Total: cached.Total,
- Took: time.Since(start),
- Summary: summaryStats,
- }, nil
- }
- // Build search query
- query := li.buildSearchQuery(req)
- // Create search request
- searchReq := bleve.NewSearchRequest(query)
- // Handle unlimited search (Limit = 0)
- if req.Limit == 0 {
- searchReq.Size = 10000000 // Very large number for unlimited search
- } else {
- searchReq.Size = req.Limit
- }
- searchReq.From = req.Offset
- // Set sorting
- if req.SortBy != "" {
- sortField := li.mapSortField(req.SortBy)
- ascending := req.SortOrder == "asc"
- searchReq.SortBy([]string{sortField})
- if !ascending {
- // For descending sort, we need to use negative sorting
- // This is a workaround for Bleve v2
- searchReq.SortByCustom(search.SortOrder{
- &search.SortField{
- Field: sortField,
- Desc: true,
- },
- })
- } else {
- searchReq.SortByCustom(search.SortOrder{
- &search.SortField{
- Field: sortField,
- Desc: false,
- },
- })
- }
- logger.Infof("Applying sort: field=%s, order=%s (desc=%v)", sortField, req.SortOrder, !ascending)
- } else {
- // Default sort by timestamp descending
- searchReq.SortByCustom(search.SortOrder{
- &search.SortField{
- Field: "timestamp",
- Desc: true,
- },
- })
- }
- // Include all fields in results
- searchReq.Fields = []string{"*"}
- // Execute search
- searchResult, err := li.index.SearchInContext(ctx, searchReq)
- if err != nil {
- return nil, fmt.Errorf("search failed: %w", err)
- }
- // Convert search results to AccessLogEntry
- entries := make([]*AccessLogEntry, 0, len(searchResult.Hits))
- for _, hit := range searchResult.Hits {
- entry := li.convertHitToEntry(hit)
- if entry != nil {
- entries = append(entries, entry)
- }
- }
- // Calculate summary statistics from ALL matching results (not just current page)
- summaryStats, err := li.calculateSummaryStatsFromQuery(ctx, query)
- if err != nil {
- logger.Warnf("Failed to calculate summary statistics: %v", err)
- summaryStats = &SummaryStats{} // Return empty stats on error
- }
- // Cache results with total count
- cachedResult := &CachedSearchResult{
- Entries: entries,
- Total: int(searchResult.Total),
- }
- li.cache.Set(cacheKey, cachedResult, int64(len(entries)*500+100)) // Estimate 500 bytes per entry + overhead
- result := &QueryResult{
- Entries: entries,
- Total: int(searchResult.Total),
- Took: time.Since(start),
- Summary: summaryStats,
- }
- return result, nil
- }
- // buildSearchQuery builds a Bleve query based on the request parameters
- func (li *LogIndexer) buildSearchQuery(req *QueryRequest) query.Query {
- var queries []query.Query
- // Time range query - only add if we have meaningful time constraints
- if !req.StartTime.IsZero() && !req.EndTime.IsZero() {
- // Check if the time range is reasonable (not too wide)
- if req.EndTime.Sub(req.StartTime) < 400*24*time.Hour { // Less than ~400 days
- // Add 1 millisecond to endTime to ensure boundary values are included
- // This fixes the issue where records with exact endTime are excluded due to exclusive upper bound
- inclusiveEndTime := req.EndTime.Add(1 * time.Millisecond)
- logger.Infof("Using time range filter: %s to %s (inclusive)", req.StartTime.Format(time.RFC3339), inclusiveEndTime.Format(time.RFC3339))
- timeQuery := bleve.NewDateRangeQuery(req.StartTime, inclusiveEndTime)
- timeQuery.SetField("timestamp")
- queries = append(queries, timeQuery)
- } else {
- logger.Infof("Time range too wide (%v), ignoring time filter to search all data", req.EndTime.Sub(req.StartTime))
- }
- } else {
- logger.Infof("No meaningful time range specified, searching all data")
- }
- // Text search query
- if req.Query != "" {
- textQuery := bleve.NewMatchQuery(req.Query)
- textQuery.SetField("raw")
- queries = append(queries, textQuery)
- }
- // IP filter
- if req.IP != "" {
- ipQuery := bleve.NewMatchQuery(req.IP)
- ipQuery.SetField("ip")
- queries = append(queries, ipQuery)
- }
- // Method filter
- if req.Method != "" {
- logger.Infof("Adding method filter: %s", req.Method)
- methodQuery := bleve.NewMatchQuery(req.Method)
- methodQuery.SetField("method")
- queries = append(queries, methodQuery)
- }
- // Status filter
- if len(req.Status) > 0 {
- logger.Infof("Adding status filter: %v", req.Status)
- var statusQueries []query.Query
- for _, status := range req.Status {
- // Use NumericRangeQuery for exact numeric match
- statusFloat := float64(status)
- statusQuery := bleve.NewNumericRangeQuery(&statusFloat, &statusFloat)
- statusQuery.SetField("status")
- statusQueries = append(statusQueries, statusQuery)
- }
- if len(statusQueries) == 1 {
- queries = append(queries, statusQueries[0])
- } else {
- orQuery := bleve.NewDisjunctionQuery(statusQueries...)
- queries = append(queries, orQuery)
- }
- }
- // Path filter
- if req.Path != "" {
- logger.Infof("Adding path filter: %s", req.Path)
- pathQuery := bleve.NewMatchQuery(req.Path)
- pathQuery.SetField("path")
- queries = append(queries, pathQuery)
- }
- // User agent filter
- if req.UserAgent != "" {
- uaQuery := bleve.NewMatchQuery(req.UserAgent)
- uaQuery.SetField("user_agent")
- queries = append(queries, uaQuery)
- }
- // Referer filter
- if req.Referer != "" {
- logger.Infof("Adding referer filter: %s", req.Referer)
- refererQuery := bleve.NewTermQuery(req.Referer)
- refererQuery.SetField("referer")
- queries = append(queries, refererQuery)
- }
- // Browser filter
- if req.Browser != "" {
- logger.Infof("Adding browser filter: %s", req.Browser)
- browsers := strings.Split(req.Browser, ",")
- var browserQueries []query.Query
- for _, browser := range browsers {
- browser = strings.TrimSpace(browser)
- if browser != "" {
- browserQuery := bleve.NewMatchQuery(browser)
- browserQuery.SetField("browser")
- browserQueries = append(browserQueries, browserQuery)
- }
- }
- if len(browserQueries) == 1 {
- queries = append(queries, browserQueries[0])
- } else if len(browserQueries) > 1 {
- orQuery := bleve.NewDisjunctionQuery(browserQueries...)
- queries = append(queries, orQuery)
- }
- }
- // OS filter
- if req.OS != "" {
- logger.Infof("Adding OS filter: %s", req.OS)
- oses := strings.Split(req.OS, ",")
- var osQueries []query.Query
- for _, os := range oses {
- os = strings.TrimSpace(os)
- if os != "" {
- osQuery := bleve.NewMatchQuery(os)
- osQuery.SetField("os")
- osQueries = append(osQueries, osQuery)
- }
- }
- if len(osQueries) == 1 {
- queries = append(queries, osQueries[0])
- } else if len(osQueries) > 1 {
- orQuery := bleve.NewDisjunctionQuery(osQueries...)
- queries = append(queries, orQuery)
- }
- }
- // Device filter
- if req.Device != "" {
- logger.Infof("Adding device filter: %s", req.Device)
- devices := strings.Split(req.Device, ",")
- var deviceQueries []query.Query
- for _, device := range devices {
- device = strings.TrimSpace(device)
- if device != "" {
- deviceQuery := bleve.NewMatchQuery(device)
- deviceQuery.SetField("device_type")
- deviceQueries = append(deviceQueries, deviceQuery)
- }
- }
- if len(deviceQueries) == 1 {
- queries = append(queries, deviceQueries[0])
- } else if len(deviceQueries) > 1 {
- orQuery := bleve.NewDisjunctionQuery(deviceQueries...)
- queries = append(queries, orQuery)
- }
- }
- // Log path filter (file_path field)
- if req.LogPath != "" {
- logger.Infof("Adding log path filter: %s", req.LogPath)
- filePathQuery := bleve.NewMatchQuery(req.LogPath)
- filePathQuery.SetField("file_path")
- queries = append(queries, filePathQuery)
- }
- // Combine all queries
- if len(queries) == 0 {
- return bleve.NewMatchAllQuery()
- } else if len(queries) == 1 {
- return queries[0]
- } else {
- return bleve.NewConjunctionQuery(queries...)
- }
- }
- // getStringField safely gets a string field from search results
- func (li *LogIndexer) getStringField(fields map[string]interface{}, fieldName string) string {
- if value, ok := fields[fieldName]; ok {
- return cast.ToString(value)
- }
- return ""
- }
- // getFloatField safely gets a float field from search results
- func (li *LogIndexer) getFloatField(fields map[string]interface{}, fieldName string) float64 {
- if value, ok := fields[fieldName]; ok {
- return cast.ToFloat64(value)
- }
- return 0
- }
- // convertHitToEntry converts a Bleve search hit to an AccessLogEntry
- func (li *LogIndexer) convertHitToEntry(hit interface{}) *AccessLogEntry {
- // Try different type assertions for Bleve v2
- switch h := hit.(type) {
- case *search.DocumentMatch:
- entry := &AccessLogEntry{}
- // Extract fields from the hit
- if fields := h.Fields; fields != nil {
- entry.IP = li.getStringField(fields, "ip")
- entry.RegionCode = li.getStringField(fields, "region_code")
- entry.Province = li.getStringField(fields, "province")
- entry.City = li.getStringField(fields, "city")
- entry.Method = li.getStringField(fields, "method")
- entry.Path = li.getStringField(fields, "path")
- entry.Protocol = li.getStringField(fields, "protocol")
- entry.Referer = li.getStringField(fields, "referer")
- entry.UserAgent = li.getStringField(fields, "user_agent")
- entry.Browser = li.getStringField(fields, "browser")
- entry.BrowserVer = li.getStringField(fields, "browser_version")
- entry.OS = li.getStringField(fields, "os")
- entry.OSVersion = li.getStringField(fields, "os_version")
- entry.DeviceType = li.getStringField(fields, "device_type")
- entry.Raw = li.getStringField(fields, "raw")
- // Handle numeric fields
- if statusFloat := li.getFloatField(fields, "status"); statusFloat > 0 {
- entry.Status = int(statusFloat)
- }
- if bytesSent := li.getFloatField(fields, "bytes_sent"); bytesSent > 0 {
- entry.BytesSent = int64(bytesSent)
- }
- entry.RequestTime = li.getFloatField(fields, "request_time")
- // Handle timestamp
- if timestampStr := li.getStringField(fields, "timestamp"); timestampStr != "" {
- if ts, err := time.Parse(time.RFC3339, timestampStr); err == nil {
- entry.Timestamp = ts
- }
- }
- } else {
- logger.Warnf("Hit has no fields: %+v", h)
- }
- return entry
- default:
- logger.Errorf("Unknown hit type: %T, content: %+v", hit, hit)
- return nil
- }
- }
- // createCacheKey creates a cache key for the given query request
- func (li *LogIndexer) createCacheKey(req *QueryRequest) string {
- // Include all search parameters in cache key
- statusStr := ""
- if len(req.Status) > 0 {
- statusStr = fmt.Sprintf("%v", req.Status)
- }
- return fmt.Sprintf("search_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%d_%d_%s_%s",
- req.StartTime.Format("20060102150405"),
- req.EndTime.Format("20060102150405"),
- req.Query,
- req.IP,
- req.Method,
- req.Path,
- req.UserAgent,
- req.Referer,
- req.Browser,
- req.OS,
- req.Device,
- req.LogPath,
- statusStr,
- req.Limit,
- req.Offset,
- req.SortBy,
- req.SortOrder,
- )
- }
- // mapSortField maps frontend sort field names to Bleve index field names
- func (li *LogIndexer) mapSortField(sortBy string) string {
- // Map frontend field names to Bleve index field names
- switch sortBy {
- case "timestamp":
- return "timestamp"
- case "ip":
- return "ip"
- case "method":
- return "method"
- case "path":
- return "path"
- case "status":
- return "status"
- case "bytes_sent":
- return "bytes_sent"
- case "browser":
- return "browser"
- case "os":
- return "os"
- case "device_type":
- return "device_type"
- default:
- // Default to timestamp if unknown field
- return "timestamp"
- }
- }
|