| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package internal
- import (
- "bytes"
- "container/heap"
- "github.com/newrelic/go-agent/internal/jsonx"
- )
- type analyticsEvent struct {
- priority Priority
- jsonWriter
- }
- type analyticsEventHeap []analyticsEvent
- type analyticsEvents struct {
- numSeen int
- events analyticsEventHeap
- failedHarvests int
- }
- func (events *analyticsEvents) NumSeen() float64 { return float64(events.numSeen) }
- func (events *analyticsEvents) NumSaved() float64 { return float64(len(events.events)) }
- func (h analyticsEventHeap) Len() int { return len(h) }
- func (h analyticsEventHeap) Less(i, j int) bool { return h[i].priority.isLowerPriority(h[j].priority) }
- func (h analyticsEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
- // Push and Pop are unused: only heap.Init and heap.Fix are used.
- func (h analyticsEventHeap) Push(x interface{}) {}
- func (h analyticsEventHeap) Pop() interface{} { return nil }
- func newAnalyticsEvents(max int) *analyticsEvents {
- return &analyticsEvents{
- numSeen: 0,
- events: make(analyticsEventHeap, 0, max),
- failedHarvests: 0,
- }
- }
- func (events *analyticsEvents) addEvent(e analyticsEvent) {
- events.numSeen++
- if len(events.events) < cap(events.events) {
- events.events = append(events.events, e)
- if len(events.events) == cap(events.events) {
- // Delay heap initialization so that we can have
- // deterministic ordering for integration tests (the max
- // is not being reached).
- heap.Init(events.events)
- }
- return
- }
- if e.priority.isLowerPriority((events.events)[0].priority) {
- return
- }
- events.events[0] = e
- heap.Fix(events.events, 0)
- }
- func (events *analyticsEvents) mergeFailed(other *analyticsEvents) {
- fails := other.failedHarvests + 1
- if fails >= failedEventsAttemptsLimit {
- return
- }
- events.failedHarvests = fails
- events.Merge(other)
- }
- func (events *analyticsEvents) Merge(other *analyticsEvents) {
- allSeen := events.numSeen + other.numSeen
- for _, e := range other.events {
- events.addEvent(e)
- }
- events.numSeen = allSeen
- }
- func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error) {
- if 0 == events.numSeen {
- return nil, nil
- }
- estimate := 256 * len(events.events)
- buf := bytes.NewBuffer(make([]byte, 0, estimate))
- buf.WriteByte('[')
- jsonx.AppendString(buf, agentRunID)
- buf.WriteByte(',')
- buf.WriteByte('{')
- buf.WriteString(`"reservoir_size":`)
- jsonx.AppendUint(buf, uint64(cap(events.events)))
- buf.WriteByte(',')
- buf.WriteString(`"events_seen":`)
- jsonx.AppendUint(buf, uint64(events.numSeen))
- buf.WriteByte('}')
- buf.WriteByte(',')
- buf.WriteByte('[')
- for i, e := range events.events {
- if i > 0 {
- buf.WriteByte(',')
- }
- e.WriteJSON(buf)
- }
- buf.WriteByte(']')
- buf.WriteByte(']')
- return buf.Bytes(), nil
- }
- // split splits the events into two. NOTE! The two event pools are not valid
- // priority queues, and should only be used to create JSON, not for adding any
- // events.
- func (events *analyticsEvents) split() (*analyticsEvents, *analyticsEvents) {
- // numSeen is conserved: e1.numSeen + e2.numSeen == events.numSeen.
- e1 := &analyticsEvents{
- numSeen: len(events.events) / 2,
- events: make([]analyticsEvent, len(events.events)/2),
- failedHarvests: events.failedHarvests,
- }
- e2 := &analyticsEvents{
- numSeen: events.numSeen - e1.numSeen,
- events: make([]analyticsEvent, len(events.events)-len(e1.events)),
- failedHarvests: events.failedHarvests,
- }
- // Note that slicing is not used to ensure that length == capacity for
- // e1.events and e2.events.
- copy(e1.events, events.events)
- copy(e2.events, events.events[len(events.events)/2:])
- return e1, e2
- }
|