浏览代码

[ML] Adding data frame analytics stats to _usage API (#45820)

* [ML] Adding data frame analytics stats to _usage API

* making the size of analytics stats 10k
Benjamin Trent 6 年之前
父节点
当前提交
0412504464

+ 22 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.ml;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -12,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.XPackField;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -26,16 +28,23 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
     public static final String MODEL_SIZE = "model_size";
     public static final String CREATED_BY = "created_by";
     public static final String NODE_COUNT = "node_count";
+    public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs";
 
     private final Map<String, Object> jobsUsage;
     private final Map<String, Object> datafeedsUsage;
+    private final Map<String, Object> analyticsUsage;
     private final int nodeCount;
 
-    public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
-                                          Map<String, Object> datafeedsUsage, int nodeCount) {
+    public MachineLearningFeatureSetUsage(boolean available,
+                                          boolean enabled,
+                                          Map<String, Object> jobsUsage,
+                                          Map<String, Object> datafeedsUsage,
+                                          Map<String, Object> analyticsUsage,
+                                          int nodeCount) {
         super(XPackField.MACHINE_LEARNING, available, enabled);
         this.jobsUsage = Objects.requireNonNull(jobsUsage);
         this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
+        this.analyticsUsage = Objects.requireNonNull(analyticsUsage);
         this.nodeCount = nodeCount;
     }
 
@@ -43,6 +52,11 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
         super(in);
         this.jobsUsage = in.readMap();
         this.datafeedsUsage = in.readMap();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.analyticsUsage = in.readMap();
+        } else {
+            this.analyticsUsage = Collections.emptyMap();
+        }
         this.nodeCount = in.readInt();
     }
 
@@ -51,18 +65,18 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
         super.writeTo(out);
         out.writeMap(jobsUsage);
         out.writeMap(datafeedsUsage);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeMap(analyticsUsage);
+        }
         out.writeInt(nodeCount);
     }
 
     @Override
     protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
         super.innerXContent(builder, params);
-        if (jobsUsage != null) {
-            builder.field(JOBS_FIELD, jobsUsage);
-        }
-        if (datafeedsUsage != null) {
-            builder.field(DATAFEEDS_FIELD, datafeedsUsage);
-        }
+        builder.field(JOBS_FIELD, jobsUsage);
+        builder.field(DATAFEEDS_FIELD, datafeedsUsage);
+        builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage);
         if (nodeCount >= 0) {
             builder.field(NODE_COUNT, nodeCount);
         }

+ 37 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java

@@ -25,10 +25,13 @@ import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
+import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
+import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -69,22 +72,35 @@ public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransp
                                    ActionListener<XPackUsageFeatureResponse> listener) {
         if (enabled == false) {
             MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), enabled,
-                Collections.emptyMap(), Collections.emptyMap(), 0);
+                Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0);
             listener.onResponse(new XPackUsageFeatureResponse(usage));
             return;
         }
 
         Map<String, Object> jobsUsage = new LinkedHashMap<>();
         Map<String, Object> datafeedsUsage = new LinkedHashMap<>();
+        Map<String, Object> analyticsUsage = new LinkedHashMap<>();
         int nodeCount = mlNodeCount(state);
 
+        // Step 3. Extract usage from data frame analytics stats and return usage response
+        ActionListener<GetDataFrameAnalyticsStatsAction.Response> dataframeAnalyticsListener = ActionListener.wrap(
+            response -> {
+                addDataFrameAnalyticsUsage(response, analyticsUsage);
+                MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
+                    enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount);
+                listener.onResponse(new XPackUsageFeatureResponse(usage));
+            },
+            listener::onFailure
+        );
+
         // Step 2. Extract usage from datafeeds stats and return usage response
         ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
             ActionListener.wrap(response -> {
                 addDatafeedsUsage(response, datafeedsUsage);
-                MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
-                    enabled, jobsUsage, datafeedsUsage, nodeCount);
-                listener.onResponse(new XPackUsageFeatureResponse(usage));
+                GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest =
+                    new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL);
+                dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000));
+                client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener);
             },
             listener::onFailure);
 
@@ -184,19 +200,33 @@ public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransp
                 ds -> Counter.newCounter()).addAndGet(1);
         }
 
-        datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count()));
+        datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
         for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
             datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
-                createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
+                createCountUsageEntry(datafeedCountByState.get(datafeedState).get()));
         }
     }
 
-    private Map<String, Object> createDatafeedUsageEntry(long count) {
+    private Map<String, Object> createCountUsageEntry(long count) {
         Map<String, Object> usage = new HashMap<>();
         usage.put(MachineLearningFeatureSetUsage.COUNT, count);
         return usage;
     }
 
+    private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response,
+                                            Map<String, Object> dataframeAnalyticsUsage) {
+        Map<DataFrameAnalyticsState, Counter> dataFrameAnalyticsStateCounterMap = new HashMap<>();
+
+        for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) {
+            dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1);
+        }
+        dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
+        for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) {
+            dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT),
+                createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get()));
+        }
+    }
+
     private static int mlNodeCount(final ClusterState clusterState) {
         int mlNodeCount = 0;
         for (DiscoveryNode node : clusterState.getNodes()) {

+ 32 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java

@@ -34,10 +34,13 @@ import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
+import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -95,6 +98,7 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
         when(clusterService.state()).thenReturn(clusterState);
         givenJobs(Collections.emptyList(), Collections.emptyList());
         givenDatafeeds(Collections.emptyList());
+        givenDataFrameAnalytics(Collections.emptyList());
     }
 
     private MachineLearningUsageTransportAction newUsageAction(Settings settings) {
@@ -165,6 +169,11 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
                 buildDatafeedStats(DatafeedState.STARTED),
                 buildDatafeedStats(DatafeedState.STOPPED)
         ));
+        givenDataFrameAnalytics(Arrays.asList(
+            buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
+            buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
+            buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED)
+        ));
 
         var usageAction = newUsageAction(settings.build());
         PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
@@ -230,6 +239,10 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
             assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
             assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));
 
+            assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3));
+            assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1));
+            assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2));
+
             assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
             assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));
 
@@ -391,6 +404,19 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
         }).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
     }
 
+    private void givenDataFrameAnalytics(List<GetDataFrameAnalyticsStatsAction.Response.Stats> dataFrameAnalyticsStats) {
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener =
+                (ActionListener<GetDataFrameAnalyticsStatsAction.Response>) invocationOnMock.getArguments()[2];
+            listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(
+                new QueryPage<>(dataFrameAnalyticsStats,
+                    dataFrameAnalyticsStats.size(),
+                    GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
+            return Void.TYPE;
+        }).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any());
+    }
+
     private static Detector buildMinDetector(String fieldName) {
         Detector.Builder detectorBuilder = new Detector.Builder();
         detectorBuilder.setFunction("min");
@@ -431,6 +457,12 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
         return stats;
     }
 
+    private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) {
+        GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class);
+        when(stats.getState()).thenReturn(state);
+        return stats;
+    }
+
     private static ForecastStats buildForecastStats(long numberOfForecasts) {
         return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
     }