otel.go 20 KB


  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/felixge/httpsnoop"
  17. "github.com/shirou/gopsutil/process"
  18. "github.com/sirupsen/logrus"
  19. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  20. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  21. "go.opentelemetry.io/contrib/detectors/aws/eks"
  22. "go.opentelemetry.io/contrib/propagators/autoprop"
  23. "go.opentelemetry.io/contrib/propagators/aws/xray"
  24. "go.opentelemetry.io/otel"
  25. "go.opentelemetry.io/otel/attribute"
  26. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  27. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  28. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  29. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  30. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  31. "go.opentelemetry.io/otel/metric"
  32. "go.opentelemetry.io/otel/propagation"
  33. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  34. "go.opentelemetry.io/otel/sdk/resource"
  35. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  36. semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
  37. "go.opentelemetry.io/otel/semconv/v1.17.0/httpconv"
  38. "go.opentelemetry.io/otel/trace"
  39. "google.golang.org/grpc/credentials"
  40. "github.com/imgproxy/imgproxy/v3/config"
  41. "github.com/imgproxy/imgproxy/v3/config/configurators"
  42. "github.com/imgproxy/imgproxy/v3/ierrors"
  43. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  44. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  45. "github.com/imgproxy/imgproxy/v3/version"
  46. )
  47. type hasSpanCtxKey struct{}
  48. type GaugeFunc func() float64
  49. var (
  50. enabled bool
  51. enabledMetrics bool
  52. tracerProvider *sdktrace.TracerProvider
  53. tracer trace.Tracer
  54. meterProvider *sdkmetric.MeterProvider
  55. meter metric.Meter
  56. propagator propagation.TextMapPropagator
  57. bufferSizeHist metric.Int64Histogram
  58. bufferDefaultSizes = make(map[string]int)
  59. bufferMaxSizes = make(map[string]int)
  60. bufferStatsMutex sync.Mutex
  61. )
  62. func Init() error {
  63. mapDeprecatedConfig()
  64. if !config.OpenTelemetryEnable {
  65. return nil
  66. }
  67. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  68. var (
  69. traceExporter *otlptrace.Exporter
  70. metricExporter sdkmetric.Exporter
  71. err error
  72. )
  73. protocol := "grpc"
  74. configurators.String(&protocol, "OTEL_EXPORTER_OTLP_PROTOCOL")
  75. switch protocol {
  76. case "grpc":
  77. traceExporter, metricExporter, err = buildGRPCExporters()
  78. case "http/protobuf", "http", "https":
  79. traceExporter, metricExporter, err = buildHTTPExporters()
  80. default:
  81. return fmt.Errorf("Unsupported OpenTelemetry protocol: %s", protocol)
  82. }
  83. if err != nil {
  84. return err
  85. }
  86. if len(os.Getenv("OTEL_SERVICE_NAME")) == 0 {
  87. os.Setenv("OTEL_SERVICE_NAME", "imgproxy")
  88. }
  89. res, _ := resource.Merge(
  90. resource.Default(),
  91. resource.NewSchemaless(
  92. semconv.ServiceVersionKey.String(version.Version),
  93. ),
  94. )
  95. awsRes, _ := resource.Detect(
  96. context.Background(),
  97. ec2.NewResourceDetector(),
  98. ecs.NewResourceDetector(),
  99. eks.NewResourceDetector(),
  100. )
  101. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  102. res = merged
  103. } else {
  104. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  105. }
  106. opts := []sdktrace.TracerProviderOption{
  107. sdktrace.WithResource(res),
  108. sdktrace.WithBatcher(traceExporter),
  109. }
  110. switch g := config.OpenTelemetryTraceIDGenerator; g {
  111. case "xray":
  112. idg := xray.NewIDGenerator()
  113. opts = append(opts, sdktrace.WithIDGenerator(idg))
  114. case "random":
  115. // Do nothing. OTel uses random generator by default
  116. default:
  117. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  118. }
  119. tracerProvider = sdktrace.NewTracerProvider(opts...)
  120. tracer = tracerProvider.Tracer("imgproxy")
  121. var propagatorNames []string
  122. configurators.StringSlice(&propagatorNames, "OTEL_PROPAGATORS")
  123. if len(propagatorNames) > 0 {
  124. propagator, err = autoprop.TextMapPropagator(propagatorNames...)
  125. if err != nil {
  126. return err
  127. }
  128. }
  129. enabled = true
  130. if metricExporter == nil {
  131. return nil
  132. }
  133. metricReader := sdkmetric.NewPeriodicReader(
  134. metricExporter,
  135. sdkmetric.WithInterval(5*time.Second),
  136. )
  137. meterProvider = sdkmetric.NewMeterProvider(
  138. sdkmetric.WithResource(res),
  139. sdkmetric.WithReader(metricReader),
  140. )
  141. meter = meterProvider.Meter("imgproxy")
  142. if err = addDefaultMetrics(); err != nil {
  143. return err
  144. }
  145. enabledMetrics = true
  146. return nil
  147. }
  148. func mapDeprecatedConfig() {
  149. endpoint := os.Getenv("IMGPROXY_OPEN_TELEMETRY_ENDPOINT")
  150. if len(endpoint) > 0 {
  151. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_ENDPOINT config is deprecated. Use IMGPROXY_OPEN_TELEMETRY_ENABLE and OTEL_EXPORTER_OTLP_ENDPOINT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  152. config.OpenTelemetryEnable = true
  153. }
  154. if !config.OpenTelemetryEnable {
  155. return
  156. }
  157. protocol := "grpc"
  158. if prot := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROTOCOL"); len(prot) > 0 {
  159. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROTOCOL config is deprecated. Use OTEL_EXPORTER_OTLP_PROTOCOL instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  160. protocol = prot
  161. os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", protocol)
  162. }
  163. if len(endpoint) > 0 {
  164. schema := "https"
  165. switch protocol {
  166. case "grpc":
  167. if insecure, _ := strconv.ParseBool(os.Getenv("IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE")); insecure {
  168. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE config is deprecated. Use OTEL_EXPORTER_OTLP_ENDPOINT with the `http://` schema instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  169. schema = "http"
  170. }
  171. case "http":
  172. schema = "http"
  173. }
  174. os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("%s://%s", schema, endpoint))
  175. }
  176. if serviceName := os.Getenv("IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME"); len(serviceName) > 0 {
  177. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME config is deprecated. Use OTEL_SERVICE_NAME instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  178. os.Setenv("OTEL_SERVICE_NAME", serviceName)
  179. }
  180. if propagators := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROPAGATORS"); len(propagators) > 0 {
  181. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROPAGATORS config is deprecated. Use OTEL_PROPAGATORS instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  182. os.Setenv("OTEL_PROPAGATORS", propagators)
  183. }
  184. if timeout := os.Getenv("IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT"); len(timeout) > 0 {
  185. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT config is deprecated. Use OTEL_EXPORTER_OTLP_TIMEOUT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  186. if to, _ := strconv.Atoi(timeout); to > 0 {
  187. os.Setenv("OTEL_EXPORTER_OTLP_TIMEOUT", strconv.Itoa(to*1000))
  188. }
  189. }
  190. }
  191. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  192. tracerOpts := []otlptracegrpc.Option{}
  193. meterOpts := []otlpmetricgrpc.Option{}
  194. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  195. creds := credentials.NewTLS(tlsConf)
  196. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  197. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  198. } else if err != nil {
  199. return nil, nil, err
  200. }
  201. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  202. if err != nil {
  203. return nil, nil, err
  204. }
  205. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  206. defer trcancel()
  207. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  208. if err != nil {
  209. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  210. }
  211. if !config.OpenTelemetryEnableMetrics {
  212. return traceExporter, nil, err
  213. }
  214. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  215. defer mtcancel()
  216. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  217. if err != nil {
  218. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  219. }
  220. return traceExporter, metricExporter, err
  221. }
  222. func buildHTTPExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  223. tracerOpts := []otlptracehttp.Option{}
  224. meterOpts := []otlpmetrichttp.Option{}
  225. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  226. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  227. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  228. } else if err != nil {
  229. return nil, nil, err
  230. }
  231. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  232. if err != nil {
  233. return nil, nil, err
  234. }
  235. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  236. defer trcancel()
  237. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  238. if err != nil {
  239. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  240. }
  241. if !config.OpenTelemetryEnableMetrics {
  242. return traceExporter, nil, err
  243. }
  244. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  245. defer mtcancel()
  246. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  247. if err != nil {
  248. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  249. }
  250. return traceExporter, metricExporter, err
  251. }
  252. func getConnectionTimeouts() (time.Duration, time.Duration, error) {
  253. connTimeout := 10000
  254. configurators.Int(&connTimeout, "OTEL_EXPORTER_OTLP_TIMEOUT")
  255. tracesConnTimeout := connTimeout
  256. configurators.Int(&tracesConnTimeout, "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT")
  257. metricsConnTimeout := connTimeout
  258. configurators.Int(&metricsConnTimeout, "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT")
  259. if tracesConnTimeout <= 0 {
  260. return 0, 0, errors.New("Opentelemetry traces timeout should be greater than 0")
  261. }
  262. if metricsConnTimeout <= 0 {
  263. return 0, 0, errors.New("Opentelemetry metrics timeout should be greater than 0")
  264. }
  265. return time.Duration(tracesConnTimeout) * time.Millisecond,
  266. time.Duration(metricsConnTimeout) * time.Millisecond,
  267. nil
  268. }
  269. func buildTLSConfig() (*tls.Config, error) {
  270. if len(config.OpenTelemetryServerCert) == 0 {
  271. return nil, nil
  272. }
  273. certPool := x509.NewCertPool()
  274. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  275. return nil, errors.New("Can't load OpenTelemetry server cert")
  276. }
  277. tlsConf := tls.Config{RootCAs: certPool}
  278. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  279. cert, err := tls.X509KeyPair(
  280. prepareKeyCert(config.OpenTelemetryClientCert),
  281. prepareKeyCert(config.OpenTelemetryClientKey),
  282. )
  283. if err != nil {
  284. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  285. }
  286. tlsConf.Certificates = []tls.Certificate{cert}
  287. }
  288. return &tlsConf, nil
  289. }
  290. func prepareKeyCert(str string) []byte {
  291. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  292. }
  293. func Stop() {
  294. if enabled {
  295. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  296. defer trcancel()
  297. tracerProvider.Shutdown(trctx)
  298. if meterProvider != nil {
  299. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  300. defer mtcancel()
  301. meterProvider.Shutdown(mtctx)
  302. }
  303. }
  304. }
  305. func Enabled() bool {
  306. return enabled
  307. }
  308. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  309. if !enabled {
  310. return ctx, func() {}, rw
  311. }
  312. if propagator != nil {
  313. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  314. }
  315. ctx, span := tracer.Start(
  316. ctx, "/request",
  317. trace.WithSpanKind(trace.SpanKindServer),
  318. trace.WithAttributes(httpconv.ServerRequest("imgproxy", r)...),
  319. )
  320. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  321. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  322. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  323. return func(statusCode int) {
  324. span.SetStatus(httpconv.ServerStatus(statusCode))
  325. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  326. next(statusCode)
  327. }
  328. },
  329. })
  330. cancel := func() { span.End() }
  331. return ctx, cancel, newRw
  332. }
  333. func SetMetadata(ctx context.Context, key string, value interface{}) {
  334. if !enabled {
  335. return
  336. }
  337. span := trace.SpanFromContext(ctx)
  338. rv := reflect.ValueOf(value)
  339. switch {
  340. case rv.Kind() == reflect.String:
  341. span.SetAttributes(attribute.String(key, value.(string)))
  342. case rv.Kind() == reflect.Bool:
  343. span.SetAttributes(attribute.Bool(key, value.(bool)))
  344. case rv.CanInt():
  345. span.SetAttributes(attribute.Int64(key, rv.Int()))
  346. case rv.CanUint():
  347. span.SetAttributes(attribute.Int64(key, int64(rv.Uint())))
  348. case rv.CanFloat():
  349. span.SetAttributes(attribute.Float64(key, rv.Float()))
  350. default:
  351. // Theoretically, we can also cover slices and arrays here,
  352. // but it's pretty complex and not really needed for now
  353. span.SetAttributes(attribute.String(key, fmt.Sprintf("%v", value)))
  354. }
  355. }
  356. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  357. if !enabled {
  358. return func() {}
  359. }
  360. if ctx.Value(hasSpanCtxKey{}) != nil {
  361. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  362. return func() { span.End() }
  363. }
  364. return func() {}
  365. }
  366. func SendError(ctx context.Context, errType string, err error) {
  367. if !enabled {
  368. return
  369. }
  370. span := trace.SpanFromContext(ctx)
  371. attributes := []attribute.KeyValue{
  372. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  373. semconv.ExceptionMessageKey.String(err.Error()),
  374. }
  375. if ierr, ok := err.(*ierrors.Error); ok {
  376. if stack := ierr.FormatStack(); len(stack) != 0 {
  377. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  378. }
  379. }
  380. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  381. }
  382. func addDefaultMetrics() error {
  383. proc, err := process.NewProcess(int32(os.Getpid()))
  384. if err != nil {
  385. return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
  386. }
  387. processResidentMemory, err := meter.Int64ObservableGauge(
  388. "process_resident_memory_bytes",
  389. metric.WithUnit("By"),
  390. metric.WithDescription("Resident memory size in bytes."),
  391. )
  392. if err != nil {
  393. return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  394. }
  395. processVirtualMemory, err := meter.Int64ObservableGauge(
  396. "process_virtual_memory_bytes",
  397. metric.WithUnit("By"),
  398. metric.WithDescription("Virtual memory size in bytes."),
  399. )
  400. if err != nil {
  401. return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  402. }
  403. goMemstatsSys, err := meter.Int64ObservableGauge(
  404. "go_memstats_sys_bytes",
  405. metric.WithUnit("By"),
  406. metric.WithDescription("Number of bytes obtained from system."),
  407. )
  408. if err != nil {
  409. return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  410. }
  411. goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
  412. "go_memstats_heap_idle_bytes",
  413. metric.WithUnit("By"),
  414. metric.WithDescription("Number of heap bytes waiting to be used."),
  415. )
  416. if err != nil {
  417. return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  418. }
  419. goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
  420. "go_memstats_heap_inuse_bytes",
  421. metric.WithUnit("By"),
  422. metric.WithDescription("Number of heap bytes that are in use."),
  423. )
  424. if err != nil {
  425. return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  426. }
  427. goGoroutines, err := meter.Int64ObservableGauge(
  428. "go_goroutines",
  429. metric.WithUnit("1"),
  430. metric.WithDescription("Number of goroutines that currently exist."),
  431. )
  432. if err != nil {
  433. return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
  434. }
  435. goThreads, err := meter.Int64ObservableGauge(
  436. "go_threads",
  437. metric.WithUnit("1"),
  438. metric.WithDescription("Number of OS threads created."),
  439. )
  440. if err != nil {
  441. return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
  442. }
  443. requestsInProgressGauge, err := meter.Float64ObservableGauge(
  444. "requests_in_progress",
  445. metric.WithUnit("1"),
  446. metric.WithDescription("A gauge of the number of requests currently being in progress."),
  447. )
  448. if err != nil {
  449. return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  450. }
  451. imagesInProgressGauge, err := meter.Float64ObservableGauge(
  452. "images_in_progress",
  453. metric.WithUnit("1"),
  454. metric.WithDescription("A gauge of the number of images currently being in progress."),
  455. )
  456. if err != nil {
  457. return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
  458. }
  459. bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
  460. "buffer_default_size_bytes",
  461. metric.WithUnit("By"),
  462. metric.WithDescription("A gauge of the buffer default size in bytes."),
  463. )
  464. if err != nil {
  465. return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
  466. }
  467. bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
  468. "buffer_max_size_bytes",
  469. metric.WithUnit("By"),
  470. metric.WithDescription("A gauge of the buffer max size in bytes."),
  471. )
  472. if err != nil {
  473. return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
  474. }
  475. _, err = meter.RegisterCallback(
  476. func(ctx context.Context, o metric.Observer) error {
  477. memStats, merr := proc.MemoryInfo()
  478. if merr != nil {
  479. return merr
  480. }
  481. o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  482. o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  483. goMemStats := &runtime.MemStats{}
  484. runtime.ReadMemStats(goMemStats)
  485. o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  486. o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  487. o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  488. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  489. o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  490. o.ObserveInt64(goThreads, int64(threadsNum))
  491. o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
  492. o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
  493. bufferStatsMutex.Lock()
  494. defer bufferStatsMutex.Unlock()
  495. for t, v := range bufferDefaultSizes {
  496. o.ObserveInt64(bufferDefaultSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  497. }
  498. for t, v := range bufferMaxSizes {
  499. o.ObserveInt64(bufferMaxSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  500. }
  501. return nil
  502. },
  503. processResidentMemory,
  504. processVirtualMemory,
  505. goMemstatsSys,
  506. goMemstatsHeapIdle,
  507. goMemstatsHeapInuse,
  508. goGoroutines,
  509. goThreads,
  510. requestsInProgressGauge,
  511. imagesInProgressGauge,
  512. bufferDefaultSizeGauge,
  513. bufferMaxSizeGauge,
  514. )
  515. if err != nil {
  516. return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
  517. }
  518. bufferSizeHist, err = meter.Int64Histogram(
  519. "buffer_size_bytes",
  520. metric.WithUnit("By"),
  521. metric.WithDescription("A histogram of the buffer size in bytes."),
  522. )
  523. if err != nil {
  524. return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
  525. }
  526. return nil
  527. }
  528. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  529. if meter == nil {
  530. return
  531. }
  532. _, err := meter.Float64ObservableGauge(
  533. name,
  534. metric.WithUnit(u),
  535. metric.WithDescription(desc),
  536. metric.WithFloat64Callback(func(_ context.Context, obsrv metric.Float64Observer) error {
  537. obsrv.Observe(f())
  538. return nil
  539. }),
  540. )
  541. if err != nil {
  542. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  543. }
  544. }
  545. func ObserveBufferSize(t string, size int) {
  546. if enabledMetrics {
  547. bufferSizeHist.Record(context.Background(), int64(size), metric.WithAttributes(attribute.String("type", t)))
  548. }
  549. }
  550. func SetBufferDefaultSize(t string, size int) {
  551. if enabledMetrics {
  552. bufferStatsMutex.Lock()
  553. defer bufferStatsMutex.Unlock()
  554. bufferDefaultSizes[t] = size
  555. }
  556. }
  557. func SetBufferMaxSize(t string, size int) {
  558. if enabledMetrics {
  559. bufferStatsMutex.Lock()
  560. defer bufferStatsMutex.Unlock()
  561. bufferMaxSizes[t] = size
  562. }
  563. }
  564. type errorHandler struct {
  565. entry *logrus.Entry
  566. }
  567. func (h *errorHandler) Handle(err error) {
  568. h.entry.Warn(err.Error())
  569. }