performance.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. package utils
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "unsafe"
  8. )
  9. // StringPool provides efficient string reuse and interning to reduce allocations and memory usage
  10. type StringPool struct {
  11. pool sync.Pool
  12. intern map[string]string // for string interning
  13. mutex sync.RWMutex // for intern map
  14. }
  15. // NewStringPool creates a new string pool
  16. func NewStringPool() *StringPool {
  17. return &StringPool{
  18. pool: sync.Pool{
  19. New: func() interface{} {
  20. b := make([]byte, 0, 1024) // Pre-allocate 1KB
  21. return &b
  22. },
  23. },
  24. intern: make(map[string]string, 10000),
  25. }
  26. }
  27. // Get retrieves a byte buffer from the pool
  28. func (sp *StringPool) Get() []byte {
  29. b := sp.pool.Get().(*[]byte)
  30. return (*b)[:0]
  31. }
  32. // Put returns a byte buffer to the pool
  33. func (sp *StringPool) Put(b []byte) {
  34. if cap(b) < 32*1024 { // Don't keep very large buffers
  35. b = b[:0]
  36. sp.pool.Put(&b)
  37. }
  38. }
  39. // Intern interns a string to reduce memory duplication
  40. func (sp *StringPool) Intern(s string) string {
  41. if s == "" {
  42. return ""
  43. }
  44. sp.mutex.RLock()
  45. if interned, exists := sp.intern[s]; exists {
  46. sp.mutex.RUnlock()
  47. return interned
  48. }
  49. sp.mutex.RUnlock()
  50. sp.mutex.Lock()
  51. defer sp.mutex.Unlock()
  52. // Double-check after acquiring write lock
  53. if interned, exists := sp.intern[s]; exists {
  54. return interned
  55. }
  56. // Don't intern very long strings
  57. if len(s) > 1024 {
  58. return s
  59. }
  60. sp.intern[s] = s
  61. return s
  62. }
  63. // Size returns the number of interned strings
  64. func (sp *StringPool) Size() int {
  65. sp.mutex.RLock()
  66. defer sp.mutex.RUnlock()
  67. return len(sp.intern)
  68. }
  69. // Clear clears the string pool
  70. func (sp *StringPool) Clear() {
  71. sp.mutex.Lock()
  72. defer sp.mutex.Unlock()
  73. sp.intern = make(map[string]string, 10000)
  74. }
  75. // MemoryPool provides memory buffer pooling to reduce GC pressure
  76. type MemoryPool struct {
  77. pools []*sync.Pool
  78. sizes []int
  79. }
  80. // NewMemoryPool creates a memory pool with different buffer sizes
  81. func NewMemoryPool() *MemoryPool {
  82. sizes := []int{64, 256, 1024, 4096, 16384, 65536} // Different buffer sizes
  83. pools := make([]*sync.Pool, len(sizes))
  84. for i, size := range sizes {
  85. s := size // Capture for closure
  86. pools[i] = &sync.Pool{
  87. New: func() interface{} {
  88. b := make([]byte, 0, s)
  89. return &b
  90. },
  91. }
  92. }
  93. return &MemoryPool{
  94. pools: pools,
  95. sizes: sizes,
  96. }
  97. }
  98. // Get retrieves a buffer of appropriate size
  99. func (mp *MemoryPool) Get(minSize int) []byte {
  100. // Find the smallest pool that fits
  101. for i, size := range mp.sizes {
  102. if size >= minSize {
  103. buf := mp.pools[i].Get().(*[]byte)
  104. return (*buf)[:0] // Reset length but keep capacity
  105. }
  106. }
  107. // If no pool fits, allocate directly
  108. return make([]byte, 0, minSize)
  109. }
  110. // Put returns a buffer to the appropriate pool
  111. func (mp *MemoryPool) Put(buf []byte) {
  112. capacity := cap(buf)
  113. // Find the appropriate pool
  114. for i, size := range mp.sizes {
  115. if capacity <= size {
  116. // Reset buffer before returning to pool
  117. buf = buf[:0]
  118. mp.pools[i].Put(&buf)
  119. return
  120. }
  121. }
  122. // Buffer too large, let GC handle it
  123. }
  124. // WorkerPool provides optimized worker management
  125. type WorkerPool struct {
  126. workers []Worker
  127. workChan chan func()
  128. closeChan chan struct{}
  129. wg sync.WaitGroup
  130. }
  131. // Worker represents a worker goroutine
  132. type Worker struct {
  133. ID int
  134. workChan chan func()
  135. }
  136. // NewWorkerPool creates an optimized worker pool
  137. func NewWorkerPool(numWorkers int, queueSize int) *WorkerPool {
  138. pool := &WorkerPool{
  139. workers: make([]Worker, numWorkers),
  140. workChan: make(chan func(), queueSize),
  141. closeChan: make(chan struct{}),
  142. }
  143. // Start workers
  144. for i := 0; i < numWorkers; i++ {
  145. pool.workers[i] = Worker{
  146. ID: i,
  147. workChan: pool.workChan,
  148. }
  149. pool.wg.Add(1)
  150. go pool.runWorker(i)
  151. }
  152. return pool
  153. }
  154. // runWorker runs a single worker
  155. func (wp *WorkerPool) runWorker(id int) {
  156. defer wp.wg.Done()
  157. for {
  158. select {
  159. case work := <-wp.workChan:
  160. if work != nil {
  161. work()
  162. }
  163. case <-wp.closeChan:
  164. return
  165. }
  166. }
  167. }
  168. // Submit submits work to the pool
  169. func (wp *WorkerPool) Submit(work func()) bool {
  170. select {
  171. case wp.workChan <- work:
  172. return true
  173. default:
  174. return false // Pool is full
  175. }
  176. }
  177. // Close closes the worker pool
  178. func (wp *WorkerPool) Close() {
  179. close(wp.closeChan)
  180. wp.wg.Wait()
  181. }
  182. // BatchProcessor provides efficient batch processing
  183. type BatchProcessor struct {
  184. items []interface{}
  185. capacity int
  186. mutex sync.Mutex
  187. }
  188. // NewBatchProcessor creates a batch processor
  189. func NewBatchProcessor(capacity int) *BatchProcessor {
  190. return &BatchProcessor{
  191. items: make([]interface{}, 0, capacity),
  192. capacity: capacity,
  193. }
  194. }
  195. // Add adds an item to the batch
  196. func (bp *BatchProcessor) Add(item interface{}) bool {
  197. bp.mutex.Lock()
  198. defer bp.mutex.Unlock()
  199. if len(bp.items) >= bp.capacity {
  200. return false
  201. }
  202. bp.items = append(bp.items, item)
  203. return true
  204. }
  205. // GetBatch returns and clears the current batch
  206. func (bp *BatchProcessor) GetBatch() []interface{} {
  207. bp.mutex.Lock()
  208. defer bp.mutex.Unlock()
  209. if len(bp.items) == 0 {
  210. return nil
  211. }
  212. batch := make([]interface{}, len(bp.items))
  213. copy(batch, bp.items)
  214. bp.items = bp.items[:0] // Reset slice
  215. return batch
  216. }
  217. // Size returns current batch size
  218. func (bp *BatchProcessor) Size() int {
  219. bp.mutex.Lock()
  220. defer bp.mutex.Unlock()
  221. return len(bp.items)
  222. }
  223. // MemoryOptimizer provides memory usage optimization
  224. type MemoryOptimizer struct {
  225. gcThreshold int64 // Bytes
  226. lastGC time.Time
  227. memStats runtime.MemStats
  228. forceGCEnabled bool
  229. }
  230. // NewMemoryOptimizer creates a memory optimizer
  231. func NewMemoryOptimizer(gcThreshold int64) *MemoryOptimizer {
  232. if gcThreshold <= 0 {
  233. gcThreshold = 512 * 1024 * 1024 // Default 512MB
  234. }
  235. return &MemoryOptimizer{
  236. gcThreshold: gcThreshold,
  237. forceGCEnabled: true,
  238. }
  239. }
  240. // CheckMemoryUsage checks memory usage and triggers GC if needed
  241. func (mo *MemoryOptimizer) CheckMemoryUsage() {
  242. if !mo.forceGCEnabled {
  243. return
  244. }
  245. runtime.ReadMemStats(&mo.memStats)
  246. // Check if we should force GC
  247. if mo.memStats.Alloc > uint64(mo.gcThreshold) && time.Since(mo.lastGC) > 30*time.Second {
  248. runtime.GC()
  249. mo.lastGC = time.Now()
  250. }
  251. }
  252. // MemoryStats represents memory statistics
  253. type MemoryStats struct {
  254. AllocMB float64 `json:"alloc_mb"`
  255. SysMB float64 `json:"sys_mb"`
  256. HeapAllocMB float64 `json:"heap_alloc_mb"`
  257. HeapSysMB float64 `json:"heap_sys_mb"`
  258. GCCount uint32 `json:"gc_count"`
  259. LastGCNs uint64 `json:"last_gc_ns"`
  260. GCCPUPercent float64 `json:"gc_cpu_percent"`
  261. }
  262. // GetMemoryStats returns current memory statistics
  263. func (mo *MemoryOptimizer) GetMemoryStats() *MemoryStats {
  264. runtime.ReadMemStats(&mo.memStats)
  265. return &MemoryStats{
  266. AllocMB: float64(mo.memStats.Alloc) / 1024 / 1024,
  267. SysMB: float64(mo.memStats.Sys) / 1024 / 1024,
  268. HeapAllocMB: float64(mo.memStats.HeapAlloc) / 1024 / 1024,
  269. HeapSysMB: float64(mo.memStats.HeapSys) / 1024 / 1024,
  270. GCCount: mo.memStats.NumGC,
  271. LastGCNs: mo.memStats.LastGC,
  272. GCCPUPercent: mo.memStats.GCCPUFraction * 100,
  273. }
  274. }
  275. // PerformanceMetrics tracks general performance metrics
  276. type PerformanceMetrics struct {
  277. operationCount int64
  278. processedItems int64
  279. processTime int64 // nanoseconds
  280. allocationCount int64
  281. allocationSize int64
  282. cacheHits int64
  283. cacheMisses int64
  284. errorCount int64
  285. }
  286. // NewPerformanceMetrics creates performance metrics tracker
  287. func NewPerformanceMetrics() *PerformanceMetrics {
  288. return &PerformanceMetrics{}
  289. }
  290. // RecordOperation records operation metrics
  291. func (pm *PerformanceMetrics) RecordOperation(itemCount int, duration time.Duration, success bool) {
  292. atomic.AddInt64(&pm.operationCount, 1)
  293. atomic.AddInt64(&pm.processedItems, int64(itemCount))
  294. atomic.AddInt64(&pm.processTime, int64(duration))
  295. if !success {
  296. atomic.AddInt64(&pm.errorCount, 1)
  297. }
  298. }
  299. // RecordCacheHit records cache hit
  300. func (pm *PerformanceMetrics) RecordCacheHit() {
  301. atomic.AddInt64(&pm.cacheHits, 1)
  302. }
  303. // RecordCacheMiss records cache miss
  304. func (pm *PerformanceMetrics) RecordCacheMiss() {
  305. atomic.AddInt64(&pm.cacheMisses, 1)
  306. }
  307. // RecordAllocation records memory allocation
  308. func (pm *PerformanceMetrics) RecordAllocation(size int64) {
  309. atomic.AddInt64(&pm.allocationCount, 1)
  310. atomic.AddInt64(&pm.allocationSize, size)
  311. }
  312. // GetMetrics returns current metrics snapshot
  313. func (pm *PerformanceMetrics) GetMetrics() map[string]interface{} {
  314. operations := atomic.LoadInt64(&pm.operationCount)
  315. items := atomic.LoadInt64(&pm.processedItems)
  316. timeNs := atomic.LoadInt64(&pm.processTime)
  317. hits := atomic.LoadInt64(&pm.cacheHits)
  318. misses := atomic.LoadInt64(&pm.cacheMisses)
  319. errors := atomic.LoadInt64(&pm.errorCount)
  320. metrics := make(map[string]interface{})
  321. metrics["operation_count"] = operations
  322. metrics["processed_items"] = items
  323. metrics["process_time_ns"] = timeNs
  324. metrics["cache_hits"] = hits
  325. metrics["cache_misses"] = misses
  326. metrics["error_count"] = errors
  327. metrics["allocation_count"] = atomic.LoadInt64(&pm.allocationCount)
  328. metrics["allocation_size"] = atomic.LoadInt64(&pm.allocationSize)
  329. if hits+misses > 0 {
  330. metrics["cache_hit_rate"] = float64(hits) / float64(hits+misses)
  331. }
  332. if timeNs > 0 {
  333. metrics["items_per_second"] = float64(items) / (float64(timeNs) / 1e9)
  334. if operations > 0 {
  335. metrics["average_operation_time_ms"] = float64(timeNs/operations) / 1e6
  336. }
  337. }
  338. if operations > 0 {
  339. metrics["error_rate"] = float64(errors) / float64(operations)
  340. }
  341. return metrics
  342. }
  343. // Reset resets all metrics
  344. func (pm *PerformanceMetrics) Reset() {
  345. atomic.StoreInt64(&pm.operationCount, 0)
  346. atomic.StoreInt64(&pm.processedItems, 0)
  347. atomic.StoreInt64(&pm.processTime, 0)
  348. atomic.StoreInt64(&pm.allocationCount, 0)
  349. atomic.StoreInt64(&pm.allocationSize, 0)
  350. atomic.StoreInt64(&pm.cacheHits, 0)
  351. atomic.StoreInt64(&pm.cacheMisses, 0)
  352. atomic.StoreInt64(&pm.errorCount, 0)
  353. }
  354. // Unsafe conversion utilities for zero-allocation string/byte conversions
  355. // BytesToStringUnsafe converts bytes to string without allocation
  356. func BytesToStringUnsafe(b []byte) string {
  357. if len(b) == 0 {
  358. return ""
  359. }
  360. return *(*string)(unsafe.Pointer(&b))
  361. }
  362. // StringToBytesUnsafe converts string to bytes without allocation
  363. func StringToBytesUnsafe(s string) []byte {
  364. if len(s) == 0 {
  365. return nil
  366. }
  367. return *(*[]byte)(unsafe.Pointer(&struct {
  368. string
  369. int
  370. }{s, len(s)}))
  371. }
  372. // AppendInt appends an integer to a byte slice efficiently
  373. func AppendInt(b []byte, i int) []byte {
  374. // Convert int to bytes efficiently
  375. if i == 0 {
  376. return append(b, '0')
  377. }
  378. // Handle negative numbers
  379. if i < 0 {
  380. b = append(b, '-')
  381. i = -i
  382. }
  383. // Convert digits
  384. start := len(b)
  385. for i > 0 {
  386. b = append(b, byte('0'+(i%10)))
  387. i /= 10
  388. }
  389. // Reverse the digits
  390. for i, j := start, len(b)-1; i < j; i, j = i+1, j-1 {
  391. b[i], b[j] = b[j], b[i]
  392. }
  393. return b
  394. }