1
0

batch_search_optimizer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/blevesearch/bleve/v2"
  9. "github.com/blevesearch/bleve/v2/search/query"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // BatchSearchOptimizer handles multiple search requests efficiently
  13. type BatchSearchOptimizer struct {
  14. searchQuery *OptimizedSearchQuery
  15. index bleve.Index
  16. batchSize int
  17. workerCount int
  18. requestTimeout time.Duration
  19. // Batch processing channels
  20. requestChannel chan *BatchSearchRequest
  21. responseChannel chan *BatchSearchResponse
  22. errorChannel chan error
  23. // Control channels
  24. stopChannel chan struct{}
  25. wg sync.WaitGroup
  26. // Request batching
  27. requestBatch []*BatchSearchRequest
  28. batchMutex sync.Mutex
  29. batchTimer *time.Timer
  30. batchInterval time.Duration
  31. // Statistics
  32. totalBatches int64
  33. totalRequests int64
  34. avgBatchSize float64
  35. mu sync.RWMutex
  36. }
  37. // BatchSearchRequest represents a single search request in a batch
  38. type BatchSearchRequest struct {
  39. ID string
  40. Request *QueryRequest
  41. Context context.Context
  42. ResponseChannel chan *BatchSearchResponse
  43. StartTime time.Time
  44. }
  45. // BatchSearchResponse represents the response for a batch search request
  46. type BatchSearchResponse struct {
  47. ID string
  48. Result *QueryResult
  49. Error error
  50. Duration time.Duration
  51. }
  52. // BatchOptimizationResult contains results from batch optimization
  53. type BatchOptimizationResult struct {
  54. Requests []*BatchSearchRequest
  55. OptimizedQuery query.Query
  56. SharedFilters map[string]interface{}
  57. CanBatch bool
  58. }
  59. // BatchSearchConfig holds configuration for batch search optimization
  60. type BatchSearchConfig struct {
  61. SearchQuery *OptimizedSearchQuery
  62. Index bleve.Index
  63. BatchSize int
  64. WorkerCount int
  65. BatchInterval time.Duration
  66. RequestTimeout time.Duration
  67. }
  68. // NewBatchSearchOptimizer creates a new batch search optimizer
  69. func NewBatchSearchOptimizer(config *BatchSearchConfig) *BatchSearchOptimizer {
  70. // Set defaults
  71. if config.BatchSize == 0 {
  72. config.BatchSize = 10
  73. }
  74. if config.WorkerCount == 0 {
  75. config.WorkerCount = runtime.NumCPU()
  76. }
  77. if config.BatchInterval == 0 {
  78. config.BatchInterval = 50 * time.Millisecond
  79. }
  80. if config.RequestTimeout == 0 {
  81. config.RequestTimeout = 30 * time.Second
  82. }
  83. bso := &BatchSearchOptimizer{
  84. searchQuery: config.SearchQuery,
  85. index: config.Index,
  86. batchSize: config.BatchSize,
  87. workerCount: config.WorkerCount,
  88. requestTimeout: config.RequestTimeout,
  89. batchInterval: config.BatchInterval,
  90. requestChannel: make(chan *BatchSearchRequest, config.BatchSize*2),
  91. responseChannel: make(chan *BatchSearchResponse, config.BatchSize*2),
  92. errorChannel: make(chan error, config.WorkerCount),
  93. stopChannel: make(chan struct{}),
  94. requestBatch: make([]*BatchSearchRequest, 0, config.BatchSize),
  95. }
  96. // Start batch processing workers
  97. bso.startWorkers()
  98. return bso
  99. }
  100. // startWorkers starts the batch processing workers
  101. func (bso *BatchSearchOptimizer) startWorkers() {
  102. // Start batch collector
  103. bso.wg.Add(1)
  104. go bso.batchCollector()
  105. // Start batch processors
  106. for i := 0; i < bso.workerCount; i++ {
  107. bso.wg.Add(1)
  108. go bso.batchProcessor(i)
  109. }
  110. logger.Infof("Started batch search optimizer with %d workers, batch size %d",
  111. bso.workerCount, bso.batchSize)
  112. }
  113. // SearchAsync submits a search request for batch processing
  114. func (bso *BatchSearchOptimizer) SearchAsync(ctx context.Context, req *QueryRequest) (*QueryResult, error) {
  115. // Create batch request
  116. batchReq := &BatchSearchRequest{
  117. ID: fmt.Sprintf("req_%d_%d", time.Now().UnixNano(), len(req.Query)),
  118. Request: req,
  119. Context: ctx,
  120. ResponseChannel: make(chan *BatchSearchResponse, 1),
  121. StartTime: time.Now(),
  122. }
  123. // Submit request
  124. select {
  125. case bso.requestChannel <- batchReq:
  126. case <-ctx.Done():
  127. return nil, ctx.Err()
  128. case <-time.After(bso.requestTimeout):
  129. return nil, fmt.Errorf("request submission timeout")
  130. }
  131. // Wait for response
  132. select {
  133. case response := <-batchReq.ResponseChannel:
  134. if response.Error != nil {
  135. return nil, response.Error
  136. }
  137. return response.Result, nil
  138. case <-ctx.Done():
  139. return nil, ctx.Err()
  140. case <-time.After(bso.requestTimeout):
  141. return nil, fmt.Errorf("request processing timeout")
  142. }
  143. }
  144. // batchCollector collects individual requests into batches
  145. func (bso *BatchSearchOptimizer) batchCollector() {
  146. defer bso.wg.Done()
  147. bso.batchTimer = time.NewTimer(bso.batchInterval)
  148. defer bso.batchTimer.Stop()
  149. for {
  150. select {
  151. case req := <-bso.requestChannel:
  152. bso.batchMutex.Lock()
  153. bso.requestBatch = append(bso.requestBatch, req)
  154. shouldProcess := len(bso.requestBatch) >= bso.batchSize
  155. bso.batchMutex.Unlock()
  156. if shouldProcess {
  157. bso.processBatch()
  158. } else if len(bso.requestBatch) == 1 {
  159. // First request in batch, reset timer
  160. bso.batchTimer.Reset(bso.batchInterval)
  161. }
  162. case <-bso.batchTimer.C:
  163. bso.processBatch()
  164. case <-bso.stopChannel:
  165. // Process final batch
  166. bso.processBatch()
  167. return
  168. }
  169. }
  170. }
  171. // processBatch processes the current batch of requests
  172. func (bso *BatchSearchOptimizer) processBatch() {
  173. bso.batchMutex.Lock()
  174. if len(bso.requestBatch) == 0 {
  175. bso.batchMutex.Unlock()
  176. return
  177. }
  178. // Copy batch and reset
  179. batch := make([]*BatchSearchRequest, len(bso.requestBatch))
  180. copy(batch, bso.requestBatch)
  181. bso.requestBatch = bso.requestBatch[:0]
  182. bso.batchMutex.Unlock()
  183. // Send batch for processing
  184. select {
  185. case bso.responseChannel <- &BatchSearchResponse{ID: "batch", Error: fmt.Errorf("batch_marker")}:
  186. // Send individual requests
  187. for _, req := range batch {
  188. select {
  189. case bso.responseChannel <- &BatchSearchResponse{ID: req.ID, Error: fmt.Errorf("process_individual")}:
  190. case <-bso.stopChannel:
  191. return
  192. }
  193. }
  194. case <-bso.stopChannel:
  195. return
  196. }
  197. // Update statistics
  198. bso.mu.Lock()
  199. bso.totalBatches++
  200. bso.totalRequests += int64(len(batch))
  201. bso.avgBatchSize = float64(bso.totalRequests) / float64(bso.totalBatches)
  202. bso.mu.Unlock()
  203. }
  204. // batchProcessor processes batches of search requests
  205. func (bso *BatchSearchOptimizer) batchProcessor(workerID int) {
  206. defer bso.wg.Done()
  207. for {
  208. select {
  209. case response := <-bso.responseChannel:
  210. if response.Error != nil && response.Error.Error() == "batch_marker" {
  211. // Process individual requests in this batch
  212. bso.processIndividualRequests(workerID)
  213. }
  214. case <-bso.stopChannel:
  215. return
  216. }
  217. }
  218. }
  219. // processIndividualRequests processes individual requests (fallback when batching not beneficial)
  220. func (bso *BatchSearchOptimizer) processIndividualRequests(workerID int) {
  221. for {
  222. select {
  223. case response := <-bso.responseChannel:
  224. if response.Error != nil && response.Error.Error() == "process_individual" {
  225. // This would process individual requests
  226. // For now, we'll just acknowledge
  227. continue
  228. }
  229. case <-time.After(10 * time.Millisecond):
  230. // No more individual requests in this batch
  231. return
  232. case <-bso.stopChannel:
  233. return
  234. }
  235. }
  236. }
  237. // optimizeBatch analyzes a batch of requests and determines optimization strategies
  238. func (bso *BatchSearchOptimizer) optimizeBatch(requests []*BatchSearchRequest) *BatchOptimizationResult {
  239. result := &BatchOptimizationResult{
  240. Requests: requests,
  241. SharedFilters: make(map[string]interface{}),
  242. CanBatch: false,
  243. }
  244. if len(requests) <= 1 {
  245. return result
  246. }
  247. // Analyze requests for common patterns
  248. commonTimeRange := bso.findCommonTimeRange(requests)
  249. commonFilters := bso.findCommonFilters(requests)
  250. // Determine if batching is beneficial
  251. if len(commonFilters) > 0 || commonTimeRange != nil {
  252. result.CanBatch = true
  253. result.SharedFilters = commonFilters
  254. if commonTimeRange != nil {
  255. result.SharedFilters["time_range"] = commonTimeRange
  256. }
  257. // Build optimized batch query
  258. result.OptimizedQuery = bso.buildBatchQuery(requests, commonFilters, commonTimeRange)
  259. }
  260. return result
  261. }
  262. // findCommonTimeRange finds a common time range across requests
  263. func (bso *BatchSearchOptimizer) findCommonTimeRange(requests []*BatchSearchRequest) *BatchTimeRange {
  264. if len(requests) == 0 {
  265. return nil
  266. }
  267. var minStart, maxEnd time.Time
  268. hasTimeRange := false
  269. for _, req := range requests {
  270. if !req.Request.StartTime.IsZero() && !req.Request.EndTime.IsZero() {
  271. if !hasTimeRange {
  272. minStart = req.Request.StartTime
  273. maxEnd = req.Request.EndTime
  274. hasTimeRange = true
  275. } else {
  276. if req.Request.StartTime.Before(minStart) {
  277. minStart = req.Request.StartTime
  278. }
  279. if req.Request.EndTime.After(maxEnd) {
  280. maxEnd = req.Request.EndTime
  281. }
  282. }
  283. }
  284. }
  285. if !hasTimeRange {
  286. return nil
  287. }
  288. // Check if the combined time range is reasonable
  289. if maxEnd.Sub(minStart) > 24*time.Hour {
  290. return nil // Too wide to be beneficial
  291. }
  292. return &BatchTimeRange{
  293. Start: minStart,
  294. End: maxEnd,
  295. }
  296. }
  297. // findCommonFilters finds filters that appear in multiple requests
  298. func (bso *BatchSearchOptimizer) findCommonFilters(requests []*BatchSearchRequest) map[string]interface{} {
  299. commonFilters := make(map[string]interface{})
  300. filterCounts := make(map[string]int)
  301. // Count filter occurrences
  302. for _, req := range requests {
  303. if req.Request.Method != "" {
  304. filterCounts["method"]++
  305. }
  306. if req.Request.IP != "" {
  307. filterCounts["ip"]++
  308. }
  309. if len(req.Request.Status) > 0 {
  310. filterCounts["status"]++
  311. }
  312. if req.Request.Browser != "" {
  313. filterCounts["browser"]++
  314. }
  315. if req.Request.OS != "" {
  316. filterCounts["os"]++
  317. }
  318. }
  319. // Identify common filters (appear in > 50% of requests)
  320. threshold := len(requests) / 2
  321. for filter, count := range filterCounts {
  322. if count > threshold {
  323. // Find the most common value for this filter
  324. commonValue := bso.findMostCommonValue(requests, filter)
  325. if commonValue != nil {
  326. commonFilters[filter] = commonValue
  327. }
  328. }
  329. }
  330. return commonFilters
  331. }
  332. // findMostCommonValue finds the most common value for a given filter
  333. func (bso *BatchSearchOptimizer) findMostCommonValue(requests []*BatchSearchRequest, filter string) interface{} {
  334. valueCounts := make(map[string]int)
  335. for _, req := range requests {
  336. var value string
  337. switch filter {
  338. case "method":
  339. value = req.Request.Method
  340. case "ip":
  341. value = req.Request.IP
  342. case "browser":
  343. value = req.Request.Browser
  344. case "os":
  345. value = req.Request.OS
  346. case "status":
  347. if len(req.Request.Status) > 0 {
  348. value = fmt.Sprintf("%d", req.Request.Status[0])
  349. }
  350. }
  351. if value != "" {
  352. valueCounts[value]++
  353. }
  354. }
  355. // Find most common value
  356. maxCount := 0
  357. var mostCommon string
  358. for value, count := range valueCounts {
  359. if count > maxCount {
  360. maxCount = count
  361. mostCommon = value
  362. }
  363. }
  364. if mostCommon != "" {
  365. return mostCommon
  366. }
  367. return nil
  368. }
  369. // buildBatchQuery builds an optimized query for a batch of requests
  370. func (bso *BatchSearchOptimizer) buildBatchQuery(requests []*BatchSearchRequest, commonFilters map[string]interface{}, timeRange *BatchTimeRange) query.Query {
  371. var queries []query.Query
  372. // Add common time range filter
  373. if timeRange != nil {
  374. timeQuery := bleve.NewDateRangeQuery(timeRange.Start, timeRange.End)
  375. timeQuery.SetField("timestamp")
  376. queries = append(queries, timeQuery)
  377. }
  378. // Add common filters
  379. for filter, value := range commonFilters {
  380. switch filter {
  381. case "method":
  382. methodQuery := bleve.NewTermQuery(value.(string))
  383. methodQuery.SetField("method")
  384. queries = append(queries, methodQuery)
  385. case "ip":
  386. ipQuery := bleve.NewTermQuery(value.(string))
  387. ipQuery.SetField("ip")
  388. queries = append(queries, ipQuery)
  389. case "browser":
  390. browserQuery := bleve.NewTermQuery(value.(string))
  391. browserQuery.SetField("browser")
  392. queries = append(queries, browserQuery)
  393. case "os":
  394. osQuery := bleve.NewTermQuery(value.(string))
  395. osQuery.SetField("os")
  396. queries = append(queries, osQuery)
  397. }
  398. }
  399. // Create individual request queries and combine with OR
  400. individualQueries := make([]query.Query, 0, len(requests))
  401. for _, req := range requests {
  402. // Build query for individual request with remaining filters
  403. reqQuery := bso.buildIndividualRequestQuery(req.Request, commonFilters)
  404. if reqQuery != nil {
  405. individualQueries = append(individualQueries, reqQuery)
  406. }
  407. }
  408. // Combine all queries
  409. if len(queries) == 0 && len(individualQueries) == 0 {
  410. return bleve.NewMatchAllQuery()
  411. }
  412. if len(individualQueries) > 0 {
  413. orQuery := bleve.NewDisjunctionQuery(individualQueries...)
  414. queries = append(queries, orQuery)
  415. }
  416. if len(queries) == 1 {
  417. return queries[0]
  418. }
  419. return bleve.NewConjunctionQuery(queries...)
  420. }
  421. // buildIndividualRequestQuery builds a query for an individual request excluding common filters
  422. func (bso *BatchSearchOptimizer) buildIndividualRequestQuery(req *QueryRequest, commonFilters map[string]interface{}) query.Query {
  423. var queries []query.Query
  424. // Add filters that are not common
  425. if req.Query != "" {
  426. textQuery := bleve.NewMatchQuery(req.Query)
  427. textQuery.SetField("raw")
  428. queries = append(queries, textQuery)
  429. }
  430. if req.Path != "" {
  431. pathQuery := bleve.NewTermQuery(req.Path)
  432. pathQuery.SetField("path")
  433. queries = append(queries, pathQuery)
  434. }
  435. // Add non-common filters
  436. if req.Method != "" && commonFilters["method"] == nil {
  437. methodQuery := bleve.NewTermQuery(req.Method)
  438. methodQuery.SetField("method")
  439. queries = append(queries, methodQuery)
  440. }
  441. if req.IP != "" && commonFilters["ip"] == nil {
  442. ipQuery := bleve.NewTermQuery(req.IP)
  443. ipQuery.SetField("ip")
  444. queries = append(queries, ipQuery)
  445. }
  446. if len(queries) == 0 {
  447. return bleve.NewMatchAllQuery()
  448. }
  449. if len(queries) == 1 {
  450. return queries[0]
  451. }
  452. return bleve.NewConjunctionQuery(queries...)
  453. }
  454. // BatchTimeRange represents a time range for batch optimization
  455. type BatchTimeRange struct {
  456. Start time.Time
  457. End time.Time
  458. }
  459. // GetStatistics returns batch processing statistics
  460. func (bso *BatchSearchOptimizer) GetStatistics() map[string]interface{} {
  461. bso.mu.RLock()
  462. defer bso.mu.RUnlock()
  463. return map[string]interface{}{
  464. "total_batches": bso.totalBatches,
  465. "total_requests": bso.totalRequests,
  466. "avg_batch_size": fmt.Sprintf("%.2f", bso.avgBatchSize),
  467. "batch_size": bso.batchSize,
  468. "worker_count": bso.workerCount,
  469. "batch_interval": bso.batchInterval.String(),
  470. "request_timeout": bso.requestTimeout.String(),
  471. }
  472. }
  473. // Close shuts down the batch search optimizer
  474. func (bso *BatchSearchOptimizer) Close() error {
  475. // Signal all workers to stop
  476. close(bso.stopChannel)
  477. // Wait for all workers to finish
  478. bso.wg.Wait()
  479. // Close channels
  480. close(bso.requestChannel)
  481. close(bso.responseChannel)
  482. close(bso.errorChannel)
  483. logger.Infof("Batch search optimizer closed. Final stats: %+v", bso.GetStatistics())
  484. return nil
  485. }