Browse Source

[ML] Audit updates on data frame analytics jobs (#60126)

Closes #59652
Dimitris Athanasiou 5 years ago
parent
commit
55c0568e09

+ 19 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java

@@ -17,6 +17,8 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
 
 import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE;
 
@@ -152,6 +154,23 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
             || (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false);
     }
 
+    public Set<String> getUpdatedFields() {
+        Set<String> updatedFields = new TreeSet<>();
+        if (description != null) {
+            updatedFields.add(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName());
+        }
+        if (modelMemoryLimit != null) {
+            updatedFields.add(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName());
+        }
+        if (allowLazyStart != null) {
+            updatedFields.add(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName());
+        }
+        if (maxNumThreads != null) {
+            updatedFields.add(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName());
+        }
+        return updatedFields;
+    }
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -58,6 +58,7 @@ public final class Messages {
     public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
 
     public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATED = "Created analytics with analysis type [{0}]";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED = "Updated analytics settings: {0}";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";

+ 44 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java

@@ -16,6 +16,7 @@ import java.io.IOException;
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -197,6 +198,49 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
         assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
     }
 
+    public void testGetUpdatedFields_GivenAll() {
+        DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
+            .setDescription("new description")
+            .setModelMemoryLimit(new ByteSizeValue(1024))
+            .setAllowLazyStart(true)
+            .setMaxNumThreads(8)
+            .build();
+
+        assertThat(update.getUpdatedFields(), contains("allow_lazy_start", "description", "max_num_threads", "model_memory_limit"));
+    }
+
+    public void testGetUpdatedFields_GivenAllowLazyStart() {
+        DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
+            .setAllowLazyStart(false)
+            .build();
+
+        assertThat(update.getUpdatedFields(), contains("allow_lazy_start"));
+    }
+
+    public void testGetUpdatedFields_GivenDescription() {
+        DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
+            .setDescription("new description")
+            .build();
+
+        assertThat(update.getUpdatedFields(), contains("description"));
+    }
+
+    public void testGetUpdatedFields_GivenMaxNumThreads() {
+        DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
+            .setMaxNumThreads(3)
+            .build();
+
+        assertThat(update.getUpdatedFields(), contains("max_num_threads"));
+    }
+
+    public void testGetUpdatedFields_GivenModelMemoryLimit() {
+        DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
+            .setModelMemoryLimit(new ByteSizeValue(1024))
+            .build();
+
+        assertThat(update.getUpdatedFields(), contains("model_memory_limit"));
+    }
+
     private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) {
         return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription()))
             && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit()))

+ 3 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java

@@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
 import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
 import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
+import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.junit.Before;
 
 import java.util.ArrayList;
@@ -51,7 +52,8 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
 
     @Before
     public void createComponents() throws Exception {
-        configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry());
+        configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry(),
+            new DataFrameAnalyticsAuditor(client(), node().getNodeEnvironment().nodeId()));
         waitForMlTemplates();
     }
 

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -718,7 +718,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
         MemoryUsageEstimationProcessManager memoryEstimationProcessManager =
             new MemoryUsageEstimationProcessManager(
                 threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);
-        DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry);
+        DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry,
+            dataFrameAnalyticsAuditor);
         assert client instanceof NodeClient;
         DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client,
             dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor, indexNameExpressionResolver);

+ 12 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java

@@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
+import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -71,10 +72,12 @@ public class DataFrameAnalyticsConfigProvider {
 
     private final Client client;
     private final NamedXContentRegistry xContentRegistry;
+    private final DataFrameAnalyticsAuditor auditor;
 
-    public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
+    public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry, DataFrameAnalyticsAuditor auditor) {
         this.client = Objects.requireNonNull(client);
         this.xContentRegistry = xContentRegistry;
+        this.auditor = Objects.requireNonNull(auditor);
     }
 
     /**
@@ -98,6 +101,7 @@ public class DataFrameAnalyticsConfigProvider {
                        ClusterState clusterState,
                        ActionListener<DataFrameAnalyticsConfig> listener) {
         String id = update.getId();
+
         GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(id));
         executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(
             getResponse -> {
@@ -133,7 +137,13 @@ public class DataFrameAnalyticsConfigProvider {
                 DataFrameAnalyticsConfig updatedConfig = updatedConfigBuilder.build();
 
                 // Index the update config
-                index(updatedConfig, getResponse, listener);
+                index(updatedConfig, getResponse, ActionListener.wrap(
+                    indexedConfig -> {
+                        auditor.info(id, Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED, update.getUpdatedFields()));
+                        listener.onResponse(indexedConfig);
+                    },
+                    listener::onFailure
+                ));
             },
             listener::onFailure
         ));