otel.go 16 KB


  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/felixge/httpsnoop"
  15. "github.com/shirou/gopsutil/process"
  16. "github.com/sirupsen/logrus"
  17. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  18. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  19. "go.opentelemetry.io/contrib/detectors/aws/eks"
  20. "go.opentelemetry.io/contrib/propagators/autoprop"
  21. "go.opentelemetry.io/contrib/propagators/aws/xray"
  22. "go.opentelemetry.io/otel"
  23. "go.opentelemetry.io/otel/attribute"
  24. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  25. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  26. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  27. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  28. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  29. "go.opentelemetry.io/otel/metric"
  30. "go.opentelemetry.io/otel/propagation"
  31. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  32. "go.opentelemetry.io/otel/sdk/resource"
  33. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  34. semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
  35. "go.opentelemetry.io/otel/semconv/v1.17.0/httpconv"
  36. "go.opentelemetry.io/otel/trace"
  37. "google.golang.org/grpc"
  38. "google.golang.org/grpc/credentials"
  39. "github.com/imgproxy/imgproxy/v3/config"
  40. "github.com/imgproxy/imgproxy/v3/ierrors"
  41. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  42. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  43. "github.com/imgproxy/imgproxy/v3/version"
  44. )
  45. type hasSpanCtxKey struct{}
  46. type GaugeFunc func() float64
  47. var (
  48. enabled bool
  49. enabledMetrics bool
  50. tracerProvider *sdktrace.TracerProvider
  51. tracer trace.Tracer
  52. meterProvider *sdkmetric.MeterProvider
  53. meter metric.Meter
  54. propagator propagation.TextMapPropagator
  55. bufferSizeHist metric.Int64Histogram
  56. bufferDefaultSizes = make(map[string]int)
  57. bufferMaxSizes = make(map[string]int)
  58. bufferStatsMutex sync.Mutex
  59. )
  60. func Init() error {
  61. if len(config.OpenTelemetryEndpoint) == 0 {
  62. return nil
  63. }
  64. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  65. var (
  66. traceExporter *otlptrace.Exporter
  67. metricExporter sdkmetric.Exporter
  68. err error
  69. )
  70. switch config.OpenTelemetryProtocol {
  71. case "grpc":
  72. traceExporter, metricExporter, err = buildGRPCExporters()
  73. case "https":
  74. traceExporter, metricExporter, err = buildHTTPExporters(false)
  75. case "http":
  76. traceExporter, metricExporter, err = buildHTTPExporters(true)
  77. default:
  78. return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
  79. }
  80. if err != nil {
  81. return err
  82. }
  83. res, _ := resource.Merge(
  84. resource.Default(),
  85. resource.NewSchemaless(
  86. semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
  87. semconv.ServiceVersionKey.String(version.Version()),
  88. ),
  89. )
  90. awsRes, _ := resource.Detect(
  91. context.Background(),
  92. ec2.NewResourceDetector(),
  93. ecs.NewResourceDetector(),
  94. eks.NewResourceDetector(),
  95. )
  96. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  97. res = merged
  98. } else {
  99. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  100. }
  101. opts := []sdktrace.TracerProviderOption{
  102. sdktrace.WithResource(res),
  103. sdktrace.WithBatcher(traceExporter),
  104. }
  105. if opts, err = addTraceIDRatioSampler(opts); err != nil {
  106. return err
  107. }
  108. switch g := config.OpenTelemetryTraceIDGenerator; g {
  109. case "xray":
  110. idg := xray.NewIDGenerator()
  111. opts = append(opts, sdktrace.WithIDGenerator(idg))
  112. case "random":
  113. // Do nothing. OTel uses random generator by default
  114. default:
  115. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  116. }
  117. tracerProvider = sdktrace.NewTracerProvider(opts...)
  118. tracer = tracerProvider.Tracer("imgproxy")
  119. if len(config.OpenTelemetryPropagators) > 0 {
  120. propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
  121. if err != nil {
  122. return err
  123. }
  124. }
  125. enabled = true
  126. if metricExporter == nil {
  127. return nil
  128. }
  129. metricReader := sdkmetric.NewPeriodicReader(
  130. metricExporter,
  131. sdkmetric.WithInterval(5*time.Second),
  132. )
  133. meterProvider = sdkmetric.NewMeterProvider(
  134. sdkmetric.WithResource(res),
  135. sdkmetric.WithReader(metricReader),
  136. )
  137. meter = meterProvider.Meter("imgproxy")
  138. if err = addDefaultMetrics(); err != nil {
  139. return err
  140. }
  141. enabledMetrics = true
  142. return nil
  143. }
  144. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  145. tracerOpts := []otlptracegrpc.Option{
  146. otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  147. otlptracegrpc.WithDialOption(grpc.WithBlock()),
  148. }
  149. meterOpts := []otlpmetricgrpc.Option{
  150. otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  151. otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
  152. }
  153. tlsConf, err := buildTLSConfig()
  154. if err != nil {
  155. return nil, nil, err
  156. }
  157. if tlsConf != nil {
  158. creds := credentials.NewTLS(tlsConf)
  159. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  160. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  161. } else if config.OpenTelemetryGRPCInsecure {
  162. tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
  163. meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
  164. }
  165. trctx, trcancel := context.WithTimeout(
  166. context.Background(),
  167. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  168. )
  169. defer trcancel()
  170. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  171. if err != nil {
  172. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  173. }
  174. if !config.OpenTelemetryEnableMetrics {
  175. return traceExporter, nil, err
  176. }
  177. mtctx, mtcancel := context.WithTimeout(
  178. context.Background(),
  179. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  180. )
  181. defer mtcancel()
  182. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  183. if err != nil {
  184. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  185. }
  186. return traceExporter, metricExporter, err
  187. }
  188. func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  189. tracerOpts := []otlptracehttp.Option{
  190. otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
  191. }
  192. meterOpts := []otlpmetrichttp.Option{
  193. otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
  194. }
  195. if insecure {
  196. tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
  197. meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
  198. } else {
  199. tlsConf, err := buildTLSConfig()
  200. if err != nil {
  201. return nil, nil, err
  202. }
  203. if tlsConf != nil {
  204. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  205. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  206. }
  207. }
  208. trctx, trcancel := context.WithTimeout(
  209. context.Background(),
  210. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  211. )
  212. defer trcancel()
  213. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  214. if err != nil {
  215. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  216. }
  217. if !config.OpenTelemetryEnableMetrics {
  218. return traceExporter, nil, err
  219. }
  220. mtctx, mtcancel := context.WithTimeout(
  221. context.Background(),
  222. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  223. )
  224. defer mtcancel()
  225. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  226. if err != nil {
  227. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  228. }
  229. return traceExporter, metricExporter, err
  230. }
  231. func buildTLSConfig() (*tls.Config, error) {
  232. if len(config.OpenTelemetryServerCert) == 0 {
  233. return nil, nil
  234. }
  235. certPool := x509.NewCertPool()
  236. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  237. return nil, errors.New("Can't load OpenTelemetry server cert")
  238. }
  239. tlsConf := tls.Config{RootCAs: certPool}
  240. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  241. cert, err := tls.X509KeyPair(
  242. prepareKeyCert(config.OpenTelemetryClientCert),
  243. prepareKeyCert(config.OpenTelemetryClientKey),
  244. )
  245. if err != nil {
  246. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  247. }
  248. tlsConf.Certificates = []tls.Certificate{cert}
  249. }
  250. return &tlsConf, nil
  251. }
  252. func prepareKeyCert(str string) []byte {
  253. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  254. }
  255. func Stop() {
  256. if enabled {
  257. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  258. defer trcancel()
  259. tracerProvider.Shutdown(trctx)
  260. if meterProvider != nil {
  261. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  262. defer mtcancel()
  263. meterProvider.Shutdown(mtctx)
  264. }
  265. }
  266. }
  267. func Enabled() bool {
  268. return enabled
  269. }
  270. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  271. if !enabled {
  272. return ctx, func() {}, rw
  273. }
  274. if propagator != nil {
  275. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  276. }
  277. ctx, span := tracer.Start(
  278. ctx, "/request",
  279. trace.WithSpanKind(trace.SpanKindServer),
  280. trace.WithAttributes(httpconv.ServerRequest("imgproxy", r)...),
  281. )
  282. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  283. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  284. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  285. return func(statusCode int) {
  286. span.SetStatus(httpconv.ServerStatus(statusCode))
  287. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  288. next(statusCode)
  289. }
  290. },
  291. })
  292. cancel := func() { span.End() }
  293. return ctx, cancel, newRw
  294. }
  295. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  296. if !enabled {
  297. return func() {}
  298. }
  299. if ctx.Value(hasSpanCtxKey{}) != nil {
  300. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  301. return func() { span.End() }
  302. }
  303. return func() {}
  304. }
  305. func SendError(ctx context.Context, errType string, err error) {
  306. if !enabled {
  307. return
  308. }
  309. span := trace.SpanFromContext(ctx)
  310. attributes := []attribute.KeyValue{
  311. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  312. semconv.ExceptionMessageKey.String(err.Error()),
  313. }
  314. if ierr, ok := err.(*ierrors.Error); ok {
  315. if stack := ierr.FormatStack(); len(stack) != 0 {
  316. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  317. }
  318. }
  319. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  320. }
  321. func addDefaultMetrics() error {
  322. proc, err := process.NewProcess(int32(os.Getpid()))
  323. if err != nil {
  324. return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
  325. }
  326. processResidentMemory, err := meter.Int64ObservableGauge(
  327. "process_resident_memory_bytes",
  328. metric.WithUnit("By"),
  329. metric.WithDescription("Resident memory size in bytes."),
  330. )
  331. if err != nil {
  332. return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  333. }
  334. processVirtualMemory, err := meter.Int64ObservableGauge(
  335. "process_virtual_memory_bytes",
  336. metric.WithUnit("By"),
  337. metric.WithDescription("Virtual memory size in bytes."),
  338. )
  339. if err != nil {
  340. return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  341. }
  342. goMemstatsSys, err := meter.Int64ObservableGauge(
  343. "go_memstats_sys_bytes",
  344. metric.WithUnit("By"),
  345. metric.WithDescription("Number of bytes obtained from system."),
  346. )
  347. if err != nil {
  348. return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  349. }
  350. goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
  351. "go_memstats_heap_idle_bytes",
  352. metric.WithUnit("By"),
  353. metric.WithDescription("Number of heap bytes waiting to be used."),
  354. )
  355. if err != nil {
  356. return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  357. }
  358. goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
  359. "go_memstats_heap_inuse_bytes",
  360. metric.WithUnit("By"),
  361. metric.WithDescription("Number of heap bytes that are in use."),
  362. )
  363. if err != nil {
  364. return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  365. }
  366. goGoroutines, err := meter.Int64ObservableGauge(
  367. "go_goroutines",
  368. metric.WithUnit("1"),
  369. metric.WithDescription("Number of goroutines that currently exist."),
  370. )
  371. if err != nil {
  372. return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
  373. }
  374. goThreads, err := meter.Int64ObservableGauge(
  375. "go_threads",
  376. metric.WithUnit("1"),
  377. metric.WithDescription("Number of OS threads created."),
  378. )
  379. if err != nil {
  380. return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
  381. }
  382. requestsInProgressGauge, err := meter.Float64ObservableGauge(
  383. "requests_in_progress",
  384. metric.WithUnit("1"),
  385. metric.WithDescription("A gauge of the number of requests currently being in progress."),
  386. )
  387. if err != nil {
  388. return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  389. }
  390. imagesInProgressGauge, err := meter.Float64ObservableGauge(
  391. "images_in_progress",
  392. metric.WithUnit("1"),
  393. metric.WithDescription("A gauge of the number of images currently being in progress."),
  394. )
  395. if err != nil {
  396. return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
  397. }
  398. bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
  399. "buffer_default_size_bytes",
  400. metric.WithUnit("By"),
  401. metric.WithDescription("A gauge of the buffer default size in bytes."),
  402. )
  403. if err != nil {
  404. return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
  405. }
  406. bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
  407. "buffer_max_size_bytes",
  408. metric.WithUnit("By"),
  409. metric.WithDescription("A gauge of the buffer max size in bytes."),
  410. )
  411. if err != nil {
  412. return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
  413. }
  414. _, err = meter.RegisterCallback(
  415. func(ctx context.Context, o metric.Observer) error {
  416. memStats, merr := proc.MemoryInfo()
  417. if merr != nil {
  418. return merr
  419. }
  420. o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  421. o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  422. goMemStats := &runtime.MemStats{}
  423. runtime.ReadMemStats(goMemStats)
  424. o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  425. o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  426. o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  427. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  428. o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  429. o.ObserveInt64(goThreads, int64(threadsNum))
  430. o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
  431. o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
  432. bufferStatsMutex.Lock()
  433. defer bufferStatsMutex.Unlock()
  434. for t, v := range bufferDefaultSizes {
  435. o.ObserveInt64(bufferDefaultSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  436. }
  437. for t, v := range bufferMaxSizes {
  438. o.ObserveInt64(bufferMaxSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  439. }
  440. return nil
  441. },
  442. processResidentMemory,
  443. processVirtualMemory,
  444. goMemstatsSys,
  445. goMemstatsHeapIdle,
  446. goMemstatsHeapInuse,
  447. goGoroutines,
  448. goThreads,
  449. requestsInProgressGauge,
  450. imagesInProgressGauge,
  451. bufferDefaultSizeGauge,
  452. bufferMaxSizeGauge,
  453. )
  454. if err != nil {
  455. return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
  456. }
  457. bufferSizeHist, err = meter.Int64Histogram(
  458. "buffer_size_bytes",
  459. metric.WithUnit("By"),
  460. metric.WithDescription("A histogram of the buffer size in bytes."),
  461. )
  462. if err != nil {
  463. return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
  464. }
  465. return nil
  466. }
  467. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  468. if meter == nil {
  469. return
  470. }
  471. _, err := meter.Float64ObservableGauge(
  472. name,
  473. metric.WithUnit(u),
  474. metric.WithDescription(desc),
  475. metric.WithFloat64Callback(func(_ context.Context, obsrv metric.Float64Observer) error {
  476. obsrv.Observe(f())
  477. return nil
  478. }),
  479. )
  480. if err != nil {
  481. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  482. }
  483. }
  484. func ObserveBufferSize(t string, size int) {
  485. if enabledMetrics {
  486. bufferSizeHist.Record(context.Background(), int64(size), metric.WithAttributes(attribute.String("type", t)))
  487. }
  488. }
  489. func SetBufferDefaultSize(t string, size int) {
  490. if enabledMetrics {
  491. bufferStatsMutex.Lock()
  492. defer bufferStatsMutex.Unlock()
  493. bufferDefaultSizes[t] = size
  494. }
  495. }
  496. func SetBufferMaxSize(t string, size int) {
  497. if enabledMetrics {
  498. bufferStatsMutex.Lock()
  499. defer bufferStatsMutex.Unlock()
  500. bufferMaxSizes[t] = size
  501. }
  502. }
  503. type errorHandler struct {
  504. entry *logrus.Entry
  505. }
  506. func (h *errorHandler) Handle(err error) {
  507. h.entry.Warn(err.Error())
  508. }