cloudwatch.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package cloudwatch
  2. import (
  3. "context"
  4. "fmt"
  5. "slices"
  6. "sync"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/aws"
  9. awsConfig "github.com/aws/aws-sdk-go-v2/config"
  10. "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
  11. cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
  12. "github.com/sirupsen/logrus"
  13. "github.com/imgproxy/imgproxy/v3/config"
  14. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  15. )
  16. type GaugeFunc func() float64
  17. type gauge struct {
  18. unit cloudwatchTypes.StandardUnit
  19. f GaugeFunc
  20. }
  21. type bufferStats struct {
  22. count int
  23. sum, min, max int
  24. }
  25. var (
  26. enabled bool
  27. client *cloudwatch.Client
  28. gauges = make(map[string]gauge)
  29. gaugesMutex sync.RWMutex
  30. collectorCtx context.Context
  31. collectorCtxCancel context.CancelFunc
  32. bufferDefaultSizes = make(map[string]int)
  33. bufferMaxSizes = make(map[string]int)
  34. bufferSizeStats = make(map[string]*bufferStats)
  35. bufferStatsMutex sync.Mutex
  36. )
  37. func Init() error {
  38. if len(config.CloudWatchServiceName) == 0 {
  39. return nil
  40. }
  41. conf, err := awsConfig.LoadDefaultConfig(context.Background())
  42. if err != nil {
  43. return fmt.Errorf("can't load CloudWatch config: %s", err)
  44. }
  45. if len(config.CloudWatchRegion) != 0 {
  46. conf.Region = config.CloudWatchRegion
  47. }
  48. if len(conf.Region) == 0 {
  49. conf.Region = "us-west-1"
  50. }
  51. client = cloudwatch.NewFromConfig(conf)
  52. collectorCtx, collectorCtxCancel = context.WithCancel(context.Background())
  53. go runMetricsCollector()
  54. enabled = true
  55. return nil
  56. }
  57. func Stop() {
  58. if enabled {
  59. collectorCtxCancel()
  60. }
  61. }
  62. func Enabled() bool {
  63. return enabled
  64. }
  65. func AddGaugeFunc(name, unit string, f GaugeFunc) {
  66. gaugesMutex.Lock()
  67. defer gaugesMutex.Unlock()
  68. standardUnit := cloudwatchTypes.StandardUnit(unit)
  69. if !slices.Contains(cloudwatchTypes.StandardUnitNone.Values(), standardUnit) {
  70. panic(fmt.Errorf("Unknown CloudWatch unit: %s", unit))
  71. }
  72. gauges[name] = gauge{unit: standardUnit, f: f}
  73. }
  74. func ObserveBufferSize(t string, size int) {
  75. if enabled {
  76. bufferStatsMutex.Lock()
  77. defer bufferStatsMutex.Unlock()
  78. sizef := size
  79. stats, ok := bufferSizeStats[t]
  80. if !ok {
  81. stats = &bufferStats{count: 1, sum: sizef, min: sizef, max: sizef}
  82. bufferSizeStats[t] = stats
  83. return
  84. }
  85. stats.count += 1
  86. stats.sum += sizef
  87. stats.min = min(stats.min, sizef)
  88. stats.max = max(stats.max, sizef)
  89. }
  90. }
  91. func SetBufferDefaultSize(t string, size int) {
  92. if enabled {
  93. bufferStatsMutex.Lock()
  94. defer bufferStatsMutex.Unlock()
  95. bufferDefaultSizes[t] = size
  96. }
  97. }
  98. func SetBufferMaxSize(t string, size int) {
  99. if enabled {
  100. bufferStatsMutex.Lock()
  101. defer bufferStatsMutex.Unlock()
  102. bufferMaxSizes[t] = size
  103. }
  104. }
  105. func runMetricsCollector() {
  106. tick := time.NewTicker(10 * time.Second)
  107. defer tick.Stop()
  108. dimension := cloudwatchTypes.Dimension{
  109. Name: aws.String("ServiceName"),
  110. Value: aws.String(config.CloudWatchServiceName),
  111. }
  112. bufferDimensions := make(map[string]cloudwatchTypes.Dimension)
  113. bufferDimension := func(t string) cloudwatchTypes.Dimension {
  114. if d, ok := bufferDimensions[t]; ok {
  115. return d
  116. }
  117. d := cloudwatchTypes.Dimension{
  118. Name: aws.String("BufferType"),
  119. Value: aws.String(t),
  120. }
  121. bufferDimensions[t] = d
  122. return d
  123. }
  124. for {
  125. select {
  126. case <-tick.C:
  127. metricsCount := len(gauges) + len(bufferDefaultSizes) + len(bufferMaxSizes) + len(bufferSizeStats) + 3
  128. metrics := make([]cloudwatchTypes.MetricDatum, 0, metricsCount)
  129. func() {
  130. gaugesMutex.RLock()
  131. defer gaugesMutex.RUnlock()
  132. for name, g := range gauges {
  133. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  134. Dimensions: []cloudwatchTypes.Dimension{dimension},
  135. MetricName: aws.String(name),
  136. Unit: g.unit,
  137. Value: aws.Float64(g.f()),
  138. })
  139. }
  140. }()
  141. func() {
  142. bufferStatsMutex.Lock()
  143. defer bufferStatsMutex.Unlock()
  144. for t, size := range bufferDefaultSizes {
  145. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  146. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  147. MetricName: aws.String("BufferDefaultSize"),
  148. Unit: cloudwatchTypes.StandardUnitBytes,
  149. Value: aws.Float64(float64(size)),
  150. })
  151. }
  152. for t, size := range bufferMaxSizes {
  153. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  154. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  155. MetricName: aws.String("BufferMaximumSize"),
  156. Unit: cloudwatchTypes.StandardUnitBytes,
  157. Value: aws.Float64(float64(size)),
  158. })
  159. }
  160. for t, stats := range bufferSizeStats {
  161. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  162. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  163. MetricName: aws.String("BufferSize"),
  164. Unit: cloudwatchTypes.StandardUnitBytes,
  165. StatisticValues: &cloudwatchTypes.StatisticSet{
  166. SampleCount: aws.Float64(float64(stats.count)),
  167. Sum: aws.Float64(float64(stats.sum)),
  168. Minimum: aws.Float64(float64(stats.min)),
  169. Maximum: aws.Float64(float64(stats.max)),
  170. },
  171. })
  172. }
  173. }()
  174. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  175. Dimensions: []cloudwatchTypes.Dimension{dimension},
  176. MetricName: aws.String("Workers"),
  177. Unit: cloudwatchTypes.StandardUnitCount,
  178. Value: aws.Float64(float64(config.Workers)),
  179. })
  180. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  181. Dimensions: []cloudwatchTypes.Dimension{dimension},
  182. MetricName: aws.String("RequestsInProgress"),
  183. Unit: cloudwatchTypes.StandardUnitCount,
  184. Value: aws.Float64(stats.RequestsInProgress()),
  185. })
  186. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  187. Dimensions: []cloudwatchTypes.Dimension{dimension},
  188. MetricName: aws.String("ImagesInProgress"),
  189. Unit: cloudwatchTypes.StandardUnitCount,
  190. Value: aws.Float64(stats.ImagesInProgress()),
  191. })
  192. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  193. Dimensions: []cloudwatchTypes.Dimension{dimension},
  194. MetricName: aws.String("ConcurrencyUtilization"),
  195. Unit: cloudwatchTypes.StandardUnitPercent,
  196. Value: aws.Float64(
  197. stats.WorkersUtilization(),
  198. ),
  199. })
  200. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  201. Dimensions: []cloudwatchTypes.Dimension{dimension},
  202. MetricName: aws.String("WorkersUtilization"),
  203. Unit: cloudwatchTypes.StandardUnitPercent,
  204. Value: aws.Float64(
  205. stats.WorkersUtilization(),
  206. ),
  207. })
  208. input := cloudwatch.PutMetricDataInput{
  209. Namespace: aws.String(config.CloudWatchNamespace),
  210. MetricData: metrics,
  211. }
  212. func() {
  213. ctx, cancel := context.WithTimeout(collectorCtx, 30*time.Second)
  214. defer cancel()
  215. if _, err := client.PutMetricData(ctx, &input); err != nil {
  216. logrus.Warnf("Can't send CloudWatch metrics: %s", err)
  217. }
  218. }()
  219. case <-collectorCtx.Done():
  220. return
  221. }
  222. }
  223. }