otel.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/felixge/httpsnoop"
  16. "github.com/shirou/gopsutil/process"
  17. "github.com/sirupsen/logrus"
  18. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  19. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  20. "go.opentelemetry.io/contrib/detectors/aws/eks"
  21. "go.opentelemetry.io/contrib/propagators/autoprop"
  22. "go.opentelemetry.io/contrib/propagators/aws/xray"
  23. "go.opentelemetry.io/otel"
  24. "go.opentelemetry.io/otel/attribute"
  25. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  26. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  27. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  28. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  29. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  30. "go.opentelemetry.io/otel/metric"
  31. "go.opentelemetry.io/otel/propagation"
  32. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  33. "go.opentelemetry.io/otel/sdk/resource"
  34. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  35. semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
  36. "go.opentelemetry.io/otel/semconv/v1.17.0/httpconv"
  37. "go.opentelemetry.io/otel/trace"
  38. "google.golang.org/grpc/credentials"
  39. "github.com/imgproxy/imgproxy/v3/config"
  40. "github.com/imgproxy/imgproxy/v3/config/configurators"
  41. "github.com/imgproxy/imgproxy/v3/ierrors"
  42. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  43. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  44. "github.com/imgproxy/imgproxy/v3/version"
  45. )
  46. type hasSpanCtxKey struct{}
  47. type GaugeFunc func() float64
  48. var (
  49. enabled bool
  50. enabledMetrics bool
  51. tracerProvider *sdktrace.TracerProvider
  52. tracer trace.Tracer
  53. meterProvider *sdkmetric.MeterProvider
  54. meter metric.Meter
  55. propagator propagation.TextMapPropagator
  56. bufferSizeHist metric.Int64Histogram
  57. bufferDefaultSizes = make(map[string]int)
  58. bufferMaxSizes = make(map[string]int)
  59. bufferStatsMutex sync.Mutex
  60. )
  61. func Init() error {
  62. mapDeprecatedConfig()
  63. if !config.OpenTelemetryEnable {
  64. return nil
  65. }
  66. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  67. var (
  68. traceExporter *otlptrace.Exporter
  69. metricExporter sdkmetric.Exporter
  70. err error
  71. )
  72. protocol := "grpc"
  73. configurators.String(&protocol, "OTEL_EXPORTER_OTLP_PROTOCOL")
  74. switch protocol {
  75. case "grpc":
  76. traceExporter, metricExporter, err = buildGRPCExporters()
  77. case "http/protobuf", "http", "https":
  78. traceExporter, metricExporter, err = buildHTTPExporters()
  79. default:
  80. return fmt.Errorf("Unsupported OpenTelemetry protocol: %s", protocol)
  81. }
  82. if err != nil {
  83. return err
  84. }
  85. if len(os.Getenv("OTEL_SERVICE_NAME")) == 0 {
  86. os.Setenv("OTEL_SERVICE_NAME", "imgproxy")
  87. }
  88. res, _ := resource.Merge(
  89. resource.Default(),
  90. resource.NewSchemaless(
  91. semconv.ServiceVersionKey.String(version.Version),
  92. ),
  93. )
  94. awsRes, _ := resource.Detect(
  95. context.Background(),
  96. ec2.NewResourceDetector(),
  97. ecs.NewResourceDetector(),
  98. eks.NewResourceDetector(),
  99. )
  100. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  101. res = merged
  102. } else {
  103. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  104. }
  105. opts := []sdktrace.TracerProviderOption{
  106. sdktrace.WithResource(res),
  107. sdktrace.WithBatcher(traceExporter),
  108. }
  109. switch g := config.OpenTelemetryTraceIDGenerator; g {
  110. case "xray":
  111. idg := xray.NewIDGenerator()
  112. opts = append(opts, sdktrace.WithIDGenerator(idg))
  113. case "random":
  114. // Do nothing. OTel uses random generator by default
  115. default:
  116. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  117. }
  118. tracerProvider = sdktrace.NewTracerProvider(opts...)
  119. tracer = tracerProvider.Tracer("imgproxy")
  120. var propagatorNames []string
  121. configurators.StringSlice(&propagatorNames, "OTEL_PROPAGATORS")
  122. if len(propagatorNames) > 0 {
  123. propagator, err = autoprop.TextMapPropagator(propagatorNames...)
  124. if err != nil {
  125. return err
  126. }
  127. }
  128. enabled = true
  129. if metricExporter == nil {
  130. return nil
  131. }
  132. metricReader := sdkmetric.NewPeriodicReader(
  133. metricExporter,
  134. sdkmetric.WithInterval(5*time.Second),
  135. )
  136. meterProvider = sdkmetric.NewMeterProvider(
  137. sdkmetric.WithResource(res),
  138. sdkmetric.WithReader(metricReader),
  139. )
  140. meter = meterProvider.Meter("imgproxy")
  141. if err = addDefaultMetrics(); err != nil {
  142. return err
  143. }
  144. enabledMetrics = true
  145. return nil
  146. }
  147. func mapDeprecatedConfig() {
  148. endpoint := os.Getenv("IMGPROXY_OPEN_TELEMETRY_ENDPOINT")
  149. if len(endpoint) > 0 {
  150. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_ENDPOINT config is deprecated. Use IMGPROXY_OPEN_TELEMETRY_ENABLE and OTEL_EXPORTER_OTLP_ENDPOINT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  151. config.OpenTelemetryEnable = true
  152. }
  153. if !config.OpenTelemetryEnable {
  154. return
  155. }
  156. protocol := "grpc"
  157. if prot := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROTOCOL"); len(prot) > 0 {
  158. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROTOCOL config is deprecated. Use OTEL_EXPORTER_OTLP_PROTOCOL instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  159. protocol = prot
  160. os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", protocol)
  161. }
  162. if len(endpoint) > 0 {
  163. schema := "https"
  164. switch protocol {
  165. case "grpc":
  166. if insecure, _ := strconv.ParseBool(os.Getenv("IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE")); insecure {
  167. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_GRPC_INSECURE config is deprecated. Use OTEL_EXPORTER_OTLP_ENDPOINT with the `http://` schema instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  168. schema = "http"
  169. }
  170. case "http":
  171. schema = "http"
  172. }
  173. os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("%s://%s", schema, endpoint))
  174. }
  175. if serviceName := os.Getenv("IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME"); len(serviceName) > 0 {
  176. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_SERVICE_NAME config is deprecated. Use OTEL_SERVICE_NAME instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  177. os.Setenv("OTEL_SERVICE_NAME", serviceName)
  178. }
  179. if propagators := os.Getenv("IMGPROXY_OPEN_TELEMETRY_PROPAGATORS"); len(propagators) > 0 {
  180. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_PROPAGATORS config is deprecated. Use OTEL_PROPAGATORS instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  181. os.Setenv("OTEL_PROPAGATORS", propagators)
  182. }
  183. if timeout := os.Getenv("IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT"); len(timeout) > 0 {
  184. logrus.Warn("The IMGPROXY_OPEN_TELEMETRY_CONNECTION_TIMEOUT config is deprecated. Use OTEL_EXPORTER_OTLP_TIMEOUT instead. See https://docs.imgproxy.net/latest/monitoring/open_telemetry#deprecated-environment-variables")
  185. if to, _ := strconv.Atoi(timeout); to > 0 {
  186. os.Setenv("OTEL_EXPORTER_OTLP_TIMEOUT", strconv.Itoa(to*1000))
  187. }
  188. }
  189. }
  190. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  191. tracerOpts := []otlptracegrpc.Option{}
  192. meterOpts := []otlpmetricgrpc.Option{}
  193. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  194. creds := credentials.NewTLS(tlsConf)
  195. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  196. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  197. } else if err != nil {
  198. return nil, nil, err
  199. }
  200. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  201. if err != nil {
  202. return nil, nil, err
  203. }
  204. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  205. defer trcancel()
  206. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  207. if err != nil {
  208. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  209. }
  210. if !config.OpenTelemetryEnableMetrics {
  211. return traceExporter, nil, err
  212. }
  213. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  214. defer mtcancel()
  215. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  216. if err != nil {
  217. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  218. }
  219. return traceExporter, metricExporter, err
  220. }
  221. func buildHTTPExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  222. tracerOpts := []otlptracehttp.Option{}
  223. meterOpts := []otlpmetrichttp.Option{}
  224. if tlsConf, err := buildTLSConfig(); tlsConf != nil && err == nil {
  225. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  226. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  227. } else if err != nil {
  228. return nil, nil, err
  229. }
  230. tracesConnTimeout, metricsConnTimeout, err := getConnectionTimeouts()
  231. if err != nil {
  232. return nil, nil, err
  233. }
  234. trctx, trcancel := context.WithTimeout(context.Background(), tracesConnTimeout)
  235. defer trcancel()
  236. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  237. if err != nil {
  238. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  239. }
  240. if !config.OpenTelemetryEnableMetrics {
  241. return traceExporter, nil, err
  242. }
  243. mtctx, mtcancel := context.WithTimeout(context.Background(), metricsConnTimeout)
  244. defer mtcancel()
  245. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  246. if err != nil {
  247. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  248. }
  249. return traceExporter, metricExporter, err
  250. }
  251. func getConnectionTimeouts() (time.Duration, time.Duration, error) {
  252. connTimeout := 10000
  253. configurators.Int(&connTimeout, "OTEL_EXPORTER_OTLP_TIMEOUT")
  254. tracesConnTimeout := connTimeout
  255. configurators.Int(&tracesConnTimeout, "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT")
  256. metricsConnTimeout := connTimeout
  257. configurators.Int(&metricsConnTimeout, "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT")
  258. if tracesConnTimeout <= 0 {
  259. return 0, 0, errors.New("Opentelemetry traces timeout should be greater than 0")
  260. }
  261. if metricsConnTimeout <= 0 {
  262. return 0, 0, errors.New("Opentelemetry metrics timeout should be greater than 0")
  263. }
  264. return time.Duration(tracesConnTimeout) * time.Millisecond,
  265. time.Duration(metricsConnTimeout) * time.Millisecond,
  266. nil
  267. }
  268. func buildTLSConfig() (*tls.Config, error) {
  269. if len(config.OpenTelemetryServerCert) == 0 {
  270. return nil, nil
  271. }
  272. certPool := x509.NewCertPool()
  273. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  274. return nil, errors.New("Can't load OpenTelemetry server cert")
  275. }
  276. tlsConf := tls.Config{RootCAs: certPool}
  277. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  278. cert, err := tls.X509KeyPair(
  279. prepareKeyCert(config.OpenTelemetryClientCert),
  280. prepareKeyCert(config.OpenTelemetryClientKey),
  281. )
  282. if err != nil {
  283. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  284. }
  285. tlsConf.Certificates = []tls.Certificate{cert}
  286. }
  287. return &tlsConf, nil
  288. }
  289. func prepareKeyCert(str string) []byte {
  290. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  291. }
  292. func Stop() {
  293. if enabled {
  294. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  295. defer trcancel()
  296. tracerProvider.Shutdown(trctx)
  297. if meterProvider != nil {
  298. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  299. defer mtcancel()
  300. meterProvider.Shutdown(mtctx)
  301. }
  302. }
  303. }
  304. func Enabled() bool {
  305. return enabled
  306. }
  307. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  308. if !enabled {
  309. return ctx, func() {}, rw
  310. }
  311. if propagator != nil {
  312. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  313. }
  314. ctx, span := tracer.Start(
  315. ctx, "/request",
  316. trace.WithSpanKind(trace.SpanKindServer),
  317. trace.WithAttributes(httpconv.ServerRequest("imgproxy", r)...),
  318. )
  319. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  320. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  321. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  322. return func(statusCode int) {
  323. span.SetStatus(httpconv.ServerStatus(statusCode))
  324. span.SetAttributes(semconv.HTTPStatusCode(statusCode))
  325. next(statusCode)
  326. }
  327. },
  328. })
  329. cancel := func() { span.End() }
  330. return ctx, cancel, newRw
  331. }
  332. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  333. if !enabled {
  334. return func() {}
  335. }
  336. if ctx.Value(hasSpanCtxKey{}) != nil {
  337. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  338. return func() { span.End() }
  339. }
  340. return func() {}
  341. }
  342. func SendError(ctx context.Context, errType string, err error) {
  343. if !enabled {
  344. return
  345. }
  346. span := trace.SpanFromContext(ctx)
  347. attributes := []attribute.KeyValue{
  348. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  349. semconv.ExceptionMessageKey.String(err.Error()),
  350. }
  351. if ierr, ok := err.(*ierrors.Error); ok {
  352. if stack := ierr.FormatStack(); len(stack) != 0 {
  353. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  354. }
  355. }
  356. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  357. }
  358. func addDefaultMetrics() error {
  359. proc, err := process.NewProcess(int32(os.Getpid()))
  360. if err != nil {
  361. return fmt.Errorf("Can't initialize process data for OpenTelemetry: %s", err)
  362. }
  363. processResidentMemory, err := meter.Int64ObservableGauge(
  364. "process_resident_memory_bytes",
  365. metric.WithUnit("By"),
  366. metric.WithDescription("Resident memory size in bytes."),
  367. )
  368. if err != nil {
  369. return fmt.Errorf("Can't add process_resident_memory_bytes gauge to OpenTelemetry: %s", err)
  370. }
  371. processVirtualMemory, err := meter.Int64ObservableGauge(
  372. "process_virtual_memory_bytes",
  373. metric.WithUnit("By"),
  374. metric.WithDescription("Virtual memory size in bytes."),
  375. )
  376. if err != nil {
  377. return fmt.Errorf("Can't add process_virtual_memory_bytes gauge to OpenTelemetry: %s", err)
  378. }
  379. goMemstatsSys, err := meter.Int64ObservableGauge(
  380. "go_memstats_sys_bytes",
  381. metric.WithUnit("By"),
  382. metric.WithDescription("Number of bytes obtained from system."),
  383. )
  384. if err != nil {
  385. return fmt.Errorf("Can't add go_memstats_sys_bytes gauge to OpenTelemetry: %s", err)
  386. }
  387. goMemstatsHeapIdle, err := meter.Int64ObservableGauge(
  388. "go_memstats_heap_idle_bytes",
  389. metric.WithUnit("By"),
  390. metric.WithDescription("Number of heap bytes waiting to be used."),
  391. )
  392. if err != nil {
  393. return fmt.Errorf("Can't add go_memstats_heap_idle_bytes gauge to OpenTelemetry: %s", err)
  394. }
  395. goMemstatsHeapInuse, err := meter.Int64ObservableGauge(
  396. "go_memstats_heap_inuse_bytes",
  397. metric.WithUnit("By"),
  398. metric.WithDescription("Number of heap bytes that are in use."),
  399. )
  400. if err != nil {
  401. return fmt.Errorf("Can't add go_memstats_heap_inuse_bytes gauge to OpenTelemetry: %s", err)
  402. }
  403. goGoroutines, err := meter.Int64ObservableGauge(
  404. "go_goroutines",
  405. metric.WithUnit("1"),
  406. metric.WithDescription("Number of goroutines that currently exist."),
  407. )
  408. if err != nil {
  409. return fmt.Errorf("Can't add go_goroutines gauge to OpenTelemetry: %s", err)
  410. }
  411. goThreads, err := meter.Int64ObservableGauge(
  412. "go_threads",
  413. metric.WithUnit("1"),
  414. metric.WithDescription("Number of OS threads created."),
  415. )
  416. if err != nil {
  417. return fmt.Errorf("Can't add go_threads gauge to OpenTelemetry: %s", err)
  418. }
  419. requestsInProgressGauge, err := meter.Float64ObservableGauge(
  420. "requests_in_progress",
  421. metric.WithUnit("1"),
  422. metric.WithDescription("A gauge of the number of requests currently being in progress."),
  423. )
  424. if err != nil {
  425. return fmt.Errorf("Can't add requests_in_progress gauge to OpenTelemetry: %s", err)
  426. }
  427. imagesInProgressGauge, err := meter.Float64ObservableGauge(
  428. "images_in_progress",
  429. metric.WithUnit("1"),
  430. metric.WithDescription("A gauge of the number of images currently being in progress."),
  431. )
  432. if err != nil {
  433. return fmt.Errorf("Can't add images_in_progress gauge to OpenTelemetry: %s", err)
  434. }
  435. bufferDefaultSizeGauge, err := meter.Int64ObservableGauge(
  436. "buffer_default_size_bytes",
  437. metric.WithUnit("By"),
  438. metric.WithDescription("A gauge of the buffer default size in bytes."),
  439. )
  440. if err != nil {
  441. return fmt.Errorf("Can't add buffer_default_size_bytes gauge to OpenTelemetry: %s", err)
  442. }
  443. bufferMaxSizeGauge, err := meter.Int64ObservableGauge(
  444. "buffer_max_size_bytes",
  445. metric.WithUnit("By"),
  446. metric.WithDescription("A gauge of the buffer max size in bytes."),
  447. )
  448. if err != nil {
  449. return fmt.Errorf("Can't add buffer_max_size_bytes gauge to OpenTelemetry: %s", err)
  450. }
  451. _, err = meter.RegisterCallback(
  452. func(ctx context.Context, o metric.Observer) error {
  453. memStats, merr := proc.MemoryInfo()
  454. if merr != nil {
  455. return merr
  456. }
  457. o.ObserveInt64(processResidentMemory, int64(memStats.RSS))
  458. o.ObserveInt64(processVirtualMemory, int64(memStats.VMS))
  459. goMemStats := &runtime.MemStats{}
  460. runtime.ReadMemStats(goMemStats)
  461. o.ObserveInt64(goMemstatsSys, int64(goMemStats.Sys))
  462. o.ObserveInt64(goMemstatsHeapIdle, int64(goMemStats.HeapIdle))
  463. o.ObserveInt64(goMemstatsHeapInuse, int64(goMemStats.HeapInuse))
  464. threadsNum, _ := runtime.ThreadCreateProfile(nil)
  465. o.ObserveInt64(goGoroutines, int64(runtime.NumGoroutine()))
  466. o.ObserveInt64(goThreads, int64(threadsNum))
  467. o.ObserveFloat64(requestsInProgressGauge, stats.RequestsInProgress())
  468. o.ObserveFloat64(imagesInProgressGauge, stats.ImagesInProgress())
  469. bufferStatsMutex.Lock()
  470. defer bufferStatsMutex.Unlock()
  471. for t, v := range bufferDefaultSizes {
  472. o.ObserveInt64(bufferDefaultSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  473. }
  474. for t, v := range bufferMaxSizes {
  475. o.ObserveInt64(bufferMaxSizeGauge, int64(v), metric.WithAttributes(attribute.String("type", t)))
  476. }
  477. return nil
  478. },
  479. processResidentMemory,
  480. processVirtualMemory,
  481. goMemstatsSys,
  482. goMemstatsHeapIdle,
  483. goMemstatsHeapInuse,
  484. goGoroutines,
  485. goThreads,
  486. requestsInProgressGauge,
  487. imagesInProgressGauge,
  488. bufferDefaultSizeGauge,
  489. bufferMaxSizeGauge,
  490. )
  491. if err != nil {
  492. return fmt.Errorf("Can't register OpenTelemetry callbacks: %s", err)
  493. }
  494. bufferSizeHist, err = meter.Int64Histogram(
  495. "buffer_size_bytes",
  496. metric.WithUnit("By"),
  497. metric.WithDescription("A histogram of the buffer size in bytes."),
  498. )
  499. if err != nil {
  500. return fmt.Errorf("Can't add buffer_size_bytes histogram to OpenTelemetry: %s", err)
  501. }
  502. return nil
  503. }
  504. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  505. if meter == nil {
  506. return
  507. }
  508. _, err := meter.Float64ObservableGauge(
  509. name,
  510. metric.WithUnit(u),
  511. metric.WithDescription(desc),
  512. metric.WithFloat64Callback(func(_ context.Context, obsrv metric.Float64Observer) error {
  513. obsrv.Observe(f())
  514. return nil
  515. }),
  516. )
  517. if err != nil {
  518. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  519. }
  520. }
  521. func ObserveBufferSize(t string, size int) {
  522. if enabledMetrics {
  523. bufferSizeHist.Record(context.Background(), int64(size), metric.WithAttributes(attribute.String("type", t)))
  524. }
  525. }
  526. func SetBufferDefaultSize(t string, size int) {
  527. if enabledMetrics {
  528. bufferStatsMutex.Lock()
  529. defer bufferStatsMutex.Unlock()
  530. bufferDefaultSizes[t] = size
  531. }
  532. }
  533. func SetBufferMaxSize(t string, size int) {
  534. if enabledMetrics {
  535. bufferStatsMutex.Lock()
  536. defer bufferStatsMutex.Unlock()
  537. bufferMaxSizes[t] = size
  538. }
  539. }
  540. type errorHandler struct {
  541. entry *logrus.Entry
  542. }
  543. func (h *errorHandler) Handle(err error) {
  544. h.entry.Warn(err.Error())
  545. }