Browse Source

[ML] Adds basic notifications for trained model deployments (#88214)

For specific models:
  - deployment started
  - deployment stopped

System notifications when rebalance occurrs with reasons:
  - model deployment started
  - model deployment stopped
  - nodes changed
Dimitris Athanasiou 3 years ago
parent
commit
7e9a6fe37a
16 changed files with 295 additions and 58 deletions
  1. 8 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java
  2. 3 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java
  3. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/AnomalyDetectionAuditMessage.java
  4. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/DataFrameAnalyticsAuditMessage.java
  5. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/InferenceAuditMessage.java
  6. 41 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/SystemAuditMessage.java
  7. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java
  8. 6 5
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java
  9. 37 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/notifications/SystemAuditMessageTests.java
  10. 64 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/PyTorchModelIT.java
  11. 8 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  12. 8 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java
  13. 21 15
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopTrainedModelDeploymentAction.java
  14. 29 21
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java
  15. 53 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/SystemAuditor.java
  16. 5 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

+ 8 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.common.notifications;
 
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
@@ -17,6 +18,7 @@ import org.elasticsearch.xpack.core.common.time.TimeUtils;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Objects;
+import java.util.Optional;
 
 import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
@@ -69,7 +71,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
     private final Date timestamp;
     private final String nodeName;
 
-    protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
+    protected AbstractAuditMessage(@Nullable String resourceId, String message, Level level, Date timestamp, String nodeName) {
         this.resourceId = resourceId;
         this.message = Objects.requireNonNull(message);
         this.level = Objects.requireNonNull(level);
@@ -77,6 +79,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
         this.nodeName = nodeName;
     }
 
+    @Nullable
     public final String getResourceId() {
         return resourceId;
     }
@@ -100,8 +103,9 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startObject();
-        if (resourceId != null) {
-            builder.field(getResourceField(), resourceId);
+        Optional<String> resourceField = getResourceField();
+        if (resourceField.isPresent() && resourceId != null) {
+            builder.field(resourceField.get(), resourceId);
         }
 
         if (message.length() > MAX_AUDIT_MESSAGE_CHARS) {
@@ -154,7 +158,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
     /**
      * @return resource id field name used when storing a new message
      */
-    protected abstract String getResourceField();
+    protected abstract Optional<String> getResourceField();
 
     /**
      * Truncate the message and append {@value #TRUNCATED_SUFFIX} so

+ 3 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -129,6 +129,9 @@ public final class Messages {
         + "hyphens or underscores, must start and end with alphanumeric, and must be less than {1} characters.";
     public static final String INFERENCE_TAGS_AND_MODEL_IDS_UNIQUE = "The provided tags {0} must not match existing model_ids.";
     public static final String INFERENCE_MODEL_ID_AND_TAGS_UNIQUE = "The provided model_id {0} must not match existing tags.";
+    public static final String INFERENCE_DEPLOYMENT_STARTED = "Started deployment";
+    public static final String INFERENCE_DEPLOYMENT_STOPPED = "Stopped deployment";
+    public static final String INFERENCE_DEPLOYMENT_REBALANCED = "Rebalanced trained model allocations because [{0}]";
 
     public static final String INVALID_MODEL_ALIAS = "Invalid model_alias; ''{0}'' can contain lowercase alphanumeric (a-z and 0-9), "
         + "hyphens or underscores; must start with alphanumeric and cannot end with numbers";

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/AnomalyDetectionAuditMessage.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.util.Date;
+import java.util.Optional;
 
 public class AnomalyDetectionAuditMessage extends AbstractAuditMessage {
 
@@ -33,7 +34,7 @@ public class AnomalyDetectionAuditMessage extends AbstractAuditMessage {
     }
 
     @Override
-    protected String getResourceField() {
-        return JOB_ID.getPreferredName();
+    protected Optional<String> getResourceField() {
+        return Optional.of(JOB_ID.getPreferredName());
     }
 }

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/DataFrameAnalyticsAuditMessage.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.util.Date;
+import java.util.Optional;
 
 public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {
 
@@ -33,7 +34,7 @@ public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {
     }
 
     @Override
-    protected String getResourceField() {
-        return JOB_ID.getPreferredName();
+    protected Optional<String> getResourceField() {
+        return Optional.of(JOB_ID.getPreferredName());
     }
 }

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/InferenceAuditMessage.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.util.Date;
+import java.util.Optional;
 
 public class InferenceAuditMessage extends AbstractAuditMessage {
 
@@ -34,7 +35,7 @@ public class InferenceAuditMessage extends AbstractAuditMessage {
     }
 
     @Override
-    protected String getResourceField() {
-        return JOB_ID.getPreferredName();
+    protected Optional<String> getResourceField() {
+        return Optional.of(JOB_ID.getPreferredName());
     }
 }

+ 41 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/SystemAuditMessage.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.notifications;
+
+import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
+import org.elasticsearch.xpack.core.common.notifications.Level;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+
+import java.util.Date;
+import java.util.Optional;
+
+public class SystemAuditMessage extends AbstractAuditMessage {
+
+    private static final String SYSTEM = "system";
+
+    public static final ConstructingObjectParser<SystemAuditMessage, Void> PARSER = createParser(
+        "ml_system_audit_message",
+        (resourceId, message, level, timestamp, nodeName) -> new SystemAuditMessage(message, level, timestamp, nodeName),
+        Job.ID
+    );
+
+    public SystemAuditMessage(String message, Level level, Date timestamp, String nodeName) {
+        super(null, message, level, timestamp, nodeName);
+    }
+
+    @Override
+    public String getJobType() {
+        return SYSTEM;
+    }
+
+    @Override
+    protected Optional<String> getResourceField() {
+        return Optional.empty();
+    }
+}

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level;
 import org.elasticsearch.xpack.core.transform.TransformField;
 
 import java.util.Date;
+import java.util.Optional;
 
 public class TransformAuditMessage extends AbstractAuditMessage {
 
@@ -33,7 +34,7 @@ public class TransformAuditMessage extends AbstractAuditMessage {
     }
 
     @Override
-    protected String getResourceField() {
-        return TRANSFORM_ID.getPreferredName();
+    protected Optional<String> getResourceField() {
+        return Optional.of(TRANSFORM_ID.getPreferredName());
     }
 }

+ 6 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.Optional;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -43,8 +44,8 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
         }
 
         @Override
-        protected String getResourceField() {
-            return TEST_ID.getPreferredName();
+        protected Optional<String> getResourceField() {
+            return Optional.of(TEST_ID.getPreferredName());
         }
     }
 
@@ -55,7 +56,7 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
 
     public void testGetResourceField() {
         TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
-        assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
+        assertThat(message.getResourceField().get(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
     }
 
     public void testGetJobType() {
@@ -104,8 +105,8 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
             }
 
             @Override
-            protected String getResourceField() {
-                return "unused";
+            protected Optional<String> getResourceField() {
+                return Optional.of("unused");
             }
         };
 

+ 37 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/notifications/SystemAuditMessageTests.java

@@ -0,0 +1,37 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.notifications;
+
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.common.notifications.Level;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class SystemAuditMessageTests extends AuditMessageTests<SystemAuditMessage> {
+
+    @Override
+    protected SystemAuditMessage createTestInstance() {
+        return new SystemAuditMessage(
+            randomAlphaOfLengthBetween(1, 20),
+            randomFrom(Level.values()),
+            new Date(),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
+        );
+    }
+
+    @Override
+    protected SystemAuditMessage doParseInstance(XContentParser parser) throws IOException {
+        return SystemAuditMessage.PARSER.apply(parser, null);
+    }
+
+    @Override
+    public String getJobType() {
+        return "system";
+    }
+}

+ 64 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/PyTorchModelIT.java

@@ -725,6 +725,27 @@ public class PyTorchModelIT extends ESRestTestCase {
         stopDeployment(modelId2);
     }
 
+    public void testNotifications() throws IOException {
+        String modelId1 = "test_notifications_1";
+        createTrainedModel(modelId1);
+        putModelDefinition(modelId1);
+        putVocabulary(List.of("these", "are", "my", "words"), modelId1);
+        startDeployment(modelId1);
+
+        String modelId2 = "test_notifications_2";
+        createTrainedModel(modelId2);
+        putModelDefinition(modelId2);
+        putVocabulary(List.of("these", "are", "my", "words"), modelId2);
+        startDeployment(modelId2);
+
+        stopDeployment(modelId1);
+        stopDeployment(modelId2);
+
+        assertNotificationsContain(modelId1, "Started deployment", "Stopped deployment");
+        assertNotificationsContain(modelId2, "Started deployment", "Stopped deployment");
+        assertSystemNotificationsContain("Rebalanced trained model allocations because [model deployment started]");
+    }
+
     @SuppressWarnings("unchecked")
     private void assertAllocationCount(String modelId, int expectedAllocationCount) throws IOException {
         Response response = getTrainedModelStats(modelId);
@@ -875,4 +896,47 @@ public class PyTorchModelIT extends ESRestTestCase {
         getTrainedModelAssignmentMetadataResponse = client().performRequest(getTrainedModelAssignmentMetadataRequest);
         assertThat(EntityUtils.toString(getTrainedModelAssignmentMetadataResponse.getEntity()), equalTo("{}"));
     }
+
+    private void assertNotificationsContain(String modelId, String... auditMessages) throws IOException {
+        client().performRequest(new Request("POST", ".ml-notifications-*/_refresh"));
+        Request search = new Request("POST", ".ml-notifications-*/_search");
+        search.setJsonEntity("""
+            {
+                "size": 100,
+                "query": {
+                  "bool": {
+                    "filter": [
+                      {"term": {"job_id": "%s"}},
+                      {"term": {"job_type": "inference"}}
+                    ]
+                  }
+                }
+            }
+            """.formatted(modelId));
+        String response = EntityUtils.toString(client().performRequest(search).getEntity());
+        for (String msg : auditMessages) {
+            assertThat(response, containsString(msg));
+        }
+    }
+
+    private void assertSystemNotificationsContain(String... auditMessages) throws IOException {
+        client().performRequest(new Request("POST", ".ml-notifications-*/_refresh"));
+        Request search = new Request("POST", ".ml-notifications-*/_search");
+        search.setJsonEntity("""
+            {
+                "size": 100,
+                "query": {
+                  "bool": {
+                    "filter": [
+                      {"term": {"job_type": "system"}}
+                    ]
+                  }
+                }
+            }
+            """);
+        String response = EntityUtils.toString(client().performRequest(search).getEntity());
+        for (String msg : auditMessages) {
+            assertThat(response, containsString(msg));
+        }
+    }
 }

+ 8 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -343,6 +343,7 @@ import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
+import org.elasticsearch.xpack.ml.notifications.SystemAuditor;
 import org.elasticsearch.xpack.ml.process.DummyController;
 import org.elasticsearch.xpack.ml.process.MlController;
 import org.elasticsearch.xpack.ml.process.MlControllerHolder;
@@ -1070,7 +1071,13 @@ public class MachineLearning extends Plugin
             threadPool
         );
         trainedModelAllocationClusterServiceSetOnce.set(
-            new TrainedModelAssignmentClusterService(settings, clusterService, threadPool, new NodeLoadDetector(memoryTracker))
+            new TrainedModelAssignmentClusterService(
+                settings,
+                clusterService,
+                threadPool,
+                new NodeLoadDetector(memoryTracker),
+                new SystemAuditor(client, clusterService)
+            )
         );
 
         mlAutoscalingDeciderService.set(

+ 8 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java

@@ -63,6 +63,7 @@ import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentSer
 import org.elasticsearch.xpack.ml.inference.persistence.ChunkedTrainedModelRestorer;
 import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelDefinitionDoc;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
+import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 
 import java.util.Collections;
@@ -91,6 +92,7 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
     private final TrainedModelAssignmentService trainedModelAssignmentService;
     private final NamedXContentRegistry xContentRegistry;
     private final MlMemoryTracker memoryTracker;
+    private final InferenceAuditor auditor;
     protected volatile int maxLazyMLNodes;
     protected volatile long maxMLNodeSize;
 
@@ -106,7 +108,8 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
         Settings settings,
         TrainedModelAssignmentService trainedModelAssignmentService,
         NamedXContentRegistry xContentRegistry,
-        MlMemoryTracker memoryTracker
+        MlMemoryTracker memoryTracker,
+        InferenceAuditor auditor
     ) {
         super(
             StartTrainedModelDeploymentAction.NAME,
@@ -124,6 +127,7 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
         this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
         this.memoryTracker = Objects.requireNonNull(memoryTracker);
         this.trainedModelAssignmentService = Objects.requireNonNull(trainedModelAssignmentService);
+        this.auditor = Objects.requireNonNull(auditor);
         this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
         this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
         clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
@@ -280,11 +284,12 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
             timeout,
             new TrainedModelAssignmentService.WaitForAssignmentListener() {
                 @Override
-                public void onResponse(TrainedModelAssignment allocation) {
+                public void onResponse(TrainedModelAssignment assignment) {
                     if (predicate.exception != null) {
                         deleteFailedDeployment(modelId, predicate.exception, listener);
                     } else {
-                        listener.onResponse(new CreateTrainedModelAssignmentAction.Response(allocation));
+                        auditor.info(assignment.getModelId(), Messages.INFERENCE_DEPLOYMENT_STARTED);
+                        listener.onResponse(new CreateTrainedModelAssignmentAction.Response(assignment));
                     }
                 }
 

+ 21 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopTrainedModelDeploymentAction.java

@@ -35,13 +35,16 @@ import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction;
 import org.elasticsearch.xpack.core.ml.action.StopTrainedModelDeploymentAction;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
 import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentClusterService;
 import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
 import org.elasticsearch.xpack.ml.inference.deployment.TrainedModelDeploymentTask;
+import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
@@ -66,6 +69,7 @@ public class TransportStopTrainedModelDeploymentAction extends TransportTasksAct
     private final Client client;
     private final IngestService ingestService;
     private final TrainedModelAssignmentClusterService trainedModelAssignmentClusterService;
+    private final InferenceAuditor auditor;
 
     @Inject
     public TransportStopTrainedModelDeploymentAction(
@@ -74,7 +78,8 @@ public class TransportStopTrainedModelDeploymentAction extends TransportTasksAct
         ActionFilters actionFilters,
         Client client,
         IngestService ingestService,
-        TrainedModelAssignmentClusterService trainedModelAssignmentClusterService
+        TrainedModelAssignmentClusterService trainedModelAssignmentClusterService,
+        InferenceAuditor auditor
     ) {
         super(
             StopTrainedModelDeploymentAction.NAME,
@@ -89,6 +94,7 @@ public class TransportStopTrainedModelDeploymentAction extends TransportTasksAct
         this.client = new OriginSettingClient(client, ML_ORIGIN);
         this.ingestService = ingestService;
         this.trainedModelAssignmentClusterService = trainedModelAssignmentClusterService;
+        this.auditor = Objects.requireNonNull(auditor);
     }
 
     @Override
@@ -192,21 +198,21 @@ public class TransportStopTrainedModelDeploymentAction extends TransportTasksAct
         request.setNodes(modelAssignment.getNodeRoutingTable().keySet().toArray(String[]::new));
         ActionListener<StopTrainedModelDeploymentAction.Response> finalListener = ActionListener.wrap(r -> {
             assert clusterService.localNode().isMasterNode();
-            trainedModelAssignmentClusterService.removeModelAssignment(
-                modelId,
-                ActionListener.wrap(deleted -> listener.onResponse(r), deletionFailed -> {
-                    logger.error(
-                        () -> format("[%s] failed to delete model assignment after nodes unallocated the deployment", modelId),
+            trainedModelAssignmentClusterService.removeModelAssignment(modelId, ActionListener.wrap(deleted -> {
+                auditor.info(modelId, Messages.INFERENCE_DEPLOYMENT_STOPPED);
+                listener.onResponse(r);
+            }, deletionFailed -> {
+                logger.error(
+                    () -> format("[%s] failed to delete model assignment after nodes unallocated the deployment", modelId),
+                    deletionFailed
+                );
+                listener.onFailure(
+                    ExceptionsHelper.serverError(
+                        "failed to delete model assignment after nodes unallocated the deployment. Attempt to stop again",
                         deletionFailed
-                    );
-                    listener.onFailure(
-                        ExceptionsHelper.serverError(
-                            "failed to delete model assignment after nodes unallocated the deployment. Attempt to stop again",
-                            deletionFailed
-                        )
-                    );
-                })
-            );
+                    )
+                );
+            }));
         }, e -> {
             if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
                 // A node has dropped out of the cluster since we started executing the requests.

+ 29 - 21
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

@@ -37,14 +37,17 @@ import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
 import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.NodeLoad;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
+import org.elasticsearch.xpack.ml.notifications.SystemAuditor;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
@@ -62,6 +65,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
     private final ClusterService clusterService;
     private final ThreadPool threadPool;
     private final NodeLoadDetector nodeLoadDetector;
+    private final SystemAuditor systemAuditor;
     private volatile int maxMemoryPercentage;
     private volatile boolean useAuto;
     private volatile int maxOpenJobs;
@@ -70,11 +74,13 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
         Settings settings,
         ClusterService clusterService,
         ThreadPool threadPool,
-        NodeLoadDetector nodeLoadDetector
+        NodeLoadDetector nodeLoadDetector,
+        SystemAuditor systemAuditor
     ) {
-        this.clusterService = clusterService;
-        this.threadPool = threadPool;
-        this.nodeLoadDetector = nodeLoadDetector;
+        this.clusterService = Objects.requireNonNull(clusterService);
+        this.threadPool = Objects.requireNonNull(threadPool);
+        this.nodeLoadDetector = Objects.requireNonNull(nodeLoadDetector);
+        this.systemAuditor = Objects.requireNonNull(systemAuditor);
         this.maxMemoryPercentage = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
         this.useAuto = MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings);
         this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
@@ -272,21 +278,16 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
             return;
         }
 
-        rebalanceAssignments(
-            clusterService.state(),
-            Optional.of(params),
-            "model [" + params.getModelId() + "] started",
-            ActionListener.wrap(newMetadata -> {
-                TrainedModelAssignment assignment = newMetadata.getModelAssignment(params.getModelId());
-                if (assignment == null) {
-                    // If we could not allocate the model anywhere then it is possible the assignment
-                    // here is null. We should notify the listener of an empty assignment as the
-                    // handling of this is done elsewhere with the wait-to-start predicate.
-                    assignment = TrainedModelAssignment.Builder.empty(params).build();
-                }
-                listener.onResponse(assignment);
-            }, listener::onFailure)
-        );
+        rebalanceAssignments(clusterService.state(), Optional.of(params), "model deployment started", ActionListener.wrap(newMetadata -> {
+            TrainedModelAssignment assignment = newMetadata.getModelAssignment(params.getModelId());
+            if (assignment == null) {
+                // If we could not allocate the model anywhere then it is possible the assignment
+                // here is null. We should notify the listener of an empty assignment as the
+                // handling of this is done elsewhere with the wait-to-start predicate.
+                assignment = TrainedModelAssignment.Builder.empty(params).build();
+            }
+            listener.onResponse(assignment);
+        }, listener::onFailure));
     }
 
     public void setModelAssignmentToStopping(String modelId, ActionListener<AcknowledgedResponse> listener) {
@@ -327,7 +328,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
                 rebalanceAssignments(
                     newState,
                     Optional.empty(),
-                    "deployment for model [" + modelId + "] stopped",
+                    "model deployment stopped",
                     ActionListener.wrap(
                         metadataAfterRebalance -> logger.debug(
                             () -> format("Successfully rebalanced model deployments after deployment for model [%s] was stopped", modelId)
@@ -408,13 +409,16 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
             submitUnbatchedTask(reason, new ClusterStateUpdateTask() {
 
                 private volatile boolean isUpdated;
+                private volatile boolean isChanged;
 
                 @Override
                 public ClusterState execute(ClusterState currentState) {
 
                     if (areClusterStatesCompatibleForRebalance(clusterState, currentState)) {
                         isUpdated = true;
-                        return update(currentState, rebalancedMetadata);
+                        ClusterState updatedState = update(currentState, rebalancedMetadata);
+                        isChanged = updatedState != currentState;
+                        return updatedState;
                     }
                     rebalanceAssignments(currentState, modelToAdd, reason, listener);
                     return currentState;
@@ -428,6 +432,10 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
                 @Override
                 public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
                     if (isUpdated) {
+                        if (isChanged) {
+                            threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
+                                .execute(() -> systemAuditor.info(Messages.getMessage(Messages.INFERENCE_DEPLOYMENT_REBALANCED, reason)));
+                        }
                         listener.onResponse(TrainedModelAssignmentMetadata.fromState(newState));
                     }
                 }

+ 53 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/SystemAuditor.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.notifications;
+
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.xpack.core.ml.notifications.SystemAuditMessage;
+
+public class SystemAuditor extends AbstractMlAuditor<SystemAuditMessage> {
+
+    public SystemAuditor(Client client, ClusterService clusterService) {
+        super(
+            client,
+            (resourceId, message, level, timestamp, nodeName) -> new SystemAuditMessage(message, level, timestamp, nodeName),
+            clusterService
+        );
+    }
+
+    public void info(String message) {
+        info(null, message);
+    }
+
+    public void warning(String message) {
+        warning(null, message);
+    }
+
+    public void error(String message) {
+        error(null, message);
+    }
+
+    @Override
+    public void info(String resourceId, String message) {
+        assert resourceId == null;
+        super.info(null, message);
+    }
+
+    @Override
+    public void warning(String resourceId, String message) {
+        assert resourceId == null;
+        super.info(null, message);
+    }
+
+    @Override
+    public void error(String resourceId, String message) {
+        assert resourceId == null;
+        super.info(null, message);
+    }
+}

+ 5 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

@@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingStateAndReaso
 import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
+import org.elasticsearch.xpack.ml.notifications.SystemAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 import org.junit.Before;
 
@@ -71,6 +72,7 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
     private ClusterService clusterService;
     private ThreadPool threadPool;
     private NodeLoadDetector nodeLoadDetector;
+    private SystemAuditor systemAuditor;
 
     @Before
     public void setupObjects() {
@@ -90,6 +92,8 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
         MlMemoryTracker memoryTracker = mock(MlMemoryTracker.class);
         when(memoryTracker.isRecentlyRefreshed()).thenReturn(true);
         nodeLoadDetector = new NodeLoadDetector(memoryTracker);
+
+        systemAuditor = mock(SystemAuditor.class);
     }
 
     public void testUpdateModelRoutingTable() {
@@ -1127,7 +1131,7 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
     }
 
     private TrainedModelAssignmentClusterService createClusterService() {
-        return new TrainedModelAssignmentClusterService(Settings.EMPTY, clusterService, threadPool, nodeLoadDetector);
+        return new TrainedModelAssignmentClusterService(Settings.EMPTY, clusterService, threadPool, nodeLoadDetector, systemAuditor);
     }
 
     private static DiscoveryNode buildNode(String name, boolean isML, long nativeMemory, int allocatedProcessors) {