| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- package internal
- import (
- "bytes"
- "container/heap"
- "encoding/json"
- "sort"
- "time"
- "github.com/newrelic/go-agent/internal/jsonx"
- )
- // See https://source.datanerd.us/agents/agent-specs/blob/master/Transaction-Trace-LEGACY.md
- type traceNodeHeap []traceNode
- // traceNodeParams is used for trace node parameters. A struct is used in place
- // of a map[string]interface{} to facilitate testing and reduce JSON Marshal
- // overhead. If too many fields get added here, it probably makes sense to
- // start using a map. This struct is not embedded into traceNode to minimize
- // the size of traceNode: Not all nodes will have parameters.
- type traceNodeParams struct {
- StackTrace StackTrace
- CleanURL string
- Database string
- Host string
- PortPathOrID string
- Query string
- TransactionGUID string
- queryParameters queryParameters
- }
- func (p *traceNodeParams) WriteJSON(buf *bytes.Buffer) {
- w := jsonFieldsWriter{buf: buf}
- buf.WriteByte('{')
- if nil != p.StackTrace {
- w.writerField("backtrace", p.StackTrace)
- }
- if "" != p.CleanURL {
- w.stringField("uri", p.CleanURL)
- }
- if "" != p.Database {
- w.stringField("database_name", p.Database)
- }
- if "" != p.Host {
- w.stringField("host", p.Host)
- }
- if "" != p.PortPathOrID {
- w.stringField("port_path_or_id", p.PortPathOrID)
- }
- if "" != p.Query {
- w.stringField("query", p.Query)
- }
- if "" != p.TransactionGUID {
- w.stringField("transaction_guid", p.TransactionGUID)
- }
- if nil != p.queryParameters {
- w.writerField("query_parameters", p.queryParameters)
- }
- buf.WriteByte('}')
- }
- // MarshalJSON is used for testing.
- func (p *traceNodeParams) MarshalJSON() ([]byte, error) {
- buf := &bytes.Buffer{}
- p.WriteJSON(buf)
- return buf.Bytes(), nil
- }
- type traceNode struct {
- start segmentTime
- stop segmentTime
- duration time.Duration
- params *traceNodeParams
- name string
- }
- func (h traceNodeHeap) Len() int { return len(h) }
- func (h traceNodeHeap) Less(i, j int) bool { return h[i].duration < h[j].duration }
- func (h traceNodeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
- // Push and Pop are unused: only heap.Init and heap.Fix are used.
- func (h traceNodeHeap) Push(x interface{}) {}
- func (h traceNodeHeap) Pop() interface{} { return nil }
- // TxnTrace contains the work in progress transaction trace.
- type TxnTrace struct {
- Enabled bool
- SegmentThreshold time.Duration
- StackTraceThreshold time.Duration
- nodes traceNodeHeap
- maxNodes int
- }
- // getMaxNodes allows the maximum number of nodes to be overwritten for unit
- // tests.
- func (trace *TxnTrace) getMaxNodes() int {
- if 0 != trace.maxNodes {
- return trace.maxNodes
- }
- return maxTxnTraceNodes
- }
- // considerNode exists to prevent unnecessary calls to witnessNode: constructing
- // the metric name and params map requires allocations.
- func (trace *TxnTrace) considerNode(end segmentEnd) bool {
- return trace.Enabled && (end.duration >= trace.SegmentThreshold)
- }
- func (trace *TxnTrace) witnessNode(end segmentEnd, name string, params *traceNodeParams) {
- node := traceNode{
- start: end.start,
- stop: end.stop,
- duration: end.duration,
- name: name,
- params: params,
- }
- if !trace.considerNode(end) {
- return
- }
- if trace.nodes == nil {
- trace.nodes = make(traceNodeHeap, 0, startingTxnTraceNodes)
- }
- if end.exclusive >= trace.StackTraceThreshold {
- if node.params == nil {
- p := new(traceNodeParams)
- node.params = p
- }
- // skip the following stack frames:
- // this method
- // function in tracing.go (EndBasicSegment, EndExternalSegment, EndDatastoreSegment)
- // function in internal_txn.go (endSegment, endExternal, endDatastore)
- // segment end method
- skip := 4
- node.params.StackTrace = GetStackTrace(skip)
- }
- if max := trace.getMaxNodes(); len(trace.nodes) < max {
- trace.nodes = append(trace.nodes, node)
- if len(trace.nodes) == max {
- heap.Init(trace.nodes)
- }
- return
- }
- if node.duration <= trace.nodes[0].duration {
- return
- }
- trace.nodes[0] = node
- heap.Fix(trace.nodes, 0)
- }
- // HarvestTrace contains a finished transaction trace ready for serialization to
- // the collector.
- type HarvestTrace struct {
- TxnEvent
- Trace TxnTrace
- }
- type nodeDetails struct {
- name string
- relativeStart time.Duration
- relativeStop time.Duration
- params *traceNodeParams
- }
- func printNodeStart(buf *bytes.Buffer, n nodeDetails) {
- // time.Seconds() is intentionally not used here. Millisecond
- // precision is enough.
- relativeStartMillis := n.relativeStart.Nanoseconds() / (1000 * 1000)
- relativeStopMillis := n.relativeStop.Nanoseconds() / (1000 * 1000)
- buf.WriteByte('[')
- jsonx.AppendInt(buf, relativeStartMillis)
- buf.WriteByte(',')
- jsonx.AppendInt(buf, relativeStopMillis)
- buf.WriteByte(',')
- jsonx.AppendString(buf, n.name)
- buf.WriteByte(',')
- if nil == n.params {
- buf.WriteString("{}")
- } else {
- n.params.WriteJSON(buf)
- }
- buf.WriteByte(',')
- buf.WriteByte('[')
- }
- func printChildren(buf *bytes.Buffer, traceStart time.Time, nodes sortedTraceNodes, next int, stop segmentStamp) int {
- firstChild := true
- for next < len(nodes) && nodes[next].start.Stamp < stop {
- if firstChild {
- firstChild = false
- } else {
- buf.WriteByte(',')
- }
- printNodeStart(buf, nodeDetails{
- name: nodes[next].name,
- relativeStart: nodes[next].start.Time.Sub(traceStart),
- relativeStop: nodes[next].stop.Time.Sub(traceStart),
- params: nodes[next].params,
- })
- next = printChildren(buf, traceStart, nodes, next+1, nodes[next].stop.Stamp)
- buf.WriteString("]]")
- }
- return next
- }
- type sortedTraceNodes []*traceNode
- func (s sortedTraceNodes) Len() int { return len(s) }
- func (s sortedTraceNodes) Less(i, j int) bool { return s[i].start.Stamp < s[j].start.Stamp }
- func (s sortedTraceNodes) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
- // MarshalJSON prepares the trace in the JSON expected by the collector.
- func (trace *HarvestTrace) MarshalJSON() ([]byte, error) {
- estimate := 100 * len(trace.Trace.nodes)
- buf := bytes.NewBuffer(make([]byte, 0, estimate))
- nodes := make(sortedTraceNodes, len(trace.Trace.nodes))
- for i := 0; i < len(nodes); i++ {
- nodes[i] = &trace.Trace.nodes[i]
- }
- sort.Sort(nodes)
- buf.WriteByte('[') // begin trace
- jsonx.AppendInt(buf, trace.Start.UnixNano()/1000)
- buf.WriteByte(',')
- jsonx.AppendFloat(buf, trace.Duration.Seconds()*1000.0)
- buf.WriteByte(',')
- jsonx.AppendString(buf, trace.FinalName)
- buf.WriteByte(',')
- jsonx.AppendString(buf, trace.CleanURL)
- buf.WriteByte(',')
- buf.WriteByte('[') // begin trace data
- // If the trace string pool is used, insert another array here.
- jsonx.AppendFloat(buf, 0.0) // unused timestamp
- buf.WriteByte(',') //
- buf.WriteString("{}") // unused: formerly request parameters
- buf.WriteByte(',') //
- buf.WriteString("{}") // unused: formerly custom parameters
- buf.WriteByte(',') //
- printNodeStart(buf, nodeDetails{ // begin outer root
- name: "ROOT",
- relativeStart: 0,
- relativeStop: trace.Duration,
- })
- printNodeStart(buf, nodeDetails{ // begin inner root
- name: trace.FinalName,
- relativeStart: 0,
- relativeStop: trace.Duration,
- })
- if len(nodes) > 0 {
- lastStopStamp := nodes[len(nodes)-1].stop.Stamp + 1
- printChildren(buf, trace.Start, nodes, 0, lastStopStamp)
- }
- buf.WriteString("]]") // end outer root
- buf.WriteString("]]") // end inner root
- buf.WriteByte(',')
- buf.WriteByte('{')
- buf.WriteString(`"agentAttributes":`)
- agentAttributesJSON(trace.Attrs, buf, destTxnTrace)
- buf.WriteByte(',')
- buf.WriteString(`"userAttributes":`)
- userAttributesJSON(trace.Attrs, buf, destTxnTrace, nil)
- buf.WriteByte(',')
- buf.WriteString(`"intrinsics":`)
- intrinsicsJSON(&trace.TxnEvent, buf)
- buf.WriteByte('}')
- // If the trace string pool is used, end another array here.
- buf.WriteByte(']') // end trace data
- buf.WriteByte(',')
- if trace.CrossProcess.Used() && trace.CrossProcess.GUID != "" {
- jsonx.AppendString(buf, trace.CrossProcess.GUID)
- } else {
- buf.WriteString(`""`)
- }
- buf.WriteByte(',') //
- buf.WriteString(`null`) // reserved for future use
- buf.WriteByte(',') //
- buf.WriteString(`false`) // ForcePersist is not yet supported
- buf.WriteByte(',') //
- buf.WriteString(`null`) // X-Ray sessions not supported
- buf.WriteByte(',') //
- // Synthetics are supported:
- if trace.CrossProcess.IsSynthetics() {
- jsonx.AppendString(buf, trace.CrossProcess.Synthetics.ResourceID)
- } else {
- buf.WriteString(`""`)
- }
- buf.WriteByte(']') // end trace
- return buf.Bytes(), nil
- }
- type txnTraceHeap []*HarvestTrace
- func (h *txnTraceHeap) isEmpty() bool {
- return 0 == len(*h)
- }
- func newTxnTraceHeap(max int) *txnTraceHeap {
- h := make(txnTraceHeap, 0, max)
- heap.Init(&h)
- return &h
- }
- // Implement sort.Interface.
- func (h txnTraceHeap) Len() int { return len(h) }
- func (h txnTraceHeap) Less(i, j int) bool { return h[i].Duration < h[j].Duration }
- func (h txnTraceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
- // Implement heap.Interface.
- func (h *txnTraceHeap) Push(x interface{}) { *h = append(*h, x.(*HarvestTrace)) }
- func (h *txnTraceHeap) Pop() interface{} {
- old := *h
- n := len(old)
- x := old[n-1]
- *h = old[0 : n-1]
- return x
- }
- func (h *txnTraceHeap) isKeeper(t *HarvestTrace) bool {
- if len(*h) < cap(*h) {
- return true
- }
- return t.Duration >= (*h)[0].Duration
- }
- func (h *txnTraceHeap) addTxnTrace(t *HarvestTrace) {
- if len(*h) < cap(*h) {
- heap.Push(h, t)
- return
- }
- if t.Duration <= (*h)[0].Duration {
- return
- }
- heap.Pop(h)
- heap.Push(h, t)
- }
- type harvestTraces struct {
- regular *txnTraceHeap
- synthetics *txnTraceHeap
- }
- func newHarvestTraces() *harvestTraces {
- return &harvestTraces{
- regular: newTxnTraceHeap(maxRegularTraces),
- synthetics: newTxnTraceHeap(maxSyntheticsTraces),
- }
- }
- func (traces *harvestTraces) Len() int {
- return traces.regular.Len() + traces.synthetics.Len()
- }
- func (traces *harvestTraces) Witness(trace HarvestTrace) {
- traceHeap := traces.regular
- if trace.CrossProcess.IsSynthetics() {
- traceHeap = traces.synthetics
- }
- if traceHeap.isKeeper(&trace) {
- cpy := new(HarvestTrace)
- *cpy = trace
- traceHeap.addTxnTrace(cpy)
- }
- }
- func (traces *harvestTraces) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
- if traces.Len() == 0 {
- return nil, nil
- }
- return json.Marshal([]interface{}{
- agentRunID,
- traces.slice(),
- })
- }
- func (traces *harvestTraces) slice() []*HarvestTrace {
- out := make([]*HarvestTrace, 0, traces.Len())
- out = append(out, (*traces.regular)...)
- out = append(out, (*traces.synthetics)...)
- return out
- }
- func (traces *harvestTraces) MergeIntoHarvest(h *Harvest) {}
- func (traces *harvestTraces) EndpointMethod() string {
- return cmdTxnTraces
- }
|