otel.go 16 KB


  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/felixge/httpsnoop"
  15. "github.com/shirou/gopsutil/process"
  16. "github.com/sirupsen/logrus"
  17. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  18. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  19. "go.opentelemetry.io/contrib/detectors/aws/eks"
  20. "go.opentelemetry.io/contrib/propagators/autoprop"
  21. "go.opentelemetry.io/contrib/propagators/aws/xray"
  22. "go.opentelemetry.io/otel"
  23. "go.opentelemetry.io/otel/attribute"
  24. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  25. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  26. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  27. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  28. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  29. "go.opentelemetry.io/otel/metric"
  30. "go.opentelemetry.io/otel/metric/instrument"
  31. "go.opentelemetry.io/otel/propagation"
  32. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  33. "go.opentelemetry.io/otel/sdk/resource"
  34. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  35. semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
  36. "go.opentelemetry.io/otel/semconv/v1.17.0/httpconv"
  37. "go.opentelemetry.io/otel/trace"
  38. "google.golang.org/grpc"
  39. "google.golang.org/grpc/credentials"
  40. "github.com/imgproxy/imgproxy/v3/config"
  41. "github.com/imgproxy/imgproxy/v3/ierrors"
  42. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  43. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  44. "github.com/imgproxy/imgproxy/v3/version"
  45. )
  46. type hasSpanCtxKey struct{}
  47. type GaugeFunc func() float64
  48. var (
  49. enabled bool
  50. enabledMetrics bool
  51. tracerProvider *sdktrace.TracerProvider
  52. tracer trace.Tracer
  53. meterProvider *sdkmetric.MeterProvider
  54. meter metric.Meter
  55. propagator propagation.TextMapPropagator
  56. bufferSizeHist instrument.Int64Histogram
  57. bufferDefaultSizes = make(map[string]int)
  58. bufferMaxSizes = make(map[string]int)
  59. bufferStatsMutex sync.Mutex
  60. )
  61. func Init() error {
  62. if len(config.OpenTelemetryEndpoint) == 0 {
  63. return nil
  64. }
  65. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  66. var (
  67. traceExporter *otlptrace.Exporter
  68. metricExporter sdkmetric.Exporter
  69. err error
  70. )
  71. switch config.OpenTelemetryProtocol {
  72. case "grpc":
  73. traceExporter, metricExporter, err = buildGRPCExporters()
  74. case "https":
  75. traceExporter, metricExporter, err = buildHTTPExporters(false)
  76. case "http":
  77. traceExporter, metricExporter, err = buildHTTPExporters(true)
  78. default:
  79. return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
  80. }
  81. if err != nil {
  82. return err
  83. }
  84. res, _ := resource.Merge(
  85. resource.Default(),
  86. resource.NewSchemaless(
  87. semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
  88. semconv.ServiceVersionKey.String(version.Version()),
  89. ),
  90. )
  91. awsRes, _ := resource.Detect(
  92. context.Background(),
  93. ec2.NewResourceDetector(),
  94. ecs.NewResourceDetector(),
  95. eks.NewResourceDetector(),
  96. )
  97. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  98. res = merged
  99. } else {
  100. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  101. }
  102. opts := []sdktrace.TracerProviderOption{
  103. sdktrace.WithResource(res),
  104. sdktrace.WithBatcher(traceExporter),
  105. }
  106. if opts, err = addTraceIDRatioSampler(opts); err != nil {
  107. return err
  108. }
  109. switch g := config.OpenTelemetryTraceIDGenerator; g {
  110. case "xray":
  111. idg := xray.NewIDGenerator()
  112. opts = append(opts, sdktrace.WithIDGenerator(idg))
  113. case "random":
  114. // Do nothing. OTel uses random generator by default
  115. default:
  116. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  117. }
  118. tracerProvider = sdktrace.NewTracerProvider(opts...)
  119. tracer = tracerProvider.Tracer("imgproxy")
  120. if len(config.OpenTelemetryPropagators) > 0 {
  121. propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
  122. if err != nil {
  123. return err
  124. }
  125. }
  126. enabled = true
  127. if metricExporter == nil {
  128. return nil
  129. }
  130. metricReader := sdkmetric.NewPeriodicReader(
  131. metricExporter,
  132. sdkmetric.WithInterval(5*time.Second),
  133. )
  134. meterProvider = sdkmetric.NewMeterProvider(
  135. sdkmetric.WithResource(res),
  136. sdkmetric.WithReader(metricReader),
  137. )
  138. meter = meterProvider.Meter("imgproxy")
  139. if err = addDefaultMetrics(); err != nil {
  140. return err
  141. }
  142. enabledMetrics = true
  143. return nil
  144. }
  145. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  146. tracerOpts := []otlptracegrpc.Option{
  147. otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  148. otlptracegrpc.WithDialOption(grpc.WithBlock()),
  149. }
  150. meterOpts := []otlpmetricgrpc.Option{
  151. otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  152. otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
  153. }
  154. tlsConf, err := buildTLSConfig()
  155. if err != nil {
  156. return nil, nil, err
  157. }
  158. if tlsConf != nil {
  159. creds := credentials.NewTLS(tlsConf)
  160. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  161. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  162. } else if config.OpenTelemetryGRPCInsecure {
  163. tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
  164. meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
  165. }
  166. trctx, trcancel := context.WithTimeout(
  167. context.Background(),
  168. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  169. )
  170. defer trcancel()
  171. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  172. if err != nil {
  173. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  174. }
  175. if !config.OpenTelemetryEnableMetrics {
  176. return traceExporter, nil, err
  177. }
  178. mtctx, mtcancel := context.WithTimeout(
  179. context.Background(),
  180. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  181. )
  182. defer mtcancel()
  183. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  184. if err != nil {
  185. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  186. }
  187. return traceExporter, metricExporter, err
  188. }
  189. func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  190. tracerOpts := []otlptracehttp.Option{
  191. otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
  192. }
  193. meterOpts := []otlpmetrichttp.Option{
  194. otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
  195. }
  196. if insecure {
  197. tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
  198. meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
  199. } else {
  200. tlsConf, err := buildTLSConfig()
  201. if err != nil {
  202. return nil, nil, err
  203. }
  204. if tlsConf != nil {
  205. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  206. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  207. }
  208. }
  209. trctx, trcancel := context.WithTimeout(
  210. context.Background(),
  211. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  212. )
  213. defer trcancel()
  214. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  215. if err != nil {
  216. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  217. }
  218. if !config.OpenTelemetryEnableMetrics {
  219. return traceExporter, nil, err
  220. }
  221. mtctx, mtcancel := context.WithTimeout(
  222. context.Background(),
  223. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  224. )
  225. defer mtcancel()
  226. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  227. if err != nil {
  228. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  229. }
  230. return traceExporter, metricExporter, err
  231. }
  232. func buildTLSConfig() (*tls.Config, error) {
  233. if len(config.OpenTelemetryServerCert) == 0 {
  234. return nil, nil
  235. }
  236. certPool := x509.NewCertPool()
  237. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  238. return nil, errors.New("Can't load OpenTelemetry server cert")
  239. }
  240. tlsConf := tls.Config{RootCAs: certPool}
  241. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  242. cert, err := tls.X509KeyPair(
  243. prepareKeyCert(config.OpenTelemetryClientCert),
  244. prepareKeyCert(config.OpenTelemetryClientKey),
  245. )
  246. if err != nil {
  247. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  248. }
  249. tlsConf.Certificates = []tls.Certificate{cert}
  250. }
  251. return &tlsConf, nil
  252. }
  253. func prepareKeyCert(str string) []byte {
  254. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  255. }
  256. func Stop() {
  257. if enabled {
  258. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  259. defer trcancel()
  260. tracerProvider.Shutdown(trctx)
  261. if meterProvider != nil {
  262. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  263. defer mtcancel()
  264. meterProvider.Shutdown(mtctx)
  265. }
  266. }
  267. }
  268. func Enabled() bool {
  269. return enabled
  270. }
  271. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  272. if !enabled {
  273. return ctx, func() {}, rw
  274. }
  275. if propagator != nil {
  276. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  277. }
  278. ctx, span := tracer.Start(
  279. ctx, "/request",
  280. trace.WithSpanKind(trace.SpanKindServer),
  281. trace.WithAttributes(httpconv.ServerRequest("imgproxy", r)...),
  282. )
  283. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  284. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  285. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  286. return func(statusCode int) {
  287. span.SetStatus(httpconv.ServerStatus(statusCode))
  288. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  289. next(statusCode)
  290. }
  291. },
  292. })
  293. cancel := func() { span.End() }
  294. return ctx, cancel, newRw
  295. }
  296. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  297. if !enabled {
  298. return func() {}
  299. }
  300. if ctx.Value(hasSpanCtxKey{}) != nil {
  301. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  302. return func() { span.End() }
  303. }
  304. return func() {}
  305. }
  306. func SendError(ctx context.Context, errType string, err error) {
  307. if !enabled {
  308. return
  309. }
  310. span := trace.SpanFromContext(ctx)
  311. attributes := []attribute.KeyValue{
  312. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  313. semconv.ExceptionMessageKey.String(err.Error()),
  314. }
  315. if ierr, ok := err.(*ierrors.Error); ok {
  316. if stack := ierr.FormatStack(); len(stack) != 0 {
  317. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  318. }
  319. }
  320. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  321. }
  322. func addDefaultMetrics() error {
  323. proc, err := process.NewProcess(int32(os.Getpid()))
  324. if err != nil {
  325. return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
  326. }
  327. processResidentMemory, err := meter.Int64ObservableGauge(
  328. "process_resident_memory_bytes",
  329. instrument.WithUnit("By"),
  330. instrument.WithDescription("Resident memory size in bytes."),
  331. )
  332. if err != nil {
  333. return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  334. }
  335. processVirtualMemory, err := meter.Int64ObservableGauge(
  336. "process_virtual_memory_bytes",
  337. instrument.WithUnit("By"),
  338. instrument.WithDescription("Virtual memory size in bytes."),
  339. )
  340. if err != nil {
  341. return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  342. }
  343. goMemstatsSys, err := meter.Int64ObservableGauge(
  344. "go_memstats_sys_bytes",
  345. instrument.WithUnit("By"),
  346. instrument.WithDescription("Number of bytes obtained from system."),
  347. )
  348. if err != nil {
  349. return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  350. }
  351. goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
  352. "go_memstats_heap_idle_bytes",
  353. instrument.WithUnit("By"),
  354. instrument.WithDescription("Number of heap bytes waiting to be used."),
  355. )
  356. if err != nil {
  357. return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  358. }
  359. goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
  360. "go_memstats_heap_inuse_bytes",
  361. instrument.WithUnit("By"),
  362. instrument.WithDescription("Number of heap bytes that are in use."),
  363. )
  364. if err != nil {
  365. return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  366. }
  367. goGoroutines, err := meter.Int64ObservableGauge(
  368. "go_goroutines",
  369. instrument.WithUnit("1"),
  370. instrument.WithDescription("Number of goroutines that currently exist."),
  371. )
  372. if err != nil {
  373. return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
  374. }
  375. goThreads, err := meter.Int64ObservableGauge(
  376. "go_threads",
  377. instrument.WithUnit("1"),
  378. instrument.WithDescription("Number of OS threads created."),
  379. )
  380. if err != nil {
  381. return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
  382. }
  383. requestsInProgressGauge, err := meter.Float64ObservableGauge(
  384. "requests_in_progress",
  385. instrument.WithUnit("1"),
  386. instrument.WithDescription("A gauge of the number of requests currently being in progress."),
  387. )
  388. if err != nil {
  389. return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  390. }
  391. imagesInProgressGauge, err := meter.Float64ObservableGauge(
  392. "images_in_progress",
  393. instrument.WithUnit("1"),
  394. instrument.WithDescription("A gauge of the number of images currently being in progress."),
  395. )
  396. if err != nil {
  397. return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
  398. }
  399. bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
  400. "buffer_default_size_bytes",
  401. instrument.WithUnit("By"),
  402. instrument.WithDescription("A gauge of the buffer default size in bytes."),
  403. )
  404. if err != nil {
  405. return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
  406. }
  407. bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
  408. "buffer_max_size_bytes",
  409. instrument.WithUnit("By"),
  410. instrument.WithDescription("A gauge of the buffer max size in bytes."),
  411. )
  412. if err != nil {
  413. return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
  414. }
  415. _, err = meter.RegisterCallback(
  416. func(ctx context.Context, o metric.Observer) error {
  417. memStats, merr := proc.MemoryInfo()
  418. if merr != nil {
  419. return merr
  420. }
  421. o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  422. o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  423. goMemStats := &runtime.MemStats{}
  424. runtime.ReadMemStats(goMemStats)
  425. o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  426. o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  427. o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  428. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  429. o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  430. o.ObserveInt64(goThreads, int64(threadsNum))
  431. o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
  432. o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
  433. bufferStatsMutex.Lock()
  434. defer bufferStatsMutex.Unlock()
  435. for t, v := range bufferDefaultSizes {
  436. o.ObserveInt64(bufferDefaultSizeGauge, int64(v), attribute.String("type", t))
  437. }
  438. for t, v := range bufferMaxSizes {
  439. o.ObserveInt64(bufferMaxSizeGauge, int64(v), attribute.String("type", t))
  440. }
  441. return nil
  442. },
  443. processResidentMemory,
  444. processVirtualMemory,
  445. goMemstatsSys,
  446. goMemstatsHeapIdle,
  447. goMemstatsHeapInuse,
  448. goGoroutines,
  449. goThreads,
  450. requestsInProgressGauge,
  451. imagesInProgressGauge,
  452. bufferDefaultSizeGauge,
  453. bufferMaxSizeGauge,
  454. )
  455. if err != nil {
  456. return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
  457. }
  458. bufferSizeHist, err = meter.Int64Histogram(
  459. "buffer_size_bytes",
  460. instrument.WithUnit("By"),
  461. instrument.WithDescription("A histogram of the buffer size in bytes."),
  462. )
  463. if err != nil {
  464. return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
  465. }
  466. return nil
  467. }
  468. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  469. if meter == nil {
  470. return
  471. }
  472. _, err := meter.Float64ObservableGauge(
  473. name,
  474. instrument.WithUnit(u),
  475. instrument.WithDescription(desc),
  476. instrument.WithFloat64Callback(func(_ context.Context, obsrv instrument.Float64Observer) error {
  477. obsrv.Observe(f())
  478. return nil
  479. }),
  480. )
  481. if err != nil {
  482. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  483. }
  484. }
  485. func ObserveBufferSize(t string, size int) {
  486. if enabledMetrics {
  487. bufferSizeHist.Record(context.Background(), int64(size), attribute.String("type", t))
  488. }
  489. }
  490. func SetBufferDefaultSize(t string, size int) {
  491. if enabledMetrics {
  492. bufferStatsMutex.Lock()
  493. defer bufferStatsMutex.Unlock()
  494. bufferDefaultSizes[t] = size
  495. }
  496. }
  497. func SetBufferMaxSize(t string, size int) {
  498. if enabledMetrics {
  499. bufferStatsMutex.Lock()
  500. defer bufferStatsMutex.Unlock()
  501. bufferMaxSizes[t] = size
  502. }
  503. }
  504. type errorHandler struct {
  505. entry *logrus.Entry
  506. }
  507. func (h *errorHandler) Handle(err error) {
  508. h.entry.Warn(err.Error())
  509. }