txn_trace.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package internal
  2. import (
  3. "bytes"
  4. "container/heap"
  5. "encoding/json"
  6. "sort"
  7. "time"
  8. "github.com/newrelic/go-agent/internal/jsonx"
  9. )
  10. // See https://source.datanerd.us/agents/agent-specs/blob/master/Transaction-Trace-LEGACY.md
  11. type traceNodeHeap []traceNode
  12. // traceNodeParams is used for trace node parameters. A struct is used in place
  13. // of a map[string]interface{} to facilitate testing and reduce JSON Marshal
  14. // overhead. If too many fields get added here, it probably makes sense to
  15. // start using a map. This struct is not embedded into traceNode to minimize
  16. // the size of traceNode: Not all nodes will have parameters.
  17. type traceNodeParams struct {
  18. StackTrace StackTrace
  19. CleanURL string
  20. Database string
  21. Host string
  22. PortPathOrID string
  23. Query string
  24. TransactionGUID string
  25. queryParameters queryParameters
  26. }
  27. func (p *traceNodeParams) WriteJSON(buf *bytes.Buffer) {
  28. w := jsonFieldsWriter{buf: buf}
  29. buf.WriteByte('{')
  30. if nil != p.StackTrace {
  31. w.writerField("backtrace", p.StackTrace)
  32. }
  33. if "" != p.CleanURL {
  34. w.stringField("uri", p.CleanURL)
  35. }
  36. if "" != p.Database {
  37. w.stringField("database_name", p.Database)
  38. }
  39. if "" != p.Host {
  40. w.stringField("host", p.Host)
  41. }
  42. if "" != p.PortPathOrID {
  43. w.stringField("port_path_or_id", p.PortPathOrID)
  44. }
  45. if "" != p.Query {
  46. w.stringField("query", p.Query)
  47. }
  48. if "" != p.TransactionGUID {
  49. w.stringField("transaction_guid", p.TransactionGUID)
  50. }
  51. if nil != p.queryParameters {
  52. w.writerField("query_parameters", p.queryParameters)
  53. }
  54. buf.WriteByte('}')
  55. }
  56. // MarshalJSON is used for testing.
  57. func (p *traceNodeParams) MarshalJSON() ([]byte, error) {
  58. buf := &bytes.Buffer{}
  59. p.WriteJSON(buf)
  60. return buf.Bytes(), nil
  61. }
  62. type traceNode struct {
  63. start segmentTime
  64. stop segmentTime
  65. duration time.Duration
  66. params *traceNodeParams
  67. name string
  68. }
  69. func (h traceNodeHeap) Len() int { return len(h) }
  70. func (h traceNodeHeap) Less(i, j int) bool { return h[i].duration < h[j].duration }
  71. func (h traceNodeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  72. // Push and Pop are unused: only heap.Init and heap.Fix are used.
  73. func (h traceNodeHeap) Push(x interface{}) {}
  74. func (h traceNodeHeap) Pop() interface{} { return nil }
  75. // TxnTrace contains the work in progress transaction trace.
  76. type TxnTrace struct {
  77. Enabled bool
  78. SegmentThreshold time.Duration
  79. StackTraceThreshold time.Duration
  80. nodes traceNodeHeap
  81. maxNodes int
  82. }
  83. // getMaxNodes allows the maximum number of nodes to be overwritten for unit
  84. // tests.
  85. func (trace *TxnTrace) getMaxNodes() int {
  86. if 0 != trace.maxNodes {
  87. return trace.maxNodes
  88. }
  89. return maxTxnTraceNodes
  90. }
  91. // considerNode exists to prevent unnecessary calls to witnessNode: constructing
  92. // the metric name and params map requires allocations.
  93. func (trace *TxnTrace) considerNode(end segmentEnd) bool {
  94. return trace.Enabled && (end.duration >= trace.SegmentThreshold)
  95. }
  96. func (trace *TxnTrace) witnessNode(end segmentEnd, name string, params *traceNodeParams) {
  97. node := traceNode{
  98. start: end.start,
  99. stop: end.stop,
  100. duration: end.duration,
  101. name: name,
  102. params: params,
  103. }
  104. if !trace.considerNode(end) {
  105. return
  106. }
  107. if trace.nodes == nil {
  108. trace.nodes = make(traceNodeHeap, 0, startingTxnTraceNodes)
  109. }
  110. if end.exclusive >= trace.StackTraceThreshold {
  111. if node.params == nil {
  112. p := new(traceNodeParams)
  113. node.params = p
  114. }
  115. // skip the following stack frames:
  116. // this method
  117. // function in tracing.go (EndBasicSegment, EndExternalSegment, EndDatastoreSegment)
  118. // function in internal_txn.go (endSegment, endExternal, endDatastore)
  119. // segment end method
  120. skip := 4
  121. node.params.StackTrace = GetStackTrace(skip)
  122. }
  123. if max := trace.getMaxNodes(); len(trace.nodes) < max {
  124. trace.nodes = append(trace.nodes, node)
  125. if len(trace.nodes) == max {
  126. heap.Init(trace.nodes)
  127. }
  128. return
  129. }
  130. if node.duration <= trace.nodes[0].duration {
  131. return
  132. }
  133. trace.nodes[0] = node
  134. heap.Fix(trace.nodes, 0)
  135. }
  136. // HarvestTrace contains a finished transaction trace ready for serialization to
  137. // the collector.
  138. type HarvestTrace struct {
  139. TxnEvent
  140. Trace TxnTrace
  141. }
  142. type nodeDetails struct {
  143. name string
  144. relativeStart time.Duration
  145. relativeStop time.Duration
  146. params *traceNodeParams
  147. }
  148. func printNodeStart(buf *bytes.Buffer, n nodeDetails) {
  149. // time.Seconds() is intentionally not used here. Millisecond
  150. // precision is enough.
  151. relativeStartMillis := n.relativeStart.Nanoseconds() / (1000 * 1000)
  152. relativeStopMillis := n.relativeStop.Nanoseconds() / (1000 * 1000)
  153. buf.WriteByte('[')
  154. jsonx.AppendInt(buf, relativeStartMillis)
  155. buf.WriteByte(',')
  156. jsonx.AppendInt(buf, relativeStopMillis)
  157. buf.WriteByte(',')
  158. jsonx.AppendString(buf, n.name)
  159. buf.WriteByte(',')
  160. if nil == n.params {
  161. buf.WriteString("{}")
  162. } else {
  163. n.params.WriteJSON(buf)
  164. }
  165. buf.WriteByte(',')
  166. buf.WriteByte('[')
  167. }
  168. func printChildren(buf *bytes.Buffer, traceStart time.Time, nodes sortedTraceNodes, next int, stop segmentStamp) int {
  169. firstChild := true
  170. for next < len(nodes) && nodes[next].start.Stamp < stop {
  171. if firstChild {
  172. firstChild = false
  173. } else {
  174. buf.WriteByte(',')
  175. }
  176. printNodeStart(buf, nodeDetails{
  177. name: nodes[next].name,
  178. relativeStart: nodes[next].start.Time.Sub(traceStart),
  179. relativeStop: nodes[next].stop.Time.Sub(traceStart),
  180. params: nodes[next].params,
  181. })
  182. next = printChildren(buf, traceStart, nodes, next+1, nodes[next].stop.Stamp)
  183. buf.WriteString("]]")
  184. }
  185. return next
  186. }
  187. type sortedTraceNodes []*traceNode
  188. func (s sortedTraceNodes) Len() int { return len(s) }
  189. func (s sortedTraceNodes) Less(i, j int) bool { return s[i].start.Stamp < s[j].start.Stamp }
  190. func (s sortedTraceNodes) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  191. // MarshalJSON prepares the trace in the JSON expected by the collector.
  192. func (trace *HarvestTrace) MarshalJSON() ([]byte, error) {
  193. estimate := 100 * len(trace.Trace.nodes)
  194. buf := bytes.NewBuffer(make([]byte, 0, estimate))
  195. nodes := make(sortedTraceNodes, len(trace.Trace.nodes))
  196. for i := 0; i < len(nodes); i++ {
  197. nodes[i] = &trace.Trace.nodes[i]
  198. }
  199. sort.Sort(nodes)
  200. buf.WriteByte('[') // begin trace
  201. jsonx.AppendInt(buf, trace.Start.UnixNano()/1000)
  202. buf.WriteByte(',')
  203. jsonx.AppendFloat(buf, trace.Duration.Seconds()*1000.0)
  204. buf.WriteByte(',')
  205. jsonx.AppendString(buf, trace.FinalName)
  206. buf.WriteByte(',')
  207. jsonx.AppendString(buf, trace.CleanURL)
  208. buf.WriteByte(',')
  209. buf.WriteByte('[') // begin trace data
  210. // If the trace string pool is used, insert another array here.
  211. jsonx.AppendFloat(buf, 0.0) // unused timestamp
  212. buf.WriteByte(',') //
  213. buf.WriteString("{}") // unused: formerly request parameters
  214. buf.WriteByte(',') //
  215. buf.WriteString("{}") // unused: formerly custom parameters
  216. buf.WriteByte(',') //
  217. printNodeStart(buf, nodeDetails{ // begin outer root
  218. name: "ROOT",
  219. relativeStart: 0,
  220. relativeStop: trace.Duration,
  221. })
  222. printNodeStart(buf, nodeDetails{ // begin inner root
  223. name: trace.FinalName,
  224. relativeStart: 0,
  225. relativeStop: trace.Duration,
  226. })
  227. if len(nodes) > 0 {
  228. lastStopStamp := nodes[len(nodes)-1].stop.Stamp + 1
  229. printChildren(buf, trace.Start, nodes, 0, lastStopStamp)
  230. }
  231. buf.WriteString("]]") // end outer root
  232. buf.WriteString("]]") // end inner root
  233. buf.WriteByte(',')
  234. buf.WriteByte('{')
  235. buf.WriteString(`"agentAttributes":`)
  236. agentAttributesJSON(trace.Attrs, buf, destTxnTrace)
  237. buf.WriteByte(',')
  238. buf.WriteString(`"userAttributes":`)
  239. userAttributesJSON(trace.Attrs, buf, destTxnTrace, nil)
  240. buf.WriteByte(',')
  241. buf.WriteString(`"intrinsics":`)
  242. intrinsicsJSON(&trace.TxnEvent, buf)
  243. buf.WriteByte('}')
  244. // If the trace string pool is used, end another array here.
  245. buf.WriteByte(']') // end trace data
  246. buf.WriteByte(',')
  247. if trace.CrossProcess.Used() && trace.CrossProcess.GUID != "" {
  248. jsonx.AppendString(buf, trace.CrossProcess.GUID)
  249. } else {
  250. buf.WriteString(`""`)
  251. }
  252. buf.WriteByte(',') //
  253. buf.WriteString(`null`) // reserved for future use
  254. buf.WriteByte(',') //
  255. buf.WriteString(`false`) // ForcePersist is not yet supported
  256. buf.WriteByte(',') //
  257. buf.WriteString(`null`) // X-Ray sessions not supported
  258. buf.WriteByte(',') //
  259. // Synthetics are supported:
  260. if trace.CrossProcess.IsSynthetics() {
  261. jsonx.AppendString(buf, trace.CrossProcess.Synthetics.ResourceID)
  262. } else {
  263. buf.WriteString(`""`)
  264. }
  265. buf.WriteByte(']') // end trace
  266. return buf.Bytes(), nil
  267. }
  268. type txnTraceHeap []*HarvestTrace
  269. func (h *txnTraceHeap) isEmpty() bool {
  270. return 0 == len(*h)
  271. }
  272. func newTxnTraceHeap(max int) *txnTraceHeap {
  273. h := make(txnTraceHeap, 0, max)
  274. heap.Init(&h)
  275. return &h
  276. }
  277. // Implement sort.Interface.
  278. func (h txnTraceHeap) Len() int { return len(h) }
  279. func (h txnTraceHeap) Less(i, j int) bool { return h[i].Duration < h[j].Duration }
  280. func (h txnTraceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  281. // Implement heap.Interface.
  282. func (h *txnTraceHeap) Push(x interface{}) { *h = append(*h, x.(*HarvestTrace)) }
  283. func (h *txnTraceHeap) Pop() interface{} {
  284. old := *h
  285. n := len(old)
  286. x := old[n-1]
  287. *h = old[0 : n-1]
  288. return x
  289. }
  290. func (h *txnTraceHeap) isKeeper(t *HarvestTrace) bool {
  291. if len(*h) < cap(*h) {
  292. return true
  293. }
  294. return t.Duration >= (*h)[0].Duration
  295. }
  296. func (h *txnTraceHeap) addTxnTrace(t *HarvestTrace) {
  297. if len(*h) < cap(*h) {
  298. heap.Push(h, t)
  299. return
  300. }
  301. if t.Duration <= (*h)[0].Duration {
  302. return
  303. }
  304. heap.Pop(h)
  305. heap.Push(h, t)
  306. }
  307. type harvestTraces struct {
  308. regular *txnTraceHeap
  309. synthetics *txnTraceHeap
  310. }
  311. func newHarvestTraces() *harvestTraces {
  312. return &harvestTraces{
  313. regular: newTxnTraceHeap(maxRegularTraces),
  314. synthetics: newTxnTraceHeap(maxSyntheticsTraces),
  315. }
  316. }
  317. func (traces *harvestTraces) Len() int {
  318. return traces.regular.Len() + traces.synthetics.Len()
  319. }
  320. func (traces *harvestTraces) Witness(trace HarvestTrace) {
  321. traceHeap := traces.regular
  322. if trace.CrossProcess.IsSynthetics() {
  323. traceHeap = traces.synthetics
  324. }
  325. if traceHeap.isKeeper(&trace) {
  326. cpy := new(HarvestTrace)
  327. *cpy = trace
  328. traceHeap.addTxnTrace(cpy)
  329. }
  330. }
  331. func (traces *harvestTraces) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
  332. if traces.Len() == 0 {
  333. return nil, nil
  334. }
  335. return json.Marshal([]interface{}{
  336. agentRunID,
  337. traces.slice(),
  338. })
  339. }
  340. func (traces *harvestTraces) slice() []*HarvestTrace {
  341. out := make([]*HarvestTrace, 0, traces.Len())
  342. out = append(out, (*traces.regular)...)
  343. out = append(out, (*traces.synthetics)...)
  344. return out
  345. }
  346. func (traces *harvestTraces) MergeIntoHarvest(h *Harvest) {}
  347. func (traces *harvestTraces) EndpointMethod() string {
  348. return cmdTxnTraces
  349. }