batch_search_optimizer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/blevesearch/bleve/v2"
  9. "github.com/blevesearch/bleve/v2/search/query"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // BatchSearchOptimizer handles multiple search requests efficiently
  13. type BatchSearchOptimizer struct {
  14. searchQuery *OptimizedSearchQuery
  15. index bleve.Index
  16. batchSize int
  17. workerCount int
  18. requestTimeout time.Duration
  19. // Batch processing channels
  20. requestChannel chan *BatchSearchRequest
  21. responseChannel chan *BatchSearchResponse
  22. errorChannel chan error
  23. // Control channels
  24. stopChannel chan struct{}
  25. wg sync.WaitGroup
  26. // Request batching
  27. requestBatch []*BatchSearchRequest
  28. batchMutex sync.Mutex
  29. batchTimer *time.Timer
  30. batchInterval time.Duration
  31. // Statistics
  32. totalBatches int64
  33. totalRequests int64
  34. avgBatchSize float64
  35. mu sync.RWMutex
  36. }
  37. // BatchSearchRequest represents a single search request in a batch
  38. type BatchSearchRequest struct {
  39. ID string
  40. Request *QueryRequest
  41. Context context.Context
  42. ResponseChannel chan *BatchSearchResponse
  43. StartTime time.Time
  44. }
  45. // BatchSearchResponse represents the response for a batch search request
  46. type BatchSearchResponse struct {
  47. ID string
  48. Result *QueryResult
  49. Error error
  50. Duration time.Duration
  51. }
  52. // BatchOptimizationResult contains results from batch optimization
  53. type BatchOptimizationResult struct {
  54. Requests []*BatchSearchRequest
  55. OptimizedQuery query.Query
  56. SharedFilters map[string]interface{}
  57. CanBatch bool
  58. }
  59. // BatchSearchConfig holds configuration for batch search optimization
  60. type BatchSearchConfig struct {
  61. SearchQuery *OptimizedSearchQuery
  62. Index bleve.Index
  63. BatchSize int
  64. WorkerCount int
  65. BatchInterval time.Duration
  66. RequestTimeout time.Duration
  67. }
  68. // NewBatchSearchOptimizer creates a new batch search optimizer
  69. func NewBatchSearchOptimizer(config *BatchSearchConfig) *BatchSearchOptimizer {
  70. // Set defaults
  71. if config.BatchSize == 0 {
  72. config.BatchSize = 10
  73. }
  74. if config.WorkerCount == 0 {
  75. config.WorkerCount = runtime.NumCPU()
  76. }
  77. if config.BatchInterval == 0 {
  78. config.BatchInterval = 50 * time.Millisecond
  79. }
  80. if config.RequestTimeout == 0 {
  81. config.RequestTimeout = 30 * time.Second
  82. }
  83. bso := &BatchSearchOptimizer{
  84. searchQuery: config.SearchQuery,
  85. index: config.Index,
  86. batchSize: config.BatchSize,
  87. workerCount: config.WorkerCount,
  88. requestTimeout: config.RequestTimeout,
  89. batchInterval: config.BatchInterval,
  90. requestChannel: make(chan *BatchSearchRequest, config.BatchSize*2),
  91. responseChannel: make(chan *BatchSearchResponse, config.BatchSize*2),
  92. errorChannel: make(chan error, config.WorkerCount),
  93. stopChannel: make(chan struct{}),
  94. requestBatch: make([]*BatchSearchRequest, 0, config.BatchSize),
  95. }
  96. // Start batch processing workers
  97. bso.startWorkers()
  98. return bso
  99. }
  100. // startWorkers starts the batch processing workers
  101. func (bso *BatchSearchOptimizer) startWorkers() {
  102. // Start batch collector
  103. bso.wg.Add(1)
  104. go bso.batchCollector()
  105. // Start batch processors
  106. for i := 0; i < bso.workerCount; i++ {
  107. bso.wg.Add(1)
  108. go bso.batchProcessor(i)
  109. }
  110. logger.Infof("Started batch search optimizer with %d workers, batch size %d",
  111. bso.workerCount, bso.batchSize)
  112. }
  113. // SearchAsync submits a search request for batch processing
  114. func (bso *BatchSearchOptimizer) SearchAsync(ctx context.Context, req *QueryRequest) (*QueryResult, error) {
  115. // Create batch request
  116. batchReq := &BatchSearchRequest{
  117. ID: fmt.Sprintf("req_%d_%d", time.Now().UnixNano(), len(req.Query)),
  118. Request: req,
  119. Context: ctx,
  120. ResponseChannel: make(chan *BatchSearchResponse, 1),
  121. StartTime: time.Now(),
  122. }
  123. // Submit request
  124. select {
  125. case bso.requestChannel <- batchReq:
  126. case <-ctx.Done():
  127. return nil, ctx.Err()
  128. case <-time.After(bso.requestTimeout):
  129. return nil, fmt.Errorf("request submission timeout")
  130. }
  131. // Wait for response
  132. select {
  133. case response := <-batchReq.ResponseChannel:
  134. if response.Error != nil {
  135. return nil, response.Error
  136. }
  137. return response.Result, nil
  138. case <-ctx.Done():
  139. return nil, ctx.Err()
  140. case <-time.After(bso.requestTimeout):
  141. return nil, fmt.Errorf("request processing timeout")
  142. }
  143. }
  144. // batchCollector collects individual requests into batches
  145. func (bso *BatchSearchOptimizer) batchCollector() {
  146. defer bso.wg.Done()
  147. bso.batchTimer = time.NewTimer(bso.batchInterval)
  148. defer bso.batchTimer.Stop()
  149. for {
  150. select {
  151. case req := <-bso.requestChannel:
  152. bso.batchMutex.Lock()
  153. bso.requestBatch = append(bso.requestBatch, req)
  154. shouldProcess := len(bso.requestBatch) >= bso.batchSize
  155. bso.batchMutex.Unlock()
  156. if shouldProcess {
  157. bso.processBatch()
  158. } else if len(bso.requestBatch) == 1 {
  159. // First request in batch, reset timer
  160. bso.batchTimer.Reset(bso.batchInterval)
  161. }
  162. case <-bso.batchTimer.C:
  163. bso.processBatch()
  164. case <-bso.stopChannel:
  165. // Process final batch
  166. bso.processBatch()
  167. return
  168. }
  169. }
  170. }
  171. // processBatch processes the current batch of requests
  172. func (bso *BatchSearchOptimizer) processBatch() {
  173. bso.batchMutex.Lock()
  174. if len(bso.requestBatch) == 0 {
  175. bso.batchMutex.Unlock()
  176. return
  177. }
  178. // Copy batch and reset
  179. batch := make([]*BatchSearchRequest, len(bso.requestBatch))
  180. copy(batch, bso.requestBatch)
  181. bso.requestBatch = bso.requestBatch[:0]
  182. bso.batchMutex.Unlock()
  183. // Send batch for processing
  184. select {
  185. case bso.responseChannel <- &BatchSearchResponse{ID: "batch", Error: fmt.Errorf("batch_marker")}:
  186. // Send individual requests
  187. for _, req := range batch {
  188. select {
  189. case bso.responseChannel <- &BatchSearchResponse{ID: req.ID, Error: fmt.Errorf("process_individual")}:
  190. case <-bso.stopChannel:
  191. return
  192. }
  193. }
  194. case <-bso.stopChannel:
  195. return
  196. }
  197. // Update statistics
  198. bso.mu.Lock()
  199. bso.totalBatches++
  200. bso.totalRequests += int64(len(batch))
  201. bso.avgBatchSize = float64(bso.totalRequests) / float64(bso.totalBatches)
  202. bso.mu.Unlock()
  203. }
  204. // batchProcessor processes batches of search requests
  205. func (bso *BatchSearchOptimizer) batchProcessor(workerID int) {
  206. defer bso.wg.Done()
  207. for {
  208. select {
  209. case response := <-bso.responseChannel:
  210. if response.Error != nil && response.Error.Error() == "batch_marker" {
  211. // Process individual requests in this batch
  212. bso.processIndividualRequests(workerID)
  213. }
  214. case <-bso.stopChannel:
  215. return
  216. }
  217. }
  218. }
  219. // processIndividualRequests processes individual requests (fallback when batching not beneficial)
  220. func (bso *BatchSearchOptimizer) processIndividualRequests(workerID int) {
  221. for {
  222. select {
  223. case response := <-bso.responseChannel:
  224. if response.Error != nil && response.Error.Error() == "process_individual" {
  225. // This would process individual requests
  226. // For now, we'll just acknowledge
  227. continue
  228. }
  229. case <-time.After(10 * time.Millisecond):
  230. // No more individual requests in this batch
  231. return
  232. case <-bso.stopChannel:
  233. return
  234. }
  235. }
  236. }
  237. // optimizeBatch analyzes a batch of requests and determines optimization strategies
  238. func (bso *BatchSearchOptimizer) optimizeBatch(requests []*BatchSearchRequest) *BatchOptimizationResult {
  239. result := &BatchOptimizationResult{
  240. Requests: requests,
  241. SharedFilters: make(map[string]interface{}),
  242. CanBatch: false,
  243. }
  244. if len(requests) <= 1 {
  245. return result
  246. }
  247. // Analyze requests for common patterns
  248. commonTimeRange := bso.findCommonTimeRange(requests)
  249. commonFilters := bso.findCommonFilters(requests)
  250. // Determine if batching is beneficial
  251. if len(commonFilters) > 0 || commonTimeRange != nil {
  252. result.CanBatch = true
  253. result.SharedFilters = commonFilters
  254. if commonTimeRange != nil {
  255. result.SharedFilters["time_range"] = commonTimeRange
  256. }
  257. // Build optimized batch query
  258. result.OptimizedQuery = bso.buildBatchQuery(requests, commonFilters, commonTimeRange)
  259. }
  260. return result
  261. }
  262. // findCommonTimeRange finds a common time range across requests
  263. func (bso *BatchSearchOptimizer) findCommonTimeRange(requests []*BatchSearchRequest) *BatchTimeRange {
  264. if len(requests) == 0 {
  265. return nil
  266. }
  267. var minStart, maxEnd time.Time
  268. hasTimeRange := false
  269. for _, req := range requests {
  270. if req.Request.StartTime != 0 && req.Request.EndTime != 0 {
  271. if !hasTimeRange {
  272. minStart = time.Unix(req.Request.StartTime, 0)
  273. maxEnd = time.Unix(req.Request.EndTime, 0)
  274. hasTimeRange = true
  275. } else {
  276. reqStartTime := time.Unix(req.Request.StartTime, 0)
  277. if reqStartTime.Before(minStart) {
  278. minStart = reqStartTime
  279. }
  280. reqEndTime := time.Unix(req.Request.EndTime, 0)
  281. if reqEndTime.After(maxEnd) {
  282. maxEnd = reqEndTime
  283. }
  284. }
  285. }
  286. }
  287. if !hasTimeRange {
  288. return nil
  289. }
  290. // Check if the combined time range is reasonable
  291. if maxEnd.Sub(minStart) > 24*time.Hour {
  292. return nil // Too wide to be beneficial
  293. }
  294. return &BatchTimeRange{
  295. Start: minStart,
  296. End: maxEnd,
  297. }
  298. }
  299. // findCommonFilters finds filters that appear in multiple requests
  300. func (bso *BatchSearchOptimizer) findCommonFilters(requests []*BatchSearchRequest) map[string]interface{} {
  301. commonFilters := make(map[string]interface{})
  302. filterCounts := make(map[string]int)
  303. // Count filter occurrences
  304. for _, req := range requests {
  305. if req.Request.Method != "" {
  306. filterCounts["method"]++
  307. }
  308. if req.Request.IP != "" {
  309. filterCounts["ip"]++
  310. }
  311. if len(req.Request.Status) > 0 {
  312. filterCounts["status"]++
  313. }
  314. if req.Request.Browser != "" {
  315. filterCounts["browser"]++
  316. }
  317. if req.Request.OS != "" {
  318. filterCounts["os"]++
  319. }
  320. }
  321. // Identify common filters (appear in > 50% of requests)
  322. threshold := len(requests) / 2
  323. for filter, count := range filterCounts {
  324. if count > threshold {
  325. // Find the most common value for this filter
  326. commonValue := bso.findMostCommonValue(requests, filter)
  327. if commonValue != nil {
  328. commonFilters[filter] = commonValue
  329. }
  330. }
  331. }
  332. return commonFilters
  333. }
  334. // findMostCommonValue finds the most common value for a given filter
  335. func (bso *BatchSearchOptimizer) findMostCommonValue(requests []*BatchSearchRequest, filter string) interface{} {
  336. valueCounts := make(map[string]int)
  337. for _, req := range requests {
  338. var value string
  339. switch filter {
  340. case "method":
  341. value = req.Request.Method
  342. case "ip":
  343. value = req.Request.IP
  344. case "browser":
  345. value = req.Request.Browser
  346. case "os":
  347. value = req.Request.OS
  348. case "status":
  349. if len(req.Request.Status) > 0 {
  350. value = fmt.Sprintf("%d", req.Request.Status[0])
  351. }
  352. }
  353. if value != "" {
  354. valueCounts[value]++
  355. }
  356. }
  357. // Find most common value
  358. maxCount := 0
  359. var mostCommon string
  360. for value, count := range valueCounts {
  361. if count > maxCount {
  362. maxCount = count
  363. mostCommon = value
  364. }
  365. }
  366. if mostCommon != "" {
  367. return mostCommon
  368. }
  369. return nil
  370. }
  371. // buildBatchQuery builds an optimized query for a batch of requests
  372. func (bso *BatchSearchOptimizer) buildBatchQuery(requests []*BatchSearchRequest, commonFilters map[string]interface{}, timeRange *BatchTimeRange) query.Query {
  373. var queries []query.Query
  374. // Add common time range filter
  375. if timeRange != nil {
  376. timeQuery := bleve.NewDateRangeQuery(timeRange.Start, timeRange.End)
  377. timeQuery.SetField("timestamp")
  378. queries = append(queries, timeQuery)
  379. }
  380. // Add common filters
  381. for filter, value := range commonFilters {
  382. switch filter {
  383. case "method":
  384. methodQuery := bleve.NewTermQuery(value.(string))
  385. methodQuery.SetField("method")
  386. queries = append(queries, methodQuery)
  387. case "ip":
  388. ipQuery := bleve.NewTermQuery(value.(string))
  389. ipQuery.SetField("ip")
  390. queries = append(queries, ipQuery)
  391. case "browser":
  392. browserQuery := bleve.NewTermQuery(value.(string))
  393. browserQuery.SetField("browser")
  394. queries = append(queries, browserQuery)
  395. case "os":
  396. osQuery := bleve.NewTermQuery(value.(string))
  397. osQuery.SetField("os")
  398. queries = append(queries, osQuery)
  399. }
  400. }
  401. // Create individual request queries and combine with OR
  402. individualQueries := make([]query.Query, 0, len(requests))
  403. for _, req := range requests {
  404. // Build query for individual request with remaining filters
  405. reqQuery := bso.buildIndividualRequestQuery(req.Request, commonFilters)
  406. if reqQuery != nil {
  407. individualQueries = append(individualQueries, reqQuery)
  408. }
  409. }
  410. // Combine all queries
  411. if len(queries) == 0 && len(individualQueries) == 0 {
  412. return bleve.NewMatchAllQuery()
  413. }
  414. if len(individualQueries) > 0 {
  415. orQuery := bleve.NewDisjunctionQuery(individualQueries...)
  416. queries = append(queries, orQuery)
  417. }
  418. if len(queries) == 1 {
  419. return queries[0]
  420. }
  421. return bleve.NewConjunctionQuery(queries...)
  422. }
  423. // buildIndividualRequestQuery builds a query for an individual request excluding common filters
  424. func (bso *BatchSearchOptimizer) buildIndividualRequestQuery(req *QueryRequest, commonFilters map[string]interface{}) query.Query {
  425. var queries []query.Query
  426. // Add filters that are not common
  427. if req.Query != "" {
  428. textQuery := bleve.NewMatchQuery(req.Query)
  429. textQuery.SetField("raw")
  430. queries = append(queries, textQuery)
  431. }
  432. if req.Path != "" {
  433. pathQuery := bleve.NewTermQuery(req.Path)
  434. pathQuery.SetField("path")
  435. queries = append(queries, pathQuery)
  436. }
  437. // Add non-common filters
  438. if req.Method != "" && commonFilters["method"] == nil {
  439. methodQuery := bleve.NewTermQuery(req.Method)
  440. methodQuery.SetField("method")
  441. queries = append(queries, methodQuery)
  442. }
  443. if req.IP != "" && commonFilters["ip"] == nil {
  444. ipQuery := bleve.NewTermQuery(req.IP)
  445. ipQuery.SetField("ip")
  446. queries = append(queries, ipQuery)
  447. }
  448. if len(queries) == 0 {
  449. return bleve.NewMatchAllQuery()
  450. }
  451. if len(queries) == 1 {
  452. return queries[0]
  453. }
  454. return bleve.NewConjunctionQuery(queries...)
  455. }
  456. // BatchTimeRange represents a time range for batch optimization
  457. type BatchTimeRange struct {
  458. Start time.Time
  459. End time.Time
  460. }
  461. // GetStatistics returns batch processing statistics
  462. func (bso *BatchSearchOptimizer) GetStatistics() map[string]interface{} {
  463. bso.mu.RLock()
  464. defer bso.mu.RUnlock()
  465. return map[string]interface{}{
  466. "total_batches": bso.totalBatches,
  467. "total_requests": bso.totalRequests,
  468. "avg_batch_size": fmt.Sprintf("%.2f", bso.avgBatchSize),
  469. "batch_size": bso.batchSize,
  470. "worker_count": bso.workerCount,
  471. "batch_interval": bso.batchInterval.String(),
  472. "request_timeout": bso.requestTimeout.String(),
  473. }
  474. }
  475. // Close shuts down the batch search optimizer
  476. func (bso *BatchSearchOptimizer) Close() error {
  477. // Signal all workers to stop
  478. close(bso.stopChannel)
  479. // Wait for all workers to finish
  480. bso.wg.Wait()
  481. // Close channels
  482. close(bso.requestChannel)
  483. close(bso.responseChannel)
  484. close(bso.errorChannel)
  485. logger.Infof("Batch search optimizer closed. Final stats: %+v", bso.GetStatistics())
  486. return nil
  487. }