|
@@ -2,18 +2,12 @@ package otel
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
- "crypto/tls"
|
|
|
- "crypto/x509"
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"log/slog"
|
|
|
"net/http"
|
|
|
"os"
|
|
|
"reflect"
|
|
|
"runtime"
|
|
|
- "strconv"
|
|
|
- "strings"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/felixge/httpsnoop"
|
|
@@ -26,11 +20,6 @@ import (
|
|
|
"go.opentelemetry.io/otel"
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
"go.opentelemetry.io/otel/codes"
|
|
|
- "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
|
|
- "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
|
|
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
|
|
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
|
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
|
|
"go.opentelemetry.io/otel/metric"
|
|
|
"go.opentelemetry.io/otel/propagation"
|
|
|
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
|
@@ -39,10 +28,7 @@ import (
|
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
|
|
|
"go.opentelemetry.io/otel/semconv/v1.20.0/httpconv"
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
- "google.golang.org/grpc/credentials"
|
|
|
|
|
|
- "github.com/imgproxy/imgproxy/v3/config"
|
|
|
- "github.com/imgproxy/imgproxy/v3/config/configurators"
|
|
|
"github.com/imgproxy/imgproxy/v3/ierrors"
|
|
|
"github.com/imgproxy/imgproxy/v3/monitoring/errformat"
|
|
|
"github.com/imgproxy/imgproxy/v3/monitoring/stats"
|
|
@@ -50,11 +36,28 @@ import (
|
|
|
"github.com/imgproxy/imgproxy/v3/vips"
|
|
|
)
|
|
|
|
|
|
+const (
|
|
|
+ // stopTimeout is the maximum time to wait for the shutdown of the tracer and meter providers
|
|
|
+ stopTimeout = 5 * time.Second
|
|
|
+
|
|
|
+ // defaultOtelServiceName is the default service name for OpenTelemetry if none is set
|
|
|
+ defaultOtelServiceName = "imgproxy"
|
|
|
+)
|
|
|
+
|
|
|
+// hasSpanCtxKey is a context key to mark that there is a span in the context
|
|
|
type hasSpanCtxKey struct{}
|
|
|
|
|
|
-var (
|
|
|
- enabled bool
|
|
|
- enabledMetrics bool
|
|
|
+// errorHandler is an implementation of the OpenTelemetry error handler interface
|
|
|
+type errorHandler struct{}
|
|
|
+
|
|
|
+func (h errorHandler) Handle(err error) {
|
|
|
+ slog.Warn(err.Error(), "source", "opentelemetry")
|
|
|
+}
|
|
|
+
|
|
|
+// Otel holds OpenTelemetry tracer and meter providers and configuration
|
|
|
+type Otel struct {
|
|
|
+ config *Config
|
|
|
+ stats *stats.Stats
|
|
|
|
|
|
tracerProvider *sdktrace.TracerProvider
|
|
|
tracer trace.Tracer
|
|
@@ -63,52 +66,39 @@ var (
|
|
|
meter metric.Meter
|
|
|
|
|
|
propagator propagation.TextMapPropagator
|
|
|
+}
|
|
|
|
|
|
- bufferSizeHist metric.Int64Histogram
|
|
|
- bufferDefaultSizes = make(map[string]int)
|
|
|
- bufferMaxSizes = make(map[string]int)
|
|
|
- bufferStatsMutex sync.Mutex
|
|
|
-)
|
|
|
+// New creates a new Otel instance
|
|
|
+func New(config *Config, stats *stats.Stats) (*Otel, error) {
|
|
|
+ o := &Otel{
|
|
|
+ config: config,
|
|
|
+ stats: stats,
|
|
|
+ }
|
|
|
|
|
|
-func Init() error {
|
|
|
- mapDeprecatedConfig()
|
|
|
+ if !config.Enabled() {
|
|
|
+ return o, nil
|
|
|
+ }
|
|
|
|
|
|
- if !config.OpenTelemetryEnable {
|
|
|
- return nil
|
|
|
+ if err := config.Validate(); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
otel.SetErrorHandler(errorHandler{})
|
|
|
|
|
|
- var (
|
|
|
- traceExporter *otlptrace.Exporter
|
|
|
- metricExporter sdkmetric.Exporter
|
|
|
- err error
|
|
|
- )
|
|
|
-
|
|
|
- protocol := "grpc"
|
|
|
- configurators.String(&protocol, "OTEL_EXPORTER_OTLP_PROTOCOL")
|
|
|
-
|
|
|
- switch protocol {
|
|
|
- case "grpc":
|
|
|
- traceExporter, metricExporter, err = buildGRPCExporters()
|
|
|
- case "http/protobuf", "http", "https":
|
|
|
- traceExporter, metricExporter, err = buildHTTPExporters()
|
|
|
- default:
|
|
|
- return fmt.Errorf("Unsupported OpenTelemetry protocol: %s", protocol)
|
|
|
- }
|
|
|
-
|
|
|
+ traceExporter, metricExporter, err := buildProtocolExporter(config)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
- if len(os.Getenv("OTEL_SERVICE_NAME")) == 0 {
|
|
|
- os.Setenv("OTEL_SERVICE_NAME", "imgproxy")
|
|
|
+ // If no service name is set, use "imgproxy" as default, and write it into the environment
|
|
|
+ if n, _ := OTEL_SERVICE_NAME.Get(); len(n) == 0 {
|
|
|
+ os.Setenv(OTEL_SERVICE_NAME.Name, defaultOtelServiceName)
|
|
|
}
|
|
|
|
|
|
res, _ := resource.Merge(
|
|
|
resource.Default(),
|
|
|
resource.NewSchemaless(
|
|
|
- semconv.ServiceVersionKey.String(version.Version),
|
|
|
+ semconv.ServiceVersion(version.Version),
|
|
|
),
|
|
|
)
|
|
|
|
|
@@ -122,7 +112,7 @@ func Init() error {
|
|
|
if merged, merr := resource.Merge(awsRes, res); merr == nil {
|
|
|
res = merged
|
|
|
} else {
|
|
|
- slog.Warn(fmt.Sprintf("Can't add AWS attributes to OpenTelemetry: %s", merr))
|
|
|
+ slog.Warn(fmt.Sprintf("can't add AWS attributes to OpenTelemetry: %s", merr))
|
|
|
}
|
|
|
|
|
|
opts := []sdktrace.TracerProviderOption{
|
|
@@ -130,271 +120,76 @@ func Init() error {
|
|
|
sdktrace.WithBatcher(traceExporter),
|
|
|
}
|
|
|
|
|
|
- switch g := config.OpenTelemetryTraceIDGenerator; g {
|
|
|
+ switch g := config.TraceIDGenerator; g {
|
|
|
case "xray":
|
|
|
idg := xray.NewIDGenerator()
|
|
|
opts = append(opts, sdktrace.WithIDGenerator(idg))
|
|
|
case "random":
|
|
|
// Do nothing. OTel uses random generator by default
|
|
|
default:
|
|
|
- return fmt.Errorf("Unknown Trace ID generator: %s", g)
|
|
|
+ return nil, fmt.Errorf("unknown Trace ID generator: %s", g)
|
|
|
}
|
|
|
|
|
|
- tracerProvider = sdktrace.NewTracerProvider(opts...)
|
|
|
-
|
|
|
- tracer = tracerProvider.Tracer("imgproxy")
|
|
|
-
|
|
|
- var propagatorNames []string
|
|
|
- configurators.StringSlice(&propagatorNames, "OTEL_PROPAGATORS")
|
|
|
+ o.tracerProvider = sdktrace.NewTracerProvider(opts...)
|
|
|
+ o.tracer = o.tracerProvider.Tracer("imgproxy")
|
|
|
|
|
|
- if len(propagatorNames) > 0 {
|
|
|
- propagator, err = autoprop.TextMapPropagator(propagatorNames...)
|
|
|
+ if len(config.Propagators) > 0 {
|
|
|
+ o.propagator, err = autoprop.TextMapPropagator(config.Propagators...)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- enabled = true
|
|
|
-
|
|
|
if metricExporter == nil {
|
|
|
- return nil
|
|
|
+ return o, nil
|
|
|
}
|
|
|
|
|
|
metricReader := sdkmetric.NewPeriodicReader(
|
|
|
metricExporter,
|
|
|
- sdkmetric.WithInterval(5*time.Second),
|
|
|
+ sdkmetric.WithInterval(config.MetricsInterval),
|
|
|
)
|
|
|
|
|
|
- meterProvider = sdkmetric.NewMeterProvider(
|
|
|
+ o.meterProvider = sdkmetric.NewMeterProvider(
|
|
|
sdkmetric.WithResource(res),
|
|
|
sdkmetric.WithReader(metricReader),
|
|
|
)
|
|
|
|
|
|
- meter = meterProvider.Meter("imgproxy")
|
|
|
-
|
|
|
- if err = addDefaultMetrics(); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- enabledMetrics = true
|
|
|
-
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func mapDeprecatedConfig() {
|
|
|
- endpoint := os.Getenv("IMGPROXY_OPEN_TELEMETRY_ENDPOINT")
|
|
|
- if len(endpoint) > 0 {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_ENDPOINT config is deprecated. Use IMGPROXY_OPEN_TELEMETRY_ENABLE and OTEL_EXPORTER_OTLP_ENDPOINT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
- config.OpenTelemetryEnable = true
|
|
|
- }
|
|
|
-
|
|
|
- if !config.OpenTelemetryEnable {
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- protocol := "grpc"
|
|
|
-
|
|
|
- if prot := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROTOCOL"); len(prot) > 0 {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_PROTOCOL config is deprecated. Use OTEL_EXPORTER_OTLP_PROTOCOL instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
- protocol = prot
|
|
|
- os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", protocol)
|
|
|
- }
|
|
|
-
|
|
|
- if len(endpoint) > 0 {
|
|
|
- schema := "https"
|
|
|
-
|
|
|
- switch protocol {
|
|
|
- case "grpc":
|
|
|
- if insecure, _ := strconv.ParseBool(os.Getenv("IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE")); insecure {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE config is deprecated. Use OTEL_EXPORTER_OTLP_ENDPOINT with the `http://` schema instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
- schema = "http"
|
|
|
- }
|
|
|
- case "http":
|
|
|
- schema = "http"
|
|
|
- }
|
|
|
-
|
|
|
- os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("%s://%s", schema, endpoint))
|
|
|
- }
|
|
|
-
|
|
|
- if serviceName := os.Getenv("IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME"); len(serviceName) > 0 {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME config is deprecated. Use OTEL_SERVICE_NAME instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
- os.Setenv("OTEL_SERVICE_NAME", serviceName)
|
|
|
- }
|
|
|
-
|
|
|
- if propagators := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROPAGATORS"); len(propagators) > 0 {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_PROPAGATORS config is deprecated. Use OTEL_PROPAGATORS instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
- os.Setenv("OTEL_PROPAGATORS", propagators)
|
|
|
- }
|
|
|
-
|
|
|
- if timeout := os.Getenv("IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT"); len(timeout) > 0 {
|
|
|
- slog.Warn("The IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT config is deprecated. Use OTEL_EXPORTER_OTLP_TIMEOUT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
|
|
|
-
|
|
|
- if to, _ := strconv.Atoi(timeout); to > 0 {
|
|
|
- os.Setenv("OTEL_EXPORTER_OTLP_TIMEOUT", strconv.Itoa(to*1000))
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
|
|
|
- tracerOpts := []otlptracegrpc.Option{}
|
|
|
- meterOpts := []otlpmetricgrpc.Option{}
|
|
|
-
|
|
|
- if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
|
|
|
- creds := credentials.NewTLS(tlsConf)
|
|
|
- tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
|
|
|
- meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
|
|
|
- } else if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
|
|
|
- if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
|
|
|
- defer trcancel()
|
|
|
-
|
|
|
- traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
|
|
|
- if err != nil {
|
|
|
- err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- if !config.OpenTelemetryEnableMetrics {
|
|
|
- return traceExporter, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
|
|
|
- defer mtcancel()
|
|
|
-
|
|
|
- metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
|
|
|
- if err != nil {
|
|
|
- err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- return traceExporter, metricExporter, err
|
|
|
-}
|
|
|
-
|
|
|
-func buildHTTPExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
|
|
|
- tracerOpts := []otlptracehttp.Option{}
|
|
|
- meterOpts := []otlpmetrichttp.Option{}
|
|
|
-
|
|
|
- if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
|
|
|
- tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
|
|
|
- meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
|
|
|
- } else if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
|
|
|
- if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
|
|
|
- defer trcancel()
|
|
|
-
|
|
|
- traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
|
|
|
- if err != nil {
|
|
|
- err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- if !config.OpenTelemetryEnableMetrics {
|
|
|
- return traceExporter, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
|
|
|
- defer mtcancel()
|
|
|
-
|
|
|
- metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
|
|
|
- if err != nil {
|
|
|
- err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- return traceExporter, metricExporter, err
|
|
|
-}
|
|
|
-
|
|
|
-func getConnectionTimeouts() (time.Duration, time.Duration, error) {
|
|
|
- connTimeout := 10000
|
|
|
- configurators.Int(&connTimeout, "OTEL_EXPORTER_OTLP_TIMEOUT")
|
|
|
-
|
|
|
- tracesConnTimeout := connTimeout
|
|
|
- configurators.Int(&tracesConnTimeout, "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT")
|
|
|
-
|
|
|
- metricsConnTimeout := connTimeout
|
|
|
- configurators.Int(&metricsConnTimeout, "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT")
|
|
|
+ o.meter = o.meterProvider.Meter("imgproxy")
|
|
|
|
|
|
- if tracesConnTimeout <= 0 {
|
|
|
- return 0, 0, errors.New("Opentelemetry traces timeout should be greater than 0")
|
|
|
+ if err = o.addDefaultMetrics(); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
- if metricsConnTimeout <= 0 {
|
|
|
- return 0, 0, errors.New("Opentelemetry metrics timeout should be greater than 0")
|
|
|
- }
|
|
|
-
|
|
|
- return time.Duration(tracesConnTimeout) * time.Millisecond,
|
|
|
- time.Duration(metricsConnTimeout) * time.Millisecond,
|
|
|
- nil
|
|
|
-}
|
|
|
-
|
|
|
-func buildTLSConfig() (*tls.Config, error) {
|
|
|
- if len(config.OpenTelemetryServerCert) == 0 {
|
|
|
- return nil, nil
|
|
|
- }
|
|
|
-
|
|
|
- certPool := x509.NewCertPool()
|
|
|
- if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
|
|
|
- return nil, errors.New("Can't load OpenTelemetry server cert")
|
|
|
- }
|
|
|
-
|
|
|
- tlsConf := tls.Config{RootCAs: certPool}
|
|
|
-
|
|
|
- if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
|
|
|
- cert, err := tls.X509KeyPair(
|
|
|
- prepareKeyCert(config.OpenTelemetryClientCert),
|
|
|
- prepareKeyCert(config.OpenTelemetryClientKey),
|
|
|
- )
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- tlsConf.Certificates = []tls.Certificate{cert}
|
|
|
- }
|
|
|
-
|
|
|
- return &tlsConf, nil
|
|
|
+ return o, nil
|
|
|
}
|
|
|
|
|
|
-func prepareKeyCert(str string) []byte {
|
|
|
- return []byte(strings.ReplaceAll(str, `\n`, "\n"))
|
|
|
+func (o *Otel) Enabled() bool {
|
|
|
+ return o.config.Enabled()
|
|
|
}
|
|
|
|
|
|
-func Stop() {
|
|
|
- if enabled {
|
|
|
- trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+func (o *Otel) Stop(ctx context.Context) {
|
|
|
+ if o.tracerProvider != nil {
|
|
|
+ trctx, trcancel := context.WithTimeout(ctx, stopTimeout)
|
|
|
defer trcancel()
|
|
|
|
|
|
- tracerProvider.Shutdown(trctx)
|
|
|
+ o.tracerProvider.Shutdown(trctx)
|
|
|
+ }
|
|
|
|
|
|
- if meterProvider != nil {
|
|
|
- mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
- defer mtcancel()
|
|
|
+ if o.meterProvider != nil {
|
|
|
+ mtctx, mtcancel := context.WithTimeout(ctx, stopTimeout)
|
|
|
+ defer mtcancel()
|
|
|
|
|
|
- meterProvider.Shutdown(mtctx)
|
|
|
- }
|
|
|
+ o.meterProvider.Shutdown(mtctx)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func Enabled() bool {
|
|
|
- return enabled
|
|
|
-}
|
|
|
-
|
|
|
-func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
|
|
|
- if !enabled {
|
|
|
+func (o *Otel) StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
|
|
|
+ if !o.Enabled() {
|
|
|
return ctx, func() {}, rw
|
|
|
}
|
|
|
|
|
|
- if propagator != nil {
|
|
|
- ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
|
|
|
+ if o.propagator != nil {
|
|
|
+ ctx = o.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
|
|
|
}
|
|
|
|
|
|
server := r.Host
|
|
@@ -402,7 +197,7 @@ func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request)
|
|
|
server = "imgproxy"
|
|
|
}
|
|
|
|
|
|
- ctx, span := tracer.Start(
|
|
|
+ ctx, span := o.tracer.Start(
|
|
|
ctx, "/request",
|
|
|
trace.WithSpanKind(trace.SpanKindServer),
|
|
|
trace.WithAttributes(httpconv.ServerRequest(server, r)...),
|
|
@@ -459,8 +254,8 @@ func setMetadata(span trace.Span, key string, value interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func SetMetadata(ctx context.Context, key string, value interface{}) {
|
|
|
- if !enabled {
|
|
|
+func (o *Otel) SetMetadata(ctx context.Context, key string, value interface{}) {
|
|
|
+ if !o.Enabled() {
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -471,13 +266,13 @@ func SetMetadata(ctx context.Context, key string, value interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func StartSpan(ctx context.Context, name string, meta map[string]any) context.CancelFunc {
|
|
|
- if !enabled {
|
|
|
+func (o *Otel) StartSpan(ctx context.Context, name string, meta map[string]any) context.CancelFunc {
|
|
|
+ if !o.Enabled() {
|
|
|
return func() {}
|
|
|
}
|
|
|
|
|
|
if ctx.Value(hasSpanCtxKey{}) != nil {
|
|
|
- _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
|
|
|
+ _, span := o.tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
|
|
|
|
|
|
for k, v := range meta {
|
|
|
setMetadata(span, k, v)
|
|
@@ -489,8 +284,8 @@ func StartSpan(ctx context.Context, name string, meta map[string]any) context.Ca
|
|
|
return func() {}
|
|
|
}
|
|
|
|
|
|
-func SendError(ctx context.Context, errType string, err error) {
|
|
|
- if !enabled {
|
|
|
+func (o *Otel) SendError(ctx context.Context, errType string, err error) {
|
|
|
+ if !o.Enabled() {
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -510,195 +305,167 @@ func SendError(ctx context.Context, errType string, err error) {
|
|
|
span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
|
|
|
}
|
|
|
|
|
|
-func addDefaultMetrics() error {
|
|
|
+func (o *Otel) addDefaultMetrics() error {
|
|
|
proc, err := process.NewProcess(int32(os.Getpid()))
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't initialize process data for OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- processResidentMemory, err := meter.Int64ObservableGauge(
|
|
|
+ processResidentMemory, err := o.meter.Int64ObservableGauge(
|
|
|
"process_resident_memory_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("Resident memory size in bytes."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- processVirtualMemory, err := meter.Int64ObservableGauge(
|
|
|
+ processVirtualMemory, err := o.meter.Int64ObservableGauge(
|
|
|
"process_virtual_memory_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("Virtual memory size in bytes."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- goMemstatsSys, err := meter.Int64ObservableGauge(
|
|
|
+ goMemstatsSys, err := o.meter.Int64ObservableGauge(
|
|
|
"go_memstats_sys_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("Number of bytes obtained from system."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
|
|
|
+ goMemstatsHeapIdle, err := o.meter.Int64ObservableGauge(
|
|
|
"go_memstats_heap_idle_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("Number of heap bytes waiting to be used."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
|
|
|
+ goMemstatsHeapInuse, err := o.meter.Int64ObservableGauge(
|
|
|
"go_memstats_heap_inuse_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("Number of heap bytes that are in use."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- goGoroutines, err := meter.Int64ObservableGauge(
|
|
|
+ goGoroutines, err := o.meter.Int64ObservableGauge(
|
|
|
"go_goroutines",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("Number of goroutines that currently exist."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add go_goroutines gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- goThreads, err := meter.Int64ObservableGauge(
|
|
|
+ goThreads, err := o.meter.Int64ObservableGauge(
|
|
|
"go_threads",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("Number of OS threads created."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add go_threads gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- workersGauge, err := meter.Int64ObservableGauge(
|
|
|
+ workersGauge, err := o.meter.Int64ObservableGauge(
|
|
|
"workers",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("A gauge of the number of running workers."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add workets gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add workers gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- requestsInProgressGauge, err := meter.Float64ObservableGauge(
|
|
|
+ requestsInProgressGauge, err := o.meter.Float64ObservableGauge(
|
|
|
"requests_in_progress",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("A gauge of the number of requests currently being in progress."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add requests_in_progress gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- imagesInProgressGauge, err := meter.Float64ObservableGauge(
|
|
|
+ imagesInProgressGauge, err := o.meter.Float64ObservableGauge(
|
|
|
"images_in_progress",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("A gauge of the number of images currently being in progress."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add images_in_progress gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- workersUtilizationGauge, err := meter.Float64ObservableGauge(
|
|
|
+ workersUtilizationGauge, err := o.meter.Float64ObservableGauge(
|
|
|
"workers_utilization",
|
|
|
metric.WithUnit("%"),
|
|
|
metric.WithDescription("A gauge of the workers utilization in percents."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add workers_utilization gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add workers_utilization gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
|
|
|
- "buffer_default_size_bytes",
|
|
|
- metric.WithUnit("By"),
|
|
|
- metric.WithDescription("A gauge of the buffer default size in bytes."),
|
|
|
- )
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
|
|
|
- "buffer_max_size_bytes",
|
|
|
- metric.WithUnit("By"),
|
|
|
- metric.WithDescription("A gauge of the buffer max size in bytes."),
|
|
|
- )
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- vipsMemory, err := meter.Float64ObservableGauge(
|
|
|
+ vipsMemory, err := o.meter.Float64ObservableGauge(
|
|
|
"vips_memory_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("A gauge of the vips tracked memory usage in bytes."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add vips_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add vips_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- vipsMaxMemory, err := meter.Float64ObservableGauge(
|
|
|
+ vipsMaxMemory, err := o.meter.Float64ObservableGauge(
|
|
|
"vips_max_memory_bytes",
|
|
|
metric.WithUnit("By"),
|
|
|
metric.WithDescription("A gauge of the max vips tracked memory usage in bytes."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add vips_max_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add vips_max_memory_bytes gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- vipsAllocs, err := meter.Float64ObservableGauge(
|
|
|
+ vipsAllocs, err := o.meter.Float64ObservableGauge(
|
|
|
"vips_allocs",
|
|
|
metric.WithUnit("1"),
|
|
|
metric.WithDescription("A gauge of the number of active vips allocations."),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't add vips_allocs gauge to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't add vips_allocs gauge to OpenTelemetry: %s", err)
|
|
|
}
|
|
|
|
|
|
- _, err = meter.RegisterCallback(
|
|
|
- func(ctx context.Context, o metric.Observer) error {
|
|
|
+ _, err = o.meter.RegisterCallback(
|
|
|
+ func(ctx context.Context, ob metric.Observer) error {
|
|
|
memStats, merr := proc.MemoryInfo()
|
|
|
if merr != nil {
|
|
|
return merr
|
|
|
}
|
|
|
|
|
|
- o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
|
|
|
- o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
|
|
|
+ ob.ObserveInt64(processResidentMemory, int64(memStats.RSS))
|
|
|
+ ob.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
|
|
|
|
|
|
goMemStats := &runtime.MemStats{}
|
|
|
runtime.ReadMemStats(goMemStats)
|
|
|
|
|
|
- o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
|
|
|
- o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
|
|
|
- o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
|
|
|
+ ob.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
|
|
|
+ ob.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
|
|
|
+ ob.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
|
|
|
|
|
|
threadsNum, _ := runtime.ThreadCreateProfile(nil)
|
|
|
- o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
|
|
|
- o.ObserveInt64(goThreads, int64(threadsNum))
|
|
|
-
|
|
|
- o.ObserveInt64(workersGauge, int64(config.Workers))
|
|
|
- o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
|
|
|
- o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
|
|
|
- o.ObserveFloat64(workersUtilizationGauge, stats.WorkersUtilization())
|
|
|
-
|
|
|
- bufferStatsMutex.Lock()
|
|
|
- defer bufferStatsMutex.Unlock()
|
|
|
+ ob.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
|
|
|
+ ob.ObserveInt64(goThreads, int64(threadsNum))
|
|
|
|
|
|
- for t, v := range bufferDefaultSizes {
|
|
|
- o.ObserveInt64(bufferDefaultSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
|
|
|
- }
|
|
|
- for t, v := range bufferMaxSizes {
|
|
|
- o.ObserveInt64(bufferMaxSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
|
|
|
- }
|
|
|
+ ob.ObserveInt64(workersGauge, int64(o.stats.WorkersNumber))
|
|
|
+ ob.ObserveFloat64(requestsInProgressGauge, o.stats.RequestsInProgress())
|
|
|
+ ob.ObserveFloat64(imagesInProgressGauge, o.stats.ImagesInProgress())
|
|
|
+ ob.ObserveFloat64(workersUtilizationGauge, o.stats.WorkersUtilization())
|
|
|
|
|
|
- o.ObserveFloat64(vipsMemory, vips.GetMem())
|
|
|
- o.ObserveFloat64(vipsMaxMemory, vips.GetMemHighwater())
|
|
|
- o.ObserveFloat64(vipsAllocs, vips.GetAllocs())
|
|
|
+ ob.ObserveFloat64(vipsMemory, vips.GetMem())
|
|
|
+ ob.ObserveFloat64(vipsMaxMemory, vips.GetMemHighwater())
|
|
|
+ ob.ObserveFloat64(vipsAllocs, vips.GetAllocs())
|
|
|
|
|
|
return nil
|
|
|
},
|
|
@@ -713,54 +480,13 @@ func addDefaultMetrics() error {
|
|
|
requestsInProgressGauge,
|
|
|
imagesInProgressGauge,
|
|
|
workersUtilizationGauge,
|
|
|
- bufferDefaultSizeGauge,
|
|
|
- bufferMaxSizeGauge,
|
|
|
vipsMemory,
|
|
|
vipsMaxMemory,
|
|
|
vipsAllocs,
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- bufferSizeHist, err = meter.Int64Histogram(
|
|
|
- "buffer_size_bytes",
|
|
|
- metric.WithUnit("By"),
|
|
|
- metric.WithDescription("A histogram of the buffer size in bytes."),
|
|
|
- )
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
|
|
|
+ return fmt.Errorf("can't register OpenTelemetry callbacks: %s", err)
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
-func ObserveBufferSize(t string, size int) {
|
|
|
- if enabledMetrics {
|
|
|
- bufferSizeHist.Record(context.Background(), int64(size), metric.WithAttributes(attribute.String("type", t)))
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func SetBufferDefaultSize(t string, size int) {
|
|
|
- if enabledMetrics {
|
|
|
- bufferStatsMutex.Lock()
|
|
|
- defer bufferStatsMutex.Unlock()
|
|
|
-
|
|
|
- bufferDefaultSizes[t] = size
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func SetBufferMaxSize(t string, size int) {
|
|
|
- if enabledMetrics {
|
|
|
- bufferStatsMutex.Lock()
|
|
|
- defer bufferStatsMutex.Unlock()
|
|
|
-
|
|
|
- bufferMaxSizes[t] = size
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-type errorHandler struct{}
|
|
|
-
|
|
|
-func (h errorHandler) Handle(err error) {
|
|
|
- slog.Warn(err.Error(), "source", "opentelemetry")
|
|
|
-}
|