concurrent_search_processor.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/dgraph-io/ristretto/v2"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // ConcurrentSearchProcessor provides high-performance concurrent search processing
  14. type ConcurrentSearchProcessor struct {
  15. // Core components
  16. index bleve.Index
  17. optimizedQuery *OptimizedSearchQuery
  18. batchOptimizer *BatchSearchOptimizer
  19. cache *ristretto.Cache[string, *CachedSearchResult]
  20. // Concurrency configuration
  21. maxConcurrency int
  22. semaphore chan struct{}
  23. workerPool *sync.Pool
  24. // Request queuing and load balancing
  25. requestQueue chan *ConcurrentSearchRequest
  26. priorityQueue chan *ConcurrentSearchRequest
  27. responseMap *sync.Map
  28. // Circuit breaker and rate limiting
  29. circuitBreaker *CircuitBreaker
  30. rateLimiter *RateLimiter
  31. // Performance monitoring
  32. activeRequests int64
  33. totalRequests int64
  34. totalDuration int64
  35. errorCount int64
  36. timeoutCount int64
  37. // Control channels
  38. stopChannel chan struct{}
  39. wg sync.WaitGroup
  40. // Configuration
  41. config *ConcurrentSearchConfig
  42. }
  43. // ConcurrentSearchRequest represents a concurrent search request
  44. type ConcurrentSearchRequest struct {
  45. ID string
  46. Request *QueryRequest
  47. Context context.Context
  48. Priority RequestPriority
  49. StartTime time.Time
  50. Callback func(*QueryResult, error)
  51. Response chan *ConcurrentSearchResponse
  52. }
  53. // ConcurrentSearchResponse represents the response from concurrent search
  54. type ConcurrentSearchResponse struct {
  55. ID string
  56. Result *QueryResult
  57. Error error
  58. Duration time.Duration
  59. FromCache bool
  60. WorkerID int
  61. }
  62. // RequestPriority defines the priority of search requests
  63. type RequestPriority int
  64. const (
  65. PriorityLow RequestPriority = iota
  66. PriorityNormal
  67. PriorityHigh
  68. PriorityCritical
  69. )
  70. // ConcurrentSearchConfig holds configuration for concurrent search processing
  71. type ConcurrentSearchConfig struct {
  72. Index bleve.Index
  73. Cache *ristretto.Cache[string, *CachedSearchResult]
  74. MaxConcurrency int
  75. QueueSize int
  76. RequestTimeout time.Duration
  77. WorkerTimeout time.Duration
  78. EnableCircuitBreaker bool
  79. EnableRateLimit bool
  80. RateLimit int // requests per second
  81. CircuitBreakerConfig *CircuitBreakerConfig
  82. }
  83. // CircuitBreakerConfig holds circuit breaker configuration
  84. type CircuitBreakerConfig struct {
  85. FailureThreshold int
  86. SuccessThreshold int
  87. Timeout time.Duration
  88. MonitoringPeriod time.Duration
  89. }
  90. // CircuitBreaker implements the circuit breaker pattern
  91. type CircuitBreaker struct {
  92. config *CircuitBreakerConfig
  93. state CircuitBreakerState
  94. failures int64
  95. successes int64
  96. lastFailureTime time.Time
  97. lastStateChange time.Time
  98. mu sync.RWMutex
  99. }
  100. // CircuitBreakerState represents the state of the circuit breaker
  101. type CircuitBreakerState int
  102. const (
  103. CircuitClosed CircuitBreakerState = iota
  104. CircuitOpen
  105. CircuitHalfOpen
  106. )
  107. // RateLimiter implements token bucket rate limiting
  108. type RateLimiter struct {
  109. rate int
  110. capacity int
  111. tokens int
  112. lastRefill time.Time
  113. mu sync.Mutex
  114. }
  115. // NewConcurrentSearchProcessor creates a new concurrent search processor
  116. func NewConcurrentSearchProcessor(config *ConcurrentSearchConfig) (*ConcurrentSearchProcessor, error) {
  117. // Set defaults
  118. if config.MaxConcurrency == 0 {
  119. config.MaxConcurrency = runtime.NumCPU() * 4
  120. }
  121. if config.QueueSize == 0 {
  122. config.QueueSize = config.MaxConcurrency * 10
  123. }
  124. if config.RequestTimeout == 0 {
  125. config.RequestTimeout = 30 * time.Second
  126. }
  127. if config.WorkerTimeout == 0 {
  128. config.WorkerTimeout = 10 * time.Second
  129. }
  130. if config.RateLimit == 0 {
  131. config.RateLimit = 1000 // 1000 requests per second default
  132. }
  133. // Create optimized query processor
  134. optimizedQuery := NewOptimizedSearchQuery(&OptimizedQueryConfig{
  135. Index: config.Index,
  136. Cache: config.Cache,
  137. MaxCacheSize: 256 * 1024 * 1024, // 256MB
  138. CacheTTL: 15 * time.Minute,
  139. MaxResultSize: 50000,
  140. })
  141. // Create batch optimizer
  142. batchOptimizer := NewBatchSearchOptimizer(&BatchSearchConfig{
  143. SearchQuery: optimizedQuery,
  144. Index: config.Index,
  145. BatchSize: 10,
  146. WorkerCount: config.MaxConcurrency / 2,
  147. BatchInterval: 50 * time.Millisecond,
  148. RequestTimeout: config.RequestTimeout,
  149. })
  150. csp := &ConcurrentSearchProcessor{
  151. index: config.Index,
  152. optimizedQuery: optimizedQuery,
  153. batchOptimizer: batchOptimizer,
  154. cache: config.Cache,
  155. maxConcurrency: config.MaxConcurrency,
  156. semaphore: make(chan struct{}, config.MaxConcurrency),
  157. requestQueue: make(chan *ConcurrentSearchRequest, config.QueueSize),
  158. priorityQueue: make(chan *ConcurrentSearchRequest, config.QueueSize/4),
  159. responseMap: &sync.Map{},
  160. stopChannel: make(chan struct{}),
  161. config: config,
  162. workerPool: &sync.Pool{
  163. New: func() interface{} {
  164. return &SearchWorker{
  165. ID: fmt.Sprintf("worker_%d", time.Now().UnixNano()),
  166. }
  167. },
  168. },
  169. }
  170. // Initialize circuit breaker if enabled
  171. if config.EnableCircuitBreaker {
  172. cbConfig := config.CircuitBreakerConfig
  173. if cbConfig == nil {
  174. cbConfig = &CircuitBreakerConfig{
  175. FailureThreshold: 10,
  176. SuccessThreshold: 5,
  177. Timeout: 30 * time.Second,
  178. MonitoringPeriod: 60 * time.Second,
  179. }
  180. }
  181. csp.circuitBreaker = NewCircuitBreaker(cbConfig)
  182. }
  183. // Initialize rate limiter if enabled
  184. if config.EnableRateLimit {
  185. csp.rateLimiter = NewRateLimiter(config.RateLimit, config.RateLimit*2)
  186. }
  187. // Start workers
  188. csp.startWorkers()
  189. return csp, nil
  190. }
  191. // SearchWorker represents a search worker
  192. type SearchWorker struct {
  193. ID string
  194. RequestCount int64
  195. TotalTime time.Duration
  196. }
  197. // startWorkers starts the concurrent search workers
  198. func (csp *ConcurrentSearchProcessor) startWorkers() {
  199. // Start request dispatcher
  200. csp.wg.Add(1)
  201. go csp.requestDispatcher()
  202. // Start worker pool
  203. for i := 0; i < csp.maxConcurrency; i++ {
  204. csp.wg.Add(1)
  205. go csp.searchWorker(i)
  206. }
  207. // Start monitoring goroutine
  208. csp.wg.Add(1)
  209. go csp.performanceMonitor()
  210. logger.Infof("Started concurrent search processor with %d workers", csp.maxConcurrency)
  211. }
  212. // SearchConcurrent performs a concurrent search
  213. func (csp *ConcurrentSearchProcessor) SearchConcurrent(ctx context.Context, req *QueryRequest, priority RequestPriority) (*QueryResult, error) {
  214. // Check rate limiter
  215. if csp.rateLimiter != nil && !csp.rateLimiter.Allow() {
  216. return nil, fmt.Errorf("rate limit exceeded")
  217. }
  218. // Check circuit breaker
  219. if csp.circuitBreaker != nil && !csp.circuitBreaker.Allow() {
  220. return nil, fmt.Errorf("circuit breaker is open")
  221. }
  222. // Create search request
  223. searchReq := &ConcurrentSearchRequest{
  224. ID: fmt.Sprintf("req_%d", time.Now().UnixNano()),
  225. Request: req,
  226. Context: ctx,
  227. Priority: priority,
  228. StartTime: time.Now(),
  229. Response: make(chan *ConcurrentSearchResponse, 1),
  230. }
  231. // Submit request
  232. select {
  233. case csp.priorityQueue <- searchReq:
  234. // High priority request submitted
  235. case csp.requestQueue <- searchReq:
  236. // Normal priority request submitted
  237. case <-ctx.Done():
  238. return nil, ctx.Err()
  239. case <-time.After(csp.config.RequestTimeout):
  240. atomic.AddInt64(&csp.timeoutCount, 1)
  241. return nil, fmt.Errorf("request submission timeout")
  242. }
  243. // Wait for response
  244. select {
  245. case response := <-searchReq.Response:
  246. // Update circuit breaker
  247. if csp.circuitBreaker != nil {
  248. if response.Error != nil {
  249. csp.circuitBreaker.RecordFailure()
  250. } else {
  251. csp.circuitBreaker.RecordSuccess()
  252. }
  253. }
  254. if response.Error != nil {
  255. atomic.AddInt64(&csp.errorCount, 1)
  256. return nil, response.Error
  257. }
  258. return response.Result, nil
  259. case <-ctx.Done():
  260. return nil, ctx.Err()
  261. case <-time.After(csp.config.RequestTimeout):
  262. atomic.AddInt64(&csp.timeoutCount, 1)
  263. return nil, fmt.Errorf("request processing timeout")
  264. }
  265. }
  266. // requestDispatcher dispatches requests to workers based on priority
  267. func (csp *ConcurrentSearchProcessor) requestDispatcher() {
  268. defer csp.wg.Done()
  269. for {
  270. select {
  271. case req := <-csp.priorityQueue:
  272. // High priority request - process immediately
  273. csp.processRequest(req)
  274. case req := <-csp.requestQueue:
  275. // Normal priority request
  276. csp.processRequest(req)
  277. case <-csp.stopChannel:
  278. return
  279. }
  280. }
  281. }
  282. // processRequest processes a search request
  283. func (csp *ConcurrentSearchProcessor) processRequest(req *ConcurrentSearchRequest) {
  284. // Acquire semaphore slot
  285. select {
  286. case csp.semaphore <- struct{}{}:
  287. // Slot acquired, process request
  288. go func() {
  289. defer func() { <-csp.semaphore }()
  290. csp.executeRequest(req)
  291. }()
  292. case <-time.After(csp.config.WorkerTimeout):
  293. // No workers available, return error
  294. req.Response <- &ConcurrentSearchResponse{
  295. ID: req.ID,
  296. Error: fmt.Errorf("no workers available"),
  297. }
  298. case <-csp.stopChannel:
  299. // Shutting down
  300. return
  301. }
  302. }
  303. // executeRequest executes a search request
  304. func (csp *ConcurrentSearchProcessor) executeRequest(req *ConcurrentSearchRequest) {
  305. start := time.Now()
  306. atomic.AddInt64(&csp.activeRequests, 1)
  307. atomic.AddInt64(&csp.totalRequests, 1)
  308. defer func() {
  309. atomic.AddInt64(&csp.activeRequests, -1)
  310. duration := time.Since(start)
  311. atomic.AddInt64(&csp.totalDuration, duration.Nanoseconds())
  312. }()
  313. // Get worker from pool
  314. worker := csp.workerPool.Get().(*SearchWorker)
  315. defer csp.workerPool.Put(worker)
  316. // Execute search using optimized query processor
  317. result, err := csp.optimizedQuery.SearchLogsOptimized(req.Context, req.Request)
  318. // Create response
  319. response := &ConcurrentSearchResponse{
  320. ID: req.ID,
  321. Result: result,
  322. Error: err,
  323. Duration: time.Since(start),
  324. WorkerID: 0, // Use numeric worker ID
  325. }
  326. if result != nil {
  327. response.FromCache = result.FromCache
  328. }
  329. // Send response
  330. select {
  331. case req.Response <- response:
  332. case <-req.Context.Done():
  333. // Request context cancelled
  334. case <-time.After(1 * time.Second):
  335. // Response channel blocked, log warning
  336. logger.Warnf("Response channel blocked for request %s", req.ID)
  337. }
  338. }
  339. // searchWorker is a dedicated search worker (currently using request dispatcher)
  340. func (csp *ConcurrentSearchProcessor) searchWorker(workerID int) {
  341. defer csp.wg.Done()
  342. // This worker is now handled by the request dispatcher
  343. // We keep this for future direct worker implementation if needed
  344. for {
  345. select {
  346. case <-csp.stopChannel:
  347. return
  348. case <-time.After(100 * time.Millisecond):
  349. // Worker heartbeat
  350. continue
  351. }
  352. }
  353. }
  354. // performanceMonitor monitors performance metrics
  355. func (csp *ConcurrentSearchProcessor) performanceMonitor() {
  356. defer csp.wg.Done()
  357. ticker := time.NewTicker(30 * time.Second)
  358. defer ticker.Stop()
  359. for {
  360. select {
  361. case <-ticker.C:
  362. stats := csp.GetStatistics()
  363. logger.Infof("Concurrent search stats: %+v", stats)
  364. case <-csp.stopChannel:
  365. return
  366. }
  367. }
  368. }
  369. // GetStatistics returns performance statistics
  370. func (csp *ConcurrentSearchProcessor) GetStatistics() map[string]interface{} {
  371. active := atomic.LoadInt64(&csp.activeRequests)
  372. total := atomic.LoadInt64(&csp.totalRequests)
  373. totalDur := atomic.LoadInt64(&csp.totalDuration)
  374. errors := atomic.LoadInt64(&csp.errorCount)
  375. timeouts := atomic.LoadInt64(&csp.timeoutCount)
  376. avgDuration := float64(0)
  377. if total > 0 {
  378. avgDuration = float64(totalDur) / float64(total) / 1e6 // Convert to milliseconds
  379. }
  380. errorRate := float64(0)
  381. if total > 0 {
  382. errorRate = float64(errors) / float64(total) * 100
  383. }
  384. stats := map[string]interface{}{
  385. "active_requests": active,
  386. "total_requests": total,
  387. "error_count": errors,
  388. "timeout_count": timeouts,
  389. "error_rate_percent": fmt.Sprintf("%.2f", errorRate),
  390. "avg_duration_ms": fmt.Sprintf("%.2f", avgDuration),
  391. "max_concurrency": csp.maxConcurrency,
  392. "queue_size": len(csp.requestQueue),
  393. "priority_queue_size": len(csp.priorityQueue),
  394. }
  395. // Add circuit breaker stats
  396. if csp.circuitBreaker != nil {
  397. cbStats := csp.circuitBreaker.GetStatistics()
  398. stats["circuit_breaker"] = cbStats
  399. }
  400. // Add rate limiter stats
  401. if csp.rateLimiter != nil {
  402. rlStats := csp.rateLimiter.GetStatistics()
  403. stats["rate_limiter"] = rlStats
  404. }
  405. // Add optimized query stats
  406. if csp.optimizedQuery != nil {
  407. oqStats := csp.optimizedQuery.GetStatistics()
  408. stats["optimized_query"] = oqStats
  409. }
  410. return stats
  411. }
  412. // Close shuts down the concurrent search processor
  413. func (csp *ConcurrentSearchProcessor) Close() error {
  414. // Signal all workers to stop
  415. close(csp.stopChannel)
  416. // Wait for all workers to finish
  417. csp.wg.Wait()
  418. // Close batch optimizer
  419. if csp.batchOptimizer != nil {
  420. csp.batchOptimizer.Close()
  421. }
  422. // Close channels
  423. close(csp.requestQueue)
  424. close(csp.priorityQueue)
  425. logger.Infof("Concurrent search processor closed. Final stats: %+v", csp.GetStatistics())
  426. return nil
  427. }
  428. // Circuit Breaker Implementation
  429. // NewCircuitBreaker creates a new circuit breaker
  430. func NewCircuitBreaker(config *CircuitBreakerConfig) *CircuitBreaker {
  431. return &CircuitBreaker{
  432. config: config,
  433. state: CircuitClosed,
  434. lastStateChange: time.Now(),
  435. }
  436. }
  437. // Allow checks if a request should be allowed through the circuit breaker
  438. func (cb *CircuitBreaker) Allow() bool {
  439. cb.mu.RLock()
  440. defer cb.mu.RUnlock()
  441. switch cb.state {
  442. case CircuitClosed:
  443. return true
  444. case CircuitOpen:
  445. // Check if timeout has passed
  446. if time.Since(cb.lastStateChange) > cb.config.Timeout {
  447. cb.mu.RUnlock()
  448. cb.mu.Lock()
  449. if cb.state == CircuitOpen && time.Since(cb.lastStateChange) > cb.config.Timeout {
  450. cb.state = CircuitHalfOpen
  451. cb.lastStateChange = time.Now()
  452. }
  453. cb.mu.Unlock()
  454. cb.mu.RLock()
  455. return cb.state == CircuitHalfOpen
  456. }
  457. return false
  458. case CircuitHalfOpen:
  459. return true
  460. default:
  461. return false
  462. }
  463. }
  464. // RecordSuccess records a successful operation
  465. func (cb *CircuitBreaker) RecordSuccess() {
  466. cb.mu.Lock()
  467. defer cb.mu.Unlock()
  468. atomic.AddInt64(&cb.successes, 1)
  469. if cb.state == CircuitHalfOpen {
  470. if cb.successes >= int64(cb.config.SuccessThreshold) {
  471. cb.state = CircuitClosed
  472. cb.failures = 0
  473. cb.successes = 0
  474. cb.lastStateChange = time.Now()
  475. }
  476. }
  477. }
  478. // RecordFailure records a failed operation
  479. func (cb *CircuitBreaker) RecordFailure() {
  480. cb.mu.Lock()
  481. defer cb.mu.Unlock()
  482. atomic.AddInt64(&cb.failures, 1)
  483. cb.lastFailureTime = time.Now()
  484. if cb.state == CircuitClosed {
  485. if cb.failures >= int64(cb.config.FailureThreshold) {
  486. cb.state = CircuitOpen
  487. cb.lastStateChange = time.Now()
  488. }
  489. } else if cb.state == CircuitHalfOpen {
  490. cb.state = CircuitOpen
  491. cb.lastStateChange = time.Now()
  492. }
  493. }
  494. // GetStatistics returns circuit breaker statistics
  495. func (cb *CircuitBreaker) GetStatistics() map[string]interface{} {
  496. cb.mu.RLock()
  497. defer cb.mu.RUnlock()
  498. stateStr := "closed"
  499. switch cb.state {
  500. case CircuitOpen:
  501. stateStr = "open"
  502. case CircuitHalfOpen:
  503. stateStr = "half-open"
  504. }
  505. return map[string]interface{}{
  506. "state": stateStr,
  507. "failures": cb.failures,
  508. "successes": cb.successes,
  509. "last_state_change": cb.lastStateChange.Format(time.RFC3339),
  510. "failure_threshold": cb.config.FailureThreshold,
  511. "success_threshold": cb.config.SuccessThreshold,
  512. }
  513. }
  514. // Rate Limiter Implementation
  515. // NewRateLimiter creates a new token bucket rate limiter
  516. func NewRateLimiter(rate, capacity int) *RateLimiter {
  517. return &RateLimiter{
  518. rate: rate,
  519. capacity: capacity,
  520. tokens: capacity,
  521. lastRefill: time.Now(),
  522. }
  523. }
  524. // Allow checks if a request should be allowed
  525. func (rl *RateLimiter) Allow() bool {
  526. rl.mu.Lock()
  527. defer rl.mu.Unlock()
  528. now := time.Now()
  529. // Refill tokens based on time passed
  530. elapsed := now.Sub(rl.lastRefill)
  531. tokensToAdd := int(elapsed.Seconds() * float64(rl.rate))
  532. if tokensToAdd > 0 {
  533. rl.tokens += tokensToAdd
  534. if rl.tokens > rl.capacity {
  535. rl.tokens = rl.capacity
  536. }
  537. rl.lastRefill = now
  538. }
  539. // Check if we have tokens available
  540. if rl.tokens > 0 {
  541. rl.tokens--
  542. return true
  543. }
  544. return false
  545. }
  546. // GetStatistics returns rate limiter statistics
  547. func (rl *RateLimiter) GetStatistics() map[string]interface{} {
  548. rl.mu.Lock()
  549. defer rl.mu.Unlock()
  550. return map[string]interface{}{
  551. "rate": rl.rate,
  552. "capacity": rl.capacity,
  553. "tokens": rl.tokens,
  554. "last_refill": rl.lastRefill.Format(time.RFC3339),
  555. }
  556. }