adaptive_sampler.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package internal
  2. import (
  3. "math"
  4. "sync"
  5. "time"
  6. )
  7. // adaptiveSamplerInput holds input fields for the NewAdaptiveSampler function
  8. type adaptiveSamplerInput struct {
  9. Period time.Duration
  10. Target uint64
  11. }
  12. // AdaptiveSampler calculates which transactions should be sampled. An interface
  13. // is used in the connect reply to facilitate testing.
  14. type AdaptiveSampler interface {
  15. ComputeSampled(priority float32, now time.Time) bool
  16. }
  17. // SampleEverything is used for testing.
  18. type SampleEverything struct{}
  19. // SampleNothing is used when the application is not yet connected.
  20. type SampleNothing struct{}
  21. // ComputeSampled implements AdaptiveSampler.
  22. func (s SampleEverything) ComputeSampled(priority float32, now time.Time) bool { return true }
  23. // ComputeSampled implements AdaptiveSampler.
  24. func (s SampleNothing) ComputeSampled(priority float32, now time.Time) bool { return false }
  25. type adaptiveSampler struct {
  26. sync.Mutex
  27. adaptiveSamplerInput
  28. // Transactions with priority higher than this are sampled.
  29. // This is 1 - sampleRatio.
  30. priorityMin float32
  31. currentPeriod struct {
  32. numSampled uint64
  33. numSeen uint64
  34. end time.Time
  35. }
  36. }
  37. func newAdaptiveSampler(input adaptiveSamplerInput, now time.Time) *adaptiveSampler {
  38. as := &adaptiveSampler{}
  39. as.adaptiveSamplerInput = input
  40. as.currentPeriod.end = now.Add(input.Period)
  41. // Sample the first transactions in the first period.
  42. as.priorityMin = 0.0
  43. return as
  44. }
  45. // ComputeSampled calculates if the transaction should be sampled.
  46. func (as *adaptiveSampler) ComputeSampled(priority float32, now time.Time) bool {
  47. as.Lock()
  48. defer as.Unlock()
  49. // If the current time is after the end of the "currentPeriod". This is in
  50. // a `for`/`while` loop in case there's a harvest where no sampling happened.
  51. // i.e. for situations where a single call to
  52. // as.currentPeriod.end = as.currentPeriod.end.Add(as.period)
  53. // might not catch us up to the current period
  54. for now.After(as.currentPeriod.end) {
  55. as.priorityMin = 0.0
  56. if as.currentPeriod.numSeen > 0 {
  57. sampledRatio := float32(as.Target) / float32(as.currentPeriod.numSeen)
  58. as.priorityMin = 1.0 - sampledRatio
  59. }
  60. as.currentPeriod.numSampled = 0
  61. as.currentPeriod.numSeen = 0
  62. as.currentPeriod.end = as.currentPeriod.end.Add(as.Period)
  63. }
  64. as.currentPeriod.numSeen++
  65. // exponential backoff -- if the number of sampled items is greater than our
  66. // target, we need to apply the exponential backoff
  67. if as.currentPeriod.numSampled > as.Target {
  68. if as.computeSampledBackoff(as.Target, as.currentPeriod.numSeen, as.currentPeriod.numSampled) {
  69. as.currentPeriod.numSampled++
  70. return true
  71. }
  72. return false
  73. } else if as.currentPeriod.numSampled > as.Target {
  74. return false
  75. }
  76. if priority >= as.priorityMin {
  77. as.currentPeriod.numSampled++
  78. return true
  79. }
  80. return false
  81. }
  82. func (as *adaptiveSampler) computeSampledBackoff(target uint64, decidedCount uint64, sampledTrueCount uint64) bool {
  83. return float64(RandUint64N(decidedCount)) <
  84. math.Pow(float64(target), (float64(target)/float64(sampledTrueCount)))-math.Pow(float64(target), 0.5)
  85. }