Browse Source

[ML] ML usage inference ingest stats should be calculated per processor (#82377)

During the refactoring in #82259 there was an accidental change reporting
ingest stats for processors using the total ingest stats per model instead
of the ingest stats of each model's processors. This commit corrects that mistake.
Dimitris Athanasiou 3 years ago
parent
commit
74db5c1176

+ 8 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java

@@ -18,7 +18,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.Environment;
-import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
@@ -50,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
 import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
+import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
 import org.elasticsearch.xpack.ml.job.JobManagerHolder;
 import org.elasticsearch.xpack.ml.job.JobManagerHolder;
 
 
 import java.util.Collections;
 import java.util.Collections;
@@ -439,10 +439,13 @@ public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransp
 
 
         for (GetTrainedModelsStatsAction.Response.TrainedModelStats modelStats : statsResponse.getResources().results()) {
         for (GetTrainedModelsStatsAction.Response.TrainedModelStats modelStats : statsResponse.getResources().results()) {
             pipelineCount += modelStats.getPipelineCount();
             pipelineCount += modelStats.getPipelineCount();
-            IngestStats ingestStats = modelStats.getIngestStats();
-            docCountStats.add(ingestStats.getTotalStats().getIngestCount());
-            timeStats.add(ingestStats.getTotalStats().getIngestTimeInMillis());
-            failureStats.add(ingestStats.getTotalStats().getIngestFailedCount());
+            modelStats.getIngestStats().getProcessorStats().values().stream().flatMap(List::stream).forEach(processorStat -> {
+                if (processorStat.getName().equals(InferenceProcessor.TYPE)) {
+                    docCountStats.add(processorStat.getStats().getIngestCount());
+                    timeStats.add(processorStat.getStats().getIngestTimeInMillis());
+                    failureStats.add(processorStat.getStats().getIngestFailedCount());
+                }
+            });
         }
         }
 
 
         Map<String, Object> ingestUsage = new HashMap<>(6);
         Map<String, Object> ingestUsage = new HashMap<>(6);

+ 70 - 10
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java

@@ -69,6 +69,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests;
 import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
 import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
+import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.JobManagerHolder;
 import org.elasticsearch.xpack.ml.job.JobManagerHolder;
 import org.junit.After;
 import org.junit.After;
@@ -279,7 +280,27 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                             trainedModel1.getModelId(),
                             trainedModel1.getModelId(),
                             new TrainedModelSizeStats(trainedModel1.getModelSize(), 0L),
                             new TrainedModelSizeStats(trainedModel1.getModelSize(), 0L),
-                            new IngestStats(new IngestStats.Stats(10L, 1L, 1000L, 100L), List.of(), Map.of()),
+                            new IngestStats(
+                                new IngestStats.Stats(0, 0, 0, 0),
+                                List.of(),
+                                Map.of(
+                                    "pipeline_1",
+                                    List.of(
+                                        new IngestStats.ProcessorStat(
+                                            InferenceProcessor.TYPE,
+                                            InferenceProcessor.TYPE,
+                                            new IngestStats.Stats(10, 1, 1000, 100)
+                                        ),
+                                        new IngestStats.ProcessorStat(
+                                            InferenceProcessor.TYPE,
+                                            InferenceProcessor.TYPE,
+                                            new IngestStats.Stats(20, 2, 2000, 200)
+                                        ),
+                                        // Adding a non inference processor that should be ignored
+                                        new IngestStats.ProcessorStat("grok", "grok", new IngestStats.Stats(100, 100, 100, 100))
+                                    )
+                                )
+                            ),
                             1,
                             1,
                             null,
                             null,
                             null
                             null
@@ -287,7 +308,20 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                             trainedModel2.getModelId(),
                             trainedModel2.getModelId(),
                             new TrainedModelSizeStats(trainedModel2.getModelSize(), 0L),
                             new TrainedModelSizeStats(trainedModel2.getModelSize(), 0L),
-                            new IngestStats(new IngestStats.Stats(20L, 2L, 2000L, 200L), List.of(), Map.of()),
+                            new IngestStats(
+                                new IngestStats.Stats(0, 0, 0, 0),
+                                List.of(),
+                                Map.of(
+                                    "pipeline_1",
+                                    List.of(
+                                        new IngestStats.ProcessorStat(
+                                            InferenceProcessor.TYPE,
+                                            InferenceProcessor.TYPE,
+                                            new IngestStats.Stats(30, 3, 3000, 300)
+                                        )
+                                    )
+                                )
+                            ),
                             2,
                             2,
                             null,
                             null,
                             null
                             null
@@ -295,7 +329,20 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                             trainedModel3.getModelId(),
                             trainedModel3.getModelId(),
                             new TrainedModelSizeStats(trainedModel3.getModelSize(), 0L),
                             new TrainedModelSizeStats(trainedModel3.getModelSize(), 0L),
-                            new IngestStats(new IngestStats.Stats(30L, 3L, 3000L, 300L), List.of(), Map.of()),
+                            new IngestStats(
+                                new IngestStats.Stats(0, 0, 0, 0),
+                                List.of(),
+                                Map.of(
+                                    "pipeline_2",
+                                    List.of(
+                                        new IngestStats.ProcessorStat(
+                                            InferenceProcessor.TYPE,
+                                            InferenceProcessor.TYPE,
+                                            new IngestStats.Stats(40, 4, 4000, 400)
+                                        )
+                                    )
+                                )
+                            ),
                             3,
                             3,
                             null,
                             null,
                             new AllocationStats("model_3", null, null, null, Instant.now(), List.of()).setState(AllocationState.STOPPING)
                             new AllocationStats("model_3", null, null, null, Instant.now(), List.of()).setState(AllocationState.STOPPING)
@@ -303,7 +350,20 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                         new GetTrainedModelsStatsAction.Response.TrainedModelStats(
                             trainedModel4.getModelId(),
                             trainedModel4.getModelId(),
                             new TrainedModelSizeStats(trainedModel4.getModelSize(), 0L),
                             new TrainedModelSizeStats(trainedModel4.getModelSize(), 0L),
-                            new IngestStats(new IngestStats.Stats(40L, 4L, 4000L, 400L), List.of(), Map.of()),
+                            new IngestStats(
+                                new IngestStats.Stats(0, 0, 0, 0),
+                                List.of(),
+                                Map.of(
+                                    "pipeline_3",
+                                    List.of(
+                                        new IngestStats.ProcessorStat(
+                                            InferenceProcessor.TYPE,
+                                            InferenceProcessor.TYPE,
+                                            new IngestStats.Stats(50, 5, 5000, 500)
+                                        )
+                                    )
+                                )
+                            ),
                             4,
                             4,
                             null,
                             null,
                             new AllocationStats(
                             new AllocationStats(
@@ -442,15 +502,15 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
             assertThat(source.getValue("inference.trained_models.count.other"), equalTo(1));
             assertThat(source.getValue("inference.trained_models.count.other"), equalTo(1));
 
 
             assertThat(source.getValue("inference.ingest_processors._all.pipelines.count"), equalTo(10));
             assertThat(source.getValue("inference.ingest_processors._all.pipelines.count"), equalTo(10));
-            assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.sum"), equalTo(100));
+            assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.sum"), equalTo(150));
             assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.min"), equalTo(10));
             assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.min"), equalTo(10));
-            assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.max"), equalTo(40));
-            assertThat(source.getValue("inference.ingest_processors._all.time_ms.sum"), equalTo(10));
+            assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.max"), equalTo(50));
+            assertThat(source.getValue("inference.ingest_processors._all.time_ms.sum"), equalTo(15));
             assertThat(source.getValue("inference.ingest_processors._all.time_ms.min"), equalTo(1));
             assertThat(source.getValue("inference.ingest_processors._all.time_ms.min"), equalTo(1));
-            assertThat(source.getValue("inference.ingest_processors._all.time_ms.max"), equalTo(4));
-            assertThat(source.getValue("inference.ingest_processors._all.num_failures.sum"), equalTo(1000));
+            assertThat(source.getValue("inference.ingest_processors._all.time_ms.max"), equalTo(5));
+            assertThat(source.getValue("inference.ingest_processors._all.num_failures.sum"), equalTo(1500));
             assertThat(source.getValue("inference.ingest_processors._all.num_failures.min"), equalTo(100));
             assertThat(source.getValue("inference.ingest_processors._all.num_failures.min"), equalTo(100));
-            assertThat(source.getValue("inference.ingest_processors._all.num_failures.max"), equalTo(400));
+            assertThat(source.getValue("inference.ingest_processors._all.num_failures.max"), equalTo(500));
             assertThat(source.getValue("inference.deployments.count"), equalTo(2));
             assertThat(source.getValue("inference.deployments.count"), equalTo(2));
             assertThat(source.getValue("inference.deployments.inference_counts.total"), equalTo(9.0));
             assertThat(source.getValue("inference.deployments.inference_counts.total"), equalTo(9.0));
             assertThat(source.getValue("inference.deployments.inference_counts.min"), equalTo(4.0));
             assertThat(source.getValue("inference.deployments.inference_counts.min"), equalTo(4.0));