فهرست منبع

Metrics: Gauge Callbacks (#101286)

Replace synchronous gauges with gauges that accept an observer which is called once per reporting interval.

Calling code may register one observer at a time. If the observer is null, there is no observation in that reporting interval.
Stuart Tettemer 2 سال پیش
والد
کامیت
aa7e92e29b
15فایلهای تغییر یافته به همراه264 افزوده شده و 144 حذف شده
  1. 23 4
      modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java
  2. 25 18
      modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java
  3. 25 18
      modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java
  4. 8 0
      modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java
  5. 2 2
      modules/apm/src/test/java/org/elasticsearch/telemetry/apm/RecordingOtelMeter.java
  6. 53 16
      modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java
  7. 3 20
      server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java
  8. 15 0
      server/src/main/java/org/elasticsearch/telemetry/metric/DoubleWithAttributes.java
  9. 3 22
      server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java
  10. 15 0
      server/src/main/java/org/elasticsearch/telemetry/metric/LongWithAttributes.java
  11. 10 4
      server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java
  12. 3 5
      test/framework/src/main/java/org/elasticsearch/telemetry/InstrumentType.java
  13. 15 2
      test/framework/src/main/java/org/elasticsearch/telemetry/MetricRecorder.java
  14. 52 25
      test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java
  15. 12 8
      test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java

+ 23 - 4
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java

@@ -24,15 +24,18 @@ import org.elasticsearch.telemetry.metric.DoubleCounter;
 import org.elasticsearch.telemetry.metric.DoubleGauge;
 import org.elasticsearch.telemetry.metric.DoubleHistogram;
 import org.elasticsearch.telemetry.metric.DoubleUpDownCounter;
+import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
 import org.elasticsearch.telemetry.metric.LongCounter;
 import org.elasticsearch.telemetry.metric.LongGauge;
 import org.elasticsearch.telemetry.metric.LongHistogram;
 import org.elasticsearch.telemetry.metric.LongUpDownCounter;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
 import org.elasticsearch.telemetry.metric.MeterRegistry;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 /**
  * Container for registering and fetching meterRegistrar by type and name.
@@ -69,82 +72,98 @@ public class APMMeterRegistry implements MeterRegistry {
     // Access to registration has to be restricted when the provider is updated in ::setProvider
     protected final ReleasableLock registerLock = new ReleasableLock(new ReentrantLock());
 
+    @Override
     public DoubleCounter registerDoubleCounter(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return doubleCounters.register(new DoubleCounterAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public DoubleCounter getDoubleCounter(String name) {
         return doubleCounters.get(name);
     }
 
+    @Override
     public DoubleUpDownCounter registerDoubleUpDownCounter(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return doubleUpDownCounters.register(new DoubleUpDownCounterAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public DoubleUpDownCounter getDoubleUpDownCounter(String name) {
         return doubleUpDownCounters.get(name);
     }
 
-    public DoubleGauge registerDoubleGauge(String name, String description, String unit) {
+    @Override
+    public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
         try (ReleasableLock lock = registerLock.acquire()) {
-            return doubleGauges.register(new DoubleGaugeAdapter(meter, name, description, unit));
+            return doubleGauges.register(new DoubleGaugeAdapter(meter, name, description, unit, observer));
         }
     }
 
+    @Override
     public DoubleGauge getDoubleGauge(String name) {
         return doubleGauges.get(name);
     }
 
+    @Override
     public DoubleHistogram registerDoubleHistogram(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return doubleHistograms.register(new DoubleHistogramAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public DoubleHistogram getDoubleHistogram(String name) {
         return doubleHistograms.get(name);
     }
 
+    @Override
     public LongCounter registerLongCounter(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return longCounters.register(new LongCounterAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public LongCounter getLongCounter(String name) {
         return longCounters.get(name);
     }
 
+    @Override
     public LongUpDownCounter registerLongUpDownCounter(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return longUpDownCounters.register(new LongUpDownCounterAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public LongUpDownCounter getLongUpDownCounter(String name) {
         return longUpDownCounters.get(name);
     }
 
-    public LongGauge registerLongGauge(String name, String description, String unit) {
+    @Override
+    public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
         try (ReleasableLock lock = registerLock.acquire()) {
-            return longGauges.register(new LongGaugeAdapter(meter, name, description, unit));
+            return longGauges.register(new LongGaugeAdapter(meter, name, description, unit, observer));
         }
     }
 
+    @Override
     public LongGauge getLongGauge(String name) {
         return longGauges.get(name);
     }
 
+    @Override
     public LongHistogram registerLongHistogram(String name, String description, String unit) {
         try (ReleasableLock lock = registerLock.acquire()) {
             return longHistograms.register(new LongHistogramAdapter(meter, name, description, unit));
         }
     }
 
+    @Override
     public LongHistogram getLongHistogram(String name) {
         return longHistograms.get(name);
     }

+ 25 - 18
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java

@@ -11,25 +11,28 @@ package org.elasticsearch.telemetry.apm.internal.metrics;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableDoubleGauge;
 
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.telemetry.apm.AbstractInstrument;
+import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 /**
- * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement
+ * DoubleGaugeAdapter wraps an otel ObservableLongGauge
  */
 public class DoubleGaugeAdapter extends AbstractInstrument<ObservableDoubleGauge>
     implements
         org.elasticsearch.telemetry.metric.DoubleGauge {
 
-    private final AtomicReference<ValueWithAttributes> valueWithAttributes;
+    private final Supplier<DoubleWithAttributes> observer;
+    private final ReleasableLock closedLock = new ReleasableLock(new ReentrantLock());
+    private boolean closed = false;
 
-    public DoubleGaugeAdapter(Meter meter, String name, String description, String unit) {
+    public DoubleGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
         super(meter, name, description, unit);
-        this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0.0, Collections.emptyMap()));
+        this.observer = observer;
     }
 
     @Override
@@ -39,20 +42,24 @@ public class DoubleGaugeAdapter extends AbstractInstrument<ObservableDoubleGauge
             .setDescription(getDescription())
             .setUnit(getUnit())
             .buildWithCallback(measurement -> {
-                var localValueWithAttributed = valueWithAttributes.get();
-                measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes()));
+                DoubleWithAttributes observation;
+                try {
+                    observation = observer.get();
+                } catch (RuntimeException err) {
+                    assert false : "observer must not throw [" + err.getMessage() + "]";
+                    return;
+                }
+                measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
             });
     }
 
     @Override
-    public void record(double value) {
-        record(value, Collections.emptyMap());
+    public void close() throws Exception {
+        try (ReleasableLock lock = closedLock.acquire()) {
+            if (closed == false) {
+                getInstrument().close();
+            }
+            closed = true;
+        }
     }
-
-    @Override
-    public void record(double value, Map<String, Object> attributes) {
-        this.valueWithAttributes.set(new ValueWithAttributes(value, attributes));
-    }
-
-    private record ValueWithAttributes(double value, Map<String, Object> attributes) {}
 }

+ 25 - 18
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java

@@ -11,22 +11,25 @@ package org.elasticsearch.telemetry.apm.internal.metrics;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongGauge;
 
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.telemetry.apm.AbstractInstrument;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 /**
- * LongGaugeAdapter wraps an otel ObservableLongMeasurement
+ * LongGaugeAdapter wraps an otel ObservableLongGauge
  */
 public class LongGaugeAdapter extends AbstractInstrument<ObservableLongGauge> implements org.elasticsearch.telemetry.metric.LongGauge {
-    private final AtomicReference<ValueWithAttributes> valueWithAttributes;
+    private final Supplier<LongWithAttributes> observer;
+    private final ReleasableLock closedLock = new ReleasableLock(new ReentrantLock());
+    private boolean closed = false;
 
-    public LongGaugeAdapter(Meter meter, String name, String description, String unit) {
+    public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<LongWithAttributes> observer) {
         super(meter, name, description, unit);
-        this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0L, Collections.emptyMap()));
+        this.observer = observer;
     }
 
     @Override
@@ -37,20 +40,24 @@ public class LongGaugeAdapter extends AbstractInstrument<ObservableLongGauge> im
             .setDescription(getDescription())
             .setUnit(getUnit())
             .buildWithCallback(measurement -> {
-                var localValueWithAttributed = valueWithAttributes.get();
-                measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes()));
+                LongWithAttributes observation;
+                try {
+                    observation = observer.get();
+                } catch (RuntimeException err) {
+                    assert false : "observer must not throw [" + err.getMessage() + "]";
+                    return;
+                }
+                measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
             });
     }
 
     @Override
-    public void record(long value) {
-        record(value, Collections.emptyMap());
+    public void close() throws Exception {
+        try (ReleasableLock lock = closedLock.acquire()) {
+            if (closed == false) {
+                getInstrument().close();
+            }
+            closed = true;
+        }
     }
-
-    @Override
-    public void record(long value, Map<String, Object> attributes) {
-        this.valueWithAttributes.set(new ValueWithAttributes(value, attributes));
-    }
-
-    private record ValueWithAttributes(long value, Map<String, Object> attributes) {}
 }

+ 8 - 0
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java

@@ -23,8 +23,16 @@ class OtelHelper {
                 builder.put(k, value);
             } else if (v instanceof Long value) {
                 builder.put(k, value);
+            } else if (v instanceof Integer value) {
+                builder.put(k, value);
+            } else if (v instanceof Byte value) {
+                builder.put(k, value);
+            } else if (v instanceof Short value) {
+                builder.put(k, value);
             } else if (v instanceof Double value) {
                 builder.put(k, value);
+            } else if (v instanceof Float value) {
+                builder.put(k, value);
             } else if (v instanceof Boolean value) {
                 builder.put(k, value);
             } else {

+ 2 - 2
modules/apm/src/test/java/org/elasticsearch/telemetry/apm/RecordingOtelMeter.java

@@ -446,7 +446,7 @@ public class RecordingOtelMeter implements Meter {
         final Consumer<ObservableDoubleMeasurement> callback;
 
         DoubleGaugeRecorder(String name, Consumer<ObservableDoubleMeasurement> callback) {
-            super(name, InstrumentType.DOUBLE_GAUGE_OBSERVER);
+            super(name, InstrumentType.DOUBLE_GAUGE);
             this.callback = callback;
         }
 
@@ -517,7 +517,7 @@ public class RecordingOtelMeter implements Meter {
         final Consumer<ObservableLongMeasurement> callback;
 
         LongGaugeRecorder(String name, Consumer<ObservableLongMeasurement> callback) {
-            super(name, InstrumentType.LONG_GAUGE_OBSERVER);
+            super(name, InstrumentType.LONG_GAUGE);
             this.callback = callback;
         }
 

+ 53 - 16
modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java

@@ -12,12 +12,15 @@ import org.elasticsearch.telemetry.Measurement;
 import org.elasticsearch.telemetry.apm.APMMeterRegistry;
 import org.elasticsearch.telemetry.apm.RecordingOtelMeter;
 import org.elasticsearch.telemetry.metric.DoubleGauge;
+import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
 import org.elasticsearch.telemetry.metric.LongGauge;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Before;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -33,34 +36,68 @@ public class GaugeAdapterTests extends ESTestCase {
     }
 
     // testing that a value reported is then used in a callback
-    @SuppressWarnings("unchecked")
-    public void testLongGaugeRecord() {
-        LongGauge longGauge = registry.registerLongGauge("name", "desc", "unit");
+    public void testLongGaugeRecord() throws Exception {
+        AtomicReference<LongWithAttributes> attrs = new AtomicReference<>();
+        LongGauge gauge = registry.registerLongGauge("name", "desc", "unit", attrs::get);
 
-        // recording a value
-        Map<String, Object> attributes = Map.of("k", 1L);
-        longGauge.record(1L, attributes);
+        attrs.set(new LongWithAttributes(1L, Map.of("k", 1L)));
 
         otelMeter.collectMetrics();
 
-        List<Measurement> metrics = otelMeter.getRecorder().getMeasurements(longGauge);
+        List<Measurement> metrics = otelMeter.getRecorder().getMeasurements(gauge);
         assertThat(metrics, hasSize(1));
-        assertThat(metrics.get(0).attributes(), equalTo(attributes));
+        assertThat(metrics.get(0).attributes(), equalTo(Map.of("k", 1L)));
         assertThat(metrics.get(0).getLong(), equalTo(1L));
+
+        attrs.set(new LongWithAttributes(2L, Map.of("k", 5L)));
+
+        otelMeter.getRecorder().resetCalls();
+        otelMeter.collectMetrics();
+
+        metrics = otelMeter.getRecorder().getMeasurements(gauge);
+        assertThat(metrics, hasSize(1));
+        assertThat(metrics.get(0).attributes(), equalTo(Map.of("k", 5L)));
+        assertThat(metrics.get(0).getLong(), equalTo(2L));
+
+        gauge.close();
+
+        otelMeter.getRecorder().resetCalls();
+        otelMeter.collectMetrics();
+
+        metrics = otelMeter.getRecorder().getMeasurements(gauge);
+        assertThat(metrics, hasSize(0));
     }
 
     // testing that a value reported is then used in a callback
-    @SuppressWarnings("unchecked")
-    public void testDoubleGaugeRecord() {
-        DoubleGauge doubleGauge = registry.registerDoubleGauge("name", "desc", "unit");
-        Map<String, Object> attributes = Map.of("k", 1L);
-        doubleGauge.record(1.0, attributes);
+    public void testDoubleGaugeRecord() throws Exception {
+        AtomicReference<DoubleWithAttributes> attrs = new AtomicReference<>();
+        DoubleGauge gauge = registry.registerDoubleGauge("name", "desc", "unit", attrs::get);
+
+        attrs.set(new DoubleWithAttributes(1.0d, Map.of("k", 1L)));
 
         otelMeter.collectMetrics();
 
-        List<Measurement> metrics = otelMeter.getRecorder().getMeasurements(doubleGauge);
+        List<Measurement> metrics = otelMeter.getRecorder().getMeasurements(gauge);
         assertThat(metrics, hasSize(1));
-        assertThat(metrics.get(0).attributes(), equalTo(attributes));
-        assertThat(metrics.get(0).getDouble(), equalTo(1.0));
+        assertThat(metrics.get(0).attributes(), equalTo(Map.of("k", 1L)));
+        assertThat(metrics.get(0).getDouble(), equalTo(1.0d));
+
+        attrs.set(new DoubleWithAttributes(2.0d, Map.of("k", 5L)));
+
+        otelMeter.getRecorder().resetCalls();
+        otelMeter.collectMetrics();
+
+        metrics = otelMeter.getRecorder().getMeasurements(gauge);
+        assertThat(metrics, hasSize(1));
+        assertThat(metrics.get(0).attributes(), equalTo(Map.of("k", 5L)));
+        assertThat(metrics.get(0).getDouble(), equalTo(2.0d));
+
+        gauge.close();
+
+        otelMeter.getRecorder().resetCalls();
+        otelMeter.collectMetrics();
+
+        metrics = otelMeter.getRecorder().getMeasurements(gauge);
+        assertThat(metrics, hasSize(0));
     }
 }

+ 3 - 20
server/src/main/java/org/elasticsearch/telemetry/metric/DoubleGauge.java

@@ -8,22 +8,10 @@
 
 package org.elasticsearch.telemetry.metric;
 
-import java.util.Map;
-
 /**
- * Record non-additive double values. eg number of running threads, current load
+ * Record non-additive double values based on a callback. eg number of running threads, current load
  */
-public interface DoubleGauge extends Instrument {
-    /**
-     * Record the current value for measured item
-     */
-    void record(double value);
-
-    /**
-     * Record the current value
-     * @param attributes key-value pairs to associate with the current measurement
-     */
-    void record(double value, Map<String, Object> attributes);
+public interface DoubleGauge extends Instrument, AutoCloseable {
 
     /**
      * Noop gauge for tests
@@ -35,12 +23,7 @@ public interface DoubleGauge extends Instrument {
         }
 
         @Override
-        public void record(double value) {
-
-        }
-
-        @Override
-        public void record(double value, Map<String, Object> attributes) {
+        public void close() {
 
         }
     };

+ 15 - 0
server/src/main/java/org/elasticsearch/telemetry/metric/DoubleWithAttributes.java

@@ -0,0 +1,15 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.telemetry.metric;
+
+import java.util.Map;
+
+public record DoubleWithAttributes(double value, Map<String, Object> attributes) {
+
+}

+ 3 - 22
server/src/main/java/org/elasticsearch/telemetry/metric/LongGauge.java

@@ -8,24 +8,10 @@
 
 package org.elasticsearch.telemetry.metric;
 
-import java.util.Map;
-
 /**
- * Record non-additive long values.
+ * Record non-additive long values based on a callback
  */
-public interface LongGauge extends Instrument {
-
-    /**
-     * Record the current value of the measured item.
-     * @param value
-     */
-    void record(long value);
-
-    /**
-     * Record the current value
-     * @param attributes key-value pairs to associate with the current measurement
-     */
-    void record(long value, Map<String, Object> attributes);
+public interface LongGauge extends Instrument, AutoCloseable {
 
     /**
      * Noop gauge for tests
@@ -37,12 +23,7 @@ public interface LongGauge extends Instrument {
         }
 
         @Override
-        public void record(long value) {
-
-        }
-
-        @Override
-        public void record(long value, Map<String, Object> attributes) {
+        public void close() throws Exception {
 
         }
     };

+ 15 - 0
server/src/main/java/org/elasticsearch/telemetry/metric/LongWithAttributes.java

@@ -0,0 +1,15 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.telemetry.metric;
+
+import java.util.Map;
+
+public record LongWithAttributes(long value, Map<String, Object> attributes) {
+
+}

+ 10 - 4
server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.telemetry.metric;
 
+import java.util.function.Supplier;
+
 /**
  * Container for metering instruments.  Meters with the same name and type (DoubleCounter, etc) can
  * only be registered once.
@@ -51,9 +53,11 @@ public interface MeterRegistry {
      * @param name name of the gauge
      * @param description description of purpose
      * @param unit the unit (bytes, sec, hour)
+     * @param observer callback to use. This is called once during reporting period.
+     *                 Must not throw an exception and must be safe to call from different threads.
      * @return the registered meter.
      */
-    DoubleGauge registerDoubleGauge(String name, String description, String unit);
+    DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer);
 
     /**
      * Retrieved a previously registered {@link DoubleGauge}.
@@ -115,9 +119,11 @@ public interface MeterRegistry {
      * @param name name of the gauge
      * @param description description of purpose
      * @param unit the unit (bytes, sec, hour)
+     * @param observer callback to use. This is called once during reporting period.
+     *                 Must not throw an exception and must be safe to call from different threads.
      * @return the registered meter.
      */
-    LongGauge registerLongGauge(String name, String description, String unit);
+    LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer);
 
     /**
      * Retrieved a previously registered {@link LongGauge}.
@@ -166,7 +172,7 @@ public interface MeterRegistry {
         }
 
         @Override
-        public DoubleGauge registerDoubleGauge(String name, String description, String unit) {
+        public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
             return DoubleGauge.NOOP;
         }
 
@@ -206,7 +212,7 @@ public interface MeterRegistry {
         }
 
         @Override
-        public LongGauge registerLongGauge(String name, String description, String unit) {
+        public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
             return LongGauge.NOOP;
         }
 

+ 3 - 5
test/framework/src/main/java/org/elasticsearch/telemetry/InstrumentType.java

@@ -32,9 +32,7 @@ public enum InstrumentType {
     DOUBLE_HISTOGRAM(true),
     LONG_HISTOGRAM(false),
     DOUBLE_GAUGE(true),
-    LONG_GAUGE(false),
-    DOUBLE_GAUGE_OBSERVER(true),
-    LONG_GAUGE_OBSERVER(false);
+    LONG_GAUGE(false);
 
     public final boolean isDouble;
     public final boolean isLong;
@@ -59,9 +57,9 @@ public enum InstrumentType {
         } else if (instrument instanceof LongHistogram) {
             return InstrumentType.LONG_HISTOGRAM;
         } else if (instrument instanceof DoubleGauge) {
-            return InstrumentType.DOUBLE_GAUGE_OBSERVER;
+            return InstrumentType.DOUBLE_GAUGE;
         } else if (instrument instanceof LongGauge) {
-            return InstrumentType.LONG_GAUGE_OBSERVER;
+            return InstrumentType.LONG_GAUGE;
         } else {
             throw new IllegalArgumentException("unknown instrument [" + instrument.getClass().getName() + "]");
         }

+ 15 - 2
test/framework/src/main/java/org/elasticsearch/telemetry/MetricRecorder.java

@@ -34,19 +34,24 @@ public class MetricRecorder<I> {
     private record RegisteredMetric<I>(
         Map<String, Registration> registered,
         Map<String, List<Measurement>> called,
-        Map<String, I> instruments
+        Map<String, I> instruments,
+        List<Runnable> callbacks
     ) {
         void register(String name, String description, String unit, I instrument) {
             assert registered.containsKey(name) == false
                 : Strings.format("unexpected [{}]: [{}][{}], already registered[{}]", name, description, unit, registered.get(name));
             registered.put(name, new Registration(name, description, unit));
             instruments.put(name, instrument);
+            if (instrument instanceof Runnable callback) {
+                callbacks.add(callback);
+            }
         }
 
         void call(String name, Measurement call) {
             assert registered.containsKey(name) : Strings.format("call for unregistered metric [{}]: [{}]", name, call);
             called.computeIfAbsent(Objects.requireNonNull(name), k -> new ArrayList<>()).add(call);
         }
+
     }
 
     /**
@@ -57,7 +62,7 @@ public class MetricRecorder<I> {
     public MetricRecorder() {
         metrics = new HashMap<>(InstrumentType.values().length);
         for (var instrument : InstrumentType.values()) {
-            metrics.put(instrument, new RegisteredMetric<>(new HashMap<>(), new HashMap<>(), new HashMap<>()));
+            metrics.put(instrument, new RegisteredMetric<>(new HashMap<>(), new HashMap<>(), new HashMap<>(), new ArrayList<>()));
         }
     }
 
@@ -106,4 +111,12 @@ public class MetricRecorder<I> {
     public I getInstrument(InstrumentType instrumentType, String name) {
         return metrics.get(instrumentType).instruments.get(name);
     }
+
+    public void resetCalls() {
+        metrics.forEach((it, rm) -> rm.called().clear());
+    }
+
+    public void collect() {
+        metrics.forEach((it, rm) -> rm.callbacks().forEach(Runnable::run));
+    }
 }

+ 52 - 25
test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java

@@ -8,19 +8,25 @@
 
 package org.elasticsearch.telemetry;
 
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.telemetry.metric.DoubleCounter;
 import org.elasticsearch.telemetry.metric.DoubleGauge;
 import org.elasticsearch.telemetry.metric.DoubleHistogram;
 import org.elasticsearch.telemetry.metric.DoubleUpDownCounter;
+import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
 import org.elasticsearch.telemetry.metric.Instrument;
 import org.elasticsearch.telemetry.metric.LongCounter;
 import org.elasticsearch.telemetry.metric.LongGauge;
 import org.elasticsearch.telemetry.metric.LongHistogram;
 import org.elasticsearch.telemetry.metric.LongUpDownCounter;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 /**
  * Recording versions of Elasticsearch {@link Instrument}s.  All invocations are recorded via {@link MetricRecorder}.
@@ -45,6 +51,40 @@ public class RecordingInstruments {
         }
     }
 
+    protected interface NumberWithAttributesObserver extends Supplier<Tuple<Number, Map<String, Object>>> {
+
+    }
+
+    protected abstract static class CallbackRecordingInstrument extends RecordingInstrument implements AutoCloseable, Runnable {
+        private final NumberWithAttributesObserver observer;
+        private boolean closed = false;
+        private final ReleasableLock closedLock = new ReleasableLock(new ReentrantLock());
+
+        public CallbackRecordingInstrument(String name, NumberWithAttributesObserver observer, MetricRecorder<Instrument> recorder) {
+            super(name, recorder);
+            this.observer = observer;
+        }
+
+        @Override
+        public void run() {
+            try (ReleasableLock lock = closedLock.acquire()) {
+                if (closed) {
+                    return;
+                }
+                var observation = observer.get();
+                call(observation.v1(), observation.v2());
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            try (ReleasableLock lock = closedLock.acquire()) {
+                assert closed == false : "double close";
+                closed = true;
+            }
+        }
+    }
+
     public static class RecordingDoubleCounter extends RecordingInstrument implements DoubleCounter {
         public RecordingDoubleCounter(String name, MetricRecorder<Instrument> recorder) {
             super(name, recorder);
@@ -66,19 +106,12 @@ public class RecordingInstruments {
         }
     }
 
-    public static class RecordingDoubleGauge extends RecordingInstrument implements DoubleGauge {
-        public RecordingDoubleGauge(String name, MetricRecorder<Instrument> recorder) {
-            super(name, recorder);
-        }
-
-        @Override
-        public void record(double value) {
-            record(value, Collections.emptyMap());
-        }
-
-        @Override
-        public void record(double value, Map<String, Object> attributes) {
-            call(value, attributes);
+    public static class RecordingDoubleGauge extends CallbackRecordingInstrument implements DoubleGauge {
+        public RecordingDoubleGauge(String name, Supplier<DoubleWithAttributes> observer, MetricRecorder<Instrument> recorder) {
+            super(name, () -> {
+                var observation = observer.get();
+                return new Tuple<>(observation.value(), observation.attributes());
+            }, recorder);
         }
     }
 
@@ -135,19 +168,13 @@ public class RecordingInstruments {
         }
     }
 
-    public static class RecordingLongGauge extends RecordingInstrument implements LongGauge {
-        public RecordingLongGauge(String name, MetricRecorder<Instrument> recorder) {
-            super(name, recorder);
-        }
+    public static class RecordingLongGauge extends CallbackRecordingInstrument implements LongGauge {
 
-        @Override
-        public void record(long value) {
-            record(value, Collections.emptyMap());
-        }
-
-        @Override
-        public void record(long value, Map<String, Object> attributes) {
-            call(value, attributes);
+        public RecordingLongGauge(String name, Supplier<LongWithAttributes> observer, MetricRecorder<Instrument> recorder) {
+            super(name, () -> {
+                var observation = observer.get();
+                return new Tuple<>(observation.value(), observation.attributes());
+            }, recorder);
         }
     }
 

+ 12 - 8
test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java

@@ -12,13 +12,17 @@ import org.elasticsearch.telemetry.metric.DoubleCounter;
 import org.elasticsearch.telemetry.metric.DoubleGauge;
 import org.elasticsearch.telemetry.metric.DoubleHistogram;
 import org.elasticsearch.telemetry.metric.DoubleUpDownCounter;
+import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
 import org.elasticsearch.telemetry.metric.Instrument;
 import org.elasticsearch.telemetry.metric.LongCounter;
 import org.elasticsearch.telemetry.metric.LongGauge;
 import org.elasticsearch.telemetry.metric.LongHistogram;
 import org.elasticsearch.telemetry.metric.LongUpDownCounter;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
 import org.elasticsearch.telemetry.metric.MeterRegistry;
 
+import java.util.function.Supplier;
+
 /**
  * A {@link MeterRegistry} that records all instrument invocations.
  * Tests can subclass this class and extend the build[Instrument] methods to do their
@@ -64,8 +68,8 @@ public class RecordingMeterRegistry implements MeterRegistry {
     }
 
     @Override
-    public DoubleGauge registerDoubleGauge(String name, String description, String unit) {
-        DoubleGauge instrument = buildDoubleGauge(name, description, unit);
+    public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
+        DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer);
         recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
         return instrument;
     }
@@ -75,8 +79,8 @@ public class RecordingMeterRegistry implements MeterRegistry {
         return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name);
     }
 
-    protected DoubleGauge buildDoubleGauge(String name, String description, String unit) {
-        return new RecordingInstruments.RecordingDoubleGauge(name, recorder);
+    protected DoubleGauge buildDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
+        return new RecordingInstruments.RecordingDoubleGauge(name, observer, recorder);
     }
 
     @Override
@@ -128,8 +132,8 @@ public class RecordingMeterRegistry implements MeterRegistry {
     }
 
     @Override
-    public LongGauge registerLongGauge(String name, String description, String unit) {
-        LongGauge instrument = buildLongGauge(name, description, unit);
+    public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
+        LongGauge instrument = buildLongGauge(name, description, unit, observer);
         recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
         return instrument;
     }
@@ -139,8 +143,8 @@ public class RecordingMeterRegistry implements MeterRegistry {
         return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name);
     }
 
-    protected LongGauge buildLongGauge(String name, String description, String unit) {
-        return new RecordingInstruments.RecordingLongGauge(name, recorder);
+    protected LongGauge buildLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
+        return new RecordingInstruments.RecordingLongGauge(name, observer, recorder);
     }
 
     @Override