Browse Source

Add more rollup usage stats (#108245)

This change adds `number_of_rollup_jobs` and `number_of_rollup_indices`
to  `rollup` usage. The former indicates the number of active rollup
jobs running and the latter indicated the number of rollup indices
(which could be the result of previous rollup jobs).
Martijn van Groningen 1 year ago
parent
commit
e043bce1af

+ 3 - 1
docs/reference/rest-api/usage.asciidoc

@@ -308,7 +308,8 @@ GET /_xpack/usage
   },
   "rollup" : {
     "available" : true,
-    "enabled" : true
+    "enabled" : true,
+    ...
   },
   "ilm" : {
     "policy_count" : 3,
@@ -496,6 +497,7 @@ GET /_xpack/usage
 }
 ------------------------------------------------------------
 // TESTRESPONSE[s/"security" : \{[^\}]*\},/"security" : $body.$_path,/]
+// TESTRESPONSE[s/"rollup" : \{[^\}]*\},/"rollup" : $body.$_path,/]
 // TESTRESPONSE[s/"detectors" : \{[^\}]*\},/"detectors" : $body.$_path,/]
 // TESTRESPONSE[s/"model_size" : \{[^\}]*\},/"model_size" : $body.$_path,/]
 // TESTRESPONSE[s/"eql" : \{[^\}]*\},/"eql" : $body.$_path,/]

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -193,6 +193,7 @@ public class TransportVersions {
     public static final TransportVersion NO_GLOBAL_RETENTION_FOR_SYSTEM_DATA_STREAMS = def(8_650_00_0);
     public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_651_00_0);
     public static final TransportVersion INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT = def(8_652_00_0);
+    public static final TransportVersion ROLLUP_USAGE = def(8_653_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 27 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupFeatureSetUsage.java

@@ -9,19 +9,45 @@ package org.elasticsearch.xpack.core.rollup;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.XPackField;
 
 import java.io.IOException;
 
+import static org.elasticsearch.TransportVersions.ROLLUP_USAGE;
+
 public class RollupFeatureSetUsage extends XPackFeatureSet.Usage {
 
+    private final int numberOfRollupJobs;
+
     public RollupFeatureSetUsage(StreamInput input) throws IOException {
         super(input);
+        this.numberOfRollupJobs = input.getTransportVersion().onOrAfter(ROLLUP_USAGE) ? input.readVInt() : 0;
     }
 
-    public RollupFeatureSetUsage() {
+    public RollupFeatureSetUsage(int numberOfRollupJobs) {
         super(XPackField.ROLLUP, true, true);
+        this.numberOfRollupJobs = numberOfRollupJobs;
+    }
+
+    public int getNumberOfRollupJobs() {
+        return numberOfRollupJobs;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        if (out.getTransportVersion().onOrAfter(ROLLUP_USAGE)) {
+            out.writeVInt(numberOfRollupJobs);
+        }
+    }
+
+    @Override
+    protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
+        super.innerXContent(builder, params);
+        builder.field("number_of_rollup_jobs", numberOfRollupJobs);
     }
 
     @Override

+ 9 - 2
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupUsageTransportAction.java

@@ -12,6 +12,8 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.core.Predicates;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -20,6 +22,7 @@ import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
 import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
+import org.elasticsearch.xpack.core.rollup.job.RollupJob;
 
 public class RollupUsageTransportAction extends XPackUsageFeatureTransportAction {
 
@@ -48,8 +51,12 @@ public class RollupUsageTransportAction extends XPackUsageFeatureTransportAction
         ClusterState state,
         ActionListener<XPackUsageFeatureResponse> listener
     ) {
-        // TODO expose the currently running rollup tasks on this node? Unclear the best way to do that
-        RollupFeatureSetUsage usage = new RollupFeatureSetUsage();
+        int numberOfRollupJobs = 0;
+        PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
+        if (persistentTasks != null) {
+            numberOfRollupJobs = persistentTasks.findTasks(RollupJob.NAME, Predicates.always()).size();
+        }
+        RollupFeatureSetUsage usage = new RollupFeatureSetUsage(numberOfRollupJobs);
         listener.onResponse(new XPackUsageFeatureResponse(usage));
     }
 }

+ 7 - 4
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupInfoTransportActionTests.java

@@ -8,18 +8,19 @@ package org.elasticsearch.xpack.rollup;
 
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
 import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.Is.is;
 import static org.mockito.Mockito.mock;
 
@@ -42,13 +43,15 @@ public class RollupInfoTransportActionTests extends ESTestCase {
         TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
         var usageAction = new RollupUsageTransportAction(transportService, null, threadPool, mock(ActionFilters.class), null);
         PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
-        usageAction.masterOperation(null, null, null, future);
-        XPackFeatureSet.Usage rollupUsage = future.get().getUsage();
+        usageAction.masterOperation(null, null, ClusterState.EMPTY_STATE, future);
+        RollupFeatureSetUsage rollupUsage = (RollupFeatureSetUsage) future.get().getUsage();
         BytesStreamOutput out = new BytesStreamOutput();
         rollupUsage.writeTo(out);
-        XPackFeatureSet.Usage serializedUsage = new RollupFeatureSetUsage(out.bytes().streamInput());
+        var serializedUsage = new RollupFeatureSetUsage(out.bytes().streamInput());
         assertThat(rollupUsage.name(), is(serializedUsage.name()));
         assertThat(rollupUsage.enabled(), is(serializedUsage.enabled()));
+        assertThat(rollupUsage.enabled(), is(serializedUsage.enabled()));
+        assertThat(rollupUsage.getNumberOfRollupJobs(), equalTo(serializedUsage.getNumberOfRollupJobs()));
     }
 
 }

+ 5 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/rollup/put_job.yml

@@ -94,6 +94,11 @@ setup:
           status:
             job_state: "stopped"
 
+  - do: {xpack.usage: {}}
+  - match: { rollup.available: true }
+  - match: { rollup.enabled: true }
+  - match: { rollup.number_of_rollup_jobs: 1 }
+
 ---
 "Test put_job with existing name":