otel.go 13 KB


  1. package otel
  2. import (
  3. "context"
  4. "fmt"
  5. "log/slog"
  6. "net/http"
  7. "os"
  8. "reflect"
  9. "runtime"
  10. "time"
  11. "github.com/felixge/httpsnoop"
  12. "github.com/shirou/gopsutil/process"
  13. ec2 "go.opentelemetry.io/contrib/detectors/aws/ec2/v2"
  14. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  15. "go.opentelemetry.io/contrib/detectors/aws/eks"
  16. "go.opentelemetry.io/contrib/propagators/autoprop"
  17. "go.opentelemetry.io/contrib/propagators/aws/xray"
  18. "go.opentelemetry.io/otel"
  19. "go.opentelemetry.io/otel/attribute"
  20. "go.opentelemetry.io/otel/codes"
  21. "go.opentelemetry.io/otel/metric"
  22. "go.opentelemetry.io/otel/propagation"
  23. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  24. "go.opentelemetry.io/otel/sdk/resource"
  25. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  26. semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
  27. "go.opentelemetry.io/otel/semconv/v1.20.0/httpconv"
  28. "go.opentelemetry.io/otel/trace"
  29. "github.com/imgproxy/imgproxy/v3/ierrors"
  30. "github.com/imgproxy/imgproxy/v3/monitoring/errformat"
  31. "github.com/imgproxy/imgproxy/v3/monitoring/stats"
  32. "github.com/imgproxy/imgproxy/v3/version"
  33. "github.com/imgproxy/imgproxy/v3/vips"
  34. )
  35. const (
  36. // stopTimeout is the maximum time to wait for the shutdown of the tracer and meter providers
  37. stopTimeout = 5 * time.Second
  38. // defaultOtelServiceName is the default service name for OpenTelemetry if none is set
  39. defaultOtelServiceName = "imgproxy"
  40. )
  41. // hasSpanCtxKey is a context key to mark that there is a span in the context
  42. type hasSpanCtxKey struct{}
  43. // errorHandler is an implementation of the OpenTelemetry error handler interface
  44. type errorHandler struct{}
  45. func (h errorHandler) Handle(err error) {
  46. slog.Warn(err.Error(), "source", "opentelemetry")
  47. }
  48. // Otel holds OpenTelemetry tracer and meter providers and configuration
  49. type Otel struct {
  50. config *Config
  51. stats *stats.Stats
  52. tracerProvider *sdktrace.TracerProvider
  53. tracer trace.Tracer
  54. meterProvider *sdkmetric.MeterProvider
  55. meter metric.Meter
  56. propagator propagation.TextMapPropagator
  57. }
  58. // New creates a new Otel instance
  59. func New(config *Config, stats *stats.Stats) (*Otel, error) {
  60. o := &Otel{
  61. config: config,
  62. stats: stats,
  63. }
  64. if !config.Enabled() {
  65. return o, nil
  66. }
  67. if err := config.Validate(); err != nil {
  68. return nil, err
  69. }
  70. otel.SetErrorHandler(errorHandler{})
  71. traceExporter, metricExporter, err := buildProtocolExporter(config)
  72. if err != nil {
  73. return nil, err
  74. }
  75. // If no service name is set, use "imgproxy" as default, and write it into the environment
  76. if n, _ := OTEL_SERVICE_NAME.Get(); len(n) == 0 {
  77. os.Setenv(OTEL_SERVICE_NAME.Name, defaultOtelServiceName)
  78. }
  79. res, _ := resource.Merge(
  80. resource.Default(),
  81. resource.NewSchemaless(
  82. semconv.ServiceVersion(version.Version),
  83. ),
  84. )
  85. awsRes, _ := resource.Detect(
  86. context.Background(),
  87. ec2.NewResourceDetector(),
  88. ecs.NewResourceDetector(),
  89. eks.NewResourceDetector(),
  90. )
  91. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  92. res = merged
  93. } else {
  94. slog.Warn(fmt.Sprintf("can't add AWS attributes to OpenTelemetry: %s", merr))
  95. }
  96. opts := []sdktrace.TracerProviderOption{
  97. sdktrace.WithResource(res),
  98. sdktrace.WithBatcher(traceExporter),
  99. }
  100. switch g := config.TraceIDGenerator; g {
  101. case "xray":
  102. idg := xray.NewIDGenerator()
  103. opts = append(opts, sdktrace.WithIDGenerator(idg))
  104. case "random":
  105. // Do nothing. OTel uses random generator by default
  106. default:
  107. return nil, fmt.Errorf("unknown Trace ID generator: %s", g)
  108. }
  109. o.tracerProvider = sdktrace.NewTracerProvider(opts...)
  110. o.tracer = o.tracerProvider.Tracer("imgproxy")
  111. if len(config.Propagators) > 0 {
  112. o.propagator, err = autoprop.TextMapPropagator(config.Propagators...)
  113. if err != nil {
  114. return nil, err
  115. }
  116. }
  117. if metricExporter == nil {
  118. return o, nil
  119. }
  120. metricReader := sdkmetric.NewPeriodicReader(
  121. metricExporter,
  122. sdkmetric.WithInterval(config.MetricsInterval),
  123. )
  124. o.meterProvider = sdkmetric.NewMeterProvider(
  125. sdkmetric.WithResource(res),
  126. sdkmetric.WithReader(metricReader),
  127. )
  128. o.meter = o.meterProvider.Meter("imgproxy")
  129. if err = o.addDefaultMetrics(); err != nil {
  130. return nil, err
  131. }
  132. return o, nil
  133. }
  134. func (o *Otel) Enabled() bool {
  135. return o.config.Enabled()
  136. }
  137. func (o *Otel) Stop(ctx context.Context) {
  138. if o.tracerProvider != nil {
  139. trctx, trcancel := context.WithTimeout(ctx, stopTimeout)
  140. defer trcancel()
  141. o.tracerProvider.Shutdown(trctx)
  142. }
  143. if o.meterProvider != nil {
  144. mtctx, mtcancel := context.WithTimeout(ctx, stopTimeout)
  145. defer mtcancel()
  146. o.meterProvider.Shutdown(mtctx)
  147. }
  148. }
  149. func (o *Otel) StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  150. if !o.Enabled() {
  151. return ctx, func() {}, rw
  152. }
  153. if o.propagator != nil {
  154. ctx = o.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  155. }
  156. server := r.Host
  157. if len(server) == 0 {
  158. server = "imgproxy"
  159. }
  160. ctx, span := o.tracer.Start(
  161. ctx, "/request",
  162. trace.WithSpanKind(trace.SpanKindServer),
  163. trace.WithAttributes(httpconv.ServerRequest(server, r)...),
  164. trace.WithAttributes(semconv.HTTPURL(r.RequestURI)),
  165. )
  166. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  167. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  168. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  169. return func(statusCode int) {
  170. span.SetStatus(httpconv.ServerStatus(statusCode))
  171. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  172. next(statusCode)
  173. }
  174. },
  175. })
  176. cancel := func() { span.End() }
  177. return ctx, cancel, newRw
  178. }
  179. func setMetadata(span trace.Span, key string, value interface{}) {
  180. if len(key) == 0 || value == nil {
  181. return
  182. }
  183. if stringer, ok := value.(fmt.Stringer); ok {
  184. span.SetAttributes(attribute.String(key, stringer.String()))
  185. return
  186. }
  187. rv := reflect.ValueOf(value)
  188. switch {
  189. case rv.Kind() == reflect.String:
  190. span.SetAttributes(attribute.String(key, value.(string)))
  191. case rv.Kind() == reflect.Bool:
  192. span.SetAttributes(attribute.Bool(key, value.(bool)))
  193. case rv.CanInt():
  194. span.SetAttributes(attribute.Int64(key, rv.Int()))
  195. case rv.CanUint():
  196. span.SetAttributes(attribute.Int64(key, int64(rv.Uint())))
  197. case rv.CanFloat():
  198. span.SetAttributes(attribute.Float64(key, rv.Float()))
  199. case rv.Kind() == reflect.Map && rv.Type().Key().Kind() == reflect.String:
  200. for _, k := range rv.MapKeys() {
  201. setMetadata(span, key+"."+k.String(), rv.MapIndex(k).Interface())
  202. }
  203. default:
  204. // Theoretically, we can also cover slices and arrays here,
  205. // but it's pretty complex and not really needed for now
  206. span.SetAttributes(attribute.String(key, fmt.Sprintf("%v", value)))
  207. }
  208. }
  209. func (o *Otel) SetMetadata(ctx context.Context, key string, value interface{}) {
  210. if !o.Enabled() {
  211. return
  212. }
  213. if ctx.Value(hasSpanCtxKey{}) != nil {
  214. if span := trace.SpanFromContext(ctx); span != nil {
  215. setMetadata(span, key, value)
  216. }
  217. }
  218. }
  219. func (o *Otel) StartSpan(ctx context.Context, name string, meta map[string]any) context.CancelFunc {
  220. if !o.Enabled() {
  221. return func() {}
  222. }
  223. if ctx.Value(hasSpanCtxKey{}) != nil {
  224. _, span := o.tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  225. for k, v := range meta {
  226. setMetadata(span, k, v)
  227. }
  228. return func() { span.End() }
  229. }
  230. return func() {}
  231. }
  232. func (o *Otel) SendError(ctx context.Context, errType string, err error) {
  233. if !o.Enabled() {
  234. return
  235. }
  236. span := trace.SpanFromContext(ctx)
  237. attributes := []attribute.KeyValue{
  238. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  239. semconv.ExceptionMessageKey.String(err.Error()),
  240. }
  241. if ierr, ok := err.(*ierrors.Error); ok {
  242. if stack := ierr.FormatStack(); len(stack) != 0 {
  243. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  244. }
  245. }
  246. span.SetStatus(codes.Error, err.Error())
  247. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  248. }
  249. func (o *Otel) addDefaultMetrics() error {
  250. proc, err := process.NewProcess(int32(os.Getpid()))
  251. if err != nil {
  252. return fmt.Errorf("can't initialize process data for OpenTelemetry: %s", err)
  253. }
  254. processResidentMemory, err := o.meter.Int64ObservableGauge(
  255. "process_resident_memory_bytes",
  256. metric.WithUnit("By"),
  257. metric.WithDescription("Resident memory size in bytes."),
  258. )
  259. if err != nil {
  260. return fmt.Errorf("can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  261. }
  262. processVirtualMemory, err := o.meter.Int64ObservableGauge(
  263. "process_virtual_memory_bytes",
  264. metric.WithUnit("By"),
  265. metric.WithDescription("Virtual memory size in bytes."),
  266. )
  267. if err != nil {
  268. return fmt.Errorf("can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  269. }
  270. goMemstatsSys, err := o.meter.Int64ObservableGauge(
  271. "go_memstats_sys_bytes",
  272. metric.WithUnit("By"),
  273. metric.WithDescription("Number of bytes obtained from system."),
  274. )
  275. if err != nil {
  276. return fmt.Errorf("can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  277. }
  278. goMemstatsHeapIdle, err := o.meter.Int64ObservableGauge(
  279. "go_memstats_heap_idle_bytes",
  280. metric.WithUnit("By"),
  281. metric.WithDescription("Number of heap bytes waiting to be used."),
  282. )
  283. if err != nil {
  284. return fmt.Errorf("can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  285. }
  286. goMemstatsHeapInuse, err := o.meter.Int64ObservableGauge(
  287. "go_memstats_heap_inuse_bytes",
  288. metric.WithUnit("By"),
  289. metric.WithDescription("Number of heap bytes that are in use."),
  290. )
  291. if err != nil {
  292. return fmt.Errorf("can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  293. }
  294. goGoroutines, err := o.meter.Int64ObservableGauge(
  295. "go_goroutines",
  296. metric.WithUnit("1"),
  297. metric.WithDescription("Number of goroutines that currently exist."),
  298. )
  299. if err != nil {
  300. return fmt.Errorf("can't add go_goroutines gauge to OpenTelemetry: %s", err)
  301. }
  302. goThreads, err := o.meter.Int64ObservableGauge(
  303. "go_threads",
  304. metric.WithUnit("1"),
  305. metric.WithDescription("Number of OS threads created."),
  306. )
  307. if err != nil {
  308. return fmt.Errorf("can't add go_threads gauge to OpenTelemetry: %s", err)
  309. }
  310. workersGauge, err := o.meter.Int64ObservableGauge(
  311. "workers",
  312. metric.WithUnit("1"),
  313. metric.WithDescription("A gauge of the number of running workers."),
  314. )
  315. if err != nil {
  316. return fmt.Errorf("can't add workers gauge to OpenTelemetry: %s", err)
  317. }
  318. requestsInProgressGauge, err := o.meter.Float64ObservableGauge(
  319. "requests_in_progress",
  320. metric.WithUnit("1"),
  321. metric.WithDescription("A gauge of the number of requests currently being in progress."),
  322. )
  323. if err != nil {
  324. return fmt.Errorf("can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  325. }
  326. imagesInProgressGauge, err := o.meter.Float64ObservableGauge(
  327. "images_in_progress",
  328. metric.WithUnit("1"),
  329. metric.WithDescription("A gauge of the number of images currently being in progress."),
  330. )
  331. if err != nil {
  332. return fmt.Errorf("can't add images_in_progress gauge to OpenTelemetry: %s", err)
  333. }
  334. workersUtilizationGauge, err := o.meter.Float64ObservableGauge(
  335. "workers_utilization",
  336. metric.WithUnit("%"),
  337. metric.WithDescription("A gauge of the workers utilization in percents."),
  338. )
  339. if err != nil {
  340. return fmt.Errorf("can't add workers_utilization gauge to OpenTelemetry: %s", err)
  341. }
  342. vipsMemory, err := o.meter.Float64ObservableGauge(
  343. "vips_memory_bytes",
  344. metric.WithUnit("By"),
  345. metric.WithDescription("A gauge of the vips tracked memory usage in bytes."),
  346. )
  347. if err != nil {
  348. return fmt.Errorf("can't add vips_memory_bytes gauge to OpenTelemetry: %s", err)
  349. }
  350. vipsMaxMemory, err := o.meter.Float64ObservableGauge(
  351. "vips_max_memory_bytes",
  352. metric.WithUnit("By"),
  353. metric.WithDescription("A gauge of the max vips tracked memory usage in bytes."),
  354. )
  355. if err != nil {
  356. return fmt.Errorf("can't add vips_max_memory_bytes gauge to OpenTelemetry: %s", err)
  357. }
  358. vipsAllocs, err := o.meter.Float64ObservableGauge(
  359. "vips_allocs",
  360. metric.WithUnit("1"),
  361. metric.WithDescription("A gauge of the number of active vips allocations."),
  362. )
  363. if err != nil {
  364. return fmt.Errorf("can't add vips_allocs gauge to OpenTelemetry: %s", err)
  365. }
  366. _, err = o.meter.RegisterCallback(
  367. func(ctx context.Context, ob metric.Observer) error {
  368. memStats, merr := proc.MemoryInfo()
  369. if merr != nil {
  370. return merr
  371. }
  372. ob.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  373. ob.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  374. goMemStats := &runtime.MemStats{}
  375. runtime.ReadMemStats(goMemStats)
  376. ob.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  377. ob.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  378. ob.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  379. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  380. ob.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  381. ob.ObserveInt64(goThreads, int64(threadsNum))
  382. ob.ObserveInt64(workersGauge, int64(o.stats.WorkersNumber))
  383. ob.ObserveFloat64(requestsInProgressGauge, o.stats.RequestsInProgress())
  384. ob.ObserveFloat64(imagesInProgressGauge, o.stats.ImagesInProgress())
  385. ob.ObserveFloat64(workersUtilizationGauge, o.stats.WorkersUtilization())
  386. ob.ObserveFloat64(vipsMemory, vips.GetMem())
  387. ob.ObserveFloat64(vipsMaxMemory, vips.GetMemHighwater())
  388. ob.ObserveFloat64(vipsAllocs, vips.GetAllocs())
  389. return nil
  390. },
  391. processResidentMemory,
  392. processVirtualMemory,
  393. goMemstatsSys,
  394. goMemstatsHeapIdle,
  395. goMemstatsHeapInuse,
  396. goGoroutines,
  397. goThreads,
  398. workersGauge,
  399. requestsInProgressGauge,
  400. imagesInProgressGauge,
  401. workersUtilizationGauge,
  402. vipsMemory,
  403. vipsMaxMemory,
  404. vipsAllocs,
  405. )
  406. if err != nil {
  407. return fmt.Errorf("can't register OpenTelemetry callbacks: %s", err)
  408. }
  409. return nil
  410. }