indexer_search.go 12 KB


  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/blevesearch/bleve/v2"
  8. "github.com/blevesearch/bleve/v2/search"
  9. "github.com/blevesearch/bleve/v2/search/query"
  10. "github.com/spf13/cast"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // SearchLogs searches for log entries matching the given criteria
  14. func (li *LogIndexer) SearchLogs(ctx context.Context, req *QueryRequest) (*QueryResult, error) {
  15. start := time.Now()
  16. // Create cache key
  17. cacheKey := li.createCacheKey(req)
  18. // Check cache first
  19. if cached, found := li.cache.Get(cacheKey); found {
  20. // Calculate summary statistics from cache (we still need to do this since cache doesn't store summary)
  21. summaryStats, err := li.calculateSummaryStatsFromQuery(ctx, li.buildSearchQuery(req))
  22. if err != nil {
  23. logger.Warnf("Failed to calculate summary statistics from cache: %v", err)
  24. summaryStats = &SummaryStats{}
  25. }
  26. return &QueryResult{
  27. Entries: cached.Entries,
  28. Total: cached.Total,
  29. Took: time.Since(start),
  30. Summary: summaryStats,
  31. }, nil
  32. }
  33. // Build search query
  34. query := li.buildSearchQuery(req)
  35. // Create search request
  36. searchReq := bleve.NewSearchRequest(query)
  37. // Handle unlimited search (Limit = 0)
  38. if req.Limit == 0 {
  39. searchReq.Size = 10000000 // Very large number for unlimited search
  40. } else {
  41. searchReq.Size = req.Limit
  42. }
  43. searchReq.From = req.Offset
  44. // Set sorting
  45. if req.SortBy != "" {
  46. sortField := li.mapSortField(req.SortBy)
  47. ascending := req.SortOrder == "asc"
  48. searchReq.SortBy([]string{sortField})
  49. if !ascending {
  50. // For descending sort, we need to use negative sorting
  51. // This is a workaround for Bleve v2
  52. searchReq.SortByCustom(search.SortOrder{
  53. &search.SortField{
  54. Field: sortField,
  55. Desc: true,
  56. },
  57. })
  58. } else {
  59. searchReq.SortByCustom(search.SortOrder{
  60. &search.SortField{
  61. Field: sortField,
  62. Desc: false,
  63. },
  64. })
  65. }
  66. logger.Infof("Applying sort: field=%s, order=%s (desc=%v)", sortField, req.SortOrder, !ascending)
  67. } else {
  68. // Default sort by timestamp descending
  69. searchReq.SortByCustom(search.SortOrder{
  70. &search.SortField{
  71. Field: "timestamp",
  72. Desc: true,
  73. },
  74. })
  75. }
  76. // Include all fields in results
  77. searchReq.Fields = []string{"*"}
  78. // Execute search
  79. searchResult, err := li.index.SearchInContext(ctx, searchReq)
  80. if err != nil {
  81. return nil, fmt.Errorf("search failed: %w", err)
  82. }
  83. // Convert search results to AccessLogEntry
  84. entries := make([]*AccessLogEntry, 0, len(searchResult.Hits))
  85. for _, hit := range searchResult.Hits {
  86. entry := li.convertHitToEntry(hit)
  87. if entry != nil {
  88. entries = append(entries, entry)
  89. }
  90. }
  91. // Calculate summary statistics from ALL matching results (not just current page)
  92. summaryStats, err := li.calculateSummaryStatsFromQuery(ctx, query)
  93. if err != nil {
  94. logger.Warnf("Failed to calculate summary statistics: %v", err)
  95. summaryStats = &SummaryStats{} // Return empty stats on error
  96. }
  97. // Cache results with total count
  98. cachedResult := &CachedSearchResult{
  99. Entries: entries,
  100. Total: int(searchResult.Total),
  101. }
  102. li.cache.Set(cacheKey, cachedResult, int64(len(entries)*500+100)) // Estimate 500 bytes per entry + overhead
  103. result := &QueryResult{
  104. Entries: entries,
  105. Total: int(searchResult.Total),
  106. Took: time.Since(start),
  107. Summary: summaryStats,
  108. }
  109. return result, nil
  110. }
  111. // buildSearchQuery builds a Bleve query based on the request parameters
  112. func (li *LogIndexer) buildSearchQuery(req *QueryRequest) query.Query {
  113. var queries []query.Query
  114. // Time range query - only add if we have meaningful time constraints
  115. if !req.StartTime.IsZero() && !req.EndTime.IsZero() {
  116. // Check if the time range is reasonable (not too wide)
  117. if req.EndTime.Sub(req.StartTime) < 400*24*time.Hour { // Less than ~400 days
  118. // Add 1 millisecond to endTime to ensure boundary values are included
  119. // This fixes the issue where records with exact endTime are excluded due to exclusive upper bound
  120. inclusiveEndTime := req.EndTime.Add(1 * time.Millisecond)
  121. logger.Infof("Using time range filter: %s to %s (inclusive)", req.StartTime.Format(time.RFC3339), inclusiveEndTime.Format(time.RFC3339))
  122. timeQuery := bleve.NewDateRangeQuery(req.StartTime, inclusiveEndTime)
  123. timeQuery.SetField("timestamp")
  124. queries = append(queries, timeQuery)
  125. } else {
  126. logger.Infof("Time range too wide (%v), ignoring time filter to search all data", req.EndTime.Sub(req.StartTime))
  127. }
  128. } else {
  129. logger.Infof("No meaningful time range specified, searching all data")
  130. }
  131. // Text search query
  132. if req.Query != "" {
  133. textQuery := bleve.NewMatchQuery(req.Query)
  134. textQuery.SetField("raw")
  135. queries = append(queries, textQuery)
  136. }
  137. // IP filter
  138. if req.IP != "" {
  139. ipQuery := bleve.NewMatchQuery(req.IP)
  140. ipQuery.SetField("ip")
  141. queries = append(queries, ipQuery)
  142. }
  143. // Method filter
  144. if req.Method != "" {
  145. logger.Infof("Adding method filter: %s", req.Method)
  146. methodQuery := bleve.NewMatchQuery(req.Method)
  147. methodQuery.SetField("method")
  148. queries = append(queries, methodQuery)
  149. }
  150. // Status filter
  151. if len(req.Status) > 0 {
  152. logger.Infof("Adding status filter: %v", req.Status)
  153. var statusQueries []query.Query
  154. for _, status := range req.Status {
  155. // Use NumericRangeQuery for exact numeric match
  156. statusFloat := float64(status)
  157. statusQuery := bleve.NewNumericRangeQuery(&statusFloat, &statusFloat)
  158. statusQuery.SetField("status")
  159. statusQueries = append(statusQueries, statusQuery)
  160. }
  161. if len(statusQueries) == 1 {
  162. queries = append(queries, statusQueries[0])
  163. } else {
  164. orQuery := bleve.NewDisjunctionQuery(statusQueries...)
  165. queries = append(queries, orQuery)
  166. }
  167. }
  168. // Path filter
  169. if req.Path != "" {
  170. logger.Infof("Adding path filter: %s", req.Path)
  171. pathQuery := bleve.NewMatchQuery(req.Path)
  172. pathQuery.SetField("path")
  173. queries = append(queries, pathQuery)
  174. }
  175. // User agent filter
  176. if req.UserAgent != "" {
  177. uaQuery := bleve.NewMatchQuery(req.UserAgent)
  178. uaQuery.SetField("user_agent")
  179. queries = append(queries, uaQuery)
  180. }
  181. // Referer filter
  182. if req.Referer != "" {
  183. logger.Infof("Adding referer filter: %s", req.Referer)
  184. refererQuery := bleve.NewTermQuery(req.Referer)
  185. refererQuery.SetField("referer")
  186. queries = append(queries, refererQuery)
  187. }
  188. // Browser filter
  189. if req.Browser != "" {
  190. logger.Infof("Adding browser filter: %s", req.Browser)
  191. browsers := strings.Split(req.Browser, ",")
  192. var browserQueries []query.Query
  193. for _, browser := range browsers {
  194. browser = strings.TrimSpace(browser)
  195. if browser != "" {
  196. browserQuery := bleve.NewMatchQuery(browser)
  197. browserQuery.SetField("browser")
  198. browserQueries = append(browserQueries, browserQuery)
  199. }
  200. }
  201. if len(browserQueries) == 1 {
  202. queries = append(queries, browserQueries[0])
  203. } else if len(browserQueries) > 1 {
  204. orQuery := bleve.NewDisjunctionQuery(browserQueries...)
  205. queries = append(queries, orQuery)
  206. }
  207. }
  208. // OS filter
  209. if req.OS != "" {
  210. logger.Infof("Adding OS filter: %s", req.OS)
  211. oses := strings.Split(req.OS, ",")
  212. var osQueries []query.Query
  213. for _, os := range oses {
  214. os = strings.TrimSpace(os)
  215. if os != "" {
  216. osQuery := bleve.NewMatchQuery(os)
  217. osQuery.SetField("os")
  218. osQueries = append(osQueries, osQuery)
  219. }
  220. }
  221. if len(osQueries) == 1 {
  222. queries = append(queries, osQueries[0])
  223. } else if len(osQueries) > 1 {
  224. orQuery := bleve.NewDisjunctionQuery(osQueries...)
  225. queries = append(queries, orQuery)
  226. }
  227. }
  228. // Device filter
  229. if req.Device != "" {
  230. logger.Infof("Adding device filter: %s", req.Device)
  231. devices := strings.Split(req.Device, ",")
  232. var deviceQueries []query.Query
  233. for _, device := range devices {
  234. device = strings.TrimSpace(device)
  235. if device != "" {
  236. deviceQuery := bleve.NewMatchQuery(device)
  237. deviceQuery.SetField("device_type")
  238. deviceQueries = append(deviceQueries, deviceQuery)
  239. }
  240. }
  241. if len(deviceQueries) == 1 {
  242. queries = append(queries, deviceQueries[0])
  243. } else if len(deviceQueries) > 1 {
  244. orQuery := bleve.NewDisjunctionQuery(deviceQueries...)
  245. queries = append(queries, orQuery)
  246. }
  247. }
  248. // Log path filter (file_path field)
  249. if req.LogPath != "" {
  250. logger.Infof("Adding log path filter: %s", req.LogPath)
  251. filePathQuery := bleve.NewMatchQuery(req.LogPath)
  252. filePathQuery.SetField("file_path")
  253. queries = append(queries, filePathQuery)
  254. }
  255. // Combine all queries
  256. if len(queries) == 0 {
  257. return bleve.NewMatchAllQuery()
  258. } else if len(queries) == 1 {
  259. return queries[0]
  260. } else {
  261. return bleve.NewConjunctionQuery(queries...)
  262. }
  263. }
  264. // getStringField safely gets a string field from search results
  265. func (li *LogIndexer) getStringField(fields map[string]interface{}, fieldName string) string {
  266. if value, ok := fields[fieldName]; ok {
  267. return cast.ToString(value)
  268. }
  269. return ""
  270. }
  271. // getFloatField safely gets a float field from search results
  272. func (li *LogIndexer) getFloatField(fields map[string]interface{}, fieldName string) float64 {
  273. if value, ok := fields[fieldName]; ok {
  274. return cast.ToFloat64(value)
  275. }
  276. return 0
  277. }
  278. // convertHitToEntry converts a Bleve search hit to an AccessLogEntry
  279. func (li *LogIndexer) convertHitToEntry(hit interface{}) *AccessLogEntry {
  280. // Try different type assertions for Bleve v2
  281. switch h := hit.(type) {
  282. case *search.DocumentMatch:
  283. entry := &AccessLogEntry{}
  284. // Extract fields from the hit
  285. if fields := h.Fields; fields != nil {
  286. entry.IP = li.getStringField(fields, "ip")
  287. entry.RegionCode = li.getStringField(fields, "region_code")
  288. entry.Province = li.getStringField(fields, "province")
  289. entry.City = li.getStringField(fields, "city")
  290. entry.Method = li.getStringField(fields, "method")
  291. entry.Path = li.getStringField(fields, "path")
  292. entry.Protocol = li.getStringField(fields, "protocol")
  293. entry.Referer = li.getStringField(fields, "referer")
  294. entry.UserAgent = li.getStringField(fields, "user_agent")
  295. entry.Browser = li.getStringField(fields, "browser")
  296. entry.BrowserVer = li.getStringField(fields, "browser_version")
  297. entry.OS = li.getStringField(fields, "os")
  298. entry.OSVersion = li.getStringField(fields, "os_version")
  299. entry.DeviceType = li.getStringField(fields, "device_type")
  300. entry.Raw = li.getStringField(fields, "raw")
  301. // Handle numeric fields
  302. if statusFloat := li.getFloatField(fields, "status"); statusFloat > 0 {
  303. entry.Status = int(statusFloat)
  304. }
  305. if bytesSent := li.getFloatField(fields, "bytes_sent"); bytesSent > 0 {
  306. entry.BytesSent = int64(bytesSent)
  307. }
  308. entry.RequestTime = li.getFloatField(fields, "request_time")
  309. // Handle timestamp
  310. if timestampStr := li.getStringField(fields, "timestamp"); timestampStr != "" {
  311. if ts, err := time.Parse(time.RFC3339, timestampStr); err == nil {
  312. entry.Timestamp = ts
  313. }
  314. }
  315. } else {
  316. logger.Warnf("Hit has no fields: %+v", h)
  317. }
  318. return entry
  319. default:
  320. logger.Errorf("Unknown hit type: %T, content: %+v", hit, hit)
  321. return nil
  322. }
  323. }
  324. // createCacheKey creates a cache key for the given query request
  325. func (li *LogIndexer) createCacheKey(req *QueryRequest) string {
  326. // Include all search parameters in cache key
  327. statusStr := ""
  328. if len(req.Status) > 0 {
  329. statusStr = fmt.Sprintf("%v", req.Status)
  330. }
  331. return fmt.Sprintf("search_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%s_%d_%d_%s_%s",
  332. req.StartTime.Format("20060102150405"),
  333. req.EndTime.Format("20060102150405"),
  334. req.Query,
  335. req.IP,
  336. req.Method,
  337. req.Path,
  338. req.UserAgent,
  339. req.Referer,
  340. req.Browser,
  341. req.OS,
  342. req.Device,
  343. req.LogPath,
  344. statusStr,
  345. req.Limit,
  346. req.Offset,
  347. req.SortBy,
  348. req.SortOrder,
  349. )
  350. }
  351. // mapSortField maps frontend sort field names to Bleve index field names
  352. func (li *LogIndexer) mapSortField(sortBy string) string {
  353. // Map frontend field names to Bleve index field names
  354. switch sortBy {
  355. case "timestamp":
  356. return "timestamp"
  357. case "ip":
  358. return "ip"
  359. case "method":
  360. return "method"
  361. case "path":
  362. return "path"
  363. case "status":
  364. return "status"
  365. case "bytes_sent":
  366. return "bytes_sent"
  367. case "browser":
  368. return "browser"
  369. case "os":
  370. return "os"
  371. case "device_type":
  372. return "device_type"
  373. default:
  374. // Default to timestamp if unknown field
  375. return "timestamp"
  376. }
  377. }