otel.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/felixge/httpsnoop"
  17. "github.com/shirou/gopsutil/process"
  18. "github.com/sirupsen/logrus"
  19. ec2 "go.opentelemetry.io/contrib/detectors/aws/ec2/v2"
  20. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  21. "go.opentelemetry.io/contrib/detectors/aws/eks"
  22. "go.opentelemetry.io/contrib/propagators/autoprop"
  23. "go.opentelemetry.io/contrib/propagators/aws/xray"
  24. "go.opentelemetry.io/otel"
  25. "go.opentelemetry.io/otel/attribute"
  26. "go.opentelemetry.io/otel/codes"
  27. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  28. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  29. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  30. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  31. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  32. "go.opentelemetry.io/otel/metric"
  33. "go.opentelemetry.io/otel/propagation"
  34. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  35. "go.opentelemetry.io/otel/sdk/resource"
  36. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  37. semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
  38. "go.opentelemetry.io/otel/semconv/v1.20.0/httpconv"
  39. "go.opentelemetry.io/otel/trace"
  40. "google.golang.org/grpc/credentials"
  41. "github.com/imgproxy/imgproxy/v3/config"
  42. "github.com/imgproxy/imgproxy/v3/config/configurators"
  43. "github.com/imgproxy/imgproxy/v3/ierrors"
  44. "github.com/imgproxy/imgproxy/v3/monitoring/errformat"
  45. "github.com/imgproxy/imgproxy/v3/monitoring/stats"
  46. "github.com/imgproxy/imgproxy/v3/version"
  47. )
  48. type hasSpanCtxKey struct{}
  49. type GaugeFunc func() float64
  50. var (
  51. enabled bool
  52. enabledMetrics bool
  53. tracerProvider *sdktrace.TracerProvider
  54. tracer trace.Tracer
  55. meterProvider *sdkmetric.MeterProvider
  56. meter metric.Meter
  57. propagator propagation.TextMapPropagator
  58. bufferSizeHist metric.Int64Histogram
  59. bufferDefaultSizes = make(map[string]int)
  60. bufferMaxSizes = make(map[string]int)
  61. bufferStatsMutex sync.Mutex
  62. )
  63. func Init() error {
  64. mapDeprecatedConfig()
  65. if !config.OpenTelemetryEnable {
  66. return nil
  67. }
  68. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  69. var (
  70. traceExporter *otlptrace.Exporter
  71. metricExporter sdkmetric.Exporter
  72. err error
  73. )
  74. protocol := "grpc"
  75. configurators.String(&protocol, "OTEL_EXPORTER_OTLP_PROTOCOL")
  76. switch protocol {
  77. case "grpc":
  78. traceExporter, metricExporter, err = buildGRPCExporters()
  79. case "http/protobuf", "http", "https":
  80. traceExporter, metricExporter, err = buildHTTPExporters()
  81. default:
  82. return fmt.Errorf("Unsupported OpenTelemetry protocol: %s", protocol)
  83. }
  84. if err != nil {
  85. return err
  86. }
  87. if len(os.Getenv("OTEL_SERVICE_NAME")) == 0 {
  88. os.Setenv("OTEL_SERVICE_NAME", "imgproxy")
  89. }
  90. res, _ := resource.Merge(
  91. resource.Default(),
  92. resource.NewSchemaless(
  93. semconv.ServiceVersionKey.String(version.Version),
  94. ),
  95. )
  96. awsRes, _ := resource.Detect(
  97. context.Background(),
  98. ec2.NewResourceDetector(),
  99. ecs.NewResourceDetector(),
  100. eks.NewResourceDetector(),
  101. )
  102. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  103. res = merged
  104. } else {
  105. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  106. }
  107. opts := []sdktrace.TracerProviderOption{
  108. sdktrace.WithResource(res),
  109. sdktrace.WithBatcher(traceExporter),
  110. }
  111. switch g := config.OpenTelemetryTraceIDGenerator; g {
  112. case "xray":
  113. idg := xray.NewIDGenerator()
  114. opts = append(opts, sdktrace.WithIDGenerator(idg))
  115. case "random":
  116. // Do nothing. OTel uses random generator by default
  117. default:
  118. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  119. }
  120. tracerProvider = sdktrace.NewTracerProvider(opts...)
  121. tracer = tracerProvider.Tracer("imgproxy")
  122. var propagatorNames []string
  123. configurators.StringSlice(&propagatorNames, "OTEL_PROPAGATORS")
  124. if len(propagatorNames) > 0 {
  125. propagator, err = autoprop.TextMapPropagator(propagatorNames...)
  126. if err != nil {
  127. return err
  128. }
  129. }
  130. enabled = true
  131. if metricExporter == nil {
  132. return nil
  133. }
  134. metricReader := sdkmetric.NewPeriodicReader(
  135. metricExporter,
  136. sdkmetric.WithInterval(5*time.Second),
  137. )
  138. meterProvider = sdkmetric.NewMeterProvider(
  139. sdkmetric.WithResource(res),
  140. sdkmetric.WithReader(metricReader),
  141. )
  142. meter = meterProvider.Meter("imgproxy")
  143. if err = addDefaultMetrics(); err != nil {
  144. return err
  145. }
  146. enabledMetrics = true
  147. return nil
  148. }
  149. func mapDeprecatedConfig() {
  150. endpoint := os.Getenv("IMGPROXY_OPEN_TELEMETRY_ENDPOINT")
  151. if len(endpoint) > 0 {
  152. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_ENDPOINT config is deprecated. Use IMGPROXY_OPEN_TELEMETRY_ENABLE and OTEL_EXPORTER_OTLP_ENDPOINT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  153. config.OpenTelemetryEnable = true
  154. }
  155. if !config.OpenTelemetryEnable {
  156. return
  157. }
  158. protocol := "grpc"
  159. if prot := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROTOCOL"); len(prot) > 0 {
  160. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROTOCOL config is deprecated. Use OTEL_EXPORTER_OTLP_PROTOCOL instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  161. protocol = prot
  162. os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", protocol)
  163. }
  164. if len(endpoint) > 0 {
  165. schema := "https"
  166. switch protocol {
  167. case "grpc":
  168. if insecure, _ := strconv.ParseBool(os.Getenv("IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE")); insecure {
  169. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE config is deprecated. Use OTEL_EXPORTER_OTLP_ENDPOINT with the `http://` schema instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  170. schema = "http"
  171. }
  172. case "http":
  173. schema = "http"
  174. }
  175. os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("%s://%s", schema, endpoint))
  176. }
  177. if serviceName := os.Getenv("IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME"); len(serviceName) > 0 {
  178. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME config is deprecated. Use OTEL_SERVICE_NAME instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  179. os.Setenv("OTEL_SERVICE_NAME", serviceName)
  180. }
  181. if propagators := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROPAGATORS"); len(propagators) > 0 {
  182. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROPAGATORS config is deprecated. Use OTEL_PROPAGATORS instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  183. os.Setenv("OTEL_PROPAGATORS", propagators)
  184. }
  185. if timeout := os.Getenv("IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT"); len(timeout) > 0 {
  186. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT config is deprecated. Use OTEL_EXPORTER_OTLP_TIMEOUT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  187. if to, _ := strconv.Atoi(timeout); to > 0 {
  188. os.Setenv("OTEL_EXPORTER_OTLP_TIMEOUT", strconv.Itoa(to*1000))
  189. }
  190. }
  191. }
  192. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  193. tracerOpts := []otlptracegrpc.Option{}
  194. meterOpts := []otlpmetricgrpc.Option{}
  195. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  196. creds := credentials.NewTLS(tlsConf)
  197. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  198. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  199. } else if err != nil {
  200. return nil, nil, err
  201. }
  202. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  203. if err != nil {
  204. return nil, nil, err
  205. }
  206. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  207. defer trcancel()
  208. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  209. if err != nil {
  210. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  211. }
  212. if !config.OpenTelemetryEnableMetrics {
  213. return traceExporter, nil, err
  214. }
  215. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  216. defer mtcancel()
  217. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  218. if err != nil {
  219. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  220. }
  221. return traceExporter, metricExporter, err
  222. }
  223. func buildHTTPExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  224. tracerOpts := []otlptracehttp.Option{}
  225. meterOpts := []otlpmetrichttp.Option{}
  226. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  227. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  228. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  229. } else if err != nil {
  230. return nil, nil, err
  231. }
  232. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  233. if err != nil {
  234. return nil, nil, err
  235. }
  236. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  237. defer trcancel()
  238. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  239. if err != nil {
  240. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  241. }
  242. if !config.OpenTelemetryEnableMetrics {
  243. return traceExporter, nil, err
  244. }
  245. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  246. defer mtcancel()
  247. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  248. if err != nil {
  249. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  250. }
  251. return traceExporter, metricExporter, err
  252. }
  253. func getConnectionTimeouts() (time.Duration, time.Duration, error) {
  254. connTimeout := 10000
  255. configurators.Int(&connTimeout, "OTEL_EXPORTER_OTLP_TIMEOUT")
  256. tracesConnTimeout := connTimeout
  257. configurators.Int(&tracesConnTimeout, "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT")
  258. metricsConnTimeout := connTimeout
  259. configurators.Int(&metricsConnTimeout, "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT")
  260. if tracesConnTimeout <= 0 {
  261. return 0, 0, errors.New("Opentelemetry traces timeout should be greater than 0")
  262. }
  263. if metricsConnTimeout <= 0 {
  264. return 0, 0, errors.New("Opentelemetry metrics timeout should be greater than 0")
  265. }
  266. return time.Duration(tracesConnTimeout) * time.Millisecond,
  267. time.Duration(metricsConnTimeout) * time.Millisecond,
  268. nil
  269. }
  270. func buildTLSConfig() (*tls.Config, error) {
  271. if len(config.OpenTelemetryServerCert) == 0 {
  272. return nil, nil
  273. }
  274. certPool := x509.NewCertPool()
  275. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  276. return nil, errors.New("Can't load OpenTelemetry server cert")
  277. }
  278. tlsConf := tls.Config{RootCAs: certPool}
  279. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  280. cert, err := tls.X509KeyPair(
  281. prepareKeyCert(config.OpenTelemetryClientCert),
  282. prepareKeyCert(config.OpenTelemetryClientKey),
  283. )
  284. if err != nil {
  285. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  286. }
  287. tlsConf.Certificates = []tls.Certificate{cert}
  288. }
  289. return &tlsConf, nil
  290. }
  291. func prepareKeyCert(str string) []byte {
  292. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  293. }
  294. func Stop() {
  295. if enabled {
  296. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  297. defer trcancel()
  298. tracerProvider.Shutdown(trctx)
  299. if meterProvider != nil {
  300. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  301. defer mtcancel()
  302. meterProvider.Shutdown(mtctx)
  303. }
  304. }
  305. }
  306. func Enabled() bool {
  307. return enabled
  308. }
  309. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  310. if !enabled {
  311. return ctx, func() {}, rw
  312. }
  313. if propagator != nil {
  314. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  315. }
  316. server := r.Host
  317. if len(server) == 0 {
  318. server = "imgproxy"
  319. }
  320. ctx, span := tracer.Start(
  321. ctx, "/request",
  322. trace.WithSpanKind(trace.SpanKindServer),
  323. trace.WithAttributes(httpconv.ServerRequest(server, r)...),
  324. trace.WithAttributes(semconv.HTTPURL(r.RequestURI)),
  325. )
  326. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  327. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  328. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  329. return func(statusCode int) {
  330. span.SetStatus(httpconv.ServerStatus(statusCode))
  331. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  332. next(statusCode)
  333. }
  334. },
  335. })
  336. cancel := func() { span.End() }
  337. return ctx, cancel, newRw
  338. }
  339. func setMetadata(span trace.Span, key string, value interface{}) {
  340. if len(key) == 0 || value == nil {
  341. return
  342. }
  343. rv := reflect.ValueOf(value)
  344. switch {
  345. case rv.Kind() == reflect.String:
  346. span.SetAttributes(attribute.String(key, value.(string)))
  347. case rv.Kind() == reflect.Bool:
  348. span.SetAttributes(attribute.Bool(key, value.(bool)))
  349. case rv.CanInt():
  350. span.SetAttributes(attribute.Int64(key, rv.Int()))
  351. case rv.CanUint():
  352. span.SetAttributes(attribute.Int64(key, int64(rv.Uint())))
  353. case rv.CanFloat():
  354. span.SetAttributes(attribute.Float64(key, rv.Float()))
  355. case rv.Kind() == reflect.Map && rv.Type().Key().Kind() == reflect.String:
  356. for _, k := range rv.MapKeys() {
  357. setMetadata(span, key+"."+k.String(), rv.MapIndex(k).Interface())
  358. }
  359. default:
  360. // Theoretically, we can also cover slices and arrays here,
  361. // but it's pretty complex and not really needed for now
  362. span.SetAttributes(attribute.String(key, fmt.Sprintf("%v", value)))
  363. }
  364. }
  365. func SetMetadata(ctx context.Context, key string, value interface{}) {
  366. if !enabled {
  367. return
  368. }
  369. if ctx.Value(hasSpanCtxKey{}) != nil {
  370. if span := trace.SpanFromContext(ctx); span != nil {
  371. setMetadata(span, key, value)
  372. }
  373. }
  374. }
  375. func StartSpan(ctx context.Context, name string, meta map[string]any) context.CancelFunc {
  376. if !enabled {
  377. return func() {}
  378. }
  379. if ctx.Value(hasSpanCtxKey{}) != nil {
  380. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  381. for k, v := range meta {
  382. setMetadata(span, k, v)
  383. }
  384. return func() { span.End() }
  385. }
  386. return func() {}
  387. }
  388. func SendError(ctx context.Context, errType string, err error) {
  389. if !enabled {
  390. return
  391. }
  392. span := trace.SpanFromContext(ctx)
  393. attributes := []attribute.KeyValue{
  394. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  395. semconv.ExceptionMessageKey.String(err.Error()),
  396. }
  397. if ierr, ok := err.(*ierrors.Error); ok {
  398. if stack := ierr.FormatStack(); len(stack) != 0 {
  399. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  400. }
  401. }
  402. span.SetStatus(codes.Error, err.Error())
  403. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  404. }
  405. func addDefaultMetrics() error {
  406. proc, err := process.NewProcess(int32(os.Getpid()))
  407. if err != nil {
  408. return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
  409. }
  410. processResidentMemory, err := meter.Int64ObservableGauge(
  411. "process_resident_memory_bytes",
  412. metric.WithUnit("By"),
  413. metric.WithDescription("Resident memory size in bytes."),
  414. )
  415. if err != nil {
  416. return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  417. }
  418. processVirtualMemory, err := meter.Int64ObservableGauge(
  419. "process_virtual_memory_bytes",
  420. metric.WithUnit("By"),
  421. metric.WithDescription("Virtual memory size in bytes."),
  422. )
  423. if err != nil {
  424. return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  425. }
  426. goMemstatsSys, err := meter.Int64ObservableGauge(
  427. "go_memstats_sys_bytes",
  428. metric.WithUnit("By"),
  429. metric.WithDescription("Number of bytes obtained from system."),
  430. )
  431. if err != nil {
  432. return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  433. }
  434. goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
  435. "go_memstats_heap_idle_bytes",
  436. metric.WithUnit("By"),
  437. metric.WithDescription("Number of heap bytes waiting to be used."),
  438. )
  439. if err != nil {
  440. return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  441. }
  442. goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
  443. "go_memstats_heap_inuse_bytes",
  444. metric.WithUnit("By"),
  445. metric.WithDescription("Number of heap bytes that are in use."),
  446. )
  447. if err != nil {
  448. return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  449. }
  450. goGoroutines, err := meter.Int64ObservableGauge(
  451. "go_goroutines",
  452. metric.WithUnit("1"),
  453. metric.WithDescription("Number of goroutines that currently exist."),
  454. )
  455. if err != nil {
  456. return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
  457. }
  458. goThreads, err := meter.Int64ObservableGauge(
  459. "go_threads",
  460. metric.WithUnit("1"),
  461. metric.WithDescription("Number of OS threads created."),
  462. )
  463. if err != nil {
  464. return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
  465. }
  466. workersGauge, err := meter.Int64ObservableGauge(
  467. "workers",
  468. metric.WithUnit("1"),
  469. metric.WithDescription("A gauge of the number of running workers."),
  470. )
  471. if err != nil {
  472. return fmt.Errorf("Can't add workets gauge to OpenTelemetry: %s", err)
  473. }
  474. requestsInProgressGauge, err := meter.Float64ObservableGauge(
  475. "requests_in_progress",
  476. metric.WithUnit("1"),
  477. metric.WithDescription("A gauge of the number of requests currently being in progress."),
  478. )
  479. if err != nil {
  480. return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  481. }
  482. imagesInProgressGauge, err := meter.Float64ObservableGauge(
  483. "images_in_progress",
  484. metric.WithUnit("1"),
  485. metric.WithDescription("A gauge of the number of images currently being in progress."),
  486. )
  487. if err != nil {
  488. return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
  489. }
  490. workersUtilizationGauge, err := meter.Float64ObservableGauge(
  491. "workers_utilization",
  492. metric.WithUnit("%"),
  493. metric.WithDescription("A gauge of the workers utilization in percents."),
  494. )
  495. if err != nil {
  496. return fmt.Errorf("Can't add workers_utilization gauge to OpenTelemetry: %s", err)
  497. }
  498. bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
  499. "buffer_default_size_bytes",
  500. metric.WithUnit("By"),
  501. metric.WithDescription("A gauge of the buffer default size in bytes."),
  502. )
  503. if err != nil {
  504. return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
  505. }
  506. bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
  507. "buffer_max_size_bytes",
  508. metric.WithUnit("By"),
  509. metric.WithDescription("A gauge of the buffer max size in bytes."),
  510. )
  511. if err != nil {
  512. return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
  513. }
  514. _, err = meter.RegisterCallback(
  515. func(ctx context.Context, o metric.Observer) error {
  516. memStats, merr := proc.MemoryInfo()
  517. if merr != nil {
  518. return merr
  519. }
  520. o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  521. o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  522. goMemStats := &runtime.MemStats{}
  523. runtime.ReadMemStats(goMemStats)
  524. o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  525. o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  526. o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  527. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  528. o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  529. o.ObserveInt64(goThreads, int64(threadsNum))
  530. o.ObserveInt64(workersGauge, int64(config.Workers))
  531. o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
  532. o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
  533. o.ObserveFloat64(workersUtilizationGauge, stats.WorkersUtilization())
  534. bufferStatsMutex.Lock()
  535. defer bufferStatsMutex.Unlock()
  536. for t, v := range bufferDefaultSizes {
  537. o.ObserveInt64(bufferDefaultSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  538. }
  539. for t, v := range bufferMaxSizes {
  540. o.ObserveInt64(bufferMaxSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  541. }
  542. return nil
  543. },
  544. processResidentMemory,
  545. processVirtualMemory,
  546. goMemstatsSys,
  547. goMemstatsHeapIdle,
  548. goMemstatsHeapInuse,
  549. goGoroutines,
  550. goThreads,
  551. workersGauge,
  552. requestsInProgressGauge,
  553. imagesInProgressGauge,
  554. workersUtilizationGauge,
  555. bufferDefaultSizeGauge,
  556. bufferMaxSizeGauge,
  557. )
  558. if err != nil {
  559. return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
  560. }
  561. bufferSizeHist, err = meter.Int64Histogram(
  562. "buffer_size_bytes",
  563. metric.WithUnit("By"),
  564. metric.WithDescription("A histogram of the buffer size in bytes."),
  565. )
  566. if err != nil {
  567. return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
  568. }
  569. return nil
  570. }
  571. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  572. if meter == nil {
  573. return
  574. }
  575. _, err := meter.Float64ObservableGauge(
  576. name,
  577. metric.WithUnit(u),
  578. metric.WithDescription(desc),
  579. metric.WithFloat64Callback(func(_ context.Context, obsrv metric.Float64Observer) error {
  580. obsrv.Observe(f())
  581. return nil
  582. }),
  583. )
  584. if err != nil {
  585. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  586. }
  587. }
  588. func ObserveBufferSize(t string, size int) {
  589. if enabledMetrics {
  590. bufferSizeHist.Record(context.Background(), int64(size), metric.WithAttributes(attribute.String("type", t)))
  591. }
  592. }
  593. func SetBufferDefaultSize(t string, size int) {
  594. if enabledMetrics {
  595. bufferStatsMutex.Lock()
  596. defer bufferStatsMutex.Unlock()
  597. bufferDefaultSizes[t] = size
  598. }
  599. }
  600. func SetBufferMaxSize(t string, size int) {
  601. if enabledMetrics {
  602. bufferStatsMutex.Lock()
  603. defer bufferStatsMutex.Unlock()
  604. bufferMaxSizes[t] = size
  605. }
  606. }
  607. type errorHandler struct {
  608. entry *logrus.Entry
  609. }
  610. func (h *errorHandler) Handle(err error) {
  611. h.entry.Warn(err.Error())
  612. }