internal_app.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. package newrelic
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/newrelic/go-agent/internal"
  12. "github.com/newrelic/go-agent/internal/logger"
  13. )
  14. var (
  15. // NEW_RELIC_DEBUG_LOGGING can be set to anything to enable additional
  16. // debug logging: the agent will log every transaction's data at info
  17. // level.
  18. envDebugLogging = "NEW_RELIC_DEBUG_LOGGING"
  19. debugLogging = os.Getenv(envDebugLogging)
  20. )
  21. type dataConsumer interface {
  22. Consume(internal.AgentRunID, internal.Harvestable)
  23. }
  24. type appData struct {
  25. id internal.AgentRunID
  26. data internal.Harvestable
  27. }
  28. type app struct {
  29. config Config
  30. rpmControls internal.RpmControls
  31. testHarvest *internal.Harvest
  32. // placeholderRun is used when the application is not connected.
  33. placeholderRun *appRun
  34. // initiateShutdown is used to tell the processor to shutdown.
  35. initiateShutdown chan struct{}
  36. // shutdownStarted and shutdownComplete are closed by the processor
  37. // goroutine to indicate the shutdown status. Two channels are used so
  38. // that the call of app.Shutdown() can block until shutdown has
  39. // completed but other goroutines can exit when shutdown has started.
  40. // This is not just an optimization: This prevents a deadlock if
  41. // harvesting data during the shutdown fails and an attempt is made to
  42. // merge the data into the next harvest.
  43. shutdownStarted chan struct{}
  44. shutdownComplete chan struct{}
  45. // Sends to these channels should not occur without a <-shutdownStarted
  46. // select option to prevent deadlock.
  47. dataChan chan appData
  48. collectorErrorChan chan internal.RPMResponse
  49. connectChan chan *appRun
  50. harvestTicker *time.Ticker
  51. // This mutex protects both `run` and `err`, both of which should only
  52. // be accessed using getState and setState.
  53. sync.RWMutex
  54. // run is non-nil when the app is successfully connected. It is
  55. // immutable.
  56. run *appRun
  57. // err is non-nil if the application will never be connected again
  58. // (disconnect, license exception, shutdown).
  59. err error
  60. }
  61. // appRun contains information regarding a single connection session with the
  62. // collector. It is immutable after creation at application connect.
  63. type appRun struct {
  64. *internal.ConnectReply
  65. // AttributeConfig is calculated on every connect since it depends on
  66. // the security policies.
  67. AttributeConfig *internal.AttributeConfig
  68. }
  69. func newAppRun(config Config, reply *internal.ConnectReply) *appRun {
  70. return &appRun{
  71. ConnectReply: reply,
  72. AttributeConfig: internal.CreateAttributeConfig(internal.AttributeConfigInput{
  73. Attributes: convertAttributeDestinationConfig(config.Attributes),
  74. ErrorCollector: convertAttributeDestinationConfig(config.ErrorCollector.Attributes),
  75. TransactionEvents: convertAttributeDestinationConfig(config.TransactionEvents.Attributes),
  76. TransactionTracer: convertAttributeDestinationConfig(config.TransactionTracer.Attributes),
  77. }, reply.SecurityPolicies.AttributesInclude.Enabled()),
  78. }
  79. }
  80. func (app *app) doHarvest(h *internal.Harvest, harvestStart time.Time, run *appRun) {
  81. h.CreateFinalMetrics()
  82. h.Metrics = h.Metrics.ApplyRules(run.MetricRules)
  83. payloads := h.Payloads(app.config.DistributedTracer.Enabled)
  84. for _, p := range payloads {
  85. cmd := p.EndpointMethod()
  86. data, err := p.Data(run.RunID.String(), harvestStart)
  87. if nil != err {
  88. app.config.Logger.Warn("unable to create harvest data", map[string]interface{}{
  89. "cmd": cmd,
  90. "error": err.Error(),
  91. })
  92. continue
  93. }
  94. if nil == data {
  95. continue
  96. }
  97. call := internal.RpmCmd{
  98. Collector: run.Collector,
  99. RunID: run.RunID.String(),
  100. Name: cmd,
  101. Data: data,
  102. RequestHeadersMap: run.RequestHeadersMap,
  103. }
  104. resp := internal.CollectorRequest(call, app.rpmControls)
  105. if resp.IsDisconnect() || resp.IsRestartException() {
  106. select {
  107. case app.collectorErrorChan <- resp:
  108. case <-app.shutdownStarted:
  109. }
  110. return
  111. }
  112. if nil != resp.Err {
  113. app.config.Logger.Warn("harvest failure", map[string]interface{}{
  114. "cmd": cmd,
  115. "error": resp.Err.Error(),
  116. "retain_data": resp.ShouldSaveHarvestData(),
  117. })
  118. }
  119. if resp.ShouldSaveHarvestData() {
  120. app.Consume(run.RunID, p)
  121. }
  122. }
  123. }
  124. func (app *app) connectRoutine() {
  125. backoff := internal.ConnectBackoffStart
  126. for {
  127. reply, resp := internal.ConnectAttempt(config{app.config},
  128. app.config.SecurityPoliciesToken, app.rpmControls)
  129. if reply != nil {
  130. select {
  131. case app.connectChan <- newAppRun(app.config, reply):
  132. case <-app.shutdownStarted:
  133. }
  134. return
  135. }
  136. if resp.IsDisconnect() {
  137. select {
  138. case app.collectorErrorChan <- resp:
  139. case <-app.shutdownStarted:
  140. }
  141. return
  142. }
  143. if nil != resp.Err {
  144. app.config.Logger.Warn("application connect failure", map[string]interface{}{
  145. "error": resp.Err.Error(),
  146. })
  147. }
  148. time.Sleep(backoff)
  149. if backoff < internal.ConnectBackoffLimit {
  150. backoff *= 2
  151. }
  152. }
  153. }
  154. func debug(data internal.Harvestable, lg Logger) {
  155. now := time.Now()
  156. h := internal.NewHarvest(now)
  157. data.MergeIntoHarvest(h)
  158. ps := h.Payloads(false)
  159. for _, p := range ps {
  160. cmd := p.EndpointMethod()
  161. d, err := p.Data("agent run id", now)
  162. if nil == d && nil == err {
  163. continue
  164. }
  165. if nil != err {
  166. lg.Info("integration", map[string]interface{}{
  167. "cmd": cmd,
  168. "error": err.Error(),
  169. })
  170. continue
  171. }
  172. lg.Info("integration", map[string]interface{}{
  173. "cmd": cmd,
  174. "data": internal.JSONString(d),
  175. })
  176. }
  177. }
  178. func processConnectMessages(run *appRun, lg Logger) {
  179. for _, msg := range run.Messages {
  180. event := "collector message"
  181. cn := map[string]interface{}{"msg": msg.Message}
  182. switch strings.ToLower(msg.Level) {
  183. case "error":
  184. lg.Error(event, cn)
  185. case "warn":
  186. lg.Warn(event, cn)
  187. case "info":
  188. lg.Info(event, cn)
  189. case "debug", "verbose":
  190. lg.Debug(event, cn)
  191. }
  192. }
  193. }
  194. func (app *app) process() {
  195. // Both the harvest and the run are non-nil when the app is connected,
  196. // and nil otherwise.
  197. var h *internal.Harvest
  198. var run *appRun
  199. for {
  200. select {
  201. case <-app.harvestTicker.C:
  202. if nil != run {
  203. now := time.Now()
  204. go app.doHarvest(h, now, run)
  205. h = internal.NewHarvest(now)
  206. }
  207. case d := <-app.dataChan:
  208. if nil != run && run.RunID == d.id {
  209. d.data.MergeIntoHarvest(h)
  210. }
  211. case <-app.initiateShutdown:
  212. close(app.shutdownStarted)
  213. // Remove the run before merging any final data to
  214. // ensure a bounded number of receives from dataChan.
  215. app.setState(nil, errors.New("application shut down"))
  216. app.harvestTicker.Stop()
  217. if nil != run {
  218. for done := false; !done; {
  219. select {
  220. case d := <-app.dataChan:
  221. if run.RunID == d.id {
  222. d.data.MergeIntoHarvest(h)
  223. }
  224. default:
  225. done = true
  226. }
  227. }
  228. app.doHarvest(h, time.Now(), run)
  229. }
  230. close(app.shutdownComplete)
  231. return
  232. case resp := <-app.collectorErrorChan:
  233. run = nil
  234. h = nil
  235. app.setState(nil, nil)
  236. if resp.IsDisconnect() {
  237. app.setState(nil, resp.Err)
  238. app.config.Logger.Error("application disconnected", map[string]interface{}{
  239. "app": app.config.AppName,
  240. })
  241. } else if resp.IsRestartException() {
  242. app.config.Logger.Info("application restarted", map[string]interface{}{
  243. "app": app.config.AppName,
  244. })
  245. go app.connectRoutine()
  246. }
  247. case run = <-app.connectChan:
  248. h = internal.NewHarvest(time.Now())
  249. app.setState(run, nil)
  250. app.config.Logger.Info("application connected", map[string]interface{}{
  251. "app": app.config.AppName,
  252. "run": run.RunID.String(),
  253. })
  254. processConnectMessages(run, app.config.Logger)
  255. }
  256. }
  257. }
  258. func (app *app) Shutdown(timeout time.Duration) {
  259. if !app.config.Enabled {
  260. return
  261. }
  262. select {
  263. case app.initiateShutdown <- struct{}{}:
  264. default:
  265. }
  266. // Block until shutdown is done or timeout occurs.
  267. t := time.NewTimer(timeout)
  268. select {
  269. case <-app.shutdownComplete:
  270. case <-t.C:
  271. }
  272. t.Stop()
  273. app.config.Logger.Info("application shutdown", map[string]interface{}{
  274. "app": app.config.AppName,
  275. })
  276. }
  277. func convertAttributeDestinationConfig(c AttributeDestinationConfig) internal.AttributeDestinationConfig {
  278. return internal.AttributeDestinationConfig{
  279. Enabled: c.Enabled,
  280. Include: c.Include,
  281. Exclude: c.Exclude,
  282. }
  283. }
  284. func runSampler(app *app, period time.Duration) {
  285. previous := internal.GetSample(time.Now(), app.config.Logger)
  286. t := time.NewTicker(period)
  287. for {
  288. select {
  289. case now := <-t.C:
  290. current := internal.GetSample(now, app.config.Logger)
  291. run, _ := app.getState()
  292. app.Consume(run.RunID, internal.GetStats(internal.Samples{
  293. Previous: previous,
  294. Current: current,
  295. }))
  296. previous = current
  297. case <-app.shutdownStarted:
  298. t.Stop()
  299. return
  300. }
  301. }
  302. }
  303. func (app *app) WaitForConnection(timeout time.Duration) error {
  304. if !app.config.Enabled {
  305. return nil
  306. }
  307. deadline := time.Now().Add(timeout)
  308. pollPeriod := 50 * time.Millisecond
  309. for {
  310. run, err := app.getState()
  311. if nil != err {
  312. return err
  313. }
  314. if run.RunID != "" {
  315. return nil
  316. }
  317. if time.Now().After(deadline) {
  318. return fmt.Errorf("timeout out after %s", timeout.String())
  319. }
  320. time.Sleep(pollPeriod)
  321. }
  322. }
  323. func newApp(c Config) (Application, error) {
  324. c = copyConfigReferenceFields(c)
  325. if err := c.Validate(); nil != err {
  326. return nil, err
  327. }
  328. if nil == c.Logger {
  329. c.Logger = logger.ShimLogger{}
  330. }
  331. app := &app{
  332. config: c,
  333. placeholderRun: newAppRun(c, internal.ConnectReplyDefaults()),
  334. // This channel must be buffered since Shutdown makes a
  335. // non-blocking send attempt.
  336. initiateShutdown: make(chan struct{}, 1),
  337. shutdownStarted: make(chan struct{}),
  338. shutdownComplete: make(chan struct{}),
  339. connectChan: make(chan *appRun, 1),
  340. collectorErrorChan: make(chan internal.RPMResponse, 1),
  341. dataChan: make(chan appData, internal.AppDataChanSize),
  342. rpmControls: internal.RpmControls{
  343. License: c.License,
  344. Client: &http.Client{
  345. Transport: c.Transport,
  346. Timeout: internal.CollectorTimeout,
  347. },
  348. Logger: c.Logger,
  349. AgentVersion: Version,
  350. },
  351. }
  352. app.config.Logger.Info("application created", map[string]interface{}{
  353. "app": app.config.AppName,
  354. "version": Version,
  355. "enabled": app.config.Enabled,
  356. })
  357. if !app.config.Enabled {
  358. return app, nil
  359. }
  360. app.harvestTicker = time.NewTicker(internal.HarvestPeriod)
  361. go app.process()
  362. go app.connectRoutine()
  363. if app.config.RuntimeSampler.Enabled {
  364. go runSampler(app, internal.RuntimeSamplerPeriod)
  365. }
  366. return app, nil
  367. }
  368. type expectApp interface {
  369. internal.Expect
  370. Application
  371. }
  372. func newTestApp(replyfn func(*internal.ConnectReply), cfg Config) (expectApp, error) {
  373. cfg.Enabled = false
  374. application, err := newApp(cfg)
  375. if nil != err {
  376. return nil, err
  377. }
  378. app := application.(*app)
  379. app.HarvestTesting(replyfn)
  380. return app, nil
  381. }
  382. var (
  383. _ internal.HarvestTestinger = &app{}
  384. _ internal.Expect = &app{}
  385. )
  386. func (app *app) HarvestTesting(replyfn func(*internal.ConnectReply)) {
  387. if nil != replyfn {
  388. reply := internal.ConnectReplyDefaults()
  389. replyfn(reply)
  390. app.placeholderRun = newAppRun(app.config, reply)
  391. }
  392. app.testHarvest = internal.NewHarvest(time.Now())
  393. }
  394. func (app *app) getState() (*appRun, error) {
  395. app.RLock()
  396. defer app.RUnlock()
  397. run := app.run
  398. if nil == run {
  399. run = app.placeholderRun
  400. }
  401. return run, app.err
  402. }
  403. func (app *app) setState(run *appRun, err error) {
  404. app.Lock()
  405. defer app.Unlock()
  406. app.run = run
  407. app.err = err
  408. }
  409. // StartTransaction implements newrelic.Application's StartTransaction.
  410. func (app *app) StartTransaction(name string, w http.ResponseWriter, r *http.Request) Transaction {
  411. run, _ := app.getState()
  412. txn := upgradeTxn(newTxn(txnInput{
  413. Config: app.config,
  414. Reply: run.ConnectReply,
  415. W: w,
  416. Consumer: app,
  417. attrConfig: run.AttributeConfig,
  418. }, name))
  419. if nil != r {
  420. txn.SetWebRequest(NewWebRequest(r))
  421. }
  422. return txn
  423. }
  424. var (
  425. errHighSecurityEnabled = errors.New("high security enabled")
  426. errCustomEventsDisabled = errors.New("custom events disabled")
  427. errCustomEventsRemoteDisabled = errors.New("custom events disabled by server")
  428. )
  429. // RecordCustomEvent implements newrelic.Application's RecordCustomEvent.
  430. func (app *app) RecordCustomEvent(eventType string, params map[string]interface{}) error {
  431. if app.config.HighSecurity {
  432. return errHighSecurityEnabled
  433. }
  434. if !app.config.CustomInsightsEvents.Enabled {
  435. return errCustomEventsDisabled
  436. }
  437. event, e := internal.CreateCustomEvent(eventType, params, time.Now())
  438. if nil != e {
  439. return e
  440. }
  441. run, _ := app.getState()
  442. if !run.CollectCustomEvents {
  443. return errCustomEventsRemoteDisabled
  444. }
  445. if !run.SecurityPolicies.CustomEvents.Enabled() {
  446. return errSecurityPolicy
  447. }
  448. app.Consume(run.RunID, event)
  449. return nil
  450. }
  451. var (
  452. errMetricInf = errors.New("invalid metric value: inf")
  453. errMetricNaN = errors.New("invalid metric value: NaN")
  454. errMetricNameEmpty = errors.New("missing metric name")
  455. )
  456. // RecordCustomMetric implements newrelic.Application's RecordCustomMetric.
  457. func (app *app) RecordCustomMetric(name string, value float64) error {
  458. if math.IsNaN(value) {
  459. return errMetricNaN
  460. }
  461. if math.IsInf(value, 0) {
  462. return errMetricInf
  463. }
  464. if "" == name {
  465. return errMetricNameEmpty
  466. }
  467. run, _ := app.getState()
  468. app.Consume(run.RunID, internal.CustomMetric{
  469. RawInputName: name,
  470. Value: value,
  471. })
  472. return nil
  473. }
  474. func (app *app) Consume(id internal.AgentRunID, data internal.Harvestable) {
  475. if "" != debugLogging {
  476. debug(data, app.config.Logger)
  477. }
  478. if nil != app.testHarvest {
  479. data.MergeIntoHarvest(app.testHarvest)
  480. return
  481. }
  482. if "" == id {
  483. return
  484. }
  485. select {
  486. case app.dataChan <- appData{id, data}:
  487. case <-app.shutdownStarted:
  488. }
  489. }
  490. func (app *app) ExpectCustomEvents(t internal.Validator, want []internal.WantEvent) {
  491. internal.ExpectCustomEvents(internal.ExtendValidator(t, "custom events"), app.testHarvest.CustomEvents, want)
  492. }
  493. func (app *app) ExpectErrors(t internal.Validator, want []internal.WantError) {
  494. t = internal.ExtendValidator(t, "traced errors")
  495. internal.ExpectErrors(t, app.testHarvest.ErrorTraces, want)
  496. }
  497. func (app *app) ExpectErrorEvents(t internal.Validator, want []internal.WantEvent) {
  498. t = internal.ExtendValidator(t, "error events")
  499. internal.ExpectErrorEvents(t, app.testHarvest.ErrorEvents, want)
  500. }
  501. func (app *app) ExpectErrorEventsPresent(t internal.Validator, want []internal.WantEvent) {
  502. t = internal.ExtendValidator(t, "error events")
  503. internal.ExpectErrorEventsPresent(t, app.testHarvest.ErrorEvents, want)
  504. }
  505. func (app *app) ExpectErrorEventsAbsent(t internal.Validator, names []string) {
  506. t = internal.ExtendValidator(t, "error events")
  507. internal.ExpectErrorEventsAbsent(t, app.testHarvest.ErrorEvents, names)
  508. }
  509. func (app *app) ExpectSpanEvents(t internal.Validator, want []internal.WantEvent) {
  510. t = internal.ExtendValidator(t, "txn events")
  511. internal.ExpectSpanEvents(t, app.testHarvest.SpanEvents, want)
  512. }
  513. func (app *app) ExpectSpanEventsPresent(t internal.Validator, want []internal.WantEvent) {
  514. t = internal.ExtendValidator(t, "span events")
  515. internal.ExpectSpanEventsPresent(t, app.testHarvest.SpanEvents, want)
  516. }
  517. func (app *app) ExpectSpanEventsAbsent(t internal.Validator, names []string) {
  518. t = internal.ExtendValidator(t, "span events")
  519. internal.ExpectSpanEventsAbsent(t, app.testHarvest.SpanEvents, names)
  520. }
  521. func (app *app) ExpectSpanEventsCount(t internal.Validator, c int) {
  522. t = internal.ExtendValidator(t, "span events")
  523. internal.ExpectSpanEventsCount(t, app.testHarvest.SpanEvents, c)
  524. }
  525. func (app *app) ExpectTxnEvents(t internal.Validator, want []internal.WantEvent) {
  526. t = internal.ExtendValidator(t, "txn events")
  527. internal.ExpectTxnEvents(t, app.testHarvest.TxnEvents, want)
  528. }
  529. func (app *app) ExpectTxnEventsPresent(t internal.Validator, want []internal.WantEvent) {
  530. t = internal.ExtendValidator(t, "txn events")
  531. internal.ExpectTxnEventsPresent(t, app.testHarvest.TxnEvents, want)
  532. }
  533. func (app *app) ExpectTxnEventsAbsent(t internal.Validator, names []string) {
  534. t = internal.ExtendValidator(t, "txn events")
  535. internal.ExpectTxnEventsAbsent(t, app.testHarvest.TxnEvents, names)
  536. }
  537. func (app *app) ExpectMetrics(t internal.Validator, want []internal.WantMetric) {
  538. t = internal.ExtendValidator(t, "metrics")
  539. internal.ExpectMetrics(t, app.testHarvest.Metrics, want)
  540. }
  541. func (app *app) ExpectMetricsPresent(t internal.Validator, want []internal.WantMetric) {
  542. t = internal.ExtendValidator(t, "metrics")
  543. internal.ExpectMetricsPresent(t, app.testHarvest.Metrics, want)
  544. }
  545. func (app *app) ExpectTxnTraces(t internal.Validator, want []internal.WantTxnTrace) {
  546. t = internal.ExtendValidator(t, "txn traces")
  547. internal.ExpectTxnTraces(t, app.testHarvest.TxnTraces, want)
  548. }
  549. func (app *app) ExpectSlowQueries(t internal.Validator, want []internal.WantSlowQuery) {
  550. t = internal.ExtendValidator(t, "slow queries")
  551. internal.ExpectSlowQueries(t, app.testHarvest.SlowSQLs, want)
  552. }