otel.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package otel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "strings"
  10. "time"
  11. "github.com/felixge/httpsnoop"
  12. "github.com/sirupsen/logrus"
  13. "go.opentelemetry.io/contrib/detectors/aws/ec2"
  14. "go.opentelemetry.io/contrib/detectors/aws/ecs"
  15. "go.opentelemetry.io/contrib/detectors/aws/eks"
  16. "go.opentelemetry.io/contrib/propagators/autoprop"
  17. "go.opentelemetry.io/contrib/propagators/aws/xray"
  18. "go.opentelemetry.io/otel"
  19. "go.opentelemetry.io/otel/attribute"
  20. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
  21. "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
  22. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  23. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  24. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  25. "go.opentelemetry.io/otel/metric"
  26. "go.opentelemetry.io/otel/metric/instrument"
  27. "go.opentelemetry.io/otel/metric/unit"
  28. "go.opentelemetry.io/otel/propagation"
  29. sdkmetric "go.opentelemetry.io/otel/sdk/metric"
  30. "go.opentelemetry.io/otel/sdk/resource"
  31. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  32. semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
  33. "go.opentelemetry.io/otel/trace"
  34. "google.golang.org/grpc"
  35. "google.golang.org/grpc/credentials"
  36. "github.com/imgproxy/imgproxy/v3/config"
  37. "github.com/imgproxy/imgproxy/v3/ierrors"
  38. "github.com/imgproxy/imgproxy/v3/metrics/errformat"
  39. "github.com/imgproxy/imgproxy/v3/metrics/stats"
  40. "github.com/imgproxy/imgproxy/v3/version"
  41. )
  42. type hasSpanCtxKey struct{}
  43. type GaugeFunc func() float64
  44. var (
  45. enabled bool
  46. enabledMetrics bool
  47. tracerProvider *sdktrace.TracerProvider
  48. tracer trace.Tracer
  49. meterProvider *sdkmetric.MeterProvider
  50. meter metric.Meter
  51. propagator propagation.TextMapPropagator
  52. )
  53. func Init() error {
  54. if len(config.OpenTelemetryEndpoint) == 0 {
  55. return nil
  56. }
  57. otel.SetErrorHandler(&errorHandler{entry: logrus.WithField("from", "opentelemetry")})
  58. var (
  59. traceExporter *otlptrace.Exporter
  60. metricExporter sdkmetric.Exporter
  61. err error
  62. )
  63. switch config.OpenTelemetryProtocol {
  64. case "grpc":
  65. traceExporter, metricExporter, err = buildGRPCExporters()
  66. case "https":
  67. traceExporter, metricExporter, err = buildHTTPExporters(false)
  68. case "http":
  69. traceExporter, metricExporter, err = buildHTTPExporters(true)
  70. default:
  71. return fmt.Errorf("Unknown OpenTelemetry protocol: %s", config.OpenTelemetryProtocol)
  72. }
  73. if err != nil {
  74. return err
  75. }
  76. res, _ := resource.Merge(
  77. resource.Default(),
  78. resource.NewSchemaless(
  79. semconv.ServiceNameKey.String(config.OpenTelemetryServiceName),
  80. semconv.ServiceVersionKey.String(version.Version()),
  81. ),
  82. )
  83. awsRes, _ := resource.Detect(
  84. context.Background(),
  85. ec2.NewResourceDetector(),
  86. ecs.NewResourceDetector(),
  87. eks.NewResourceDetector(),
  88. )
  89. if merged, merr := resource.Merge(awsRes, res); merr == nil {
  90. res = merged
  91. } else {
  92. logrus.Warnf("Can't add AWS attributes to OpenTelemetry: %s", merr)
  93. }
  94. opts := []sdktrace.TracerProviderOption{
  95. sdktrace.WithResource(res),
  96. sdktrace.WithBatcher(traceExporter),
  97. }
  98. if opts, err = addTraceIDRatioSampler(opts); err != nil {
  99. return err
  100. }
  101. switch g := config.OpenTelemetryTraceIDGenerator; g {
  102. case "xray":
  103. idg := xray.NewIDGenerator()
  104. opts = append(opts, sdktrace.WithIDGenerator(idg))
  105. case "random":
  106. // Do nothing. OTel uses random generator by default
  107. default:
  108. return fmt.Errorf("Unknown Trace ID generator: %s", g)
  109. }
  110. tracerProvider = sdktrace.NewTracerProvider(opts...)
  111. tracer = tracerProvider.Tracer("imgproxy")
  112. if len(config.OpenTelemetryPropagators) > 0 {
  113. propagator, err = autoprop.TextMapPropagator(config.OpenTelemetryPropagators...)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. enabled = true
  119. if metricExporter == nil {
  120. return nil
  121. }
  122. metricReader := sdkmetric.NewPeriodicReader(
  123. metricExporter,
  124. sdkmetric.WithInterval(5*time.Second),
  125. )
  126. meterProvider = sdkmetric.NewMeterProvider(
  127. sdkmetric.WithResource(res),
  128. sdkmetric.WithReader(metricReader),
  129. )
  130. meter = meterProvider.Meter("imgproxy")
  131. enabledMetrics = true
  132. AddGaugeFunc(
  133. "requests_in_progress",
  134. "A gauge of the number of requests currently being in progress.",
  135. "1",
  136. stats.RequestsInProgress,
  137. )
  138. AddGaugeFunc(
  139. "images_in_progress",
  140. "A gauge of the number of images currently being in progress.",
  141. "1",
  142. stats.ImagesInProgress,
  143. )
  144. return nil
  145. }
  146. func buildGRPCExporters() (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  147. tracerOpts := []otlptracegrpc.Option{
  148. otlptracegrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  149. otlptracegrpc.WithDialOption(grpc.WithBlock()),
  150. }
  151. meterOpts := []otlpmetricgrpc.Option{
  152. otlpmetricgrpc.WithEndpoint(config.OpenTelemetryEndpoint),
  153. otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
  154. }
  155. tlsConf, err := buildTLSConfig()
  156. if err != nil {
  157. return nil, nil, err
  158. }
  159. if tlsConf != nil {
  160. creds := credentials.NewTLS(tlsConf)
  161. tracerOpts = append(tracerOpts, otlptracegrpc.WithTLSCredentials(creds))
  162. meterOpts = append(meterOpts, otlpmetricgrpc.WithTLSCredentials(creds))
  163. } else if config.OpenTelemetryGRPCInsecure {
  164. tracerOpts = append(tracerOpts, otlptracegrpc.WithInsecure())
  165. meterOpts = append(meterOpts, otlpmetricgrpc.WithInsecure())
  166. }
  167. trctx, trcancel := context.WithTimeout(
  168. context.Background(),
  169. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  170. )
  171. defer trcancel()
  172. traceExporter, err := otlptracegrpc.New(trctx, tracerOpts...)
  173. if err != nil {
  174. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  175. }
  176. if !config.OpenTelemetryEnableMetrics {
  177. return traceExporter, nil, err
  178. }
  179. mtctx, mtcancel := context.WithTimeout(
  180. context.Background(),
  181. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  182. )
  183. defer mtcancel()
  184. metricExporter, err := otlpmetricgrpc.New(mtctx, meterOpts...)
  185. if err != nil {
  186. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  187. }
  188. return traceExporter, metricExporter, err
  189. }
  190. func buildHTTPExporters(insecure bool) (*otlptrace.Exporter, sdkmetric.Exporter, error) {
  191. tracerOpts := []otlptracehttp.Option{
  192. otlptracehttp.WithEndpoint(config.OpenTelemetryEndpoint),
  193. }
  194. meterOpts := []otlpmetrichttp.Option{
  195. otlpmetrichttp.WithEndpoint(config.OpenTelemetryEndpoint),
  196. }
  197. if insecure {
  198. tracerOpts = append(tracerOpts, otlptracehttp.WithInsecure())
  199. meterOpts = append(meterOpts, otlpmetrichttp.WithInsecure())
  200. } else {
  201. tlsConf, err := buildTLSConfig()
  202. if err != nil {
  203. return nil, nil, err
  204. }
  205. if tlsConf != nil {
  206. tracerOpts = append(tracerOpts, otlptracehttp.WithTLSClientConfig(tlsConf))
  207. meterOpts = append(meterOpts, otlpmetrichttp.WithTLSClientConfig(tlsConf))
  208. }
  209. }
  210. trctx, trcancel := context.WithTimeout(
  211. context.Background(),
  212. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  213. )
  214. defer trcancel()
  215. traceExporter, err := otlptracehttp.New(trctx, tracerOpts...)
  216. if err != nil {
  217. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  218. }
  219. if !config.OpenTelemetryEnableMetrics {
  220. return traceExporter, nil, err
  221. }
  222. mtctx, mtcancel := context.WithTimeout(
  223. context.Background(),
  224. time.Duration(config.OpenTelemetryConnectionTimeout)*time.Second,
  225. )
  226. defer mtcancel()
  227. metricExporter, err := otlpmetrichttp.New(mtctx, meterOpts...)
  228. if err != nil {
  229. err = fmt.Errorf("Can't connect to OpenTelemetry collector: %s", err)
  230. }
  231. return traceExporter, metricExporter, err
  232. }
  233. func buildTLSConfig() (*tls.Config, error) {
  234. if len(config.OpenTelemetryServerCert) == 0 {
  235. return nil, nil
  236. }
  237. certPool := x509.NewCertPool()
  238. if !certPool.AppendCertsFromPEM(prepareKeyCert(config.OpenTelemetryServerCert)) {
  239. return nil, errors.New("Can't load OpenTelemetry server cert")
  240. }
  241. tlsConf := tls.Config{RootCAs: certPool}
  242. if len(config.OpenTelemetryClientCert) > 0 && len(config.OpenTelemetryClientKey) > 0 {
  243. cert, err := tls.X509KeyPair(
  244. prepareKeyCert(config.OpenTelemetryClientCert),
  245. prepareKeyCert(config.OpenTelemetryClientKey),
  246. )
  247. if err != nil {
  248. return nil, fmt.Errorf("Can't load OpenTelemetry client cert/key pair: %s", err)
  249. }
  250. tlsConf.Certificates = []tls.Certificate{cert}
  251. }
  252. return &tlsConf, nil
  253. }
  254. func prepareKeyCert(str string) []byte {
  255. return []byte(strings.ReplaceAll(str, `\n`, "\n"))
  256. }
  257. func Stop() {
  258. if enabled {
  259. trctx, trcancel := context.WithTimeout(context.Background(), 5*time.Second)
  260. defer trcancel()
  261. tracerProvider.Shutdown(trctx)
  262. if meterProvider != nil {
  263. mtctx, mtcancel := context.WithTimeout(context.Background(), 5*time.Second)
  264. defer mtcancel()
  265. meterProvider.Shutdown(mtctx)
  266. }
  267. }
  268. }
  269. func Enabled() bool {
  270. return enabled
  271. }
  272. func StartRootSpan(ctx context.Context, rw http.ResponseWriter, r *http.Request) (context.Context, context.CancelFunc, http.ResponseWriter) {
  273. if !enabled {
  274. return ctx, func() {}, rw
  275. }
  276. if propagator != nil {
  277. ctx = propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
  278. }
  279. ctx, span := tracer.Start(
  280. ctx, "/request",
  281. trace.WithSpanKind(trace.SpanKindServer),
  282. trace.WithAttributes(semconv.NetAttributesFromHTTPRequest("tcp", r)...),
  283. trace.WithAttributes(semconv.EndUserAttributesFromHTTPRequest(r)...),
  284. trace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest("imgproxy", "/", r)...),
  285. )
  286. ctx = context.WithValue(ctx, hasSpanCtxKey{}, struct{}{})
  287. newRw := httpsnoop.Wrap(rw, httpsnoop.Hooks{
  288. WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
  289. return func(statusCode int) {
  290. attrs := semconv.HTTPAttributesFromHTTPStatusCode(statusCode)
  291. spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(statusCode, trace.SpanKindServer)
  292. span.SetAttributes(attrs...)
  293. span.SetStatus(spanStatus, spanMessage)
  294. next(statusCode)
  295. }
  296. },
  297. })
  298. cancel := func() { span.End() }
  299. return ctx, cancel, newRw
  300. }
  301. func StartSpan(ctx context.Context, name string) context.CancelFunc {
  302. if !enabled {
  303. return func() {}
  304. }
  305. if ctx.Value(hasSpanCtxKey{}) != nil {
  306. _, span := tracer.Start(ctx, name, trace.WithSpanKind(trace.SpanKindInternal))
  307. return func() { span.End() }
  308. }
  309. return func() {}
  310. }
  311. func SendError(ctx context.Context, errType string, err error) {
  312. if !enabled {
  313. return
  314. }
  315. span := trace.SpanFromContext(ctx)
  316. attributes := []attribute.KeyValue{
  317. semconv.ExceptionTypeKey.String(errformat.FormatErrType(errType, err)),
  318. semconv.ExceptionMessageKey.String(err.Error()),
  319. }
  320. if ierr, ok := err.(*ierrors.Error); ok {
  321. if stack := ierr.FormatStack(); len(stack) != 0 {
  322. attributes = append(attributes, semconv.ExceptionStacktraceKey.String(stack))
  323. }
  324. }
  325. span.AddEvent(semconv.ExceptionEventName, trace.WithAttributes(attributes...))
  326. }
  327. func AddGaugeFunc(name, desc, u string, f GaugeFunc) {
  328. if !enabledMetrics {
  329. return
  330. }
  331. gauge, err := meter.Float64ObservableGauge(
  332. name,
  333. instrument.WithUnit(unit.Unit(u)),
  334. instrument.WithDescription(desc),
  335. )
  336. if err != nil {
  337. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  338. return
  339. }
  340. if _, err = meter.RegisterCallback(
  341. func(ctx context.Context, o metric.Observer) error {
  342. o.ObserveFloat64(gauge, f())
  343. return nil
  344. },
  345. gauge,
  346. ); err != nil {
  347. logrus.Warnf("Can't add %s gauge to OpenTelemetry: %s", name, err)
  348. }
  349. }
  350. type errorHandler struct {
  351. entry *logrus.Entry
  352. }
  353. func (h *errorHandler) Handle(err error) {
  354. h.entry.Warn(err.Error())
  355. }