|
@@ -0,0 +1,409 @@
|
|
|
+package otel
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "crypto/tls"
|
|
|
+ "crypto/x509"
|
|
|
+ "fmt"
|
|
|
+ "net/http"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/felixge/httpsnoop"
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
+ "go.opentelemetry.io/contrib/detectors/aws/ec2"
|
|
|
+ "go.opentelemetry.io/contrib/detectors/aws/ecs"
|
|
|
+ "go.opentelemetry.io/contrib/detectors/aws/eks"
|
|
|
+ "go.opentelemetry.io/contrib/propagators/autoprop"
|
|
|
+ "go.opentelemetry.io/contrib/propagators/aws/xray"
|
|
|
+ "go.opentelemetry.io/otel"
|
|
|
+ "go.opentelemetry.io/otel/attribute"
|
|
|
+ "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
|
|
+ "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
|
|
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
|
|
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
|
+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
|
|
+ "go.opentelemetry.io/otel/metric"
|
|
|
+ "go.opentelemetry.io/otel/metric/instrument"
|
|
|
+ "go.opentelemetry.io/otel/metric/unit"
|
|
|
+ "go.opentelemetry.io/otel/propagation"
|
|
|
+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
|
|
+ "go.opentelemetry.io/otel/sdk/resource"
|
|
|
+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
|
+ semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
|
|
+ "go.opentelemetry.io/otel/trace"
|
|
|
+ "google.golang.org/grpc"
|
|
|
+ "google.golang.org/grpc/credentials"
|
|
|
+
|
|
|
+ "github.com/imgproxy/imgproxy/v3/config"
|
|
|
+ "github.com/imgproxy/imgproxy/v3/ierrors"
|
|
|
+ "github.com/imgproxy/imgproxy/v3/metrics/errformat"
|
|
|
+ "github.com/imgproxy/imgproxy/v3/metrics/stats"
|
|
|
+)
|
|
|
+
|
|
|
+type hasSpanCtxKey struct{}
|
|
|
+
|
|
|
+type GaugeFunc func() float64
|
|
|
+
|
|
|
+var (
|
|
|
+ enabled bool
|
|
|
+ enabledMetrics bool
|
|
|
+
|
|
|
+ tracerProvider *sdktrace.TracerProvider
|
|
|
+ tracer trace.Tracer
|
|
|
+
|
|
|
+ meterProvider *sdkmetric.MeterProvider
|
|
|
+ meter metric.Meter
|
|
|
+
|
|
|
+ propagator propagation.TextMapPropagator
|
|
|
+)
|
|
|
+
|
|
|
+func Init() error {
|
|
|
+ if len(config.OpenTelemetryEndpoint) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
|
|
|
+
|
|
|
+ var (
|
|
|
+ traceExporter *otlptrace.Exporter
|
|
|
+ metricExporter sdkmetric.Exporter
|
|
|
+ err error
|
|
|
+ )
|
|
|
+
|
|
|
+ switch config.OpenTelemetryProtocol {
|
|
|
+ case "grpc":
|
|
|
+ traceExporter, metricExporter, err = buildGRPCExporters()
|
|
|
+ case "https":
|
|
|
+ traceExporter, metricExporter, err = buildHTTPExporters(false)
|
|
|
+ case "http":
|
|
|
+ traceExporter, metricExporter, err = buildHTTPExporters(true)
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ res := resource.NewWithAttributes(
|
|
|
+ semconv.SchemaURL,
|
|
|
+ semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
|
|
|
+ )
|
|
|
+
|
|
|
+ awsRes, _ := resource.Detect(
|
|
|
+ context.Background(),
|
|
|
+ ec2.NewResourceDetector(),
|
|
|
+ ecs.NewResourceDetector(),
|
|
|
+ eks.NewResourceDetector(),
|
|
|
+ )
|
|
|
+
|
|
|
+ if awsRes != nil {
|
|
|
+ res, _ = resource.Merge(res, awsRes)
|
|
|
+ }
|
|
|
+
|
|
|
+ idg := xray.NewIDGenerator()
|
|
|
+
|
|
|
+ tracerProvider = sdktrace.NewTracerProvider(
|
|
|
+ sdktrace.WithResource(res),
|
|
|
+ sdktrace.WithSampler(sdktrace.AlwaysSample()),
|
|
|
+ sdktrace.WithBatcher(traceExporter),
|
|
|
+ sdktrace.WithIDGenerator(idg),
|
|
|
+ )
|
|
|
+
|
|
|
+ tracer = tracerProvider.Tracer("imgproxy")
|
|
|
+
|
|
|
+ if len(config.OpenTelemetryPropagators) > 0 {
|
|
|
+ propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ enabled = true
|
|
|
+
|
|
|
+ if metricExporter == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ metricReader := sdkmetric.NewPeriodicReader(
|
|
|
+ metricExporter,
|
|
|
+ sdkmetric.WithInterval(5*time.Second),
|
|
|
+ )
|
|
|
+
|
|
|
+ meterProvider = sdkmetric.NewMeterProvider(
|
|
|
+ sdkmetric.WithResource(res),
|
|
|
+ sdkmetric.WithReader(metricReader),
|
|
|
+ )
|
|
|
+
|
|
|
+ meter = meterProvider.Meter("imgproxy")
|
|
|
+
|
|
|
+ enabledMetrics = true
|
|
|
+
|
|
|
+ AddGaugeFunc(
|
|
|
+ "requests_in_progress",
|
|
|
+ "A gauge of the number of requests currently being in progress.",
|
|
|
+ "1",
|
|
|
+ stats.RequestsInProgress,
|
|
|
+ )
|
|
|
+ AddGaugeFunc(
|
|
|
+ "images_in_progress",
|
|
|
+ "A gauge of the number of images currently being in progress.",
|
|
|
+ "1",
|
|
|
+ stats.ImagesInProgress,
|
|
|
+ )
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
|
|
|
+ tracerOpts := []otlptracegrpc.Option{
|
|
|
+ otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
|
|
|
+ otlptracegrpc.WithDialOption(grpc.WithBlock()),
|
|
|
+ }
|
|
|
+
|
|
|
+ meterOpts := []otlpmetricgrpc.Option{
|
|
|
+ otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
|
|
|
+ otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
|
|
|
+ }
|
|
|
+
|
|
|
+ tlsConf, err := buildTLSConfig()
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if tlsConf != nil {
|
|
|
+ creds := credentials.NewTLS(tlsConf)
|
|
|
+ tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
|
|
|
+ meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
|
|
|
+ } else {
|
|
|
+ tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
|
|
|
+ meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
|
|
|
+ }
|
|
|
+
|
|
|
+ trctx, trcancel := context.WithTimeout(
|
|
|
+ context.Background(),
|
|
|
+ time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
|
|
|
+ )
|
|
|
+ defer trcancel()
|
|
|
+
|
|
|
+ traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !config.OpenTelemetryEnableMetrics {
|
|
|
+ return traceExporter, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ mtctx, mtcancel := context.WithTimeout(
|
|
|
+ context.Background(),
|
|
|
+ time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
|
|
|
+ )
|
|
|
+ defer mtcancel()
|
|
|
+
|
|
|
+ metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return traceExporter, metricExporter, err
|
|
|
+}
|
|
|
+
|
|
|
+func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
|
|
|
+ tracerOpts := []otlptracehttp.Option{
|
|
|
+ otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
|
|
|
+ }
|
|
|
+
|
|
|
+ meterOpts := []otlpmetrichttp.Option{
|
|
|
+ otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
|
|
|
+ }
|
|
|
+
|
|
|
+ if insecure {
|
|
|
+ tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
|
|
|
+ meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
|
|
|
+ } else {
|
|
|
+ tlsConf, err := buildTLSConfig()
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if tlsConf != nil {
|
|
|
+ tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
|
|
|
+ meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ trctx, trcancel := context.WithTimeout(
|
|
|
+ context.Background(),
|
|
|
+ time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
|
|
|
+ )
|
|
|
+ defer trcancel()
|
|
|
+
|
|
|
+ traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !config.OpenTelemetryEnableMetrics {
|
|
|
+ return traceExporter, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ mtctx, mtcancel := context.WithTimeout(
|
|
|
+ context.Background(),
|
|
|
+ time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
|
|
|
+ )
|
|
|
+ defer mtcancel()
|
|
|
+
|
|
|
+ metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return traceExporter, metricExporter, err
|
|
|
+}
|
|
|
+
|
|
|
+func buildTLSConfig() (*tls.Config, error) {
|
|
|
+ if len(config.OpenTelemetryServerCert) == 0 {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ certPool := x509.NewCertPool()
|
|
|
+ if !certPool.AppendCertsFromPEM([]byte(config.OpenTelemetryServerCert)) {
|
|
|
+ return nil, fmt.Errorf("Can't load OpenTelemetry server cert")
|
|
|
+ }
|
|
|
+
|
|
|
+ tlsConf := tls.Config{RootCAs: certPool}
|
|
|
+
|
|
|
+ if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
|
|
|
+ cert, err := tls.X509KeyPair(
|
|
|
+ []byte(config.OpenTelemetryClientCert),
|
|
|
+ []byte(config.OpenTelemetryClientKey),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ tlsConf.Certificates = []tls.Certificate{cert}
|
|
|
+ }
|
|
|
+
|
|
|
+ return &tlsConf, nil
|
|
|
+}
|
|
|
+
|
|
|
+func Stop() {
|
|
|
+ if enabled {
|
|
|
+ trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+ defer trcancel()
|
|
|
+
|
|
|
+ tracerProvider.Shutdown(trctx)
|
|
|
+
|
|
|
+ if meterProvider != nil {
|
|
|
+ mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+ defer mtcancel()
|
|
|
+
|
|
|
+ meterProvider.Shutdown(mtctx)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func Enabled() bool {
|
|
|
+ return enabled
|
|
|
+}
|
|
|
+
|
|
|
+func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
|
|
|
+ if !enabled {
|
|
|
+ return ctx, func() {}, rw
|
|
|
+ }
|
|
|
+
|
|
|
+ if propagator != nil {
|
|
|
+ ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, span := tracer.Start(
|
|
|
+ ctx, "/request",
|
|
|
+ trace.WithSpanKind(trace.SpanKindServer),
|
|
|
+ trace.WithAttributes(semconv.NetAttributesFromHTTPRequest("tcp", r)...),
|
|
|
+ trace.WithAttributes(semconv.EndUserAttributesFromHTTPRequest(r)...),
|
|
|
+ trace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest("imgproxy", "/", r)...),
|
|
|
+ )
|
|
|
+ ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
|
|
|
+
|
|
|
+ newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
|
|
|
+ WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
|
|
|
+ return func(statusCode int) {
|
|
|
+ attrs := semconv.HTTPAttributesFromHTTPStatusCode(statusCode)
|
|
|
+ spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(statusCode, trace.SpanKindServer)
|
|
|
+ span.SetAttributes(attrs...)
|
|
|
+ span.SetStatus(spanStatus, spanMessage)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ cancel := func() { span.End() }
|
|
|
+ return ctx, cancel, newRw
|
|
|
+}
|
|
|
+
|
|
|
+func StartSpan(ctx context.Context, name string) context.CancelFunc {
|
|
|
+ if !enabled {
|
|
|
+ return func() {}
|
|
|
+ }
|
|
|
+
|
|
|
+ if ctx.Value(hasSpanCtxKey{}) != nil {
|
|
|
+ _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
|
|
|
+
|
|
|
+ return func() { span.End() }
|
|
|
+ }
|
|
|
+
|
|
|
+ return func() {}
|
|
|
+}
|
|
|
+
|
|
|
+func SendError(ctx context.Context, errType string, err error) {
|
|
|
+ if !enabled {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ span := trace.SpanFromContext(ctx)
|
|
|
+
|
|
|
+ attributes := []attribute.KeyValue{
|
|
|
+ semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
|
|
|
+ semconv.ExceptionMessageKey.String(err.Error()),
|
|
|
+ }
|
|
|
+
|
|
|
+ if ierr, ok := err.(*ierrors.Error); ok {
|
|
|
+ if stack := ierr.FormatStack(); len(stack) != 0 {
|
|
|
+ attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
|
|
|
+}
|
|
|
+
|
|
|
+func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
|
|
|
+ if !enabledMetrics {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ gauge, _ := meter.AsyncFloat64().Gauge(
|
|
|
+ name,
|
|
|
+ instrument.WithUnit(unit.Unit(u)),
|
|
|
+ instrument.WithDescription(desc),
|
|
|
+ )
|
|
|
+
|
|
|
+ if err := meter.RegisterCallback(
|
|
|
+ []instrument.Asynchronous{
|
|
|
+ gauge,
|
|
|
+ },
|
|
|
+ func(ctx context.Context) {
|
|
|
+ gauge.Observe(ctx, f())
|
|
|
+ },
|
|
|
+ ); err != nil {
|
|
|
+ logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+type errorHandler struct {
|
|
|
+ entry *logrus.Entry
|
|
|
+}
|
|
|
+
|
|
|
+func (h *errorHandler) Handle(err error) {
|
|
|
+ h.entry.Warn(err.Error())
|
|
|
+}
|