cloudwatch.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package cloudwatch
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/aws/session"
  9. "github.com/aws/aws-sdk-go/service/cloudwatch"
  10. "github.com/imgproxy/imgproxy/v3/config"
  11. "github.com/imgproxy/imgproxy/v3/imath"
  12. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  13. "github.com/sirupsen/logrus"
  14. )
  15. type GaugeFunc func() float64
  16. type gauge struct {
  17. unit string
  18. f GaugeFunc
  19. }
  20. type bufferStats struct {
  21. count int
  22. sum, min, max int
  23. }
  24. var (
  25. enabled bool
  26. client *cloudwatch.CloudWatch
  27. gauges = make(map[string]gauge)
  28. gaugesMutex sync.RWMutex
  29. collectorCtx context.Context
  30. collectorCtxCancel context.CancelFunc
  31. bufferDefaultSizes = make(map[string]int)
  32. bufferMaxSizes = make(map[string]int)
  33. bufferSizeStats = make(map[string]*bufferStats)
  34. bufferStatsMutex sync.Mutex
  35. )
  36. func Init() error {
  37. if len(config.CloudWatchServiceName) == 0 {
  38. return nil
  39. }
  40. conf := aws.NewConfig()
  41. if len(config.CloudWatchRegion) > 0 {
  42. conf = conf.WithRegion(config.CloudWatchRegion)
  43. }
  44. sess, err := session.NewSession()
  45. if err != nil {
  46. return fmt.Errorf("Can't create CloudWatch session: %s", err)
  47. }
  48. if sess.Config.Region == nil || len(*sess.Config.Region) == 0 {
  49. sess.Config.Region = aws.String("us-west-1")
  50. }
  51. client = cloudwatch.New(sess, conf)
  52. collectorCtx, collectorCtxCancel = context.WithCancel(context.Background())
  53. go runMetricsCollector()
  54. enabled = true
  55. return nil
  56. }
  57. func Stop() {
  58. if enabled {
  59. collectorCtxCancel()
  60. }
  61. }
  62. func Enabled() bool {
  63. return enabled
  64. }
  65. func AddGaugeFunc(name, unit string, f GaugeFunc) {
  66. gaugesMutex.Lock()
  67. defer gaugesMutex.Unlock()
  68. gauges[name] = gauge{unit: unit, f: f}
  69. }
  70. func ObserveBufferSize(t string, size int) {
  71. if enabled {
  72. bufferStatsMutex.Lock()
  73. defer bufferStatsMutex.Unlock()
  74. sizef := size
  75. stats, ok := bufferSizeStats[t]
  76. if !ok {
  77. stats = &bufferStats{count: 1, sum: sizef, min: sizef, max: sizef}
  78. bufferSizeStats[t] = stats
  79. return
  80. }
  81. stats.count += 1
  82. stats.sum += sizef
  83. stats.min = imath.Min(stats.min, sizef)
  84. stats.max = imath.Max(stats.max, sizef)
  85. }
  86. }
  87. func SetBufferDefaultSize(t string, size int) {
  88. if enabled {
  89. bufferStatsMutex.Lock()
  90. defer bufferStatsMutex.Unlock()
  91. bufferDefaultSizes[t] = size
  92. }
  93. }
  94. func SetBufferMaxSize(t string, size int) {
  95. if enabled {
  96. bufferStatsMutex.Lock()
  97. defer bufferStatsMutex.Unlock()
  98. bufferMaxSizes[t] = size
  99. }
  100. }
  101. func runMetricsCollector() {
  102. tick := time.NewTicker(10 * time.Second)
  103. defer tick.Stop()
  104. dimension := &cloudwatch.Dimension{
  105. Name: aws.String("ServiceName"),
  106. Value: aws.String(config.CloudWatchServiceName),
  107. }
  108. bufferDimensions := make(map[string]*cloudwatch.Dimension)
  109. bufferDimension := func(t string) *cloudwatch.Dimension {
  110. if d, ok := bufferDimensions[t]; ok {
  111. return d
  112. }
  113. d := &cloudwatch.Dimension{
  114. Name: aws.String("BufferType"),
  115. Value: aws.String(t),
  116. }
  117. bufferDimensions[t] = d
  118. return d
  119. }
  120. for {
  121. select {
  122. case <-tick.C:
  123. metricsCount := len(gauges) + len(bufferDefaultSizes) + len(bufferMaxSizes) + len(bufferSizeStats) + 3
  124. metrics := make([]*cloudwatch.MetricDatum, 0, metricsCount)
  125. func() {
  126. gaugesMutex.RLock()
  127. defer gaugesMutex.RUnlock()
  128. for name, g := range gauges {
  129. metrics = append(metrics, &cloudwatch.MetricDatum{
  130. Dimensions: []*cloudwatch.Dimension{dimension},
  131. MetricName: aws.String(name),
  132. Unit: aws.String(g.unit),
  133. Value: aws.Float64(g.f()),
  134. })
  135. }
  136. }()
  137. func() {
  138. bufferStatsMutex.Lock()
  139. defer bufferStatsMutex.Unlock()
  140. for t, size := range bufferDefaultSizes {
  141. metrics = append(metrics, &cloudwatch.MetricDatum{
  142. Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)},
  143. MetricName: aws.String("BufferDefaultSize"),
  144. Unit: aws.String("Bytes"),
  145. Value: aws.Float64(float64(size)),
  146. })
  147. }
  148. for t, size := range bufferMaxSizes {
  149. metrics = append(metrics, &cloudwatch.MetricDatum{
  150. Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)},
  151. MetricName: aws.String("BufferMaximumSize"),
  152. Unit: aws.String("Bytes"),
  153. Value: aws.Float64(float64(size)),
  154. })
  155. }
  156. for t, stats := range bufferSizeStats {
  157. metrics = append(metrics, &cloudwatch.MetricDatum{
  158. Dimensions: []*cloudwatch.Dimension{dimension, bufferDimension(t)},
  159. MetricName: aws.String("BufferSize"),
  160. Unit: aws.String("Bytes"),
  161. StatisticValues: &cloudwatch.StatisticSet{
  162. SampleCount: aws.Float64(float64(stats.count)),
  163. Sum: aws.Float64(float64(stats.sum)),
  164. Minimum: aws.Float64(float64(stats.min)),
  165. Maximum: aws.Float64(float64(stats.max)),
  166. },
  167. })
  168. }
  169. }()
  170. metrics = append(metrics, &cloudwatch.MetricDatum{
  171. Dimensions: []*cloudwatch.Dimension{dimension},
  172. MetricName: aws.String("RequestsInProgress"),
  173. Unit: aws.String("Count"),
  174. Value: aws.Float64(stats.RequestsInProgress()),
  175. })
  176. metrics = append(metrics, &cloudwatch.MetricDatum{
  177. Dimensions: []*cloudwatch.Dimension{dimension},
  178. MetricName: aws.String("ImagesInProgress"),
  179. Unit: aws.String("Count"),
  180. Value: aws.Float64(stats.ImagesInProgress()),
  181. })
  182. metrics = append(metrics, &cloudwatch.MetricDatum{
  183. Dimensions: []*cloudwatch.Dimension{dimension},
  184. MetricName: aws.String("ConcurrencyUtilization"),
  185. Unit: aws.String("Percent"),
  186. Value: aws.Float64(
  187. stats.RequestsInProgress() / float64(config.Workers) * 100.0,
  188. ),
  189. })
  190. metrics = append(metrics, &cloudwatch.MetricDatum{
  191. Dimensions: []*cloudwatch.Dimension{dimension},
  192. MetricName: aws.String("WorkersUtilization"),
  193. Unit: aws.String("Percent"),
  194. Value: aws.Float64(
  195. stats.RequestsInProgress() / float64(config.Workers) * 100.0,
  196. ),
  197. })
  198. _, err := client.PutMetricData(&cloudwatch.PutMetricDataInput{
  199. Namespace: aws.String(config.CloudWatchNamespace),
  200. MetricData: metrics,
  201. })
  202. if err != nil {
  203. logrus.Warnf("Can't send CloudWatch metrics: %s", err)
  204. }
  205. case <-collectorCtx.Done():
  206. return
  207. }
  208. }
  209. }