performance.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. package utils
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "unsafe"
  8. )
  9. // StringPool provides efficient string reuse and interning to reduce allocations and memory usage
  10. type StringPool struct {
  11. pool sync.Pool
  12. intern map[string]string // for string interning
  13. mutex sync.RWMutex // for intern map
  14. }
  15. // NewStringPool creates a new string pool
  16. func NewStringPool() *StringPool {
  17. return &StringPool{
  18. pool: sync.Pool{
  19. New: func() interface{} {
  20. return make([]byte, 0, 1024) // Pre-allocate 1KB
  21. },
  22. },
  23. intern: make(map[string]string, 10000),
  24. }
  25. }
  26. // Get retrieves a byte buffer from the pool
  27. func (sp *StringPool) Get() []byte {
  28. return sp.pool.Get().([]byte)[:0]
  29. }
  30. // Put returns a byte buffer to the pool
  31. func (sp *StringPool) Put(b []byte) {
  32. if cap(b) < 32*1024 { // Don't keep very large buffers
  33. sp.pool.Put(b)
  34. }
  35. }
  36. // Intern interns a string to reduce memory duplication
  37. func (sp *StringPool) Intern(s string) string {
  38. if s == "" {
  39. return ""
  40. }
  41. sp.mutex.RLock()
  42. if interned, exists := sp.intern[s]; exists {
  43. sp.mutex.RUnlock()
  44. return interned
  45. }
  46. sp.mutex.RUnlock()
  47. sp.mutex.Lock()
  48. defer sp.mutex.Unlock()
  49. // Double-check after acquiring write lock
  50. if interned, exists := sp.intern[s]; exists {
  51. return interned
  52. }
  53. // Don't intern very long strings
  54. if len(s) > 1024 {
  55. return s
  56. }
  57. sp.intern[s] = s
  58. return s
  59. }
  60. // Size returns the number of interned strings
  61. func (sp *StringPool) Size() int {
  62. sp.mutex.RLock()
  63. defer sp.mutex.RUnlock()
  64. return len(sp.intern)
  65. }
  66. // Clear clears the string pool
  67. func (sp *StringPool) Clear() {
  68. sp.mutex.Lock()
  69. defer sp.mutex.Unlock()
  70. sp.intern = make(map[string]string, 10000)
  71. }
  72. // MemoryPool provides memory buffer pooling to reduce GC pressure
  73. type MemoryPool struct {
  74. pools []*sync.Pool
  75. sizes []int
  76. }
  77. // NewMemoryPool creates a memory pool with different buffer sizes
  78. func NewMemoryPool() *MemoryPool {
  79. sizes := []int{64, 256, 1024, 4096, 16384, 65536} // Different buffer sizes
  80. pools := make([]*sync.Pool, len(sizes))
  81. for i, size := range sizes {
  82. s := size // Capture for closure
  83. pools[i] = &sync.Pool{
  84. New: func() interface{} {
  85. return make([]byte, 0, s)
  86. },
  87. }
  88. }
  89. return &MemoryPool{
  90. pools: pools,
  91. sizes: sizes,
  92. }
  93. }
  94. // Get retrieves a buffer of appropriate size
  95. func (mp *MemoryPool) Get(minSize int) []byte {
  96. // Find the smallest pool that fits
  97. for i, size := range mp.sizes {
  98. if size >= minSize {
  99. buf := mp.pools[i].Get().([]byte)
  100. return buf[:0] // Reset length but keep capacity
  101. }
  102. }
  103. // If no pool fits, allocate directly
  104. return make([]byte, 0, minSize)
  105. }
  106. // Put returns a buffer to the appropriate pool
  107. func (mp *MemoryPool) Put(buf []byte) {
  108. capacity := cap(buf)
  109. // Find the appropriate pool
  110. for i, size := range mp.sizes {
  111. if capacity <= size {
  112. // Reset buffer before returning to pool
  113. buf = buf[:0]
  114. mp.pools[i].Put(buf)
  115. return
  116. }
  117. }
  118. // Buffer too large, let GC handle it
  119. }
  120. // WorkerPool provides optimized worker management
  121. type WorkerPool struct {
  122. workers []Worker
  123. workChan chan func()
  124. closeChan chan struct{}
  125. wg sync.WaitGroup
  126. }
  127. // Worker represents a worker goroutine
  128. type Worker struct {
  129. ID int
  130. workChan chan func()
  131. }
  132. // NewWorkerPool creates an optimized worker pool
  133. func NewWorkerPool(numWorkers int, queueSize int) *WorkerPool {
  134. pool := &WorkerPool{
  135. workers: make([]Worker, numWorkers),
  136. workChan: make(chan func(), queueSize),
  137. closeChan: make(chan struct{}),
  138. }
  139. // Start workers
  140. for i := 0; i < numWorkers; i++ {
  141. pool.workers[i] = Worker{
  142. ID: i,
  143. workChan: pool.workChan,
  144. }
  145. pool.wg.Add(1)
  146. go pool.runWorker(i)
  147. }
  148. return pool
  149. }
  150. // runWorker runs a single worker
  151. func (wp *WorkerPool) runWorker(id int) {
  152. defer wp.wg.Done()
  153. for {
  154. select {
  155. case work := <-wp.workChan:
  156. if work != nil {
  157. work()
  158. }
  159. case <-wp.closeChan:
  160. return
  161. }
  162. }
  163. }
  164. // Submit submits work to the pool
  165. func (wp *WorkerPool) Submit(work func()) bool {
  166. select {
  167. case wp.workChan <- work:
  168. return true
  169. default:
  170. return false // Pool is full
  171. }
  172. }
  173. // Close closes the worker pool
  174. func (wp *WorkerPool) Close() {
  175. close(wp.closeChan)
  176. wp.wg.Wait()
  177. }
  178. // BatchProcessor provides efficient batch processing
  179. type BatchProcessor struct {
  180. items []interface{}
  181. capacity int
  182. mutex sync.Mutex
  183. }
  184. // NewBatchProcessor creates a batch processor
  185. func NewBatchProcessor(capacity int) *BatchProcessor {
  186. return &BatchProcessor{
  187. items: make([]interface{}, 0, capacity),
  188. capacity: capacity,
  189. }
  190. }
  191. // Add adds an item to the batch
  192. func (bp *BatchProcessor) Add(item interface{}) bool {
  193. bp.mutex.Lock()
  194. defer bp.mutex.Unlock()
  195. if len(bp.items) >= bp.capacity {
  196. return false
  197. }
  198. bp.items = append(bp.items, item)
  199. return true
  200. }
  201. // GetBatch returns and clears the current batch
  202. func (bp *BatchProcessor) GetBatch() []interface{} {
  203. bp.mutex.Lock()
  204. defer bp.mutex.Unlock()
  205. if len(bp.items) == 0 {
  206. return nil
  207. }
  208. batch := make([]interface{}, len(bp.items))
  209. copy(batch, bp.items)
  210. bp.items = bp.items[:0] // Reset slice
  211. return batch
  212. }
  213. // Size returns current batch size
  214. func (bp *BatchProcessor) Size() int {
  215. bp.mutex.Lock()
  216. defer bp.mutex.Unlock()
  217. return len(bp.items)
  218. }
  219. // MemoryOptimizer provides memory usage optimization
  220. type MemoryOptimizer struct {
  221. gcThreshold int64 // Bytes
  222. lastGC time.Time
  223. memStats runtime.MemStats
  224. forceGCEnabled bool
  225. }
  226. // NewMemoryOptimizer creates a memory optimizer
  227. func NewMemoryOptimizer(gcThreshold int64) *MemoryOptimizer {
  228. if gcThreshold <= 0 {
  229. gcThreshold = 512 * 1024 * 1024 // Default 512MB
  230. }
  231. return &MemoryOptimizer{
  232. gcThreshold: gcThreshold,
  233. forceGCEnabled: true,
  234. }
  235. }
  236. // CheckMemoryUsage checks memory usage and triggers GC if needed
  237. func (mo *MemoryOptimizer) CheckMemoryUsage() {
  238. if !mo.forceGCEnabled {
  239. return
  240. }
  241. runtime.ReadMemStats(&mo.memStats)
  242. // Check if we should force GC
  243. if mo.memStats.Alloc > uint64(mo.gcThreshold) && time.Since(mo.lastGC) > 30*time.Second {
  244. runtime.GC()
  245. mo.lastGC = time.Now()
  246. }
  247. }
  248. // MemoryStats represents memory statistics
  249. type MemoryStats struct {
  250. AllocMB float64 `json:"alloc_mb"`
  251. SysMB float64 `json:"sys_mb"`
  252. HeapAllocMB float64 `json:"heap_alloc_mb"`
  253. HeapSysMB float64 `json:"heap_sys_mb"`
  254. GCCount uint32 `json:"gc_count"`
  255. LastGCNs uint64 `json:"last_gc_ns"`
  256. GCCPUPercent float64 `json:"gc_cpu_percent"`
  257. }
  258. // GetMemoryStats returns current memory statistics
  259. func (mo *MemoryOptimizer) GetMemoryStats() *MemoryStats {
  260. runtime.ReadMemStats(&mo.memStats)
  261. return &MemoryStats{
  262. AllocMB: float64(mo.memStats.Alloc) / 1024 / 1024,
  263. SysMB: float64(mo.memStats.Sys) / 1024 / 1024,
  264. HeapAllocMB: float64(mo.memStats.HeapAlloc) / 1024 / 1024,
  265. HeapSysMB: float64(mo.memStats.HeapSys) / 1024 / 1024,
  266. GCCount: mo.memStats.NumGC,
  267. LastGCNs: mo.memStats.LastGC,
  268. GCCPUPercent: mo.memStats.GCCPUFraction * 100,
  269. }
  270. }
  271. // PerformanceMetrics tracks general performance metrics
  272. type PerformanceMetrics struct {
  273. operationCount int64
  274. processedItems int64
  275. processTime int64 // nanoseconds
  276. allocationCount int64
  277. allocationSize int64
  278. cacheHits int64
  279. cacheMisses int64
  280. errorCount int64
  281. mutex sync.RWMutex
  282. }
  283. // NewPerformanceMetrics creates performance metrics tracker
  284. func NewPerformanceMetrics() *PerformanceMetrics {
  285. return &PerformanceMetrics{}
  286. }
  287. // RecordOperation records operation metrics
  288. func (pm *PerformanceMetrics) RecordOperation(itemCount int, duration time.Duration, success bool) {
  289. atomic.AddInt64(&pm.operationCount, 1)
  290. atomic.AddInt64(&pm.processedItems, int64(itemCount))
  291. atomic.AddInt64(&pm.processTime, int64(duration))
  292. if !success {
  293. atomic.AddInt64(&pm.errorCount, 1)
  294. }
  295. }
  296. // RecordCacheHit records cache hit
  297. func (pm *PerformanceMetrics) RecordCacheHit() {
  298. atomic.AddInt64(&pm.cacheHits, 1)
  299. }
  300. // RecordCacheMiss records cache miss
  301. func (pm *PerformanceMetrics) RecordCacheMiss() {
  302. atomic.AddInt64(&pm.cacheMisses, 1)
  303. }
  304. // RecordAllocation records memory allocation
  305. func (pm *PerformanceMetrics) RecordAllocation(size int64) {
  306. atomic.AddInt64(&pm.allocationCount, 1)
  307. atomic.AddInt64(&pm.allocationSize, size)
  308. }
  309. // GetMetrics returns current metrics snapshot
  310. func (pm *PerformanceMetrics) GetMetrics() map[string]interface{} {
  311. operations := atomic.LoadInt64(&pm.operationCount)
  312. items := atomic.LoadInt64(&pm.processedItems)
  313. timeNs := atomic.LoadInt64(&pm.processTime)
  314. hits := atomic.LoadInt64(&pm.cacheHits)
  315. misses := atomic.LoadInt64(&pm.cacheMisses)
  316. errors := atomic.LoadInt64(&pm.errorCount)
  317. metrics := make(map[string]interface{})
  318. metrics["operation_count"] = operations
  319. metrics["processed_items"] = items
  320. metrics["process_time_ns"] = timeNs
  321. metrics["cache_hits"] = hits
  322. metrics["cache_misses"] = misses
  323. metrics["error_count"] = errors
  324. metrics["allocation_count"] = atomic.LoadInt64(&pm.allocationCount)
  325. metrics["allocation_size"] = atomic.LoadInt64(&pm.allocationSize)
  326. if hits+misses > 0 {
  327. metrics["cache_hit_rate"] = float64(hits) / float64(hits+misses)
  328. }
  329. if timeNs > 0 {
  330. metrics["items_per_second"] = float64(items) / (float64(timeNs) / 1e9)
  331. if operations > 0 {
  332. metrics["average_operation_time_ms"] = float64(timeNs/operations) / 1e6
  333. }
  334. }
  335. if operations > 0 {
  336. metrics["error_rate"] = float64(errors) / float64(operations)
  337. }
  338. return metrics
  339. }
  340. // Reset resets all metrics
  341. func (pm *PerformanceMetrics) Reset() {
  342. atomic.StoreInt64(&pm.operationCount, 0)
  343. atomic.StoreInt64(&pm.processedItems, 0)
  344. atomic.StoreInt64(&pm.processTime, 0)
  345. atomic.StoreInt64(&pm.allocationCount, 0)
  346. atomic.StoreInt64(&pm.allocationSize, 0)
  347. atomic.StoreInt64(&pm.cacheHits, 0)
  348. atomic.StoreInt64(&pm.cacheMisses, 0)
  349. atomic.StoreInt64(&pm.errorCount, 0)
  350. }
  351. // Unsafe conversion utilities for zero-allocation string/byte conversions
  352. // BytesToStringUnsafe converts bytes to string without allocation
  353. func BytesToStringUnsafe(b []byte) string {
  354. if len(b) == 0 {
  355. return ""
  356. }
  357. return *(*string)(unsafe.Pointer(&b))
  358. }
  359. // StringToBytesUnsafe converts string to bytes without allocation
  360. func StringToBytesUnsafe(s string) []byte {
  361. if len(s) == 0 {
  362. return nil
  363. }
  364. return *(*[]byte)(unsafe.Pointer(&struct {
  365. string
  366. int
  367. }{s, len(s)}))
  368. }
  369. // AppendInt appends an integer to a byte slice efficiently
  370. func AppendInt(b []byte, i int) []byte {
  371. // Convert int to bytes efficiently
  372. if i == 0 {
  373. return append(b, '0')
  374. }
  375. // Handle negative numbers
  376. if i < 0 {
  377. b = append(b, '-')
  378. i = -i
  379. }
  380. // Convert digits
  381. start := len(b)
  382. for i > 0 {
  383. b = append(b, byte('0'+(i%10)))
  384. i /= 10
  385. }
  386. // Reverse the digits
  387. for i, j := start, len(b)-1; i < j; i, j = i+1, j-1 {
  388. b[i], b[j] = b[j], b[i]
  389. }
  390. return b
  391. }