slow_queries.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package internal
  2. import (
  3. "bytes"
  4. "container/heap"
  5. "hash/fnv"
  6. "time"
  7. "github.com/newrelic/go-agent/internal/jsonx"
  8. )
  9. type queryParameters map[string]interface{}
  10. func vetQueryParameters(params map[string]interface{}) queryParameters {
  11. if nil == params {
  12. return nil
  13. }
  14. // Copying the parameters into a new map is safer than modifying the map
  15. // from the customer.
  16. vetted := make(map[string]interface{})
  17. for key, val := range params {
  18. val, err := ValidateUserAttribute(key, val)
  19. if nil != err {
  20. continue
  21. }
  22. vetted[key] = val
  23. }
  24. return queryParameters(vetted)
  25. }
  26. func (q queryParameters) WriteJSON(buf *bytes.Buffer) {
  27. buf.WriteByte('{')
  28. w := jsonFieldsWriter{buf: buf}
  29. for key, val := range q {
  30. writeAttributeValueJSON(&w, key, val)
  31. }
  32. buf.WriteByte('}')
  33. }
  34. // https://source.datanerd.us/agents/agent-specs/blob/master/Slow-SQLs-LEGACY.md
  35. // slowQueryInstance represents a single datastore call.
  36. type slowQueryInstance struct {
  37. // Fields populated right after the datastore segment finishes:
  38. Duration time.Duration
  39. DatastoreMetric string
  40. ParameterizedQuery string
  41. QueryParameters queryParameters
  42. Host string
  43. PortPathOrID string
  44. DatabaseName string
  45. StackTrace StackTrace
  46. TxnEvent
  47. }
  48. // Aggregation is performed to avoid reporting multiple slow queries with same
  49. // query string. Since some datastore segments may be below the slow query
  50. // threshold, the aggregation fields Count, Total, and Min should be taken with
  51. // a grain of salt.
  52. type slowQuery struct {
  53. Count int32 // number of times the query has been observed
  54. Total time.Duration // cummulative duration
  55. Min time.Duration // minimum observed duration
  56. // When Count > 1, slowQueryInstance contains values from the slowest
  57. // observation.
  58. slowQueryInstance
  59. }
  60. type slowQueries struct {
  61. priorityQueue []*slowQuery
  62. // lookup maps query strings to indices in the priorityQueue
  63. lookup map[string]int
  64. }
  65. func (slows *slowQueries) Len() int {
  66. return len(slows.priorityQueue)
  67. }
  68. func (slows *slowQueries) Less(i, j int) bool {
  69. pq := slows.priorityQueue
  70. return pq[i].Duration < pq[j].Duration
  71. }
  72. func (slows *slowQueries) Swap(i, j int) {
  73. pq := slows.priorityQueue
  74. si := pq[i]
  75. sj := pq[j]
  76. pq[i], pq[j] = pq[j], pq[i]
  77. slows.lookup[si.ParameterizedQuery] = j
  78. slows.lookup[sj.ParameterizedQuery] = i
  79. }
  80. // Push and Pop are unused: only heap.Init and heap.Fix are used.
  81. func (slows *slowQueries) Push(x interface{}) {}
  82. func (slows *slowQueries) Pop() interface{} { return nil }
  83. func newSlowQueries(max int) *slowQueries {
  84. return &slowQueries{
  85. lookup: make(map[string]int, max),
  86. priorityQueue: make([]*slowQuery, 0, max),
  87. }
  88. }
  89. // Merge is used to merge slow queries from the transaction into the harvest.
  90. func (slows *slowQueries) Merge(other *slowQueries, txnEvent TxnEvent) {
  91. for _, s := range other.priorityQueue {
  92. cp := *s
  93. cp.TxnEvent = txnEvent
  94. slows.observe(cp)
  95. }
  96. }
  97. // merge aggregates the observations from two slow queries with the same Query.
  98. func (slow *slowQuery) merge(other slowQuery) {
  99. slow.Count += other.Count
  100. slow.Total += other.Total
  101. if other.Min < slow.Min {
  102. slow.Min = other.Min
  103. }
  104. if other.Duration > slow.Duration {
  105. slow.slowQueryInstance = other.slowQueryInstance
  106. }
  107. }
  108. func (slows *slowQueries) observeInstance(slow slowQueryInstance) {
  109. slows.observe(slowQuery{
  110. Count: 1,
  111. Total: slow.Duration,
  112. Min: slow.Duration,
  113. slowQueryInstance: slow,
  114. })
  115. }
  116. func (slows *slowQueries) insertAtIndex(slow slowQuery, idx int) {
  117. cpy := new(slowQuery)
  118. *cpy = slow
  119. slows.priorityQueue[idx] = cpy
  120. slows.lookup[slow.ParameterizedQuery] = idx
  121. heap.Fix(slows, idx)
  122. }
  123. func (slows *slowQueries) observe(slow slowQuery) {
  124. // Has the query has previously been observed?
  125. if idx, ok := slows.lookup[slow.ParameterizedQuery]; ok {
  126. slows.priorityQueue[idx].merge(slow)
  127. heap.Fix(slows, idx)
  128. return
  129. }
  130. // Has the collection reached max capacity?
  131. if len(slows.priorityQueue) < cap(slows.priorityQueue) {
  132. idx := len(slows.priorityQueue)
  133. slows.priorityQueue = slows.priorityQueue[0 : idx+1]
  134. slows.insertAtIndex(slow, idx)
  135. return
  136. }
  137. // Is this query slower than the existing fastest?
  138. fastest := slows.priorityQueue[0]
  139. if slow.Duration > fastest.Duration {
  140. delete(slows.lookup, fastest.ParameterizedQuery)
  141. slows.insertAtIndex(slow, 0)
  142. return
  143. }
  144. }
  145. // The third element of the slow query JSON should be a hash of the query
  146. // string. This hash may be used by backend services to aggregate queries which
  147. // have the have the same query string. It is unknown if this actually used.
  148. func makeSlowQueryID(query string) uint32 {
  149. h := fnv.New32a()
  150. h.Write([]byte(query))
  151. return h.Sum32()
  152. }
  153. func (slow *slowQuery) WriteJSON(buf *bytes.Buffer) {
  154. buf.WriteByte('[')
  155. jsonx.AppendString(buf, slow.TxnEvent.FinalName)
  156. buf.WriteByte(',')
  157. jsonx.AppendString(buf, slow.TxnEvent.CleanURL)
  158. buf.WriteByte(',')
  159. jsonx.AppendInt(buf, int64(makeSlowQueryID(slow.ParameterizedQuery)))
  160. buf.WriteByte(',')
  161. jsonx.AppendString(buf, slow.ParameterizedQuery)
  162. buf.WriteByte(',')
  163. jsonx.AppendString(buf, slow.DatastoreMetric)
  164. buf.WriteByte(',')
  165. jsonx.AppendInt(buf, int64(slow.Count))
  166. buf.WriteByte(',')
  167. jsonx.AppendFloat(buf, slow.Total.Seconds()*1000.0)
  168. buf.WriteByte(',')
  169. jsonx.AppendFloat(buf, slow.Min.Seconds()*1000.0)
  170. buf.WriteByte(',')
  171. jsonx.AppendFloat(buf, slow.Duration.Seconds()*1000.0)
  172. buf.WriteByte(',')
  173. w := jsonFieldsWriter{buf: buf}
  174. buf.WriteByte('{')
  175. if "" != slow.Host {
  176. w.stringField("host", slow.Host)
  177. }
  178. if "" != slow.PortPathOrID {
  179. w.stringField("port_path_or_id", slow.PortPathOrID)
  180. }
  181. if "" != slow.DatabaseName {
  182. w.stringField("database_name", slow.DatabaseName)
  183. }
  184. if nil != slow.StackTrace {
  185. w.writerField("backtrace", slow.StackTrace)
  186. }
  187. if nil != slow.QueryParameters {
  188. w.writerField("query_parameters", slow.QueryParameters)
  189. }
  190. sharedBetterCATIntrinsics(&slow.TxnEvent, &w)
  191. buf.WriteByte('}')
  192. buf.WriteByte(']')
  193. }
  194. // WriteJSON marshals the collection of slow queries into JSON according to the
  195. // schema expected by the collector.
  196. //
  197. // Note: This JSON does not contain the agentRunID. This is for unknown
  198. // historical reasons. Since the agentRunID is included in the url,
  199. // its use in the other commands' JSON is redundant (although required).
  200. func (slows *slowQueries) WriteJSON(buf *bytes.Buffer) {
  201. buf.WriteByte('[')
  202. buf.WriteByte('[')
  203. for idx, s := range slows.priorityQueue {
  204. if idx > 0 {
  205. buf.WriteByte(',')
  206. }
  207. s.WriteJSON(buf)
  208. }
  209. buf.WriteByte(']')
  210. buf.WriteByte(']')
  211. }
  212. func (slows *slowQueries) Data(agentRunID string, harvestStart time.Time) ([]byte, error) {
  213. if 0 == len(slows.priorityQueue) {
  214. return nil, nil
  215. }
  216. estimate := 1024 * len(slows.priorityQueue)
  217. buf := bytes.NewBuffer(make([]byte, 0, estimate))
  218. slows.WriteJSON(buf)
  219. return buf.Bytes(), nil
  220. }
  221. func (slows *slowQueries) MergeIntoHarvest(newHarvest *Harvest) {
  222. }
  223. func (slows *slowQueries) EndpointMethod() string {
  224. return cmdSlowSQLs
  225. }