123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package cloudwatch
- import (
- "context"
- "fmt"
- "log/slog"
- "time"
- "github.com/aws/aws-sdk-go-v2/aws"
- awsConfig "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
- cloudwatchTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
- "github.com/imgproxy/imgproxy/v3/monitoring/stats"
- "github.com/imgproxy/imgproxy/v3/vips"
- )
- const (
- // AWS CloudWatch PutMetrics timeout
- putMetricsTimeout = 30 * time.Second
- // default AWS region to set if neither aws env region nor config region are set
- defaultAwsRegion = "us-west-1"
- )
- // CloudWatch holds CloudWatch client and configuration
- type CloudWatch struct {
- config *Config
- stats *stats.Stats
- client *cloudwatch.Client
- collectorCtx context.Context
- collectorCtxCancel context.CancelFunc
- }
- // New creates a new CloudWatch instance
- func New(ctx context.Context, config *Config, stats *stats.Stats) (*CloudWatch, error) {
- cw := &CloudWatch{
- config: config,
- stats: stats,
- }
- if !config.Enabled() {
- return cw, nil
- }
- if err := config.Validate(); err != nil {
- return nil, err
- }
- conf, err := awsConfig.LoadDefaultConfig(ctx)
- if err != nil {
- return nil, fmt.Errorf("can't load CloudWatch config: %s", err)
- }
- if len(config.Region) > 0 {
- conf.Region = config.Region
- }
- if len(conf.Region) == 0 {
- conf.Region = defaultAwsRegion
- }
- cw.client = cloudwatch.NewFromConfig(conf)
- cw.collectorCtx, cw.collectorCtxCancel = context.WithCancel(ctx)
- go cw.runMetricsCollector()
- return cw, nil
- }
- // Enabled returns true if CloudWatch is enabled
- func (cw *CloudWatch) Enabled() bool {
- return cw.config.Enabled()
- }
- // Stop stops the CloudWatch metrics collection
- func (cw *CloudWatch) Stop() {
- if cw.collectorCtxCancel != nil {
- cw.collectorCtxCancel()
- }
- }
- // runMetricsCollector collects and sends metrics to CloudWatch
- func (cw *CloudWatch) runMetricsCollector() {
- tick := time.NewTicker(cw.config.MetricsInterval)
- defer tick.Stop()
- dimension := cloudwatchTypes.Dimension{
- Name: aws.String("ServiceName"),
- Value: aws.String(cw.config.ServiceName),
- }
- dimensions := []cloudwatchTypes.Dimension{dimension}
- namespace := aws.String(cw.config.Namespace)
- // metric names
- metricNameWorkers := aws.String("Workers")
- metricNameRequestsInProgress := aws.String("RequestsInProgress")
- metricNameImagesInProgress := aws.String("ImagesInProgress")
- metricNameConcurrencyUtilization := aws.String("ConcurrencyUtilization")
- metricNameWorkersUtilization := aws.String("WorkersUtilization")
- metricNameVipsMemory := aws.String("VipsMemory")
- metricNameVipsMaxMemory := aws.String("VipsMaxMemory")
- metricNameVipsAllocs := aws.String("VipsAllocs")
- for {
- select {
- case <-tick.C:
- // 8 is the number of metrics we send
- metrics := make([]cloudwatchTypes.MetricDatum, 0, 8)
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameWorkers,
- Unit: cloudwatchTypes.StandardUnitCount,
- Value: aws.Float64(float64(cw.stats.WorkersNumber)),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameRequestsInProgress,
- Unit: cloudwatchTypes.StandardUnitCount,
- Value: aws.Float64(cw.stats.RequestsInProgress()),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameImagesInProgress,
- Unit: cloudwatchTypes.StandardUnitCount,
- Value: aws.Float64(cw.stats.ImagesInProgress()),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameConcurrencyUtilization,
- Unit: cloudwatchTypes.StandardUnitPercent,
- Value: aws.Float64(
- cw.stats.WorkersUtilization(),
- ),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameWorkersUtilization,
- Unit: cloudwatchTypes.StandardUnitPercent,
- Value: aws.Float64(
- cw.stats.WorkersUtilization(),
- ),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameVipsMemory,
- Unit: cloudwatchTypes.StandardUnitBytes,
- Value: aws.Float64(vips.GetMem()),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameVipsMaxMemory,
- Unit: cloudwatchTypes.StandardUnitBytes,
- Value: aws.Float64(vips.GetMemHighwater()),
- })
- metrics = append(metrics, cloudwatchTypes.MetricDatum{
- Dimensions: dimensions,
- MetricName: metricNameVipsAllocs,
- Unit: cloudwatchTypes.StandardUnitCount,
- Value: aws.Float64(vips.GetAllocs()),
- })
- input := cloudwatch.PutMetricDataInput{
- Namespace: namespace,
- MetricData: metrics,
- }
- func() {
- ctx, cancel := context.WithTimeout(cw.collectorCtx, putMetricsTimeout)
- defer cancel()
- if _, err := cw.client.PutMetricData(ctx, &input); err != nil {
- slog.Warn(fmt.Sprintf("can't send CloudWatch metrics: %s", err))
- }
- }()
- case <-cw.collectorCtx.Done():
- return
- }
- }
- }
|