span_events.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package internal
  2. import (
  3. "bytes"
  4. "time"
  5. )
  6. // https://source.datanerd.us/agents/agent-specs/blob/master/Span-Events.md
  7. type spanCategory string
  8. const (
  9. spanCategoryHTTP spanCategory = "http"
  10. spanCategoryDatastore = "datastore"
  11. spanCategoryGeneric = "generic"
  12. )
  13. // SpanEvent represents a span event, necessary to support Distributed Tracing.
  14. type SpanEvent struct {
  15. TraceID string
  16. GUID string
  17. ParentID string
  18. TransactionID string
  19. Sampled bool
  20. Priority Priority
  21. Timestamp time.Time
  22. Duration time.Duration
  23. Name string
  24. Category spanCategory
  25. IsEntrypoint bool
  26. DatastoreExtras *spanDatastoreExtras
  27. ExternalExtras *spanExternalExtras
  28. }
  29. type spanDatastoreExtras struct {
  30. Component string
  31. Statement string
  32. Instance string
  33. Address string
  34. Hostname string
  35. }
  36. type spanExternalExtras struct {
  37. URL string
  38. Method string
  39. Component string
  40. }
  41. // WriteJSON prepares JSON in the format expected by the collector.
  42. func (e *SpanEvent) WriteJSON(buf *bytes.Buffer) {
  43. w := jsonFieldsWriter{buf: buf}
  44. buf.WriteByte('[')
  45. buf.WriteByte('{')
  46. w.stringField("type", "Span")
  47. w.stringField("traceId", e.TraceID)
  48. w.stringField("guid", e.GUID)
  49. if "" != e.ParentID {
  50. w.stringField("parentId", e.ParentID)
  51. }
  52. w.stringField("transactionId", e.TransactionID)
  53. w.boolField("sampled", e.Sampled)
  54. w.writerField("priority", e.Priority)
  55. w.intField("timestamp", e.Timestamp.UnixNano()/(1000*1000)) // in milliseconds
  56. w.floatField("duration", e.Duration.Seconds())
  57. w.stringField("name", e.Name)
  58. w.stringField("category", string(e.Category))
  59. if e.IsEntrypoint {
  60. w.boolField("nr.entryPoint", true)
  61. }
  62. if ex := e.DatastoreExtras; nil != ex {
  63. if "" != ex.Component {
  64. w.stringField("component", ex.Component)
  65. }
  66. if "" != ex.Statement {
  67. w.stringField("db.statement", ex.Statement)
  68. }
  69. if "" != ex.Instance {
  70. w.stringField("db.instance", ex.Instance)
  71. }
  72. if "" != ex.Address {
  73. w.stringField("peer.address", ex.Address)
  74. }
  75. if "" != ex.Hostname {
  76. w.stringField("peer.hostname", ex.Hostname)
  77. }
  78. w.stringField("span.kind", "client")
  79. }
  80. if ex := e.ExternalExtras; nil != ex {
  81. if "" != ex.URL {
  82. w.stringField("http.url", ex.URL)
  83. }
  84. if "" != ex.Method {
  85. w.stringField("http.method", ex.Method)
  86. }
  87. w.stringField("span.kind", "client")
  88. w.stringField("component", "http")
  89. }
  90. buf.WriteByte('}')
  91. buf.WriteByte(',')
  92. buf.WriteByte('{')
  93. buf.WriteByte('}')
  94. buf.WriteByte(',')
  95. buf.WriteByte('{')
  96. buf.WriteByte('}')
  97. buf.WriteByte(']')
  98. }
  99. // MarshalJSON is used for testing.
  100. func (e *SpanEvent) MarshalJSON() ([]byte, error) {
  101. buf := bytes.NewBuffer(make([]byte, 0, 256))
  102. e.WriteJSON(buf)
  103. return buf.Bytes(), nil
  104. }
  105. type spanEvents struct {
  106. events *analyticsEvents
  107. }
  108. func newSpanEvents(max int) *spanEvents {
  109. return &spanEvents{
  110. events: newAnalyticsEvents(max),
  111. }
  112. }
  113. func (events *spanEvents) addEvent(e *SpanEvent, cat *BetterCAT) {
  114. e.TraceID = cat.TraceID()
  115. e.TransactionID = cat.ID
  116. e.Sampled = cat.Sampled
  117. e.Priority = cat.Priority
  118. events.events.addEvent(analyticsEvent{priority: cat.Priority, jsonWriter: e})
  119. }
  120. // MergeFromTransaction merges the span events from a transaction into the
  121. // harvest's span events. This should only be called if the transaction was
  122. // sampled and span events are enabled.
  123. func (events *spanEvents) MergeFromTransaction(txndata *TxnData) {
  124. root := &SpanEvent{
  125. GUID: txndata.getRootSpanID(),
  126. Timestamp: txndata.Start,
  127. Duration: txndata.Duration,
  128. Name: txndata.FinalName,
  129. Category: spanCategoryGeneric,
  130. IsEntrypoint: true,
  131. }
  132. if nil != txndata.BetterCAT.Inbound {
  133. root.ParentID = txndata.BetterCAT.Inbound.ID
  134. }
  135. events.addEvent(root, &txndata.BetterCAT)
  136. for _, evt := range txndata.spanEvents {
  137. events.addEvent(evt, &txndata.BetterCAT)
  138. }
  139. }
  140. func (events *spanEvents) MergeIntoHarvest(h *Harvest) {
  141. h.SpanEvents.events.mergeFailed(events.events)
  142. }
  143. func (events *spanEvents) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
  144. return events.events.CollectorJSON(agentRunID)
  145. }
  146. func (events *spanEvents) numSeen() float64 { return events.events.NumSeen() }
  147. func (events *spanEvents) numSaved() float64 { return events.events.NumSaved() }
  148. func (events *spanEvents) EndpointMethod() string {
  149. return cmdSpanEvents
  150. }