txn_events.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package internal
  2. import (
  3. "bytes"
  4. "sort"
  5. "strings"
  6. "time"
  7. )
  8. // DatastoreExternalTotals contains overview of external and datastore calls
  9. // made during a transaction.
  10. type DatastoreExternalTotals struct {
  11. externalCallCount uint64
  12. externalDuration time.Duration
  13. datastoreCallCount uint64
  14. datastoreDuration time.Duration
  15. }
  16. // WriteJSON prepares JSON in the format expected by the collector.
  17. func (e *TxnEvent) WriteJSON(buf *bytes.Buffer) {
  18. w := jsonFieldsWriter{buf: buf}
  19. buf.WriteByte('[')
  20. buf.WriteByte('{')
  21. w.stringField("type", "Transaction")
  22. w.stringField("name", e.FinalName)
  23. w.floatField("timestamp", timeToFloatSeconds(e.Start))
  24. if ApdexNone != e.Zone {
  25. w.stringField("nr.apdexPerfZone", e.Zone.label())
  26. }
  27. w.boolField("error", e.HasError)
  28. sharedTransactionIntrinsics(e, &w)
  29. // Write better CAT intrinsics if enabled
  30. sharedBetterCATIntrinsics(e, &w)
  31. if e.BetterCAT.Enabled {
  32. if p := e.BetterCAT.Inbound; nil != p {
  33. if "" != p.TransactionID {
  34. w.stringField("parentId", p.TransactionID)
  35. }
  36. if "" != p.ID {
  37. w.stringField("parentSpanId", p.ID)
  38. }
  39. }
  40. }
  41. // Write old CAT intrinsics if enabled
  42. oldCATIntrinsics(e, &w)
  43. buf.WriteByte('}')
  44. buf.WriteByte(',')
  45. userAttributesJSON(e.Attrs, buf, destTxnEvent, nil)
  46. buf.WriteByte(',')
  47. agentAttributesJSON(e.Attrs, buf, destTxnEvent)
  48. buf.WriteByte(']')
  49. }
  50. // oldCATIntrinsics reports old CAT intrinsics for Transaction
  51. // if CrossProcess.Used() is true
  52. func oldCATIntrinsics(e *TxnEvent, w *jsonFieldsWriter) {
  53. if !e.CrossProcess.Used() {
  54. return
  55. }
  56. if e.CrossProcess.ClientID != "" {
  57. w.stringField("client_cross_process_id", e.CrossProcess.ClientID)
  58. }
  59. if e.CrossProcess.TripID != "" {
  60. w.stringField("nr.tripId", e.CrossProcess.TripID)
  61. }
  62. if e.CrossProcess.PathHash != "" {
  63. w.stringField("nr.pathHash", e.CrossProcess.PathHash)
  64. }
  65. if e.CrossProcess.ReferringPathHash != "" {
  66. w.stringField("nr.referringPathHash", e.CrossProcess.ReferringPathHash)
  67. }
  68. if e.CrossProcess.GUID != "" {
  69. w.stringField("nr.guid", e.CrossProcess.GUID)
  70. }
  71. if e.CrossProcess.ReferringTxnGUID != "" {
  72. w.stringField("nr.referringTransactionGuid", e.CrossProcess.ReferringTxnGUID)
  73. }
  74. if len(e.CrossProcess.AlternatePathHashes) > 0 {
  75. hashes := make([]string, 0, len(e.CrossProcess.AlternatePathHashes))
  76. for hash := range e.CrossProcess.AlternatePathHashes {
  77. hashes = append(hashes, hash)
  78. }
  79. sort.Strings(hashes)
  80. w.stringField("nr.alternatePathHashes", strings.Join(hashes, ","))
  81. }
  82. }
  83. // sharedTransactionIntrinsics reports intrinsics that are shared
  84. // by Transaction and TransactionError
  85. func sharedTransactionIntrinsics(e *TxnEvent, w *jsonFieldsWriter) {
  86. w.floatField("duration", e.Duration.Seconds())
  87. if e.Queuing > 0 {
  88. w.floatField("queueDuration", e.Queuing.Seconds())
  89. }
  90. if e.externalCallCount > 0 {
  91. w.intField("externalCallCount", int64(e.externalCallCount))
  92. w.floatField("externalDuration", e.externalDuration.Seconds())
  93. }
  94. if e.datastoreCallCount > 0 {
  95. // Note that "database" is used for the keys here instead of
  96. // "datastore" for historical reasons.
  97. w.intField("databaseCallCount", int64(e.datastoreCallCount))
  98. w.floatField("databaseDuration", e.datastoreDuration.Seconds())
  99. }
  100. if e.CrossProcess.IsSynthetics() {
  101. w.stringField("nr.syntheticsResourceId", e.CrossProcess.Synthetics.ResourceID)
  102. w.stringField("nr.syntheticsJobId", e.CrossProcess.Synthetics.JobID)
  103. w.stringField("nr.syntheticsMonitorId", e.CrossProcess.Synthetics.MonitorID)
  104. }
  105. }
  106. // sharedBetterCATIntrinsics reports intrinsics that are shared
  107. // by Transaction, TransactionError, and Slow SQL
  108. func sharedBetterCATIntrinsics(e *TxnEvent, w *jsonFieldsWriter) {
  109. if e.BetterCAT.Enabled {
  110. if p := e.BetterCAT.Inbound; nil != p {
  111. w.stringField("parent.type", p.Type)
  112. w.stringField("parent.app", p.App)
  113. w.stringField("parent.account", p.Account)
  114. w.stringField("parent.transportType", p.TransportType)
  115. w.floatField("parent.transportDuration", p.TransportDuration.Seconds())
  116. }
  117. w.stringField("guid", e.BetterCAT.ID)
  118. w.stringField("traceId", e.BetterCAT.TraceID())
  119. w.writerField("priority", e.BetterCAT.Priority)
  120. w.boolField("sampled", e.BetterCAT.Sampled)
  121. }
  122. }
  123. // MarshalJSON is used for testing.
  124. func (e *TxnEvent) MarshalJSON() ([]byte, error) {
  125. buf := bytes.NewBuffer(make([]byte, 0, 256))
  126. e.WriteJSON(buf)
  127. return buf.Bytes(), nil
  128. }
  129. type txnEvents struct {
  130. events *analyticsEvents
  131. }
  132. func newTxnEvents(max int) *txnEvents {
  133. return &txnEvents{
  134. events: newAnalyticsEvents(max),
  135. }
  136. }
  137. func (events *txnEvents) AddTxnEvent(e *TxnEvent, priority Priority) {
  138. // Synthetics events always get priority: normal event priorities are in the
  139. // range [0.0,1.99999], so adding 2 means that a Synthetics event will always
  140. // win.
  141. if e.CrossProcess.IsSynthetics() {
  142. priority += 2.0
  143. }
  144. events.events.addEvent(analyticsEvent{priority: priority, jsonWriter: e})
  145. }
  146. func (events *txnEvents) MergeIntoHarvest(h *Harvest) {
  147. h.TxnEvents.events.mergeFailed(events.events)
  148. }
  149. func (events *txnEvents) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
  150. return events.events.CollectorJSON(agentRunID)
  151. }
  152. func (events *txnEvents) numSeen() float64 { return events.events.NumSeen() }
  153. func (events *txnEvents) numSaved() float64 { return events.events.NumSaved() }
  154. func (events *txnEvents) EndpointMethod() string {
  155. return cmdTxnEvents
  156. }
  157. func (events *txnEvents) payloads(limit int) []PayloadCreator {
  158. if events.numSaved() < float64(limit) {
  159. return []PayloadCreator{events}
  160. }
  161. e1, e2 := events.events.split()
  162. return []PayloadCreator{
  163. &txnEvents{events: e1},
  164. &txnEvents{events: e2},
  165. }
  166. }