collector.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package internal
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "time"
  12. "github.com/newrelic/go-agent/internal/logger"
  13. )
  14. const (
  15. procotolVersion = "17"
  16. userAgentPrefix = "NewRelic-Go-Agent/"
  17. // Methods used in collector communication.
  18. cmdPreconnect = "preconnect"
  19. cmdConnect = "connect"
  20. cmdMetrics = "metric_data"
  21. cmdCustomEvents = "custom_event_data"
  22. cmdTxnEvents = "analytic_event_data"
  23. cmdErrorEvents = "error_event_data"
  24. cmdErrorData = "error_data"
  25. cmdTxnTraces = "transaction_sample_data"
  26. cmdSlowSQLs = "sql_trace_data"
  27. cmdSpanEvents = "span_event_data"
  28. )
  29. // RpmCmd contains fields specific to an individual call made to RPM.
  30. type RpmCmd struct {
  31. Name string
  32. Collector string
  33. RunID string
  34. Data []byte
  35. RequestHeadersMap map[string]string
  36. }
  37. // RpmControls contains fields which will be the same for all calls made
  38. // by the same application.
  39. type RpmControls struct {
  40. License string
  41. Client *http.Client
  42. Logger logger.Logger
  43. AgentVersion string
  44. }
  45. // RPMResponse contains a NR endpoint response.
  46. //
  47. // Agent Behavior Summary:
  48. //
  49. // on connect/preconnect:
  50. // 410 means shutdown
  51. // 200, 202 mean success (start run)
  52. // all other response codes and errors mean try after backoff
  53. //
  54. // on harvest:
  55. // 410 means shutdown
  56. // 401, 409 mean restart run
  57. // 408, 429, 500, 503 mean save data for next harvest
  58. // all other response codes and errors discard the data and continue the current harvest
  59. type RPMResponse struct {
  60. statusCode int
  61. body []byte
  62. // Err indicates whether or not the call was successful: newRPMResponse
  63. // should be used to avoid mismatch between statusCode and Err.
  64. Err error
  65. disconnectSecurityPolicy bool
  66. }
  67. func newRPMResponse(statusCode int) RPMResponse {
  68. var err error
  69. if statusCode != 200 && statusCode != 202 {
  70. err = fmt.Errorf("response code: %d", statusCode)
  71. }
  72. return RPMResponse{statusCode: statusCode, Err: err}
  73. }
  74. // IsDisconnect indicates that the agent should disconnect.
  75. func (resp RPMResponse) IsDisconnect() bool {
  76. return resp.statusCode == 410 || resp.disconnectSecurityPolicy
  77. }
  78. // IsRestartException indicates that the agent should restart.
  79. func (resp RPMResponse) IsRestartException() bool {
  80. return resp.statusCode == 401 ||
  81. resp.statusCode == 409
  82. }
  83. // ShouldSaveHarvestData indicates that the agent should save the data and try
  84. // to send it in the next harvest.
  85. func (resp RPMResponse) ShouldSaveHarvestData() bool {
  86. switch resp.statusCode {
  87. case 408, 429, 500, 503:
  88. return true
  89. default:
  90. return false
  91. }
  92. }
  93. func rpmURL(cmd RpmCmd, cs RpmControls) string {
  94. var u url.URL
  95. u.Host = cmd.Collector
  96. u.Path = "agent_listener/invoke_raw_method"
  97. u.Scheme = "https"
  98. query := url.Values{}
  99. query.Set("marshal_format", "json")
  100. query.Set("protocol_version", procotolVersion)
  101. query.Set("method", cmd.Name)
  102. query.Set("license_key", cs.License)
  103. if len(cmd.RunID) > 0 {
  104. query.Set("run_id", cmd.RunID)
  105. }
  106. u.RawQuery = query.Encode()
  107. return u.String()
  108. }
  109. func collectorRequestInternal(url string, cmd RpmCmd, cs RpmControls) RPMResponse {
  110. deflated, err := compress(cmd.Data)
  111. if nil != err {
  112. return RPMResponse{Err: err}
  113. }
  114. req, err := http.NewRequest("POST", url, deflated)
  115. if nil != err {
  116. return RPMResponse{Err: err}
  117. }
  118. req.Header.Add("Accept-Encoding", "identity, deflate")
  119. req.Header.Add("Content-Type", "application/octet-stream")
  120. req.Header.Add("User-Agent", userAgentPrefix+cs.AgentVersion)
  121. req.Header.Add("Content-Encoding", "deflate")
  122. for k, v := range cmd.RequestHeadersMap {
  123. req.Header.Add(k, v)
  124. }
  125. resp, err := cs.Client.Do(req)
  126. if err != nil {
  127. return RPMResponse{Err: err}
  128. }
  129. defer resp.Body.Close()
  130. r := newRPMResponse(resp.StatusCode)
  131. // Read the entire response, rather than using resp.Body as input to json.NewDecoder to
  132. // avoid the issue described here:
  133. // https://github.com/google/go-github/pull/317
  134. // https://ahmetalpbalkan.com/blog/golang-json-decoder-pitfalls/
  135. // Also, collector JSON responses are expected to be quite small.
  136. body, err := ioutil.ReadAll(resp.Body)
  137. if nil == r.Err {
  138. r.Err = err
  139. }
  140. r.body = body
  141. return r
  142. }
  143. // CollectorRequest makes a request to New Relic.
  144. func CollectorRequest(cmd RpmCmd, cs RpmControls) RPMResponse {
  145. url := rpmURL(cmd, cs)
  146. if cs.Logger.DebugEnabled() {
  147. cs.Logger.Debug("rpm request", map[string]interface{}{
  148. "command": cmd.Name,
  149. "url": url,
  150. "payload": JSONString(cmd.Data),
  151. })
  152. }
  153. resp := collectorRequestInternal(url, cmd, cs)
  154. if cs.Logger.DebugEnabled() {
  155. if err := resp.Err; err != nil {
  156. cs.Logger.Debug("rpm failure", map[string]interface{}{
  157. "command": cmd.Name,
  158. "url": url,
  159. "response": string(resp.body), // Body might not be JSON on failure.
  160. "error": err.Error(),
  161. })
  162. } else {
  163. cs.Logger.Debug("rpm response", map[string]interface{}{
  164. "command": cmd.Name,
  165. "url": url,
  166. "response": JSONString(resp.body),
  167. })
  168. }
  169. }
  170. return resp
  171. }
  172. const (
  173. // NEW_RELIC_HOST can be used to override the New Relic endpoint. This
  174. // is useful for testing.
  175. envHost = "NEW_RELIC_HOST"
  176. )
  177. var (
  178. preconnectHostOverride = os.Getenv(envHost)
  179. preconnectHostDefault = "collector.newrelic.com"
  180. preconnectRegionLicenseRegex = regexp.MustCompile(`(^.+?)x`)
  181. )
  182. func calculatePreconnectHost(license, overrideHost string) string {
  183. if "" != overrideHost {
  184. return overrideHost
  185. }
  186. m := preconnectRegionLicenseRegex.FindStringSubmatch(license)
  187. if len(m) > 1 {
  188. return "collector." + m[1] + ".nr-data.net"
  189. }
  190. return preconnectHostDefault
  191. }
  192. // ConnectJSONCreator allows the creation of the connect payload JSON to be
  193. // deferred until the SecurityPolicies are acquired and vetted.
  194. type ConnectJSONCreator interface {
  195. CreateConnectJSON(*SecurityPolicies) ([]byte, error)
  196. }
  197. type preconnectRequest struct {
  198. SecurityPoliciesToken string `json:"security_policies_token,omitempty"`
  199. }
  200. // ConnectAttempt tries to connect an application.
  201. func ConnectAttempt(config ConnectJSONCreator, securityPoliciesToken string, cs RpmControls) (*ConnectReply, RPMResponse) {
  202. preconnectData, err := json.Marshal([]preconnectRequest{
  203. {SecurityPoliciesToken: securityPoliciesToken},
  204. })
  205. if nil != err {
  206. return nil, RPMResponse{Err: fmt.Errorf("unable to marshal preconnect data: %v", err)}
  207. }
  208. call := RpmCmd{
  209. Name: cmdPreconnect,
  210. Collector: calculatePreconnectHost(cs.License, preconnectHostOverride),
  211. Data: preconnectData,
  212. }
  213. resp := CollectorRequest(call, cs)
  214. if nil != resp.Err {
  215. return nil, resp
  216. }
  217. var preconnect struct {
  218. Preconnect PreconnectReply `json:"return_value"`
  219. }
  220. err = json.Unmarshal(resp.body, &preconnect)
  221. if nil != err {
  222. // Certain security policy errors must be treated as a disconnect.
  223. return nil, RPMResponse{
  224. Err: fmt.Errorf("unable to process preconnect reply: %v", err),
  225. disconnectSecurityPolicy: isDisconnectSecurityPolicyError(err),
  226. }
  227. }
  228. js, err := config.CreateConnectJSON(preconnect.Preconnect.SecurityPolicies.PointerIfPopulated())
  229. if nil != err {
  230. return nil, RPMResponse{Err: fmt.Errorf("unable to create connect data: %v", err)}
  231. }
  232. call.Collector = preconnect.Preconnect.Collector
  233. call.Data = js
  234. call.Name = cmdConnect
  235. resp = CollectorRequest(call, cs)
  236. if nil != resp.Err {
  237. return nil, resp
  238. }
  239. var reply struct {
  240. Reply *ConnectReply `json:"return_value"`
  241. }
  242. reply.Reply = ConnectReplyDefaults()
  243. err = json.Unmarshal(resp.body, &reply)
  244. if nil != err {
  245. return nil, RPMResponse{Err: fmt.Errorf("unable to parse connect reply: %v", err)}
  246. }
  247. // Note: This should never happen. It would mean the collector
  248. // response is malformed. This exists merely as extra defensiveness.
  249. if "" == reply.Reply.RunID {
  250. return nil, RPMResponse{Err: errors.New("connect reply missing agent run id")}
  251. }
  252. reply.Reply.PreconnectReply = preconnect.Preconnect
  253. reply.Reply.AdaptiveSampler = newAdaptiveSampler(adaptiveSamplerInput{
  254. Period: time.Duration(reply.Reply.SamplingTargetPeriodInSeconds) * time.Second,
  255. Target: reply.Reply.SamplingTarget,
  256. }, time.Now())
  257. return reply.Reply, resp
  258. }