internal_app.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. package newrelic
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/newrelic/go-agent/internal"
  12. "github.com/newrelic/go-agent/internal/logger"
  13. )
  14. var (
  15. // NEW_RELIC_DEBUG_LOGGING can be set to anything to enable additional
  16. // debug logging: the agent will log every transaction's data at info
  17. // level.
  18. envDebugLogging = "NEW_RELIC_DEBUG_LOGGING"
  19. debugLogging = os.Getenv(envDebugLogging)
  20. )
  21. type dataConsumer interface {
  22. Consume(internal.AgentRunID, internal.Harvestable)
  23. }
  24. type appData struct {
  25. id internal.AgentRunID
  26. data internal.Harvestable
  27. }
  28. type app struct {
  29. config Config
  30. rpmControls internal.RpmControls
  31. testHarvest *internal.Harvest
  32. // placeholderRun is used when the application is not connected.
  33. placeholderRun *appRun
  34. // initiateShutdown is used to tell the processor to shutdown.
  35. initiateShutdown chan struct{}
  36. // shutdownStarted and shutdownComplete are closed by the processor
  37. // goroutine to indicate the shutdown status. Two channels are used so
  38. // that the call of app.Shutdown() can block until shutdown has
  39. // completed but other goroutines can exit when shutdown has started.
  40. // This is not just an optimization: This prevents a deadlock if
  41. // harvesting data during the shutdown fails and an attempt is made to
  42. // merge the data into the next harvest.
  43. shutdownStarted chan struct{}
  44. shutdownComplete chan struct{}
  45. // Sends to these channels should not occur without a <-shutdownStarted
  46. // select option to prevent deadlock.
  47. dataChan chan appData
  48. collectorErrorChan chan error
  49. connectChan chan *appRun
  50. harvestTicker *time.Ticker
  51. // This mutex protects both `run` and `err`, both of which should only
  52. // be accessed using getState and setState.
  53. sync.RWMutex
  54. // run is non-nil when the app is successfully connected. It is
  55. // immutable.
  56. run *appRun
  57. // err is non-nil if the application will never be connected again
  58. // (disconnect, license exception, shutdown).
  59. err error
  60. }
  61. // appRun contains information regarding a single connection session with the
  62. // collector. It is immutable after creation at application connect.
  63. type appRun struct {
  64. *internal.ConnectReply
  65. // AttributeConfig is calculated on every connect since it depends on
  66. // the security policies.
  67. AttributeConfig *internal.AttributeConfig
  68. }
  69. func newAppRun(config Config, reply *internal.ConnectReply) *appRun {
  70. return &appRun{
  71. ConnectReply: reply,
  72. AttributeConfig: internal.CreateAttributeConfig(internal.AttributeConfigInput{
  73. Attributes: convertAttributeDestinationConfig(config.Attributes),
  74. ErrorCollector: convertAttributeDestinationConfig(config.ErrorCollector.Attributes),
  75. TransactionEvents: convertAttributeDestinationConfig(config.TransactionEvents.Attributes),
  76. TransactionTracer: convertAttributeDestinationConfig(config.TransactionTracer.Attributes),
  77. }, reply.SecurityPolicies.AttributesInclude.Enabled()),
  78. }
  79. }
  80. func isFatalHarvestError(e error) bool {
  81. return internal.IsDisconnect(e) ||
  82. internal.IsLicenseException(e) ||
  83. internal.IsRestartException(e)
  84. }
  85. func shouldSaveFailedHarvest(e error) bool {
  86. if e == internal.ErrPayloadTooLarge || e == internal.ErrUnsupportedMedia {
  87. return false
  88. }
  89. return true
  90. }
  91. func (app *app) doHarvest(h *internal.Harvest, harvestStart time.Time, run *appRun) {
  92. h.CreateFinalMetrics()
  93. h.Metrics = h.Metrics.ApplyRules(run.MetricRules)
  94. payloads := h.Payloads(app.config.DistributedTracer.Enabled)
  95. for _, p := range payloads {
  96. cmd := p.EndpointMethod()
  97. data, err := p.Data(run.RunID.String(), harvestStart)
  98. if nil == data && nil == err {
  99. continue
  100. }
  101. if nil == err {
  102. call := internal.RpmCmd{
  103. Collector: run.Collector,
  104. RunID: run.RunID.String(),
  105. Name: cmd,
  106. Data: data,
  107. }
  108. // The reply from harvest calls is always unused.
  109. _, err = internal.CollectorRequest(call, app.rpmControls)
  110. }
  111. if nil == err {
  112. continue
  113. }
  114. if isFatalHarvestError(err) {
  115. select {
  116. case app.collectorErrorChan <- err:
  117. case <-app.shutdownStarted:
  118. }
  119. return
  120. }
  121. app.config.Logger.Warn("harvest failure", map[string]interface{}{
  122. "cmd": cmd,
  123. "error": err.Error(),
  124. })
  125. if shouldSaveFailedHarvest(err) {
  126. app.Consume(run.RunID, p)
  127. }
  128. }
  129. }
  130. func connectAttempt(app *app) (*appRun, error) {
  131. reply, err := internal.ConnectAttempt(config{app.config}, app.config.SecurityPoliciesToken, app.rpmControls)
  132. if nil != err {
  133. return nil, err
  134. }
  135. return newAppRun(app.config, reply), nil
  136. }
  137. func (app *app) connectRoutine() {
  138. for {
  139. run, err := connectAttempt(app)
  140. if nil == err {
  141. select {
  142. case app.connectChan <- run:
  143. case <-app.shutdownStarted:
  144. }
  145. return
  146. }
  147. if internal.IsDisconnect(err) || internal.IsLicenseException(err) {
  148. select {
  149. case app.collectorErrorChan <- err:
  150. case <-app.shutdownStarted:
  151. }
  152. return
  153. }
  154. app.config.Logger.Warn("application connect failure", map[string]interface{}{
  155. "error": err.Error(),
  156. })
  157. time.Sleep(internal.ConnectBackoff)
  158. }
  159. }
  160. func debug(data internal.Harvestable, lg Logger) {
  161. now := time.Now()
  162. h := internal.NewHarvest(now)
  163. data.MergeIntoHarvest(h)
  164. ps := h.Payloads(false)
  165. for _, p := range ps {
  166. cmd := p.EndpointMethod()
  167. d, err := p.Data("agent run id", now)
  168. if nil == d && nil == err {
  169. continue
  170. }
  171. if nil != err {
  172. lg.Info("integration", map[string]interface{}{
  173. "cmd": cmd,
  174. "error": err.Error(),
  175. })
  176. continue
  177. }
  178. lg.Info("integration", map[string]interface{}{
  179. "cmd": cmd,
  180. "data": internal.JSONString(d),
  181. })
  182. }
  183. }
  184. func processConnectMessages(run *appRun, lg Logger) {
  185. for _, msg := range run.Messages {
  186. event := "collector message"
  187. cn := map[string]interface{}{"msg": msg.Message}
  188. switch strings.ToLower(msg.Level) {
  189. case "error":
  190. lg.Error(event, cn)
  191. case "warn":
  192. lg.Warn(event, cn)
  193. case "info":
  194. lg.Info(event, cn)
  195. case "debug", "verbose":
  196. lg.Debug(event, cn)
  197. }
  198. }
  199. }
  200. func (app *app) process() {
  201. // Both the harvest and the run are non-nil when the app is connected,
  202. // and nil otherwise.
  203. var h *internal.Harvest
  204. var run *appRun
  205. for {
  206. select {
  207. case <-app.harvestTicker.C:
  208. if nil != run {
  209. now := time.Now()
  210. go app.doHarvest(h, now, run)
  211. h = internal.NewHarvest(now)
  212. }
  213. case d := <-app.dataChan:
  214. if nil != run && run.RunID == d.id {
  215. d.data.MergeIntoHarvest(h)
  216. }
  217. case <-app.initiateShutdown:
  218. close(app.shutdownStarted)
  219. // Remove the run before merging any final data to
  220. // ensure a bounded number of receives from dataChan.
  221. app.setState(nil, errors.New("application shut down"))
  222. app.harvestTicker.Stop()
  223. if nil != run {
  224. for done := false; !done; {
  225. select {
  226. case d := <-app.dataChan:
  227. if run.RunID == d.id {
  228. d.data.MergeIntoHarvest(h)
  229. }
  230. default:
  231. done = true
  232. }
  233. }
  234. app.doHarvest(h, time.Now(), run)
  235. }
  236. close(app.shutdownComplete)
  237. return
  238. case err := <-app.collectorErrorChan:
  239. run = nil
  240. h = nil
  241. app.setState(nil, nil)
  242. switch {
  243. case internal.IsDisconnect(err):
  244. app.setState(nil, err)
  245. app.config.Logger.Error("application disconnected", map[string]interface{}{
  246. "app": app.config.AppName,
  247. "err": err.Error(),
  248. })
  249. case internal.IsLicenseException(err):
  250. app.setState(nil, err)
  251. app.config.Logger.Error("invalid license", map[string]interface{}{
  252. "app": app.config.AppName,
  253. "license": app.config.License,
  254. })
  255. case internal.IsRestartException(err):
  256. app.config.Logger.Info("application restarted", map[string]interface{}{
  257. "app": app.config.AppName,
  258. })
  259. go app.connectRoutine()
  260. }
  261. case run = <-app.connectChan:
  262. h = internal.NewHarvest(time.Now())
  263. app.setState(run, nil)
  264. app.config.Logger.Info("application connected", map[string]interface{}{
  265. "app": app.config.AppName,
  266. "run": run.RunID.String(),
  267. })
  268. processConnectMessages(run, app.config.Logger)
  269. }
  270. }
  271. }
  272. func (app *app) Shutdown(timeout time.Duration) {
  273. if !app.config.Enabled {
  274. return
  275. }
  276. select {
  277. case app.initiateShutdown <- struct{}{}:
  278. default:
  279. }
  280. // Block until shutdown is done or timeout occurs.
  281. t := time.NewTimer(timeout)
  282. select {
  283. case <-app.shutdownComplete:
  284. case <-t.C:
  285. }
  286. t.Stop()
  287. app.config.Logger.Info("application shutdown", map[string]interface{}{
  288. "app": app.config.AppName,
  289. })
  290. }
  291. func convertAttributeDestinationConfig(c AttributeDestinationConfig) internal.AttributeDestinationConfig {
  292. return internal.AttributeDestinationConfig{
  293. Enabled: c.Enabled,
  294. Include: c.Include,
  295. Exclude: c.Exclude,
  296. }
  297. }
  298. func runSampler(app *app, period time.Duration) {
  299. previous := internal.GetSample(time.Now(), app.config.Logger)
  300. t := time.NewTicker(period)
  301. for {
  302. select {
  303. case now := <-t.C:
  304. current := internal.GetSample(now, app.config.Logger)
  305. run, _ := app.getState()
  306. app.Consume(run.RunID, internal.GetStats(internal.Samples{
  307. Previous: previous,
  308. Current: current,
  309. }))
  310. previous = current
  311. case <-app.shutdownStarted:
  312. t.Stop()
  313. return
  314. }
  315. }
  316. }
  317. func (app *app) WaitForConnection(timeout time.Duration) error {
  318. if !app.config.Enabled {
  319. return nil
  320. }
  321. deadline := time.Now().Add(timeout)
  322. pollPeriod := 50 * time.Millisecond
  323. for {
  324. run, err := app.getState()
  325. if nil != err {
  326. return err
  327. }
  328. if run.RunID != "" {
  329. return nil
  330. }
  331. if time.Now().After(deadline) {
  332. return fmt.Errorf("timeout out after %s", timeout.String())
  333. }
  334. time.Sleep(pollPeriod)
  335. }
  336. }
  337. func newApp(c Config) (Application, error) {
  338. c = copyConfigReferenceFields(c)
  339. if err := c.Validate(); nil != err {
  340. return nil, err
  341. }
  342. if nil == c.Logger {
  343. c.Logger = logger.ShimLogger{}
  344. }
  345. app := &app{
  346. config: c,
  347. placeholderRun: newAppRun(c, internal.ConnectReplyDefaults()),
  348. // This channel must be buffered since Shutdown makes a
  349. // non-blocking send attempt.
  350. initiateShutdown: make(chan struct{}, 1),
  351. shutdownStarted: make(chan struct{}),
  352. shutdownComplete: make(chan struct{}),
  353. connectChan: make(chan *appRun, 1),
  354. collectorErrorChan: make(chan error, 1),
  355. dataChan: make(chan appData, internal.AppDataChanSize),
  356. rpmControls: internal.RpmControls{
  357. License: c.License,
  358. Client: &http.Client{
  359. Transport: c.Transport,
  360. Timeout: internal.CollectorTimeout,
  361. },
  362. Logger: c.Logger,
  363. AgentVersion: Version,
  364. },
  365. }
  366. app.config.Logger.Info("application created", map[string]interface{}{
  367. "app": app.config.AppName,
  368. "version": Version,
  369. "enabled": app.config.Enabled,
  370. })
  371. if !app.config.Enabled {
  372. return app, nil
  373. }
  374. app.harvestTicker = time.NewTicker(internal.HarvestPeriod)
  375. go app.process()
  376. go app.connectRoutine()
  377. if app.config.RuntimeSampler.Enabled {
  378. go runSampler(app, internal.RuntimeSamplerPeriod)
  379. }
  380. return app, nil
  381. }
  382. type expectApp interface {
  383. internal.Expect
  384. Application
  385. }
  386. func newTestApp(replyfn func(*internal.ConnectReply), cfg Config) (expectApp, error) {
  387. cfg.Enabled = false
  388. application, err := newApp(cfg)
  389. if nil != err {
  390. return nil, err
  391. }
  392. app := application.(*app)
  393. if nil != replyfn {
  394. replyfn(app.placeholderRun.ConnectReply)
  395. app.placeholderRun = newAppRun(cfg, app.placeholderRun.ConnectReply)
  396. }
  397. app.testHarvest = internal.NewHarvest(time.Now())
  398. return app, nil
  399. }
  400. func (app *app) getState() (*appRun, error) {
  401. app.RLock()
  402. defer app.RUnlock()
  403. run := app.run
  404. if nil == run {
  405. run = app.placeholderRun
  406. }
  407. return run, app.err
  408. }
  409. func (app *app) setState(run *appRun, err error) {
  410. app.Lock()
  411. defer app.Unlock()
  412. app.run = run
  413. app.err = err
  414. }
  415. func transportTypeFromRequest(r *http.Request) TransportType {
  416. if strings.HasPrefix(r.Proto, "HTTP") {
  417. if r.TLS != nil {
  418. return TransportHTTPS
  419. }
  420. return TransportHTTP
  421. }
  422. return TransportUnknown
  423. }
  424. // StartTransaction implements newrelic.Application's StartTransaction.
  425. func (app *app) StartTransaction(name string, w http.ResponseWriter, r *http.Request) Transaction {
  426. run, _ := app.getState()
  427. txn := upgradeTxn(newTxn(txnInput{
  428. Config: app.config,
  429. Reply: run.ConnectReply,
  430. W: w,
  431. Consumer: app,
  432. attrConfig: run.AttributeConfig,
  433. }, r, name))
  434. if nil != r {
  435. if p := r.Header.Get(DistributedTracePayloadHeader); p != "" {
  436. txn.AcceptDistributedTracePayload(transportTypeFromRequest(r), p)
  437. }
  438. }
  439. return txn
  440. }
  441. var (
  442. errHighSecurityEnabled = errors.New("high security enabled")
  443. errCustomEventsDisabled = errors.New("custom events disabled")
  444. errCustomEventsRemoteDisabled = errors.New("custom events disabled by server")
  445. )
  446. // RecordCustomEvent implements newrelic.Application's RecordCustomEvent.
  447. func (app *app) RecordCustomEvent(eventType string, params map[string]interface{}) error {
  448. if app.config.HighSecurity {
  449. return errHighSecurityEnabled
  450. }
  451. if !app.config.CustomInsightsEvents.Enabled {
  452. return errCustomEventsDisabled
  453. }
  454. event, e := internal.CreateCustomEvent(eventType, params, time.Now())
  455. if nil != e {
  456. return e
  457. }
  458. run, _ := app.getState()
  459. if !run.CollectCustomEvents {
  460. return errCustomEventsRemoteDisabled
  461. }
  462. if !run.SecurityPolicies.CustomEvents.Enabled() {
  463. return errSecurityPolicy
  464. }
  465. app.Consume(run.RunID, event)
  466. return nil
  467. }
  468. var (
  469. errMetricInf = errors.New("invalid metric value: inf")
  470. errMetricNaN = errors.New("invalid metric value: NaN")
  471. errMetricNameEmpty = errors.New("missing metric name")
  472. )
  473. // RecordCustomMetric implements newrelic.Application's RecordCustomMetric.
  474. func (app *app) RecordCustomMetric(name string, value float64) error {
  475. if math.IsNaN(value) {
  476. return errMetricNaN
  477. }
  478. if math.IsInf(value, 0) {
  479. return errMetricInf
  480. }
  481. if "" == name {
  482. return errMetricNameEmpty
  483. }
  484. run, _ := app.getState()
  485. app.Consume(run.RunID, internal.CustomMetric{
  486. RawInputName: name,
  487. Value: value,
  488. })
  489. return nil
  490. }
  491. func (app *app) Consume(id internal.AgentRunID, data internal.Harvestable) {
  492. if "" != debugLogging {
  493. debug(data, app.config.Logger)
  494. }
  495. if nil != app.testHarvest {
  496. data.MergeIntoHarvest(app.testHarvest)
  497. return
  498. }
  499. if "" == id {
  500. return
  501. }
  502. select {
  503. case app.dataChan <- appData{id, data}:
  504. case <-app.shutdownStarted:
  505. }
  506. }
  507. func (app *app) ExpectCustomEvents(t internal.Validator, want []internal.WantEvent) {
  508. internal.ExpectCustomEvents(internal.ExtendValidator(t, "custom events"), app.testHarvest.CustomEvents, want)
  509. }
  510. func (app *app) ExpectErrors(t internal.Validator, want []internal.WantError) {
  511. t = internal.ExtendValidator(t, "traced errors")
  512. internal.ExpectErrors(t, app.testHarvest.ErrorTraces, want)
  513. }
  514. func (app *app) ExpectErrorEvents(t internal.Validator, want []internal.WantEvent) {
  515. t = internal.ExtendValidator(t, "error events")
  516. internal.ExpectErrorEvents(t, app.testHarvest.ErrorEvents, want)
  517. }
  518. func (app *app) ExpectErrorEventsPresent(t internal.Validator, want []internal.WantEvent) {
  519. t = internal.ExtendValidator(t, "error events")
  520. internal.ExpectErrorEventsPresent(t, app.testHarvest.ErrorEvents, want)
  521. }
  522. func (app *app) ExpectErrorEventsAbsent(t internal.Validator, names []string) {
  523. t = internal.ExtendValidator(t, "error events")
  524. internal.ExpectErrorEventsAbsent(t, app.testHarvest.ErrorEvents, names)
  525. }
  526. func (app *app) ExpectSpanEvents(t internal.Validator, want []internal.WantEvent) {
  527. t = internal.ExtendValidator(t, "txn events")
  528. internal.ExpectSpanEvents(t, app.testHarvest.SpanEvents, want)
  529. }
  530. func (app *app) ExpectSpanEventsPresent(t internal.Validator, want []internal.WantEvent) {
  531. t = internal.ExtendValidator(t, "span events")
  532. internal.ExpectSpanEventsPresent(t, app.testHarvest.SpanEvents, want)
  533. }
  534. func (app *app) ExpectSpanEventsAbsent(t internal.Validator, names []string) {
  535. t = internal.ExtendValidator(t, "span events")
  536. internal.ExpectSpanEventsAbsent(t, app.testHarvest.SpanEvents, names)
  537. }
  538. func (app *app) ExpectSpanEventsCount(t internal.Validator, c int) {
  539. t = internal.ExtendValidator(t, "span events")
  540. internal.ExpectSpanEventsCount(t, app.testHarvest.SpanEvents, c)
  541. }
  542. func (app *app) ExpectTxnEvents(t internal.Validator, want []internal.WantEvent) {
  543. t = internal.ExtendValidator(t, "txn events")
  544. internal.ExpectTxnEvents(t, app.testHarvest.TxnEvents, want)
  545. }
  546. func (app *app) ExpectTxnEventsPresent(t internal.Validator, want []internal.WantEvent) {
  547. t = internal.ExtendValidator(t, "txn events")
  548. internal.ExpectTxnEventsPresent(t, app.testHarvest.TxnEvents, want)
  549. }
  550. func (app *app) ExpectTxnEventsAbsent(t internal.Validator, names []string) {
  551. t = internal.ExtendValidator(t, "txn events")
  552. internal.ExpectTxnEventsAbsent(t, app.testHarvest.TxnEvents, names)
  553. }
  554. func (app *app) ExpectMetrics(t internal.Validator, want []internal.WantMetric) {
  555. t = internal.ExtendValidator(t, "metrics")
  556. internal.ExpectMetrics(t, app.testHarvest.Metrics, want)
  557. }
  558. func (app *app) ExpectMetricsPresent(t internal.Validator, want []internal.WantMetric) {
  559. t = internal.ExtendValidator(t, "metrics")
  560. internal.ExpectMetricsPresent(t, app.testHarvest.Metrics, want)
  561. }
  562. func (app *app) ExpectTxnTraces(t internal.Validator, want []internal.WantTxnTrace) {
  563. t = internal.ExtendValidator(t, "txn traces")
  564. internal.ExpectTxnTraces(t, app.testHarvest.TxnTraces, want)
  565. }
  566. func (app *app) ExpectSlowQueries(t internal.Validator, want []internal.WantSlowQuery) {
  567. t = internal.ExtendValidator(t, "slow queries")
  568. internal.ExpectSlowQueries(t, app.testHarvest.SlowSQLs, want)
  569. }