cloudwatch.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package cloudwatch
  2. import (
  3. "context"
  4. "fmt"
  5. "log/slog"
  6. "time"
  7. "github.com/aws/aws-sdk-go-v2/aws"
  8. awsConfig "github.com/aws/aws-sdk-go-v2/config"
  9. "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
  10. cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
  11. "github.com/imgproxy/imgproxy/v3/monitoring/stats"
  12. "github.com/imgproxy/imgproxy/v3/vips"
  13. )
  14. const (
  15. // AWS CloudWatch PutMetrics timeout
  16. putMetricsTimeout = 30 * time.Second
  17. // default AWS region to set if neither aws env region nor config region are set
  18. defaultAwsRegion = "us-west-1"
  19. )
  20. // CloudWatch holds CloudWatch client and configuration
  21. type CloudWatch struct {
  22. config *Config
  23. stats *stats.Stats
  24. client *cloudwatch.Client
  25. collectorCtx context.Context
  26. collectorCtxCancel context.CancelFunc
  27. }
  28. // New creates a new CloudWatch instance
  29. func New(ctx context.Context, config *Config, stats *stats.Stats) (*CloudWatch, error) {
  30. cw := &CloudWatch{
  31. config: config,
  32. stats: stats,
  33. }
  34. if !config.Enabled() {
  35. return cw, nil
  36. }
  37. if err := config.Validate(); err != nil {
  38. return nil, err
  39. }
  40. conf, err := awsConfig.LoadDefaultConfig(ctx)
  41. if err != nil {
  42. return nil, fmt.Errorf("can't load CloudWatch config: %s", err)
  43. }
  44. if len(config.Region) > 0 {
  45. conf.Region = config.Region
  46. }
  47. if len(conf.Region) == 0 {
  48. conf.Region = defaultAwsRegion
  49. }
  50. cw.client = cloudwatch.NewFromConfig(conf)
  51. cw.collectorCtx, cw.collectorCtxCancel = context.WithCancel(ctx)
  52. go cw.runMetricsCollector()
  53. return cw, nil
  54. }
  55. // Enabled returns true if CloudWatch is enabled
  56. func (cw *CloudWatch) Enabled() bool {
  57. return cw.config.Enabled()
  58. }
  59. // Stop stops the CloudWatch metrics collection
  60. func (cw *CloudWatch) Stop() {
  61. if cw.collectorCtxCancel != nil {
  62. cw.collectorCtxCancel()
  63. }
  64. }
  65. // runMetricsCollector collects and sends metrics to CloudWatch
  66. func (cw *CloudWatch) runMetricsCollector() {
  67. tick := time.NewTicker(cw.config.MetricsInterval)
  68. defer tick.Stop()
  69. dimension := cloudwatchTypes.Dimension{
  70. Name: aws.String("ServiceName"),
  71. Value: aws.String(cw.config.ServiceName),
  72. }
  73. dimensions := []cloudwatchTypes.Dimension{dimension}
  74. namespace := aws.String(cw.config.Namespace)
  75. // metric names
  76. metricNameWorkers := aws.String("Workers")
  77. metricNameRequestsInProgress := aws.String("RequestsInProgress")
  78. metricNameImagesInProgress := aws.String("ImagesInProgress")
  79. metricNameConcurrencyUtilization := aws.String("ConcurrencyUtilization")
  80. metricNameWorkersUtilization := aws.String("WorkersUtilization")
  81. metricNameVipsMemory := aws.String("VipsMemory")
  82. metricNameVipsMaxMemory := aws.String("VipsMaxMemory")
  83. metricNameVipsAllocs := aws.String("VipsAllocs")
  84. for {
  85. select {
  86. case <-tick.C:
  87. // 8 is the number of metrics we send
  88. metrics := make([]cloudwatchTypes.MetricDatum, 0, 8)
  89. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  90. Dimensions: dimensions,
  91. MetricName: metricNameWorkers,
  92. Unit: cloudwatchTypes.StandardUnitCount,
  93. Value: aws.Float64(float64(cw.stats.WorkersNumber)),
  94. })
  95. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  96. Dimensions: dimensions,
  97. MetricName: metricNameRequestsInProgress,
  98. Unit: cloudwatchTypes.StandardUnitCount,
  99. Value: aws.Float64(cw.stats.RequestsInProgress()),
  100. })
  101. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  102. Dimensions: dimensions,
  103. MetricName: metricNameImagesInProgress,
  104. Unit: cloudwatchTypes.StandardUnitCount,
  105. Value: aws.Float64(cw.stats.ImagesInProgress()),
  106. })
  107. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  108. Dimensions: dimensions,
  109. MetricName: metricNameConcurrencyUtilization,
  110. Unit: cloudwatchTypes.StandardUnitPercent,
  111. Value: aws.Float64(
  112. cw.stats.WorkersUtilization(),
  113. ),
  114. })
  115. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  116. Dimensions: dimensions,
  117. MetricName: metricNameWorkersUtilization,
  118. Unit: cloudwatchTypes.StandardUnitPercent,
  119. Value: aws.Float64(
  120. cw.stats.WorkersUtilization(),
  121. ),
  122. })
  123. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  124. Dimensions: dimensions,
  125. MetricName: metricNameVipsMemory,
  126. Unit: cloudwatchTypes.StandardUnitBytes,
  127. Value: aws.Float64(vips.GetMem()),
  128. })
  129. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  130. Dimensions: dimensions,
  131. MetricName: metricNameVipsMaxMemory,
  132. Unit: cloudwatchTypes.StandardUnitBytes,
  133. Value: aws.Float64(vips.GetMemHighwater()),
  134. })
  135. metrics = append(metrics, cloudwatchTypes.MetricDatum{
  136. Dimensions: dimensions,
  137. MetricName: metricNameVipsAllocs,
  138. Unit: cloudwatchTypes.StandardUnitCount,
  139. Value: aws.Float64(vips.GetAllocs()),
  140. })
  141. input := cloudwatch.PutMetricDataInput{
  142. Namespace: namespace,
  143. MetricData: metrics,
  144. }
  145. func() {
  146. ctx, cancel := context.WithTimeout(cw.collectorCtx, putMetricsTimeout)
  147. defer cancel()
  148. if _, err := cw.client.PutMetricData(ctx, &input); err != nil {
  149. slog.Warn(fmt.Sprintf("can't send CloudWatch metrics: %s", err))
  150. }
  151. }()
  152. case <-cw.collectorCtx.Done():
  153. return
  154. }
  155. }
  156. }