analytics_events.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package internal
  2. import (
  3. "bytes"
  4. "container/heap"
  5. "github.com/newrelic/go-agent/internal/jsonx"
  6. )
  7. type analyticsEvent struct {
  8. priority Priority
  9. jsonWriter
  10. }
  11. type analyticsEventHeap []analyticsEvent
  12. type analyticsEvents struct {
  13. numSeen int
  14. events analyticsEventHeap
  15. failedHarvests int
  16. }
  17. func (events *analyticsEvents) NumSeen() float64 { return float64(events.numSeen) }
  18. func (events *analyticsEvents) NumSaved() float64 { return float64(len(events.events)) }
  19. func (h analyticsEventHeap) Len() int { return len(h) }
  20. func (h analyticsEventHeap) Less(i, j int) bool { return h[i].priority.isLowerPriority(h[j].priority) }
  21. func (h analyticsEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  22. // Push and Pop are unused: only heap.Init and heap.Fix are used.
  23. func (h analyticsEventHeap) Push(x interface{}) {}
  24. func (h analyticsEventHeap) Pop() interface{} { return nil }
  25. func newAnalyticsEvents(max int) *analyticsEvents {
  26. return &analyticsEvents{
  27. numSeen: 0,
  28. events: make(analyticsEventHeap, 0, max),
  29. failedHarvests: 0,
  30. }
  31. }
  32. func (events *analyticsEvents) addEvent(e analyticsEvent) {
  33. events.numSeen++
  34. if len(events.events) < cap(events.events) {
  35. events.events = append(events.events, e)
  36. if len(events.events) == cap(events.events) {
  37. // Delay heap initialization so that we can have
  38. // deterministic ordering for integration tests (the max
  39. // is not being reached).
  40. heap.Init(events.events)
  41. }
  42. return
  43. }
  44. if e.priority.isLowerPriority((events.events)[0].priority) {
  45. return
  46. }
  47. events.events[0] = e
  48. heap.Fix(events.events, 0)
  49. }
  50. func (events *analyticsEvents) mergeFailed(other *analyticsEvents) {
  51. fails := other.failedHarvests + 1
  52. if fails >= failedEventsAttemptsLimit {
  53. return
  54. }
  55. events.failedHarvests = fails
  56. events.Merge(other)
  57. }
  58. func (events *analyticsEvents) Merge(other *analyticsEvents) {
  59. allSeen := events.numSeen + other.numSeen
  60. for _, e := range other.events {
  61. events.addEvent(e)
  62. }
  63. events.numSeen = allSeen
  64. }
  65. func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error) {
  66. if 0 == events.numSeen {
  67. return nil, nil
  68. }
  69. estimate := 256 * len(events.events)
  70. buf := bytes.NewBuffer(make([]byte, 0, estimate))
  71. buf.WriteByte('[')
  72. jsonx.AppendString(buf, agentRunID)
  73. buf.WriteByte(',')
  74. buf.WriteByte('{')
  75. buf.WriteString(`"reservoir_size":`)
  76. jsonx.AppendUint(buf, uint64(cap(events.events)))
  77. buf.WriteByte(',')
  78. buf.WriteString(`"events_seen":`)
  79. jsonx.AppendUint(buf, uint64(events.numSeen))
  80. buf.WriteByte('}')
  81. buf.WriteByte(',')
  82. buf.WriteByte('[')
  83. for i, e := range events.events {
  84. if i > 0 {
  85. buf.WriteByte(',')
  86. }
  87. e.WriteJSON(buf)
  88. }
  89. buf.WriteByte(']')
  90. buf.WriteByte(']')
  91. return buf.Bytes(), nil
  92. }
  93. // split splits the events into two. NOTE! The two event pools are not valid
  94. // priority queues, and should only be used to create JSON, not for adding any
  95. // events.
  96. func (events *analyticsEvents) split() (*analyticsEvents, *analyticsEvents) {
  97. // numSeen is conserved: e1.numSeen + e2.numSeen == events.numSeen.
  98. e1 := &analyticsEvents{
  99. numSeen: len(events.events) / 2,
  100. events: make([]analyticsEvent, len(events.events)/2),
  101. failedHarvests: events.failedHarvests,
  102. }
  103. e2 := &analyticsEvents{
  104. numSeen: events.numSeen - e1.numSeen,
  105. events: make([]analyticsEvent, len(events.events)-len(e1.events)),
  106. failedHarvests: events.failedHarvests,
  107. }
  108. // Note that slicing is not used to ensure that length == capacity for
  109. // e1.events and e2.events.
  110. copy(e1.events, events.events)
  111. copy(e2.events, events.events[len(events.events)/2:])
  112. return e1, e2
  113. }