123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package internal
- import (
- "math"
- "sync"
- "time"
- )
- // adaptiveSamplerInput holds input fields for the NewAdaptiveSampler function
- type adaptiveSamplerInput struct {
- Period time.Duration
- Target uint64
- }
- // AdaptiveSampler calculates which transactions should be sampled. An interface
- // is used in the connect reply to facilitate testing.
- type AdaptiveSampler interface {
- ComputeSampled(priority float32, now time.Time) bool
- }
- // SampleEverything is used for testing.
- type SampleEverything struct{}
- // SampleNothing is used when the application is not yet connected.
- type SampleNothing struct{}
- // ComputeSampled implements AdaptiveSampler.
- func (s SampleEverything) ComputeSampled(priority float32, now time.Time) bool { return true }
- // ComputeSampled implements AdaptiveSampler.
- func (s SampleNothing) ComputeSampled(priority float32, now time.Time) bool { return false }
- type adaptiveSampler struct {
- sync.Mutex
- adaptiveSamplerInput
- // Transactions with priority higher than this are sampled.
- // This is 1 - sampleRatio.
- priorityMin float32
- currentPeriod struct {
- numSampled uint64
- numSeen uint64
- end time.Time
- }
- }
- func newAdaptiveSampler(input adaptiveSamplerInput, now time.Time) *adaptiveSampler {
- as := &adaptiveSampler{}
- as.adaptiveSamplerInput = input
- as.currentPeriod.end = now.Add(input.Period)
- // Sample the first transactions in the first period.
- as.priorityMin = 0.0
- return as
- }
- // ComputeSampled calculates if the transaction should be sampled.
- func (as *adaptiveSampler) ComputeSampled(priority float32, now time.Time) bool {
- as.Lock()
- defer as.Unlock()
- // If the current time is after the end of the "currentPeriod". This is in
- // a `for`/`while` loop in case there's a harvest where no sampling happened.
- // i.e. for situations where a single call to
- // as.currentPeriod.end = as.currentPeriod.end.Add(as.period)
- // might not catch us up to the current period
- for now.After(as.currentPeriod.end) {
- as.priorityMin = 0.0
- if as.currentPeriod.numSeen > 0 {
- sampledRatio := float32(as.Target) / float32(as.currentPeriod.numSeen)
- as.priorityMin = 1.0 - sampledRatio
- }
- as.currentPeriod.numSampled = 0
- as.currentPeriod.numSeen = 0
- as.currentPeriod.end = as.currentPeriod.end.Add(as.Period)
- }
- as.currentPeriod.numSeen++
- // exponential backoff -- if the number of sampled items is greater than our
- // target, we need to apply the exponential backoff
- if as.currentPeriod.numSampled > as.Target {
- if as.computeSampledBackoff(as.Target, as.currentPeriod.numSeen, as.currentPeriod.numSampled) {
- as.currentPeriod.numSampled++
- return true
- }
- return false
- } else if as.currentPeriod.numSampled > as.Target {
- return false
- }
- if priority >= as.priorityMin {
- as.currentPeriod.numSampled++
- return true
- }
- return false
- }
- func (as *adaptiveSampler) computeSampledBackoff(target uint64, decidedCount uint64, sampledTrueCount uint64) bool {
- return float64(RandUint64N(decidedCount)) <
- math.Pow(float64(target), (float64(target)/float64(sampledTrueCount)))-math.Pow(float64(target), 0.5)
- }
|