otel.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "net/http"
  8. "time"
  9. "github.com/felixge/httpsnoop"
  10. "github.com/sirupsen/logrus"
  11. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  12. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  13. "go.opentelemetry.io/contrib/detectors/aws/eks"
  14. "go.opentelemetry.io/contrib/propagators/autoprop"
  15. "go.opentelemetry.io/contrib/propagators/aws/xray"
  16. "go.opentelemetry.io/otel"
  17. "go.opentelemetry.io/otel/attribute"
  18. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  19. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  20. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  21. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  22. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  23. "go.opentelemetry.io/otel/metric"
  24. "go.opentelemetry.io/otel/metric/instrument"
  25. "go.opentelemetry.io/otel/metric/unit"
  26. "go.opentelemetry.io/otel/propagation"
  27. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  28. "go.opentelemetry.io/otel/sdk/resource"
  29. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  30. semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
  31. "go.opentelemetry.io/otel/trace"
  32. "google.golang.org/grpc"
  33. "google.golang.org/grpc/credentials"
  34. "github.com/imgproxy/imgproxy/v3/config"
  35. "github.com/imgproxy/imgproxy/v3/ierrors"
  36. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  37. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  38. )
  39. type hasSpanCtxKey struct{}
  40. type GaugeFunc func() float64
  41. var (
  42. enabled bool
  43. enabledMetrics bool
  44. tracerProvider *sdktrace.TracerProvider
  45. tracer trace.Tracer
  46. meterProvider *sdkmetric.MeterProvider
  47. meter metric.Meter
  48. propagator propagation.TextMapPropagator
  49. )
  50. func Init() error {
  51. if len(config.OpenTelemetryEndpoint) == 0 {
  52. return nil
  53. }
  54. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  55. var (
  56. traceExporter *otlptrace.Exporter
  57. metricExporter sdkmetric.Exporter
  58. err error
  59. )
  60. switch config.OpenTelemetryProtocol {
  61. case "grpc":
  62. traceExporter, metricExporter, err = buildGRPCExporters()
  63. case "https":
  64. traceExporter, metricExporter, err = buildHTTPExporters(false)
  65. case "http":
  66. traceExporter, metricExporter, err = buildHTTPExporters(true)
  67. default:
  68. return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
  69. }
  70. if err != nil {
  71. return err
  72. }
  73. res := resource.NewWithAttributes(
  74. semconv.SchemaURL,
  75. semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
  76. )
  77. awsRes, _ := resource.Detect(
  78. context.Background(),
  79. ec2.NewResourceDetector(),
  80. ecs.NewResourceDetector(),
  81. eks.NewResourceDetector(),
  82. )
  83. if awsRes != nil {
  84. res, _ = resource.Merge(res, awsRes)
  85. }
  86. idg := xray.NewIDGenerator()
  87. tracerProvider = sdktrace.NewTracerProvider(
  88. sdktrace.WithResource(res),
  89. sdktrace.WithSampler(sdktrace.AlwaysSample()),
  90. sdktrace.WithBatcher(traceExporter),
  91. sdktrace.WithIDGenerator(idg),
  92. )
  93. tracer = tracerProvider.Tracer("imgproxy")
  94. if len(config.OpenTelemetryPropagators) > 0 {
  95. propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
  96. if err != nil {
  97. return err
  98. }
  99. }
  100. enabled = true
  101. if metricExporter == nil {
  102. return nil
  103. }
  104. metricReader := sdkmetric.NewPeriodicReader(
  105. metricExporter,
  106. sdkmetric.WithInterval(5*time.Second),
  107. )
  108. meterProvider = sdkmetric.NewMeterProvider(
  109. sdkmetric.WithResource(res),
  110. sdkmetric.WithReader(metricReader),
  111. )
  112. meter = meterProvider.Meter("imgproxy")
  113. enabledMetrics = true
  114. AddGaugeFunc(
  115. "requests_in_progress",
  116. "A gauge of the number of requests currently being in progress.",
  117. "1",
  118. stats.RequestsInProgress,
  119. )
  120. AddGaugeFunc(
  121. "images_in_progress",
  122. "A gauge of the number of images currently being in progress.",
  123. "1",
  124. stats.ImagesInProgress,
  125. )
  126. return nil
  127. }
  128. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  129. tracerOpts := []otlptracegrpc.Option{
  130. otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  131. otlptracegrpc.WithDialOption(grpc.WithBlock()),
  132. }
  133. meterOpts := []otlpmetricgrpc.Option{
  134. otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  135. otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
  136. }
  137. tlsConf, err := buildTLSConfig()
  138. if err != nil {
  139. return nil, nil, err
  140. }
  141. if tlsConf != nil {
  142. creds := credentials.NewTLS(tlsConf)
  143. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  144. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  145. } else {
  146. tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
  147. meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
  148. }
  149. trctx, trcancel := context.WithTimeout(
  150. context.Background(),
  151. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  152. )
  153. defer trcancel()
  154. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  155. if err != nil {
  156. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  157. }
  158. if !config.OpenTelemetryEnableMetrics {
  159. return traceExporter, nil, err
  160. }
  161. mtctx, mtcancel := context.WithTimeout(
  162. context.Background(),
  163. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  164. )
  165. defer mtcancel()
  166. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  167. if err != nil {
  168. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  169. }
  170. return traceExporter, metricExporter, err
  171. }
  172. func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  173. tracerOpts := []otlptracehttp.Option{
  174. otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
  175. }
  176. meterOpts := []otlpmetrichttp.Option{
  177. otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
  178. }
  179. if insecure {
  180. tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
  181. meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
  182. } else {
  183. tlsConf, err := buildTLSConfig()
  184. if err != nil {
  185. return nil, nil, err
  186. }
  187. if tlsConf != nil {
  188. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  189. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  190. }
  191. }
  192. trctx, trcancel := context.WithTimeout(
  193. context.Background(),
  194. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  195. )
  196. defer trcancel()
  197. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  198. if err != nil {
  199. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  200. }
  201. if !config.OpenTelemetryEnableMetrics {
  202. return traceExporter, nil, err
  203. }
  204. mtctx, mtcancel := context.WithTimeout(
  205. context.Background(),
  206. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  207. )
  208. defer mtcancel()
  209. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  210. if err != nil {
  211. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  212. }
  213. return traceExporter, metricExporter, err
  214. }
  215. func buildTLSConfig() (*tls.Config, error) {
  216. if len(config.OpenTelemetryServerCert) == 0 {
  217. return nil, nil
  218. }
  219. certPool := x509.NewCertPool()
  220. if !certPool.AppendCertsFromPEM([]byte(config.OpenTelemetryServerCert)) {
  221. return nil, fmt.Errorf("Can't load OpenTelemetry server cert")
  222. }
  223. tlsConf := tls.Config{RootCAs: certPool}
  224. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  225. cert, err := tls.X509KeyPair(
  226. []byte(config.OpenTelemetryClientCert),
  227. []byte(config.OpenTelemetryClientKey),
  228. )
  229. if err != nil {
  230. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  231. }
  232. tlsConf.Certificates = []tls.Certificate{cert}
  233. }
  234. return &tlsConf, nil
  235. }
  236. func Stop() {
  237. if enabled {
  238. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  239. defer trcancel()
  240. tracerProvider.Shutdown(trctx)
  241. if meterProvider != nil {
  242. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  243. defer mtcancel()
  244. meterProvider.Shutdown(mtctx)
  245. }
  246. }
  247. }
  248. func Enabled() bool {
  249. return enabled
  250. }
  251. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  252. if !enabled {
  253. return ctx, func() {}, rw
  254. }
  255. if propagator != nil {
  256. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  257. }
  258. ctx, span := tracer.Start(
  259. ctx, "/request",
  260. trace.WithSpanKind(trace.SpanKindServer),
  261. trace.WithAttributes(semconv.NetAttributesFromHTTPRequest("tcp", r)...),
  262. trace.WithAttributes(semconv.EndUserAttributesFromHTTPRequest(r)...),
  263. trace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest("imgproxy", "/", r)...),
  264. )
  265. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  266. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  267. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  268. return func(statusCode int) {
  269. attrs := semconv.HTTPAttributesFromHTTPStatusCode(statusCode)
  270. spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(statusCode, trace.SpanKindServer)
  271. span.SetAttributes(attrs...)
  272. span.SetStatus(spanStatus, spanMessage)
  273. next(statusCode)
  274. }
  275. },
  276. })
  277. cancel := func() { span.End() }
  278. return ctx, cancel, newRw
  279. }
  280. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  281. if !enabled {
  282. return func() {}
  283. }
  284. if ctx.Value(hasSpanCtxKey{}) != nil {
  285. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  286. return func() { span.End() }
  287. }
  288. return func() {}
  289. }
  290. func SendError(ctx context.Context, errType string, err error) {
  291. if !enabled {
  292. return
  293. }
  294. span := trace.SpanFromContext(ctx)
  295. attributes := []attribute.KeyValue{
  296. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  297. semconv.ExceptionMessageKey.String(err.Error()),
  298. }
  299. if ierr, ok := err.(*ierrors.Error); ok {
  300. if stack := ierr.FormatStack(); len(stack) != 0 {
  301. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  302. }
  303. }
  304. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  305. }
  306. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  307. if !enabledMetrics {
  308. return
  309. }
  310. gauge, _ := meter.AsyncFloat64().Gauge(
  311. name,
  312. instrument.WithUnit(unit.Unit(u)),
  313. instrument.WithDescription(desc),
  314. )
  315. if err := meter.RegisterCallback(
  316. []instrument.Asynchronous{
  317. gauge,
  318. },
  319. func(ctx context.Context) {
  320. gauge.Observe(ctx, f())
  321. },
  322. ); err != nil {
  323. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  324. }
  325. }
  326. type errorHandler struct {
  327. entry *logrus.Entry
  328. }
  329. func (h *errorHandler) Handle(err error) {
  330. h.entry.Warn(err.Error())
  331. }