Quellcode durchsuchen

Add num docs and size to logsdb telemetry (#116128) (#116270)

Follow-up on #115994 to add telemetry for the total number of documents
and size in bytes of logsdb indices.

Relates #115994
Nhat Nguyen vor 11 Monaten
Ursprung
Commit
9497410147

+ 5 - 0
docs/changelog/116128.yaml

@@ -0,0 +1,5 @@
+pr: 116128
+summary: Add num docs and size to logsdb telemetry
+area: Logs
+type: enhancement
+issues: []

+ 16 - 0
server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java

@@ -11,9 +11,11 @@ package org.elasticsearch.monitor.metrics;
 
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.OnScriptError;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.indices.IndicesService;
@@ -329,6 +331,10 @@ public class IndicesMetricsIT extends ESIntegTestCase {
                 equalTo(0L)
             )
         );
+
+        verifyStatsPerIndexMode(
+            Map.of(IndexMode.STANDARD, numStandardDocs, IndexMode.LOGSDB, numLogsdbDocs, IndexMode.TIME_SERIES, numTimeSeriesDocs)
+        );
     }
 
     void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map<String, Matcher<Long>> matchers) {
@@ -434,6 +440,16 @@ public class IndicesMetricsIT extends ESIntegTestCase {
         return totalDocs;
     }
 
+    private void verifyStatsPerIndexMode(Map<IndexMode, Long> expectedDocs) {
+        var nodes = clusterService().state().nodes().stream().toArray(DiscoveryNode[]::new);
+        var request = new IndexModeStatsActionType.StatsRequest(nodes);
+        var resp = client().execute(IndexModeStatsActionType.TYPE, request).actionGet();
+        var stats = resp.stats();
+        for (Map.Entry<IndexMode, Long> e : expectedDocs.entrySet()) {
+            assertThat(stats.get(e.getKey()).numDocs(), equalTo(e.getValue()));
+        }
+    }
+
     private Map<String, Object> parseMapping(String mapping) throws IOException {
         try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) {
             return parser.map();

+ 1 - 0
server/src/main/java/module-info.java

@@ -469,5 +469,6 @@ module org.elasticsearch.server {
             org.elasticsearch.serverless.apifiltering;
     exports org.elasticsearch.lucene.spatial;
     exports org.elasticsearch.inference.configuration;
+    exports org.elasticsearch.monitor.metrics;
 
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -187,6 +187,7 @@ public class TransportVersions {
     public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
     public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);
     public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
+    public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 2 - 0
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -243,6 +243,7 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata;
 import org.elasticsearch.injection.guice.AbstractModule;
 import org.elasticsearch.injection.guice.TypeLiteral;
 import org.elasticsearch.injection.guice.multibindings.MapBinder;
+import org.elasticsearch.monitor.metrics.IndexModeStatsActionType;
 import org.elasticsearch.persistent.CompletionPersistentTaskAction;
 import org.elasticsearch.persistent.RemovePersistentTaskAction;
 import org.elasticsearch.persistent.StartPersistentTaskAction;
@@ -630,6 +631,7 @@ public class ActionModule extends AbstractModule {
         actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
         actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class);
         actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class);
+        actions.register(IndexModeStatsActionType.TYPE, IndexModeStatsActionType.TransportAction.class);
         actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class);
         actions.register(TransportNodesHotThreadsAction.TYPE, TransportNodesHotThreadsAction.class);
         actions.register(TransportListTasksAction.TYPE, TransportListTasksAction.class);

+ 162 - 0
server/src/main/java/org/elasticsearch/monitor/metrics/IndexModeStatsActionType.java

@@ -0,0 +1,162 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.monitor.metrics;
+
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+public final class IndexModeStatsActionType extends ActionType<IndexModeStatsActionType.StatsResponse> {
+    public static final IndexModeStatsActionType TYPE = new IndexModeStatsActionType();
+
+    private IndexModeStatsActionType() {
+        super("cluster:monitor/nodes/index_mode_stats");
+    }
+
+    public static final class StatsRequest extends BaseNodesRequest<StatsRequest> {
+        public StatsRequest(String[] nodesIds) {
+            super(nodesIds);
+        }
+
+        public StatsRequest(DiscoveryNode... concreteNodes) {
+            super(concreteNodes);
+        }
+    }
+
+    public static final class StatsResponse extends BaseNodesResponse<NodeResponse> {
+        StatsResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
+            super(clusterName, nodes, failures);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            assert false : "must be local";
+            throw new UnsupportedOperationException("must be local");
+        }
+
+        @Override
+        protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
+            assert false : "must be local";
+            throw new UnsupportedOperationException("must be local");
+        }
+
+        @Override
+        protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
+            assert false : "must be local";
+            throw new UnsupportedOperationException("must be local");
+        }
+
+        public Map<IndexMode, IndexStats> stats() {
+            final Map<IndexMode, IndexStats> stats = new EnumMap<>(IndexMode.class);
+            for (IndexMode mode : IndexMode.values()) {
+                stats.put(mode, new IndexStats());
+            }
+            for (NodeResponse node : getNodes()) {
+                for (Map.Entry<IndexMode, IndexStats> e : node.stats.entrySet()) {
+                    stats.get(e.getKey()).add(e.getValue());
+                }
+            }
+            return stats;
+        }
+    }
+
+    public static final class NodeRequest extends TransportRequest {
+        NodeRequest() {
+
+        }
+
+        NodeRequest(StreamInput in) throws IOException {
+            super(in);
+        }
+    }
+
+    public static class NodeResponse extends BaseNodeResponse {
+        private final Map<IndexMode, IndexStats> stats;
+
+        NodeResponse(DiscoveryNode node, Map<IndexMode, IndexStats> stats) {
+            super(node);
+            this.stats = stats;
+        }
+
+        NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
+            super(in, node);
+            stats = in.readMap(IndexMode::readFrom, IndexStats::new);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeMap(stats, (o, m) -> IndexMode.writeTo(m, o), (o, s) -> s.writeTo(o));
+        }
+    }
+
+    public static class TransportAction extends TransportNodesAction<StatsRequest, StatsResponse, NodeRequest, NodeResponse, Void> {
+        private final IndicesService indicesService;
+
+        @Inject
+        public TransportAction(
+            ClusterService clusterService,
+            TransportService transportService,
+            ActionFilters actionFilters,
+            IndicesService indicesService
+        ) {
+            super(
+                TYPE.name(),
+                clusterService,
+                transportService,
+                actionFilters,
+                NodeRequest::new,
+                transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
+            );
+            this.indicesService = indicesService;
+        }
+
+        @Override
+        protected StatsResponse newResponse(StatsRequest request, List<NodeResponse> nodeResponses, List<FailedNodeException> failures) {
+            return new StatsResponse(ClusterName.DEFAULT, nodeResponses, failures);
+        }
+
+        @Override
+        protected NodeRequest newNodeRequest(StatsRequest request) {
+            return new NodeRequest();
+        }
+
+        @Override
+        protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
+            return new NodeResponse(in, node);
+        }
+
+        @Override
+        protected NodeResponse nodeOperation(NodeRequest request, Task task) {
+            return new NodeResponse(clusterService.localNode(), IndicesMetrics.getStatsWithoutCache(indicesService));
+        }
+    }
+}

+ 67 - 0
server/src/main/java/org/elasticsearch/monitor/metrics/IndexStats.java

@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.monitor.metrics;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.index.search.stats.SearchStats;
+import org.elasticsearch.index.shard.IndexingStats;
+
+import java.io.IOException;
+
+public final class IndexStats implements Writeable {
+    int numIndices = 0;
+    long numDocs = 0;
+    long numBytes = 0;
+    SearchStats.Stats search = new SearchStats().getTotal();
+    IndexingStats.Stats indexing = new IndexingStats().getTotal();
+
+    IndexStats() {
+
+    }
+
+    IndexStats(StreamInput in) throws IOException {
+        this.numIndices = in.readVInt();
+        this.numDocs = in.readVLong();
+        this.numBytes = in.readVLong();
+        this.search = SearchStats.Stats.readStats(in);
+        this.indexing = new IndexingStats.Stats(in);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVInt(numIndices);
+        out.writeVLong(numDocs);
+        out.writeVLong(numBytes);
+        search.writeTo(out);
+        indexing.writeTo(out);
+    }
+
+    void add(IndexStats other) {
+        this.numIndices += other.numIndices;
+        this.numDocs += other.numDocs;
+        this.numBytes += other.numBytes;
+        this.search.add(other.search);
+        this.indexing.add(other.indexing);
+    }
+
+    public int numIndices() {
+        return numIndices;
+    }
+
+    public long numDocs() {
+        return numDocs;
+    }
+
+    public long numBytes() {
+        return numBytes;
+    }
+}

+ 31 - 41
server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java

@@ -18,11 +18,9 @@ import org.elasticsearch.common.util.SingleObjectCache;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.shard.DocsStats;
 import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.IndexingStats;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.telemetry.metric.LongWithAttributes;
 import org.elasticsearch.telemetry.metric.MeterRegistry;
@@ -193,12 +191,36 @@ public class IndicesMetrics extends AbstractLifecycleComponent {
         });
     }
 
-    static class IndexStats {
-        int numIndices = 0;
-        long numDocs = 0;
-        long numBytes = 0;
-        SearchStats.Stats search = new SearchStats().getTotal();
-        IndexingStats.Stats indexing = new IndexingStats().getTotal();
+    static Map<IndexMode, IndexStats> getStatsWithoutCache(IndicesService indicesService) {
+        Map<IndexMode, IndexStats> stats = new EnumMap<>(IndexMode.class);
+        for (IndexMode mode : IndexMode.values()) {
+            stats.put(mode, new IndexStats());
+        }
+        for (IndexService indexService : indicesService) {
+            for (IndexShard indexShard : indexService) {
+                if (indexShard.isSystem()) {
+                    continue; // skip system indices
+                }
+                final ShardRouting shardRouting = indexShard.routingEntry();
+                final IndexMode indexMode = indexShard.indexSettings().getMode();
+                final IndexStats indexStats = stats.get(indexMode);
+                try {
+                    if (shardRouting.primary() && shardRouting.recoverySource() == null) {
+                        if (shardRouting.shardId().id() == 0) {
+                            indexStats.numIndices++;
+                        }
+                        final DocsStats docStats = indexShard.docStats();
+                        indexStats.numDocs += docStats.getCount();
+                        indexStats.numBytes += docStats.getTotalSizeInBytes();
+                        indexStats.indexing.add(indexShard.indexingStats().getTotal());
+                    }
+                    indexStats.search.add(indexShard.searchStats().getTotal());
+                } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) {
+                    // ignored
+                }
+            }
+        }
+        return stats;
     }
 
     private static class IndicesStatsCache extends SingleObjectCache<Map<IndexMode, IndexStats>> {
@@ -219,41 +241,9 @@ public class IndicesMetrics extends AbstractLifecycleComponent {
             this.refresh = true;
         }
 
-        private Map<IndexMode, IndexStats> internalGetIndicesStats() {
-            Map<IndexMode, IndexStats> stats = new EnumMap<>(IndexMode.class);
-            for (IndexMode mode : IndexMode.values()) {
-                stats.put(mode, new IndexStats());
-            }
-            for (IndexService indexService : indicesService) {
-                for (IndexShard indexShard : indexService) {
-                    if (indexShard.isSystem()) {
-                        continue; // skip system indices
-                    }
-                    final ShardRouting shardRouting = indexShard.routingEntry();
-                    final IndexMode indexMode = indexShard.indexSettings().getMode();
-                    final IndexStats indexStats = stats.get(indexMode);
-                    try {
-                        if (shardRouting.primary() && shardRouting.recoverySource() == null) {
-                            if (shardRouting.shardId().id() == 0) {
-                                indexStats.numIndices++;
-                            }
-                            final DocsStats docStats = indexShard.docStats();
-                            indexStats.numDocs += docStats.getCount();
-                            indexStats.numBytes += docStats.getTotalSizeInBytes();
-                            indexStats.indexing.add(indexShard.indexingStats().getTotal());
-                        }
-                        indexStats.search.add(indexShard.searchStats().getTotal());
-                    } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) {
-                        // ignored
-                    }
-                }
-            }
-            return stats;
-        }
-
         @Override
         protected Map<IndexMode, IndexStats> refresh() {
-            return refresh ? internalGetIndicesStats() : getNoRefresh();
+            return refresh ? getStatsWithoutCache(indicesService) : getNoRefresh();
         }
 
         @Override

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackFeatures.java

@@ -21,13 +21,15 @@ import java.util.Set;
  */
 public class XPackFeatures implements FeatureSpecification {
     public static final NodeFeature LOGSDB_TELEMETRY = new NodeFeature("logsdb_telemetry");
+    public static final NodeFeature LOGSDB_TELMETRY_STATS = new NodeFeature("logsdb_telemetry_stats");
 
     @Override
     public Set<NodeFeature> getFeatures() {
         return Set.of(
             NodesDataTiersUsageTransportAction.LOCALLY_PRECALCULATED_STATS_FEATURE, // Added in 8.12
             License.INDEPENDENT_TRIAL_VERSION_FEATURE, // 8.14.0
-            LOGSDB_TELEMETRY
+            LOGSDB_TELEMETRY,
+            LOGSDB_TELMETRY_STATS
         );
     }
 

+ 29 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/application/LogsDBFeatureSetUsage.java

@@ -20,11 +20,20 @@ import java.util.Objects;
 public final class LogsDBFeatureSetUsage extends XPackFeatureUsage {
     private final int indicesCount;
     private final int indicesWithSyntheticSource;
+    private final long numDocs;
+    private final long sizeInBytes;
 
     public LogsDBFeatureSetUsage(StreamInput input) throws IOException {
         super(input);
         indicesCount = input.readVInt();
         indicesWithSyntheticSource = input.readVInt();
+        if (input.getTransportVersion().onOrAfter(TransportVersions.LOGSDB_TELEMETRY_STATS)) {
+            numDocs = input.readVLong();
+            sizeInBytes = input.readVLong();
+        } else {
+            numDocs = 0;
+            sizeInBytes = 0;
+        }
     }
 
     @Override
@@ -32,12 +41,25 @@ public final class LogsDBFeatureSetUsage extends XPackFeatureUsage {
         super.writeTo(out);
         out.writeVInt(indicesCount);
         out.writeVInt(indicesWithSyntheticSource);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.LOGSDB_TELEMETRY_STATS)) {
+            out.writeVLong(numDocs);
+            out.writeVLong(sizeInBytes);
+        }
     }
 
-    public LogsDBFeatureSetUsage(boolean available, boolean enabled, int indicesCount, int indicesWithSyntheticSource) {
+    public LogsDBFeatureSetUsage(
+        boolean available,
+        boolean enabled,
+        int indicesCount,
+        int indicesWithSyntheticSource,
+        long numDocs,
+        long sizeInBytes
+    ) {
         super(XPackField.LOGSDB, available, enabled);
         this.indicesCount = indicesCount;
         this.indicesWithSyntheticSource = indicesWithSyntheticSource;
+        this.numDocs = numDocs;
+        this.sizeInBytes = sizeInBytes;
     }
 
     @Override
@@ -50,11 +72,13 @@ public final class LogsDBFeatureSetUsage extends XPackFeatureUsage {
         super.innerXContent(builder, params);
         builder.field("indices_count", indicesCount);
         builder.field("indices_with_synthetic_source", indicesWithSyntheticSource);
+        builder.field("num_docs", numDocs);
+        builder.field("size_in_bytes", sizeInBytes);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(available, enabled, indicesCount, indicesWithSyntheticSource);
+        return Objects.hash(available, enabled, indicesCount, indicesWithSyntheticSource, numDocs, sizeInBytes);
     }
 
     @Override
@@ -69,6 +93,8 @@ public final class LogsDBFeatureSetUsage extends XPackFeatureUsage {
         return Objects.equals(available, other.available)
             && Objects.equals(enabled, other.enabled)
             && Objects.equals(indicesCount, other.indicesCount)
-            && Objects.equals(indicesWithSyntheticSource, other.indicesWithSyntheticSource);
+            && Objects.equals(indicesWithSyntheticSource, other.indicesWithSyntheticSource)
+            && Objects.equals(numDocs, other.numDocs)
+            && Objects.equals(sizeInBytes, other.sizeInBytes);
     }
 }

+ 34 - 3
x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBUsageTransportAction.java

@@ -8,17 +8,22 @@ package org.elasticsearch.xpack.logsdb;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.monitor.metrics.IndexModeStatsActionType;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackFeatures;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
@@ -28,13 +33,17 @@ import static org.elasticsearch.index.mapper.SourceFieldMapper.INDEX_MAPPER_SOUR
 
 public class LogsDBUsageTransportAction extends XPackUsageFeatureTransportAction {
     private final ClusterService clusterService;
+    private final FeatureService featureService;
+    private final Client client;
 
     @Inject
     public LogsDBUsageTransportAction(
         TransportService transportService,
         ClusterService clusterService,
+        FeatureService featureService,
         ThreadPool threadPool,
         ActionFilters actionFilters,
+        Client client,
         IndexNameExpressionResolver indexNameExpressionResolver
     ) {
         super(
@@ -46,6 +55,8 @@ public class LogsDBUsageTransportAction extends XPackUsageFeatureTransportAction
             indexNameExpressionResolver
         );
         this.clusterService = clusterService;
+        this.featureService = featureService;
+        this.client = client;
     }
 
     @Override
@@ -66,8 +77,28 @@ public class LogsDBUsageTransportAction extends XPackUsageFeatureTransportAction
             }
         }
         final boolean enabled = LogsDBPlugin.CLUSTER_LOGSDB_ENABLED.get(clusterService.getSettings());
-        listener.onResponse(
-            new XPackUsageFeatureResponse(new LogsDBFeatureSetUsage(true, enabled, numIndices, numIndicesWithSyntheticSources))
-        );
+        if (featureService.clusterHasFeature(state, XPackFeatures.LOGSDB_TELMETRY_STATS)) {
+            final DiscoveryNode[] nodes = state.nodes().getDataNodes().values().toArray(DiscoveryNode[]::new);
+            final var statsRequest = new IndexModeStatsActionType.StatsRequest(nodes);
+            final int finalNumIndices = numIndices;
+            final int finalNumIndicesWithSyntheticSources = numIndicesWithSyntheticSources;
+            client.execute(IndexModeStatsActionType.TYPE, statsRequest, listener.map(statsResponse -> {
+                final var indexStats = statsResponse.stats().get(IndexMode.LOGSDB);
+                return new XPackUsageFeatureResponse(
+                    new LogsDBFeatureSetUsage(
+                        true,
+                        enabled,
+                        finalNumIndices,
+                        finalNumIndicesWithSyntheticSources,
+                        indexStats.numDocs(),
+                        indexStats.numBytes()
+                    )
+                );
+            }));
+        } else {
+            listener.onResponse(
+                new XPackUsageFeatureResponse(new LogsDBFeatureSetUsage(true, enabled, numIndices, numIndicesWithSyntheticSources, 0L, 0L))
+            );
+        }
     }
 }

+ 36 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_usage.yml

@@ -1,5 +1,8 @@
 ---
 logsdb usage:
+  - requires:
+      cluster_features: ["logsdb_telemetry_stats"]
+      reason: "requires stats"
   - do:
       indices.create:
         index: test1
@@ -7,20 +10,43 @@ logsdb usage:
           settings:
             index:
               mode: logsdb
+  - do:
+      bulk:
+        index: test1
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "@timestamp": "2024-02-12T10:30:00Z", "host.name": "foo" }
+          - { "index": { } }
+          - { "@timestamp": "2024-02-12T10:31:00Z", "host.name": "bar" }
 
   - do: {xpack.usage: {}}
   - match: { logsdb.available: true }
   - match: { logsdb.indices_count: 1 }
   - match: { logsdb.indices_with_synthetic_source: 1 }
+  - match: { logsdb.num_docs: 2 }
+  - gt:    { logsdb.size_in_bytes: 0}
 
   - do:
       indices.create:
         index: test2
 
+  - do:
+      bulk:
+        index: test2
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "@timestamp": "2024-02-12T10:32:00Z", "host.name": "foo" }
+          - { "index": { } }
+          - { "@timestamp": "2024-02-12T10:33:00Z", "host.name": "baz" }
+
   - do: {xpack.usage: {}}
   - match: { logsdb.available: true }
   - match: { logsdb.indices_count: 1 }
   - match: { logsdb.indices_with_synthetic_source: 1 }
+  - match: { logsdb.num_docs: 2 }
+  - gt:    { logsdb.size_in_bytes: 0}
 
   - do:
       indices.create:
@@ -31,7 +57,17 @@ logsdb usage:
               mode: logsdb
               mapping.source.mode: stored
 
+  - do:
+      bulk:
+        index: test3
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "@timestamp": "2024-02-12T10:32:00Z", "host.name": "foobar"}
+
   - do: {xpack.usage: {}}
   - match: { logsdb.available: true }
   - match: { logsdb.indices_count: 2 }
   - match: { logsdb.indices_with_synthetic_source: 1 }
+  - match: { logsdb.num_docs: 3 }
+  - gt:    { logsdb.size_in_bytes: 0}