collector.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package internal
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "time"
  12. "github.com/newrelic/go-agent/internal/logger"
  13. )
  14. const (
  15. procotolVersion = "16"
  16. userAgentPrefix = "NewRelic-Go-Agent/"
  17. // Methods used in collector communication.
  18. cmdPreconnect = "preconnect"
  19. cmdConnect = "connect"
  20. cmdMetrics = "metric_data"
  21. cmdCustomEvents = "custom_event_data"
  22. cmdTxnEvents = "analytic_event_data"
  23. cmdErrorEvents = "error_event_data"
  24. cmdErrorData = "error_data"
  25. cmdTxnTraces = "transaction_sample_data"
  26. cmdSlowSQLs = "sql_trace_data"
  27. cmdSpanEvents = "span_event_data"
  28. )
  29. var (
  30. // ErrPayloadTooLarge is created in response to receiving a 413 response
  31. // code.
  32. ErrPayloadTooLarge = errors.New("payload too large")
  33. // ErrUnauthorized is created in response to receiving a 401 response code.
  34. ErrUnauthorized = errors.New("unauthorized")
  35. // ErrUnsupportedMedia is created in response to receiving a 415
  36. // response code.
  37. ErrUnsupportedMedia = errors.New("unsupported media")
  38. )
  39. // RpmCmd contains fields specific to an individual call made to RPM.
  40. type RpmCmd struct {
  41. Name string
  42. Collector string
  43. RunID string
  44. Data []byte
  45. }
  46. // RpmControls contains fields which will be the same for all calls made
  47. // by the same application.
  48. type RpmControls struct {
  49. License string
  50. Client *http.Client
  51. Logger logger.Logger
  52. AgentVersion string
  53. }
  54. func rpmURL(cmd RpmCmd, cs RpmControls) string {
  55. var u url.URL
  56. u.Host = cmd.Collector
  57. u.Path = "agent_listener/invoke_raw_method"
  58. u.Scheme = "https"
  59. query := url.Values{}
  60. query.Set("marshal_format", "json")
  61. query.Set("protocol_version", procotolVersion)
  62. query.Set("method", cmd.Name)
  63. query.Set("license_key", cs.License)
  64. if len(cmd.RunID) > 0 {
  65. query.Set("run_id", cmd.RunID)
  66. }
  67. u.RawQuery = query.Encode()
  68. return u.String()
  69. }
  70. type unexpectedStatusCodeErr struct {
  71. code int
  72. }
  73. func (e unexpectedStatusCodeErr) Error() string {
  74. return fmt.Sprintf("unexpected HTTP status code: %d", e.code)
  75. }
  76. func collectorRequestInternal(url string, data []byte, cs RpmControls) ([]byte, error) {
  77. deflated, err := compress(data)
  78. if nil != err {
  79. return nil, err
  80. }
  81. req, err := http.NewRequest("POST", url, deflated)
  82. if nil != err {
  83. return nil, err
  84. }
  85. req.Header.Add("Accept-Encoding", "identity, deflate")
  86. req.Header.Add("Content-Type", "application/octet-stream")
  87. req.Header.Add("User-Agent", userAgentPrefix+cs.AgentVersion)
  88. req.Header.Add("Content-Encoding", "deflate")
  89. resp, err := cs.Client.Do(req)
  90. if err != nil {
  91. return nil, err
  92. }
  93. defer resp.Body.Close()
  94. switch resp.StatusCode {
  95. case 200:
  96. // Nothing to do.
  97. case 401:
  98. return nil, ErrUnauthorized
  99. case 413:
  100. return nil, ErrPayloadTooLarge
  101. case 415:
  102. return nil, ErrUnsupportedMedia
  103. default:
  104. // If the response code is not 200, then the collector may not return
  105. // valid JSON.
  106. return nil, unexpectedStatusCodeErr{code: resp.StatusCode}
  107. }
  108. // Read the entire response, rather than using resp.Body as input to json.NewDecoder to
  109. // avoid the issue described here:
  110. // https://github.com/google/go-github/pull/317
  111. // https://ahmetalpbalkan.com/blog/golang-json-decoder-pitfalls/
  112. // Also, collector JSON responses are expected to be quite small.
  113. b, err := ioutil.ReadAll(resp.Body)
  114. if nil != err {
  115. return nil, err
  116. }
  117. return parseResponse(b)
  118. }
  119. // CollectorRequest makes a request to New Relic.
  120. func CollectorRequest(cmd RpmCmd, cs RpmControls) ([]byte, error) {
  121. url := rpmURL(cmd, cs)
  122. if cs.Logger.DebugEnabled() {
  123. cs.Logger.Debug("rpm request", map[string]interface{}{
  124. "command": cmd.Name,
  125. "url": url,
  126. "payload": JSONString(cmd.Data),
  127. })
  128. }
  129. resp, err := collectorRequestInternal(url, cmd.Data, cs)
  130. if err != nil {
  131. cs.Logger.Debug("rpm failure", map[string]interface{}{
  132. "command": cmd.Name,
  133. "url": url,
  134. "error": err.Error(),
  135. })
  136. }
  137. if cs.Logger.DebugEnabled() {
  138. cs.Logger.Debug("rpm response", map[string]interface{}{
  139. "command": cmd.Name,
  140. "url": url,
  141. "response": JSONString(resp),
  142. })
  143. }
  144. return resp, err
  145. }
  146. type rpmException struct {
  147. Message string `json:"message"`
  148. ErrorType string `json:"error_type"`
  149. }
  150. func (e *rpmException) Error() string {
  151. return fmt.Sprintf("%s: %s", e.ErrorType, e.Message)
  152. }
  153. func hasType(e error, expected string) bool {
  154. rpmErr, ok := e.(*rpmException)
  155. if !ok {
  156. return false
  157. }
  158. return rpmErr.ErrorType == expected
  159. }
  160. const (
  161. forceRestartType = "NewRelic::Agent::ForceRestartException"
  162. disconnectType = "NewRelic::Agent::ForceDisconnectException"
  163. licenseInvalidType = "NewRelic::Agent::LicenseException"
  164. runtimeType = "RuntimeError"
  165. )
  166. // IsRestartException indicates if the error was a restart exception.
  167. func IsRestartException(e error) bool { return hasType(e, forceRestartType) }
  168. // IsLicenseException indicates if the error was an invalid exception.
  169. func IsLicenseException(e error) bool { return hasType(e, licenseInvalidType) }
  170. // IsRuntime indicates if the error was a runtime exception.
  171. func IsRuntime(e error) bool { return hasType(e, runtimeType) }
  172. // IsDisconnect indicates if the error was a disconnect exception.
  173. func IsDisconnect(e error) bool {
  174. // Unrecognized or missing security policies should be treated as
  175. // disconnects.
  176. if _, ok := e.(errUnknownRequiredPolicy); ok {
  177. return true
  178. }
  179. if _, ok := e.(errUnsetPolicy); ok {
  180. return true
  181. }
  182. return hasType(e, disconnectType)
  183. }
  184. func parseResponse(b []byte) ([]byte, error) {
  185. var r struct {
  186. ReturnValue json.RawMessage `json:"return_value"`
  187. Exception *rpmException `json:"exception"`
  188. }
  189. err := json.Unmarshal(b, &r)
  190. if nil != err {
  191. return nil, err
  192. }
  193. if nil != r.Exception {
  194. return nil, r.Exception
  195. }
  196. return r.ReturnValue, nil
  197. }
  198. const (
  199. // NEW_RELIC_HOST can be used to override the New Relic endpoint. This
  200. // is useful for testing.
  201. envHost = "NEW_RELIC_HOST"
  202. )
  203. var (
  204. preconnectHostOverride = os.Getenv(envHost)
  205. preconnectHostDefault = "collector.newrelic.com"
  206. preconnectRegionLicenseRegex = regexp.MustCompile(`(^.+?)x`)
  207. )
  208. func calculatePreconnectHost(license, overrideHost string) string {
  209. if "" != overrideHost {
  210. return overrideHost
  211. }
  212. m := preconnectRegionLicenseRegex.FindStringSubmatch(license)
  213. if len(m) > 1 {
  214. return "collector." + m[1] + ".nr-data.net"
  215. }
  216. return preconnectHostDefault
  217. }
  218. // ConnectJSONCreator allows the creation of the connect payload JSON to be
  219. // deferred until the SecurityPolicies are acquired and vetted.
  220. type ConnectJSONCreator interface {
  221. CreateConnectJSON(*SecurityPolicies) ([]byte, error)
  222. }
  223. type preconnectRequest struct {
  224. SecurityPoliciesToken string `json:"security_policies_token,omitempty"`
  225. }
  226. // ConnectAttempt tries to connect an application.
  227. func ConnectAttempt(config ConnectJSONCreator, securityPoliciesToken string, cs RpmControls) (*ConnectReply, error) {
  228. preconnectData, err := json.Marshal([]preconnectRequest{
  229. preconnectRequest{SecurityPoliciesToken: securityPoliciesToken},
  230. })
  231. if nil != err {
  232. return nil, fmt.Errorf("unable to marshal preconnect data: %v", err)
  233. }
  234. call := RpmCmd{
  235. Name: cmdPreconnect,
  236. Collector: calculatePreconnectHost(cs.License, preconnectHostOverride),
  237. Data: preconnectData,
  238. }
  239. out, err := CollectorRequest(call, cs)
  240. if nil != err {
  241. // err is intentionally unmodified: We do not want to change
  242. // the type of these collector errors.
  243. return nil, err
  244. }
  245. var preconnect PreconnectReply
  246. err = json.Unmarshal(out, &preconnect)
  247. if nil != err {
  248. // Unknown policies detected during unmarshal should produce a
  249. // disconnect.
  250. if IsDisconnect(err) {
  251. return nil, err
  252. }
  253. return nil, fmt.Errorf("unable to parse preconnect reply: %v", err)
  254. }
  255. js, err := config.CreateConnectJSON(preconnect.SecurityPolicies.PointerIfPopulated())
  256. if nil != err {
  257. return nil, fmt.Errorf("unable to create connect data: %v", err)
  258. }
  259. call.Collector = preconnect.Collector
  260. call.Data = js
  261. call.Name = cmdConnect
  262. rawReply, err := CollectorRequest(call, cs)
  263. if nil != err {
  264. // err is intentionally unmodified: We do not want to change
  265. // the type of these collector errors.
  266. return nil, err
  267. }
  268. reply := ConnectReplyDefaults()
  269. err = json.Unmarshal(rawReply, reply)
  270. if nil != err {
  271. return nil, fmt.Errorf("unable to parse connect reply: %v", err)
  272. }
  273. // Note: This should never happen. It would mean the collector
  274. // response is malformed. This exists merely as extra defensiveness.
  275. if "" == reply.RunID {
  276. return nil, errors.New("connect reply missing agent run id")
  277. }
  278. reply.PreconnectReply = preconnect
  279. reply.AdaptiveSampler = newAdaptiveSampler(adaptiveSamplerInput{
  280. Period: time.Duration(reply.SamplingTargetPeriodInSeconds) * time.Second,
  281. Target: reply.SamplingTarget,
  282. }, time.Now())
  283. return reply, nil
  284. }