harvest.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package internal
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. )
  7. // Harvestable is something that can be merged into a Harvest.
  8. type Harvestable interface {
  9. MergeIntoHarvest(h *Harvest)
  10. }
  11. // Harvest contains collected data.
  12. type Harvest struct {
  13. Metrics *metricTable
  14. CustomEvents *customEvents
  15. TxnEvents *txnEvents
  16. ErrorEvents *errorEvents
  17. ErrorTraces harvestErrors
  18. TxnTraces *harvestTraces
  19. SlowSQLs *slowQueries
  20. SpanEvents *spanEvents
  21. }
  22. const (
  23. // txnEventPayloadlimit is the maximum number of events that should be
  24. // sent up in one post.
  25. txnEventPayloadlimit = 5000
  26. )
  27. // Payloads returns a map from expected collector method name to data type.
  28. func (h *Harvest) Payloads(splitLargeTxnEvents bool) []PayloadCreator {
  29. ps := []PayloadCreator{
  30. h.Metrics,
  31. h.CustomEvents,
  32. h.ErrorEvents,
  33. h.ErrorTraces,
  34. h.TxnTraces,
  35. h.SlowSQLs,
  36. h.SpanEvents,
  37. }
  38. if splitLargeTxnEvents {
  39. ps = append(ps, h.TxnEvents.payloads(txnEventPayloadlimit)...)
  40. } else {
  41. ps = append(ps, h.TxnEvents)
  42. }
  43. return ps
  44. }
  45. // NewHarvest returns a new Harvest.
  46. func NewHarvest(now time.Time) *Harvest {
  47. return &Harvest{
  48. Metrics: newMetricTable(maxMetrics, now),
  49. CustomEvents: newCustomEvents(maxCustomEvents),
  50. TxnEvents: newTxnEvents(maxTxnEvents),
  51. ErrorEvents: newErrorEvents(maxErrorEvents),
  52. ErrorTraces: newHarvestErrors(maxHarvestErrors),
  53. TxnTraces: newHarvestTraces(),
  54. SlowSQLs: newSlowQueries(maxHarvestSlowSQLs),
  55. SpanEvents: newSpanEvents(maxSpanEvents),
  56. }
  57. }
  58. var (
  59. trackMutex sync.Mutex
  60. trackMetrics []string
  61. )
  62. // TrackUsage helps track which integration packages are used.
  63. func TrackUsage(s ...string) {
  64. trackMutex.Lock()
  65. defer trackMutex.Unlock()
  66. m := "Supportability/" + strings.Join(s, "/")
  67. trackMetrics = append(trackMetrics, m)
  68. }
  69. func createTrackUsageMetrics(metrics *metricTable) {
  70. trackMutex.Lock()
  71. defer trackMutex.Unlock()
  72. for _, m := range trackMetrics {
  73. metrics.addSingleCount(m, forced)
  74. }
  75. }
  76. // CreateFinalMetrics creates extra metrics at harvest time.
  77. func (h *Harvest) CreateFinalMetrics() {
  78. h.Metrics.addSingleCount(instanceReporting, forced)
  79. h.Metrics.addCount(customEventsSeen, h.CustomEvents.numSeen(), forced)
  80. h.Metrics.addCount(customEventsSent, h.CustomEvents.numSaved(), forced)
  81. h.Metrics.addCount(txnEventsSeen, h.TxnEvents.numSeen(), forced)
  82. h.Metrics.addCount(txnEventsSent, h.TxnEvents.numSaved(), forced)
  83. h.Metrics.addCount(errorEventsSeen, h.ErrorEvents.numSeen(), forced)
  84. h.Metrics.addCount(errorEventsSent, h.ErrorEvents.numSaved(), forced)
  85. h.Metrics.addCount(spanEventsSeen, h.SpanEvents.numSeen(), forced)
  86. h.Metrics.addCount(spanEventsSent, h.SpanEvents.numSaved(), forced)
  87. if h.Metrics.numDropped > 0 {
  88. h.Metrics.addCount(supportabilityDropped, float64(h.Metrics.numDropped), forced)
  89. }
  90. createTrackUsageMetrics(h.Metrics)
  91. }
  92. // PayloadCreator is a data type in the harvest.
  93. type PayloadCreator interface {
  94. // In the event of a rpm request failure (hopefully simply an
  95. // intermittent collector issue) the payload may be merged into the next
  96. // time period's harvest.
  97. Harvestable
  98. // Data prepares JSON in the format expected by the collector endpoint.
  99. // This method should return (nil, nil) if the payload is empty and no
  100. // rpm request is necessary.
  101. Data(agentRunID string, harvestStart time.Time) ([]byte, error)
  102. // EndpointMethod is used for the "method" query parameter when posting
  103. // the data.
  104. EndpointMethod() string
  105. }
  106. func supportMetric(metrics *metricTable, b bool, metricName string) {
  107. if b {
  108. metrics.addSingleCount(metricName, forced)
  109. }
  110. }
  111. // CreateTxnMetrics creates metrics for a transaction.
  112. func CreateTxnMetrics(args *TxnData, metrics *metricTable) {
  113. // Duration Metrics
  114. rollup := backgroundRollup
  115. if args.IsWeb {
  116. rollup = webRollup
  117. metrics.addDuration(dispatcherMetric, "", args.Duration, 0, forced)
  118. }
  119. metrics.addDuration(args.FinalName, "", args.Duration, args.Exclusive, forced)
  120. metrics.addDuration(rollup, "", args.Duration, args.Exclusive, forced)
  121. // Better CAT Metrics
  122. if cat := args.BetterCAT; cat.Enabled {
  123. caller := callerUnknown
  124. if nil != cat.Inbound {
  125. caller = cat.Inbound.payloadCaller
  126. }
  127. m := durationByCallerMetric(caller)
  128. metrics.addDuration(m.all, "", args.Duration, args.Duration, unforced)
  129. metrics.addDuration(m.webOrOther(args.IsWeb), "", args.Duration, args.Duration, unforced)
  130. // Transport Duration Metric
  131. if nil != cat.Inbound {
  132. d := cat.Inbound.TransportDuration
  133. m = transportDurationMetric(caller)
  134. metrics.addDuration(m.all, "", d, d, unforced)
  135. metrics.addDuration(m.webOrOther(args.IsWeb), "", d, d, unforced)
  136. }
  137. // CAT Error Metrics
  138. if args.HasErrors() {
  139. m = errorsByCallerMetric(caller)
  140. metrics.addSingleCount(m.all, unforced)
  141. metrics.addSingleCount(m.webOrOther(args.IsWeb), unforced)
  142. }
  143. supportMetric(metrics, args.AcceptPayloadSuccess, supportTracingAcceptSuccess)
  144. supportMetric(metrics, args.AcceptPayloadException, supportTracingAcceptException)
  145. supportMetric(metrics, args.AcceptPayloadParseException, supportTracingAcceptParseException)
  146. supportMetric(metrics, args.AcceptPayloadCreateBeforeAccept, supportTracingCreateBeforeAccept)
  147. supportMetric(metrics, args.AcceptPayloadIgnoredMultiple, supportTracingIgnoredMultiple)
  148. supportMetric(metrics, args.AcceptPayloadIgnoredVersion, supportTracingIgnoredVersion)
  149. supportMetric(metrics, args.AcceptPayloadUntrustedAccount, supportTracingAcceptUntrustedAccount)
  150. supportMetric(metrics, args.AcceptPayloadNullPayload, supportTracingAcceptNull)
  151. supportMetric(metrics, args.CreatePayloadSuccess, supportTracingCreatePayloadSuccess)
  152. supportMetric(metrics, args.CreatePayloadException, supportTracingCreatePayloadException)
  153. }
  154. // Apdex Metrics
  155. if args.Zone != ApdexNone {
  156. metrics.addApdex(apdexRollup, "", args.ApdexThreshold, args.Zone, forced)
  157. mname := apdexPrefix + removeFirstSegment(args.FinalName)
  158. metrics.addApdex(mname, "", args.ApdexThreshold, args.Zone, unforced)
  159. }
  160. // Error Metrics
  161. if args.HasErrors() {
  162. metrics.addSingleCount(errorsRollupMetric.all, forced)
  163. metrics.addSingleCount(errorsRollupMetric.webOrOther(args.IsWeb), forced)
  164. metrics.addSingleCount(errorsPrefix+args.FinalName, forced)
  165. }
  166. // Queueing Metrics
  167. if args.Queuing > 0 {
  168. metrics.addDuration(queueMetric, "", args.Queuing, args.Queuing, forced)
  169. }
  170. }