| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- package internal
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "regexp"
- "time"
- "github.com/newrelic/go-agent/internal/logger"
- )
- const (
- procotolVersion = "17"
- userAgentPrefix = "NewRelic-Go-Agent/"
- // Methods used in collector communication.
- cmdPreconnect = "preconnect"
- cmdConnect = "connect"
- cmdMetrics = "metric_data"
- cmdCustomEvents = "custom_event_data"
- cmdTxnEvents = "analytic_event_data"
- cmdErrorEvents = "error_event_data"
- cmdErrorData = "error_data"
- cmdTxnTraces = "transaction_sample_data"
- cmdSlowSQLs = "sql_trace_data"
- cmdSpanEvents = "span_event_data"
- )
- // RpmCmd contains fields specific to an individual call made to RPM.
- type RpmCmd struct {
- Name string
- Collector string
- RunID string
- Data []byte
- RequestHeadersMap map[string]string
- }
- // RpmControls contains fields which will be the same for all calls made
- // by the same application.
- type RpmControls struct {
- License string
- Client *http.Client
- Logger logger.Logger
- AgentVersion string
- }
- // RPMResponse contains a NR endpoint response.
- //
- // Agent Behavior Summary:
- //
- // on connect/preconnect:
- // 410 means shutdown
- // 200, 202 mean success (start run)
- // all other response codes and errors mean try after backoff
- //
- // on harvest:
- // 410 means shutdown
- // 401, 409 mean restart run
- // 408, 429, 500, 503 mean save data for next harvest
- // all other response codes and errors discard the data and continue the current harvest
- type RPMResponse struct {
- statusCode int
- body []byte
- // Err indicates whether or not the call was successful: newRPMResponse
- // should be used to avoid mismatch between statusCode and Err.
- Err error
- disconnectSecurityPolicy bool
- }
- func newRPMResponse(statusCode int) RPMResponse {
- var err error
- if statusCode != 200 && statusCode != 202 {
- err = fmt.Errorf("response code: %d", statusCode)
- }
- return RPMResponse{statusCode: statusCode, Err: err}
- }
- // IsDisconnect indicates that the agent should disconnect.
- func (resp RPMResponse) IsDisconnect() bool {
- return resp.statusCode == 410 || resp.disconnectSecurityPolicy
- }
- // IsRestartException indicates that the agent should restart.
- func (resp RPMResponse) IsRestartException() bool {
- return resp.statusCode == 401 ||
- resp.statusCode == 409
- }
- // ShouldSaveHarvestData indicates that the agent should save the data and try
- // to send it in the next harvest.
- func (resp RPMResponse) ShouldSaveHarvestData() bool {
- switch resp.statusCode {
- case 408, 429, 500, 503:
- return true
- default:
- return false
- }
- }
- func rpmURL(cmd RpmCmd, cs RpmControls) string {
- var u url.URL
- u.Host = cmd.Collector
- u.Path = "agent_listener/invoke_raw_method"
- u.Scheme = "https"
- query := url.Values{}
- query.Set("marshal_format", "json")
- query.Set("protocol_version", procotolVersion)
- query.Set("method", cmd.Name)
- query.Set("license_key", cs.License)
- if len(cmd.RunID) > 0 {
- query.Set("run_id", cmd.RunID)
- }
- u.RawQuery = query.Encode()
- return u.String()
- }
- func collectorRequestInternal(url string, cmd RpmCmd, cs RpmControls) RPMResponse {
- deflated, err := compress(cmd.Data)
- if nil != err {
- return RPMResponse{Err: err}
- }
- req, err := http.NewRequest("POST", url, deflated)
- if nil != err {
- return RPMResponse{Err: err}
- }
- req.Header.Add("Accept-Encoding", "identity, deflate")
- req.Header.Add("Content-Type", "application/octet-stream")
- req.Header.Add("User-Agent", userAgentPrefix+cs.AgentVersion)
- req.Header.Add("Content-Encoding", "deflate")
- for k, v := range cmd.RequestHeadersMap {
- req.Header.Add(k, v)
- }
- resp, err := cs.Client.Do(req)
- if err != nil {
- return RPMResponse{Err: err}
- }
- defer resp.Body.Close()
- r := newRPMResponse(resp.StatusCode)
- // Read the entire response, rather than using resp.Body as input to json.NewDecoder to
- // avoid the issue described here:
- // https://github.com/google/go-github/pull/317
- // https://ahmetalpbalkan.com/blog/golang-json-decoder-pitfalls/
- // Also, collector JSON responses are expected to be quite small.
- body, err := ioutil.ReadAll(resp.Body)
- if nil == r.Err {
- r.Err = err
- }
- r.body = body
- return r
- }
- // CollectorRequest makes a request to New Relic.
- func CollectorRequest(cmd RpmCmd, cs RpmControls) RPMResponse {
- url := rpmURL(cmd, cs)
- if cs.Logger.DebugEnabled() {
- cs.Logger.Debug("rpm request", map[string]interface{}{
- "command": cmd.Name,
- "url": url,
- "payload": JSONString(cmd.Data),
- })
- }
- resp := collectorRequestInternal(url, cmd, cs)
- if cs.Logger.DebugEnabled() {
- if err := resp.Err; err != nil {
- cs.Logger.Debug("rpm failure", map[string]interface{}{
- "command": cmd.Name,
- "url": url,
- "response": string(resp.body), // Body might not be JSON on failure.
- "error": err.Error(),
- })
- } else {
- cs.Logger.Debug("rpm response", map[string]interface{}{
- "command": cmd.Name,
- "url": url,
- "response": JSONString(resp.body),
- })
- }
- }
- return resp
- }
- const (
- // NEW_RELIC_HOST can be used to override the New Relic endpoint. This
- // is useful for testing.
- envHost = "NEW_RELIC_HOST"
- )
- var (
- preconnectHostOverride = os.Getenv(envHost)
- preconnectHostDefault = "collector.newrelic.com"
- preconnectRegionLicenseRegex = regexp.MustCompile(`(^.+?)x`)
- )
- func calculatePreconnectHost(license, overrideHost string) string {
- if "" != overrideHost {
- return overrideHost
- }
- m := preconnectRegionLicenseRegex.FindStringSubmatch(license)
- if len(m) > 1 {
- return "collector." + m[1] + ".nr-data.net"
- }
- return preconnectHostDefault
- }
- // ConnectJSONCreator allows the creation of the connect payload JSON to be
- // deferred until the SecurityPolicies are acquired and vetted.
- type ConnectJSONCreator interface {
- CreateConnectJSON(*SecurityPolicies) ([]byte, error)
- }
- type preconnectRequest struct {
- SecurityPoliciesToken string `json:"security_policies_token,omitempty"`
- }
- // ConnectAttempt tries to connect an application.
- func ConnectAttempt(config ConnectJSONCreator, securityPoliciesToken string, cs RpmControls) (*ConnectReply, RPMResponse) {
- preconnectData, err := json.Marshal([]preconnectRequest{
- {SecurityPoliciesToken: securityPoliciesToken},
- })
- if nil != err {
- return nil, RPMResponse{Err: fmt.Errorf("unable to marshal preconnect data: %v", err)}
- }
- call := RpmCmd{
- Name: cmdPreconnect,
- Collector: calculatePreconnectHost(cs.License, preconnectHostOverride),
- Data: preconnectData,
- }
- resp := CollectorRequest(call, cs)
- if nil != resp.Err {
- return nil, resp
- }
- var preconnect struct {
- Preconnect PreconnectReply `json:"return_value"`
- }
- err = json.Unmarshal(resp.body, &preconnect)
- if nil != err {
- // Certain security policy errors must be treated as a disconnect.
- return nil, RPMResponse{
- Err: fmt.Errorf("unable to process preconnect reply: %v", err),
- disconnectSecurityPolicy: isDisconnectSecurityPolicyError(err),
- }
- }
- js, err := config.CreateConnectJSON(preconnect.Preconnect.SecurityPolicies.PointerIfPopulated())
- if nil != err {
- return nil, RPMResponse{Err: fmt.Errorf("unable to create connect data: %v", err)}
- }
- call.Collector = preconnect.Preconnect.Collector
- call.Data = js
- call.Name = cmdConnect
- resp = CollectorRequest(call, cs)
- if nil != resp.Err {
- return nil, resp
- }
- var reply struct {
- Reply *ConnectReply `json:"return_value"`
- }
- reply.Reply = ConnectReplyDefaults()
- err = json.Unmarshal(resp.body, &reply)
- if nil != err {
- return nil, RPMResponse{Err: fmt.Errorf("unable to parse connect reply: %v", err)}
- }
- // Note: This should never happen. It would mean the collector
- // response is malformed. This exists merely as extra defensiveness.
- if "" == reply.Reply.RunID {
- return nil, RPMResponse{Err: errors.New("connect reply missing agent run id")}
- }
- reply.Reply.PreconnectReply = preconnect.Preconnect
- reply.Reply.AdaptiveSampler = newAdaptiveSampler(adaptiveSamplerInput{
- Period: time.Duration(reply.Reply.SamplingTargetPeriodInSeconds) * time.Second,
- Target: reply.Reply.SamplingTarget,
- }, time.Now())
- return reply.Reply, resp
- }
|