stream_optimizations.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package parser
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "io"
  7. "time"
  8. "unsafe"
  9. )
  10. // OptimizedParseStream provides a high-performance streaming parser with zero-allocation optimizations
  11. func (p *OptimizedParser) OptimizedParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
  12. startTime := time.Now()
  13. // Pre-allocate result with estimated capacity to reduce reallocations
  14. result := &ParseResult{
  15. Entries: make([]*AccessLogEntry, 0, 10000), // Pre-allocate for better performance
  16. }
  17. // Use a larger buffer for better I/O performance
  18. const bufferSize = 64 * 1024 // 64KB buffer
  19. scanner := bufio.NewScanner(reader)
  20. scanner.Buffer(make([]byte, bufferSize), p.config.MaxLineLength)
  21. // Pre-allocate batch slice with capacity
  22. batch := make([]string, 0, p.config.BatchSize)
  23. contextCheckCounter := 0
  24. const contextCheckFreq = 100 // Check context every 100 lines instead of every line
  25. // Stream processing with optimized batching
  26. for scanner.Scan() {
  27. // Reduce context checking frequency for better performance
  28. contextCheckCounter++
  29. if contextCheckCounter >= contextCheckFreq {
  30. select {
  31. case <-ctx.Done():
  32. return result, ctx.Err()
  33. default:
  34. }
  35. contextCheckCounter = 0
  36. }
  37. // Get line bytes from scanner
  38. lineBytes := scanner.Bytes()
  39. if len(lineBytes) == 0 {
  40. continue
  41. }
  42. // Convert bytes to string with proper copying to avoid corruption
  43. line := string(lineBytes)
  44. batch = append(batch, line)
  45. result.Processed++
  46. // Process full batches
  47. if len(batch) >= p.config.BatchSize {
  48. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  49. return result, err
  50. }
  51. // Reset batch slice but keep capacity
  52. batch = batch[:0]
  53. }
  54. }
  55. // Process remaining lines
  56. if len(batch) > 0 {
  57. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  58. return result, err
  59. }
  60. }
  61. // Check for scanner errors
  62. if err := scanner.Err(); err != nil {
  63. return result, err
  64. }
  65. // Finalize result
  66. result.Duration = time.Since(startTime)
  67. if result.Processed > 0 {
  68. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  69. }
  70. return result, nil
  71. }
  72. // processBatchOptimized processes a batch of lines with memory-efficient operations
  73. func (p *OptimizedParser) processBatchOptimized(ctx context.Context, batch []string, result *ParseResult) error {
  74. batchResult := p.ParseLinesWithContext(ctx, batch)
  75. // Pre-grow the result.Entries slice to avoid multiple reallocations
  76. currentLen := len(result.Entries)
  77. newLen := currentLen + len(batchResult.Entries)
  78. // Grow the slice efficiently
  79. if cap(result.Entries) < newLen {
  80. newEntries := make([]*AccessLogEntry, newLen, newLen*2) // Double capacity for future growth
  81. copy(newEntries, result.Entries)
  82. result.Entries = newEntries
  83. } else {
  84. result.Entries = result.Entries[:newLen]
  85. }
  86. // Copy batch results efficiently
  87. copy(result.Entries[currentLen:], batchResult.Entries)
  88. result.Succeeded += batchResult.Succeeded
  89. result.Failed += batchResult.Failed
  90. return nil
  91. }
  92. // StreamBuffer provides a reusable buffer for streaming operations
  93. type StreamBuffer struct {
  94. data []byte
  95. offset int
  96. }
  97. // NewStreamBuffer creates a new stream buffer with the specified capacity
  98. func NewStreamBuffer(capacity int) *StreamBuffer {
  99. return &StreamBuffer{
  100. data: make([]byte, 0, capacity),
  101. }
  102. }
  103. // ReadLine reads a single line from the buffer, reusing memory where possible
  104. func (sb *StreamBuffer) ReadLine(reader io.Reader) ([]byte, error) {
  105. // Reset buffer for reuse
  106. sb.data = sb.data[:0]
  107. sb.offset = 0
  108. buf := make([]byte, 1)
  109. for {
  110. n, err := reader.Read(buf)
  111. if err != nil {
  112. if err == io.EOF && len(sb.data) > 0 {
  113. return sb.data, nil
  114. }
  115. return nil, err
  116. }
  117. if n == 0 {
  118. continue
  119. }
  120. if buf[0] == '\n' {
  121. return sb.data, nil
  122. }
  123. sb.data = append(sb.data, buf[0])
  124. }
  125. }
  126. // ChunkedParseStream processes the stream in chunks for better memory usage
  127. func (p *OptimizedParser) ChunkedParseStream(ctx context.Context, reader io.Reader, chunkSize int) (*ParseResult, error) {
  128. startTime := time.Now()
  129. result := &ParseResult{
  130. Entries: make([]*AccessLogEntry, 0, chunkSize),
  131. }
  132. buffer := make([]byte, chunkSize)
  133. remainder := make([]byte, 0, 1024)
  134. for {
  135. // Check context periodically
  136. select {
  137. case <-ctx.Done():
  138. return result, ctx.Err()
  139. default:
  140. }
  141. n, err := reader.Read(buffer)
  142. if n == 0 && err == io.EOF {
  143. break
  144. }
  145. if err != nil && err != io.EOF {
  146. return result, err
  147. }
  148. // Combine remainder with new data
  149. data := append(remainder, buffer[:n]...)
  150. lines := bytes.Split(data, []byte("\n"))
  151. // Keep the last incomplete line as remainder
  152. if err != io.EOF {
  153. remainder = append(remainder[:0], lines[len(lines)-1]...)
  154. lines = lines[:len(lines)-1]
  155. } else {
  156. remainder = remainder[:0]
  157. }
  158. // Process lines in batches
  159. batch := make([]string, 0, p.config.BatchSize)
  160. for _, lineBytes := range lines {
  161. if len(lineBytes) == 0 {
  162. continue
  163. }
  164. line := string(lineBytes)
  165. batch = append(batch, line)
  166. result.Processed++
  167. if len(batch) >= p.config.BatchSize {
  168. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  169. return result, err
  170. }
  171. batch = batch[:0]
  172. }
  173. }
  174. // Process remaining batch
  175. if len(batch) > 0 {
  176. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  177. return result, err
  178. }
  179. }
  180. if err == io.EOF {
  181. break
  182. }
  183. }
  184. // Process any remaining data
  185. if len(remainder) > 0 {
  186. line := string(remainder)
  187. batch := []string{line}
  188. result.Processed++
  189. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  190. return result, err
  191. }
  192. }
  193. result.Duration = time.Since(startTime)
  194. if result.Processed > 0 {
  195. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  196. }
  197. return result, nil
  198. }
  199. // unsafeBytesToString converts bytes to string without memory allocation
  200. func unsafeBytesToString(b []byte) string {
  201. if len(b) == 0 {
  202. return ""
  203. }
  204. return *(*string)(unsafe.Pointer(&b))
  205. }
  206. // LineBuffer provides a reusable line buffer for parsing operations
  207. type LineBuffer struct {
  208. data []byte
  209. cap int
  210. }
  211. // NewLineBuffer creates a new line buffer with the specified capacity
  212. func NewLineBuffer(capacity int) *LineBuffer {
  213. return &LineBuffer{
  214. data: make([]byte, 0, capacity),
  215. cap: capacity,
  216. }
  217. }
  218. // Reset resets the buffer for reuse
  219. func (lb *LineBuffer) Reset() {
  220. lb.data = lb.data[:0]
  221. }
  222. // Grow grows the buffer to accommodate more data
  223. func (lb *LineBuffer) Grow(n int) {
  224. if cap(lb.data) < len(lb.data)+n {
  225. newData := make([]byte, len(lb.data), (len(lb.data)+n)*2)
  226. copy(newData, lb.data)
  227. lb.data = newData
  228. }
  229. }
  230. // Append appends data to the buffer
  231. func (lb *LineBuffer) Append(data []byte) {
  232. lb.Grow(len(data))
  233. lb.data = append(lb.data, data...)
  234. }
  235. // String returns the buffer content as a string (with copying)
  236. func (lb *LineBuffer) String() string {
  237. return string(lb.data)
  238. }
  239. // UnsafeString returns the buffer content as a string without copying
  240. func (lb *LineBuffer) UnsafeString() string {
  241. return unsafeBytesToString(lb.data)
  242. }
  243. // Bytes returns the buffer content as bytes
  244. func (lb *LineBuffer) Bytes() []byte {
  245. return lb.data
  246. }
  247. // MemoryEfficientParseStream uses minimal memory allocations for streaming
  248. func (p *OptimizedParser) MemoryEfficientParseStream(ctx context.Context, reader io.Reader) (*ParseResult, error) {
  249. startTime := time.Now()
  250. result := &ParseResult{
  251. Entries: make([]*AccessLogEntry, 0, 1000),
  252. }
  253. // Use pooled buffers for memory efficiency
  254. lineBuffer := NewLineBuffer(2048)
  255. defer lineBuffer.Reset()
  256. // Use a smaller, more efficient scanner
  257. scanner := bufio.NewScanner(reader)
  258. batch := make([]string, 0, p.config.BatchSize)
  259. lineCount := 0
  260. for scanner.Scan() {
  261. // Reduce context check frequency
  262. lineCount++
  263. if lineCount%50 == 0 {
  264. select {
  265. case <-ctx.Done():
  266. return result, ctx.Err()
  267. default:
  268. }
  269. }
  270. lineBuffer.Reset()
  271. lineBuffer.Append(scanner.Bytes())
  272. if lineBuffer.Bytes() == nil || len(lineBuffer.Bytes()) == 0 {
  273. continue
  274. }
  275. // Use safe conversion to avoid corruption
  276. line := lineBuffer.String()
  277. batch = append(batch, line)
  278. result.Processed++
  279. if len(batch) >= p.config.BatchSize {
  280. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  281. return result, err
  282. }
  283. batch = batch[:0]
  284. }
  285. }
  286. // Process remaining lines
  287. if len(batch) > 0 {
  288. if err := p.processBatchOptimized(ctx, batch, result); err != nil {
  289. return result, err
  290. }
  291. }
  292. if err := scanner.Err(); err != nil {
  293. return result, err
  294. }
  295. result.Duration = time.Since(startTime)
  296. if result.Processed > 0 {
  297. result.ErrorRate = float64(result.Failed) / float64(result.Processed)
  298. }
  299. return result, nil
  300. }