1
0

cloudwatch.go 6.6 KB


  1. package cloudwatch
  2. import (
  3. "context"
  4. "fmt"
  5. "slices"
  6. "sync"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/aws"
  9. awsConfig "github.com/aws/aws-sdk-go-v2/config"
  10. "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
  11. cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
  12. "github.com/sirupsen/logrus"
  13. "github.com/imgproxy/imgproxy/v3/config"
  14. "github.com/imgproxy/imgproxy/v3/imath"
  15. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  16. )
  17. type GaugeFunc func() float64
  18. type gauge struct {
  19. unit cloudwatchTypes.StandardUnit
  20. f GaugeFunc
  21. }
  22. type bufferStats struct {
  23. count int
  24. sum, min, max int
  25. }
  26. var (
  27. enabled bool
  28. client *cloudwatch.Client
  29. gauges = make(map[string]gauge)
  30. gaugesMutex sync.RWMutex
  31. collectorCtx context.Context
  32. collectorCtxCancel context.CancelFunc
  33. bufferDefaultSizes = make(map[string]int)
  34. bufferMaxSizes = make(map[string]int)
  35. bufferSizeStats = make(map[string]*bufferStats)
  36. bufferStatsMutex sync.Mutex
  37. )
  38. func Init() error {
  39. if len(config.CloudWatchServiceName) == 0 {
  40. return nil
  41. }
  42. conf, err := awsConfig.LoadDefaultConfig(context.Background())
  43. if err != nil {
  44. return fmt.Errorf("can't load CloudWatch config: %s", err)
  45. }
  46. if len(config.CloudWatchRegion) != 0 {
  47. conf.Region = config.CloudWatchRegion
  48. }
  49. if len(conf.Region) == 0 {
  50. conf.Region = "us-west-1"
  51. }
  52. client = cloudwatch.NewFromConfig(conf)
  53. collectorCtx, collectorCtxCancel = context.WithCancel(context.Background())
  54. go runMetricsCollector()
  55. enabled = true
  56. return nil
  57. }
  58. func Stop() {
  59. if enabled {
  60. collectorCtxCancel()
  61. }
  62. }
  63. func Enabled() bool {
  64. return enabled
  65. }
  66. func AddGaugeFunc(name, unit string, f GaugeFunc) {
  67. gaugesMutex.Lock()
  68. defer gaugesMutex.Unlock()
  69. standardUnit := cloudwatchTypes.StandardUnit(unit)
  70. if !slices.Contains(cloudwatchTypes.StandardUnitNone.Values(), standardUnit) {
  71. panic(fmt.Errorf("Unknown CloudWatch unit: %s", unit))
  72. }
  73. gauges[name] = gauge{unit: standardUnit, f: f}
  74. }
  75. func ObserveBufferSize(t string, size int) {
  76. if enabled {
  77. bufferStatsMutex.Lock()
  78. defer bufferStatsMutex.Unlock()
  79. sizef := size
  80. stats, ok := bufferSizeStats[t]
  81. if !ok {
  82. stats = &bufferStats{count: 1, sum: sizef, min: sizef, max: sizef}
  83. bufferSizeStats[t] = stats
  84. return
  85. }
  86. stats.count += 1
  87. stats.sum += sizef
  88. stats.min = imath.Min(stats.min, sizef)
  89. stats.max = imath.Max(stats.max, sizef)
  90. }
  91. }
  92. func SetBufferDefaultSize(t string, size int) {
  93. if enabled {
  94. bufferStatsMutex.Lock()
  95. defer bufferStatsMutex.Unlock()
  96. bufferDefaultSizes[t] = size
  97. }
  98. }
  99. func SetBufferMaxSize(t string, size int) {
  100. if enabled {
  101. bufferStatsMutex.Lock()
  102. defer bufferStatsMutex.Unlock()
  103. bufferMaxSizes[t] = size
  104. }
  105. }
  106. func runMetricsCollector() {
  107. tick := time.NewTicker(10 * time.Second)
  108. defer tick.Stop()
  109. dimension := cloudwatchTypes.Dimension{
  110. Name: aws.String("ServiceName"),
  111. Value: aws.String(config.CloudWatchServiceName),
  112. }
  113. bufferDimensions := make(map[string]cloudwatchTypes.Dimension)
  114. bufferDimension := func(t string) cloudwatchTypes.Dimension {
  115. if d, ok := bufferDimensions[t]; ok {
  116. return d
  117. }
  118. d := cloudwatchTypes.Dimension{
  119. Name: aws.String("BufferType"),
  120. Value: aws.String(t),
  121. }
  122. bufferDimensions[t] = d
  123. return d
  124. }
  125. for {
  126. select {
  127. case <-tick.C:
  128. metricsCount := len(gauges) + len(bufferDefaultSizes) + len(bufferMaxSizes) + len(bufferSizeStats) + 3
  129. metrics := make([]cloudwatchTypes.MetricDatum, 0, metricsCount)
  130. func() {
  131. gaugesMutex.RLock()
  132. defer gaugesMutex.RUnlock()
  133. for name, g := range gauges {
  134. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  135. Dimensions: []cloudwatchTypes.Dimension{dimension},
  136. MetricName: aws.String(name),
  137. Unit: g.unit,
  138. Value: aws.Float64(g.f()),
  139. })
  140. }
  141. }()
  142. func() {
  143. bufferStatsMutex.Lock()
  144. defer bufferStatsMutex.Unlock()
  145. for t, size := range bufferDefaultSizes {
  146. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  147. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  148. MetricName: aws.String("BufferDefaultSize"),
  149. Unit: cloudwatchTypes.StandardUnitBytes,
  150. Value: aws.Float64(float64(size)),
  151. })
  152. }
  153. for t, size := range bufferMaxSizes {
  154. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  155. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  156. MetricName: aws.String("BufferMaximumSize"),
  157. Unit: cloudwatchTypes.StandardUnitBytes,
  158. Value: aws.Float64(float64(size)),
  159. })
  160. }
  161. for t, stats := range bufferSizeStats {
  162. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  163. Dimensions: []cloudwatchTypes.Dimension{dimension, bufferDimension(t)},
  164. MetricName: aws.String("BufferSize"),
  165. Unit: cloudwatchTypes.StandardUnitBytes,
  166. StatisticValues: &cloudwatchTypes.StatisticSet{
  167. SampleCount: aws.Float64(float64(stats.count)),
  168. Sum: aws.Float64(float64(stats.sum)),
  169. Minimum: aws.Float64(float64(stats.min)),
  170. Maximum: aws.Float64(float64(stats.max)),
  171. },
  172. })
  173. }
  174. }()
  175. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  176. Dimensions: []cloudwatchTypes.Dimension{dimension},
  177. MetricName: aws.String("RequestsInProgress"),
  178. Unit: cloudwatchTypes.StandardUnitCount,
  179. Value: aws.Float64(stats.RequestsInProgress()),
  180. })
  181. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  182. Dimensions: []cloudwatchTypes.Dimension{dimension},
  183. MetricName: aws.String("ImagesInProgress"),
  184. Unit: cloudwatchTypes.StandardUnitCount,
  185. Value: aws.Float64(stats.ImagesInProgress()),
  186. })
  187. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  188. Dimensions: []cloudwatchTypes.Dimension{dimension},
  189. MetricName: aws.String("ConcurrencyUtilization"),
  190. Unit: cloudwatchTypes.StandardUnitPercent,
  191. Value: aws.Float64(
  192. stats.RequestsInProgress() / float64(config.Workers) * 100.0,
  193. ),
  194. })
  195. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  196. Dimensions: []cloudwatchTypes.Dimension{dimension},
  197. MetricName: aws.String("WorkersUtilization"),
  198. Unit: cloudwatchTypes.StandardUnitPercent,
  199. Value: aws.Float64(
  200. stats.RequestsInProgress() / float64(config.Workers) * 100.0,
  201. ),
  202. })
  203. input := cloudwatch.PutMetricDataInput{
  204. Namespace: aws.String(config.CloudWatchNamespace),
  205. MetricData: metrics,
  206. }
  207. func() {
  208. ctx, cancel := context.WithTimeout(collectorCtx, 30*time.Second)
  209. defer cancel()
  210. if _, err := client.PutMetricData(ctx, &input); err != nil {
  211. logrus.Warnf("Can't send CloudWatch metrics: %s", err)
  212. }
  213. }()
  214. case <-collectorCtx.Done():
  215. return
  216. }
  217. }
  218. }