optimized_parser.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. package nginx_log
  2. import (
  3. "bufio"
  4. "bytes"
  5. "io"
  6. "runtime"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "unsafe"
  12. "github.com/0xJacky/Nginx-UI/internal/geolite"
  13. )
  14. type OptimizedLogParser struct {
  15. uaParser UserAgentParser
  16. pool *sync.Pool
  17. geoService *geolite.Service
  18. }
  19. type parseBuffer struct {
  20. fields [][]byte
  21. entry *AccessLogEntry
  22. }
  23. func NewOptimizedLogParser(uaParser UserAgentParser) *OptimizedLogParser {
  24. geoService, _ := geolite.GetService()
  25. return &OptimizedLogParser{
  26. uaParser: uaParser,
  27. geoService: geoService,
  28. pool: &sync.Pool{
  29. New: func() interface{} {
  30. return &parseBuffer{
  31. fields: make([][]byte, 0, 16),
  32. entry: &AccessLogEntry{},
  33. }
  34. },
  35. },
  36. }
  37. }
  38. func (p *OptimizedLogParser) ParseLine(line string) (*AccessLogEntry, error) {
  39. if len(line) == 0 {
  40. return nil, ErrEmptyLogLine
  41. }
  42. buf := p.pool.Get().(*parseBuffer)
  43. defer p.pool.Put(buf)
  44. buf.fields = buf.fields[:0]
  45. *buf.entry = AccessLogEntry{}
  46. lineBytes := stringToBytes(line)
  47. if err := p.parseLineOptimized(lineBytes, buf); err != nil {
  48. return nil, err
  49. }
  50. return buf.entry, nil
  51. }
  52. func (p *OptimizedLogParser) parseLineOptimized(line []byte, buf *parseBuffer) error {
  53. pos := 0
  54. length := len(line)
  55. // Check for minimum valid log format
  56. if length < 20 || !bytes.Contains(line, []byte(" - - [")) {
  57. return ErrUnsupportedLogFormat
  58. }
  59. pos = p.parseIP(line, pos, buf.entry)
  60. if pos >= length {
  61. return ErrUnsupportedLogFormat
  62. }
  63. pos = p.skipSpaces(line, pos)
  64. pos = p.skipField(line, pos)
  65. pos = p.skipSpaces(line, pos)
  66. pos = p.skipField(line, pos)
  67. pos = p.skipSpaces(line, pos)
  68. pos = p.parseTimestamp(line, pos, buf.entry)
  69. if pos >= length {
  70. return ErrUnsupportedLogFormat
  71. }
  72. pos = p.skipSpaces(line, pos)
  73. pos = p.parseRequest(line, pos, buf.entry)
  74. if pos >= length {
  75. return ErrUnsupportedLogFormat
  76. }
  77. pos = p.skipSpaces(line, pos)
  78. pos = p.parseStatus(line, pos, buf.entry)
  79. if pos >= length {
  80. return ErrUnsupportedLogFormat
  81. }
  82. pos = p.skipSpaces(line, pos)
  83. pos = p.parseSize(line, pos, buf.entry)
  84. // After size, the log might end (common format) or continue with referer and user agent
  85. if pos >= length {
  86. return nil // Valid common log format
  87. }
  88. // Try to parse referer if present
  89. pos = p.skipSpaces(line, pos)
  90. if pos < length && line[pos] == '"' {
  91. pos = p.parseReferer(line, pos, buf.entry)
  92. } else if pos < length {
  93. // No referer field, might be end of line
  94. return nil
  95. }
  96. // Try to parse user agent if present
  97. if pos < length {
  98. pos = p.skipSpaces(line, pos)
  99. if pos < length && line[pos] == '"' {
  100. pos = p.parseUserAgent(line, pos, buf.entry)
  101. }
  102. }
  103. // Parse additional fields if present (request_time, upstream_time)
  104. if pos < length-1 {
  105. pos = p.skipSpaces(line, pos)
  106. if pos < length {
  107. pos = p.parseRequestTime(line, pos, buf.entry)
  108. }
  109. }
  110. if pos < length-1 {
  111. pos = p.skipSpaces(line, pos)
  112. if pos < length {
  113. pos = p.parseUpstreamTime(line, pos, buf.entry)
  114. }
  115. }
  116. return nil
  117. }
  118. func (p *OptimizedLogParser) parseIP(line []byte, pos int, entry *AccessLogEntry) int {
  119. start := pos
  120. for pos < len(line) && line[pos] != ' ' {
  121. pos++
  122. }
  123. if pos > start {
  124. entry.IP = bytesToString(line[start:pos])
  125. // Populate geographic fields using geolite service
  126. if p.geoService != nil && entry.IP != "" && entry.IP != "-" {
  127. if location, err := p.geoService.Search(entry.IP); err == nil && location != nil {
  128. entry.RegionCode = location.CountryCode
  129. entry.Province = location.Region
  130. entry.City = location.City
  131. }
  132. }
  133. }
  134. return pos
  135. }
  136. func (p *OptimizedLogParser) parseTimestamp(line []byte, pos int, entry *AccessLogEntry) int {
  137. if pos >= len(line) || line[pos] != '[' {
  138. return pos
  139. }
  140. pos++
  141. start := pos
  142. for pos < len(line) && line[pos] != ']' {
  143. pos++
  144. }
  145. if pos > start {
  146. timeStr := bytesToString(line[start:pos])
  147. if t, err := time.Parse("02/Jan/2006:15:04:05 -0700", timeStr); err == nil {
  148. entry.Timestamp = t.Unix()
  149. }
  150. }
  151. if pos < len(line) && line[pos] == ']' {
  152. pos++
  153. }
  154. return pos
  155. }
  156. func (p *OptimizedLogParser) parseRequest(line []byte, pos int, entry *AccessLogEntry) int {
  157. if pos >= len(line) || line[pos] != '"' {
  158. return pos
  159. }
  160. pos++
  161. start := pos
  162. for pos < len(line) && line[pos] != '"' {
  163. pos++
  164. }
  165. if pos > start {
  166. requestLine := line[start:pos]
  167. parts := bytes.Fields(requestLine)
  168. if len(parts) >= 2 {
  169. entry.Method = bytesToString(parts[0])
  170. entry.Path = bytesToString(parts[1])
  171. }
  172. if len(parts) >= 3 {
  173. entry.Protocol = bytesToString(parts[2])
  174. }
  175. }
  176. if pos < len(line) && line[pos] == '"' {
  177. pos++
  178. }
  179. return pos
  180. }
  181. func (p *OptimizedLogParser) parseStatus(line []byte, pos int, entry *AccessLogEntry) int {
  182. start := pos
  183. for pos < len(line) && line[pos] >= '0' && line[pos] <= '9' {
  184. pos++
  185. }
  186. if pos > start {
  187. if status, err := fastParseInt(line[start:pos]); err == nil {
  188. entry.Status = status
  189. }
  190. }
  191. return pos
  192. }
  193. func (p *OptimizedLogParser) parseSize(line []byte, pos int, entry *AccessLogEntry) int {
  194. start := pos
  195. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '-') {
  196. pos++
  197. }
  198. if pos > start {
  199. sizeBytes := line[start:pos]
  200. if len(sizeBytes) == 1 && sizeBytes[0] == '-' {
  201. entry.BytesSent = 0
  202. } else {
  203. if size, err := fastParseInt(sizeBytes); err == nil {
  204. entry.BytesSent = int64(size)
  205. }
  206. }
  207. }
  208. return pos
  209. }
  210. func (p *OptimizedLogParser) parseReferer(line []byte, pos int, entry *AccessLogEntry) int {
  211. if pos >= len(line) || line[pos] != '"' {
  212. return pos
  213. }
  214. pos++
  215. start := pos
  216. for pos < len(line) && line[pos] != '"' {
  217. pos++
  218. }
  219. if pos > start {
  220. referer := bytesToString(line[start:pos])
  221. // Keep the "-" value as is for tests
  222. entry.Referer = referer
  223. }
  224. if pos < len(line) && line[pos] == '"' {
  225. pos++
  226. }
  227. return pos
  228. }
  229. func (p *OptimizedLogParser) parseUserAgent(line []byte, pos int, entry *AccessLogEntry) int {
  230. if pos >= len(line) || line[pos] != '"' {
  231. return pos
  232. }
  233. pos++
  234. start := pos
  235. for pos < len(line) && line[pos] != '"' {
  236. pos++
  237. }
  238. if pos > start {
  239. userAgent := bytesToString(line[start:pos])
  240. entry.UserAgent = userAgent
  241. if p.uaParser != nil && userAgent != "-" {
  242. parsed := p.uaParser.Parse(userAgent)
  243. // Don't set "Unknown" values to maintain compatibility with tests
  244. if parsed.Browser != "Unknown" {
  245. entry.Browser = parsed.Browser
  246. entry.BrowserVer = parsed.BrowserVer
  247. }
  248. if parsed.OS != "Unknown" {
  249. entry.OS = parsed.OS
  250. entry.OSVersion = parsed.OSVersion
  251. }
  252. if parsed.DeviceType != "Desktop" || (userAgent != "-" && userAgent != "") {
  253. entry.DeviceType = parsed.DeviceType
  254. }
  255. }
  256. }
  257. if pos < len(line) && line[pos] == '"' {
  258. pos++
  259. }
  260. return pos
  261. }
  262. func (p *OptimizedLogParser) parseRequestTime(line []byte, pos int, entry *AccessLogEntry) int {
  263. start := pos
  264. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  265. pos++
  266. }
  267. if pos > start {
  268. timeStr := bytesToString(line[start:pos])
  269. if timeStr != "-" {
  270. if val, err := strconv.ParseFloat(timeStr, 64); err == nil {
  271. entry.RequestTime = val
  272. }
  273. }
  274. }
  275. return pos
  276. }
  277. func (p *OptimizedLogParser) parseUpstreamTime(line []byte, pos int, entry *AccessLogEntry) int {
  278. start := pos
  279. for pos < len(line) && ((line[pos] >= '0' && line[pos] <= '9') || line[pos] == '.' || line[pos] == '-') {
  280. pos++
  281. }
  282. if pos > start {
  283. timeStr := bytesToString(line[start:pos])
  284. if timeStr != "-" {
  285. if val, err := strconv.ParseFloat(timeStr, 64); err == nil {
  286. entry.UpstreamTime = &val
  287. }
  288. }
  289. }
  290. return pos
  291. }
  292. func (p *OptimizedLogParser) skipSpaces(line []byte, pos int) int {
  293. for pos < len(line) && line[pos] == ' ' {
  294. pos++
  295. }
  296. return pos
  297. }
  298. func (p *OptimizedLogParser) skipField(line []byte, pos int) int {
  299. for pos < len(line) && line[pos] != ' ' {
  300. pos++
  301. }
  302. return pos
  303. }
  304. func fastParseInt(b []byte) (int, error) {
  305. if len(b) == 0 {
  306. return 0, strconv.ErrSyntax
  307. }
  308. neg := false
  309. if b[0] == '-' {
  310. neg = true
  311. b = b[1:]
  312. if len(b) == 0 {
  313. return 0, strconv.ErrSyntax
  314. }
  315. }
  316. n := 0
  317. for _, c := range b {
  318. if c < '0' || c > '9' {
  319. return 0, strconv.ErrSyntax
  320. }
  321. n = n*10 + int(c-'0')
  322. }
  323. if neg {
  324. n = -n
  325. }
  326. return n, nil
  327. }
  328. func stringToBytes(s string) []byte {
  329. return *(*[]byte)(unsafe.Pointer(&struct {
  330. string
  331. Cap int
  332. }{s, len(s)}))
  333. }
  334. func bytesToString(b []byte) string {
  335. return *(*string)(unsafe.Pointer(&b))
  336. }
  337. type StreamingLogProcessor struct {
  338. parser *OptimizedLogParser
  339. batchSize int
  340. workers int
  341. indexer *LogIndexer
  342. entryChannel chan *AccessLogEntry
  343. errorChannel chan error
  344. wg sync.WaitGroup
  345. }
  346. func NewStreamingLogProcessor(indexer *LogIndexer, batchSize, workers int) *StreamingLogProcessor {
  347. return &StreamingLogProcessor{
  348. parser: NewOptimizedLogParser(NewSimpleUserAgentParser()),
  349. batchSize: batchSize,
  350. workers: workers,
  351. indexer: indexer,
  352. entryChannel: make(chan *AccessLogEntry, batchSize*2),
  353. errorChannel: make(chan error, workers),
  354. }
  355. }
  356. func (p *StreamingLogProcessor) ProcessFile(reader io.Reader) error {
  357. for i := 0; i < p.workers; i++ {
  358. p.wg.Add(1)
  359. go p.worker()
  360. }
  361. scanner := bufio.NewScanner(reader)
  362. scanner.Buffer(make([]byte, 0, 128*1024), 2048*1024)
  363. go func() {
  364. defer close(p.entryChannel)
  365. for scanner.Scan() {
  366. line := scanner.Text()
  367. if len(line) == 0 {
  368. continue
  369. }
  370. entry, err := p.parser.ParseLine(line)
  371. if err != nil {
  372. continue
  373. }
  374. select {
  375. case p.entryChannel <- entry:
  376. case err := <-p.errorChannel:
  377. p.errorChannel <- err
  378. return
  379. }
  380. }
  381. }()
  382. p.wg.Wait()
  383. close(p.errorChannel)
  384. select {
  385. case err := <-p.errorChannel:
  386. return err
  387. default:
  388. return nil
  389. }
  390. }
  391. func (p *StreamingLogProcessor) worker() {
  392. defer p.wg.Done()
  393. batch := make([]*AccessLogEntry, 0, p.batchSize)
  394. for entry := range p.entryChannel {
  395. batch = append(batch, entry)
  396. if len(batch) >= p.batchSize {
  397. if err := p.processBatch(batch); err != nil {
  398. p.errorChannel <- err
  399. return
  400. }
  401. batch = batch[:0]
  402. }
  403. }
  404. if len(batch) > 0 {
  405. if err := p.processBatch(batch); err != nil {
  406. p.errorChannel <- err
  407. return
  408. }
  409. }
  410. }
  411. func (p *StreamingLogProcessor) processBatch(entries []*AccessLogEntry) error {
  412. if p.indexer == nil {
  413. return nil
  414. }
  415. // For now, just count the entries - indexing implementation would go here
  416. // This allows the benchmark to run and measure parsing performance
  417. _ = entries
  418. return nil
  419. }
  420. // ParseLines parses multiple log lines and returns parsed entries
  421. func (p *OptimizedLogParser) ParseLines(lines []string) []*AccessLogEntry {
  422. return p.ParseLinesParallel(lines)
  423. }
  424. // ParseLinesParallel parses multiple log lines in parallel
  425. func (p *OptimizedLogParser) ParseLinesParallel(lines []string) []*AccessLogEntry {
  426. if len(lines) == 0 {
  427. return nil
  428. }
  429. // For small datasets, use single-threaded parsing
  430. if len(lines) < 100 {
  431. return p.parseLinesSingleThreaded(lines)
  432. }
  433. numWorkers := runtime.NumCPU()
  434. if numWorkers > len(lines)/10 {
  435. numWorkers = len(lines)/10 + 1
  436. }
  437. results := make([]*AccessLogEntry, 0, len(lines))
  438. resultChan := make(chan *AccessLogEntry, len(lines))
  439. lineChan := make(chan string, numWorkers*2)
  440. var wg sync.WaitGroup
  441. // Start workers
  442. for i := 0; i < numWorkers; i++ {
  443. wg.Add(1)
  444. go func() {
  445. defer wg.Done()
  446. for line := range lineChan {
  447. if entry, err := p.ParseLine(line); err == nil {
  448. resultChan <- entry
  449. }
  450. }
  451. }()
  452. }
  453. // Send lines to workers
  454. go func() {
  455. for _, line := range lines {
  456. if strings.TrimSpace(line) != "" {
  457. lineChan <- line
  458. }
  459. }
  460. close(lineChan)
  461. }()
  462. // Wait for workers to finish
  463. go func() {
  464. wg.Wait()
  465. close(resultChan)
  466. }()
  467. // Collect results
  468. for entry := range resultChan {
  469. results = append(results, entry)
  470. }
  471. return results
  472. }
  473. // parseLinesSingleThreaded parses lines in a single thread
  474. func (p *OptimizedLogParser) parseLinesSingleThreaded(lines []string) []*AccessLogEntry {
  475. results := make([]*AccessLogEntry, 0, len(lines))
  476. for _, line := range lines {
  477. if strings.TrimSpace(line) == "" {
  478. continue
  479. }
  480. if entry, err := p.ParseLine(line); err == nil {
  481. results = append(results, entry)
  482. }
  483. }
  484. return results
  485. }