Explorar o código

[ML-DataFrame] audit changes in the source index (#45282)

add audits when the set of source indexes changes and in a special case runs empty
Hendrik Muhs %!s(int64=6) %!d(string=hai) anos
pai
achega
384460fbc7

+ 4 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -175,7 +175,9 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
         }
         dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName()));
         dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
-        dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get()));
+        dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client,
+                                                                                          dataFrameTransformsConfigManager.get(),
+                                                                                          dataFrameAuditor.get()));
 
         return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get(), dataFrameTransformsCheckpointService.get());
     }
@@ -223,6 +225,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
                 settingsModule.getSettings()));
     }
 
+    @Override
     public List<Setting<?>> getSettings() {
         return Collections.singletonList(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING);
     }

+ 6 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
 import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
+import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
 /**
@@ -31,19 +32,21 @@ public class DataFrameTransformsCheckpointService {
 
     private final Client client;
     private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private final DataFrameAuditor dataFrameAuditor;
 
     public DataFrameTransformsCheckpointService(final Client client,
-            final DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
+            final DataFrameTransformsConfigManager dataFrameTransformsConfigManager, DataFrameAuditor dataFrameAuditor) {
         this.client = client;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.dataFrameAuditor = dataFrameAuditor;
     }
 
     public CheckpointProvider getCheckpointProvider(final DataFrameTransformConfig transformConfig) {
         if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) {
-            return new TimeBasedCheckpointProvider(client, dataFrameTransformsConfigManager, transformConfig);
+            return new TimeBasedCheckpointProvider(client, dataFrameTransformsConfigManager, dataFrameAuditor, transformConfig);
         }
 
-        return new DefaultCheckpointProvider(client, dataFrameTransformsConfigManager, transformConfig);
+        return new DefaultCheckpointProvider(client, dataFrameTransformsConfigManager, dataFrameAuditor, transformConfig);
     }
 
     /**

+ 42 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
@@ -25,9 +26,11 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
+import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +38,9 @@ import java.util.TreeMap;
 
 public class DefaultCheckpointProvider implements CheckpointProvider {
 
+    // threshold when to audit concrete index names, above this threshold we only report the number of changes
+    private static final int AUDIT_CONCRETED_SOURCE_INDEX_CHANGES = 10;
+
     /**
      * Builder for collecting checkpointing information for the purpose of _stats
      */
@@ -101,13 +107,16 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
 
     protected final Client client;
     protected final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    protected final DataFrameAuditor dataFrameAuditor;
     protected final DataFrameTransformConfig transformConfig;
 
     public DefaultCheckpointProvider(final Client client,
                                      final DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
+                                     final DataFrameAuditor dataFrameAuditor,
                                      final DataFrameTransformConfig transformConfig) {
         this.client = client;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.dataFrameAuditor = dataFrameAuditor;
         this.transformConfig = transformConfig;
     }
 
@@ -123,6 +132,9 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
         final long checkpoint = lastCheckpoint != null ? lastCheckpoint.getCheckpoint() + 1 : 1;
 
         getIndexCheckpoints(ActionListener.wrap(checkpointsByIndex -> {
+            reportSourceIndexChanges(lastCheckpoint != null ? lastCheckpoint.getIndicesCheckpoints().keySet() : Collections.emptySet(),
+                                     checkpointsByIndex.keySet());
+
             listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(), timestamp, checkpoint, checkpointsByIndex, 0L));
         }, listener::onFailure));
     }
@@ -299,4 +311,34 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
             getIndexCheckpoints(checkpointsByIndexListener);
         }
     }
+
+    /**
+     * Inspect source changes and report differences
+     *
+     * @param lastSourceIndexes the set of indexes seen in the previous checkpoint
+     * @param newSourceIndexes the set of indexes seen in the new checkpoint
+     */
+    void reportSourceIndexChanges(final Set<String> lastSourceIndexes, final Set<String> newSourceIndexes) {
+        // spam protection: only warn the first time
+        if (newSourceIndexes.isEmpty() && lastSourceIndexes.isEmpty() == false) {
+            String message = "Source did not resolve to any open indexes";
+            logger.warn("{} for transform [{}]", message, transformConfig.getId());
+            dataFrameAuditor.warning(transformConfig.getId(), message);
+        } else {
+            Set<String> removedIndexes = Sets.difference(lastSourceIndexes, newSourceIndexes);
+            Set<String> addedIndexes = Sets.difference(newSourceIndexes, lastSourceIndexes);
+
+            if (removedIndexes.size() + addedIndexes.size() > AUDIT_CONCRETED_SOURCE_INDEX_CHANGES) {
+                String message = "Source index resolve found more than " + AUDIT_CONCRETED_SOURCE_INDEX_CHANGES + " changes, ["
+                        + removedIndexes.size() + "] removed indexes, [" + addedIndexes.size() + "] new indexes";
+                logger.debug("{} for transform [{}]", message, transformConfig.getId());
+                dataFrameAuditor.info(transformConfig.getId(), message);
+            } else if (removedIndexes.size() + addedIndexes.size() > 0) {
+                String message = "Source index resolve found changes, removedIndexes: " + removedIndexes + ", new indexes: " + addedIndexes;
+                logger.debug("{} for transform [{}]", message, transformConfig.getId());
+                dataFrameAuditor.info(transformConfig.getId(), message);
+            }
+        }
+    }
+
 }

+ 3 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/TimeBasedCheckpointProvider.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
+import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
 public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider {
@@ -31,8 +32,9 @@ public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider {
 
     TimeBasedCheckpointProvider(final Client client,
                                 final DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
+                                final DataFrameAuditor dataFrameAuditor,
                                 final DataFrameTransformConfig transformConfig) {
-        super(client, dataFrameTransformsConfigManager, transformConfig);
+        super(client, dataFrameTransformsConfigManager, dataFrameAuditor, transformConfig);
         timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig();
     }
 

+ 5 - 1
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java

@@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests;
 import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
+import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.junit.After;
 import org.junit.Before;
@@ -123,7 +124,10 @@ public class DataFrameTransformCheckpointServiceNodeTests extends DataFrameSingl
 
         // use a mock for the checkpoint service
         mockClientForCheckpointing = new MockClientForCheckpointing(getTestName());
-        transformsCheckpointService = new DataFrameTransformsCheckpointService(mockClientForCheckpointing, transformsConfigManager);
+        DataFrameAuditor mockAuditor = mock(DataFrameAuditor.class);
+        transformsCheckpointService = new DataFrameTransformsCheckpointService(mockClientForCheckpointing,
+                                                                               transformsConfigManager,
+                                                                               mockAuditor);
     }
 
     @After

+ 191 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProviderTests.java

@@ -0,0 +1,191 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.checkpoint;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
+import org.elasticsearch.xpack.dataframe.notifications.MockDataFrameAuditor;
+import org.elasticsearch.xpack.dataframe.notifications.MockDataFrameAuditor.AuditExpectation;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.junit.Before;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.mockito.Mockito.mock;
+
+public class DefaultCheckpointProviderTests extends ESTestCase {
+
+    private Client client;
+
+    private MockDataFrameAuditor dataFrameAuditor;
+    private DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private Logger checkpointProviderlogger = LogManager.getLogger(DefaultCheckpointProvider.class);
+
+    @Before
+    public void setUpMocks() throws IllegalAccessException {
+        client = mock(Client.class);
+        dataFrameTransformsConfigManager = mock(DataFrameTransformsConfigManager.class);
+        dataFrameAuditor = new MockDataFrameAuditor();
+    }
+
+    public void testReportSourceIndexChangesRunsEmpty() throws Exception {
+        String transformId = getTestName();
+        DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId);
+
+        DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
+            client,
+            dataFrameTransformsConfigManager,
+            dataFrameAuditor,
+            transformConfig);
+
+        assertExpectation(
+            new MockLogAppender.SeenEventExpectation("warn when source is empty",
+                checkpointProviderlogger.getName(),
+                Level.WARN,
+                "Source did not resolve to any open indexes for transform [" + transformId + "]"),
+            new MockDataFrameAuditor.SeenAuditExpectation("warn when source is empty",
+                org.elasticsearch.xpack.core.common.notifications.Level.WARNING,
+                transformId,
+                "Source did not resolve to any open indexes"),
+            () -> {
+                    provider.reportSourceIndexChanges(Collections.singleton("index"), Collections.emptySet());
+                });
+
+        assertExpectation(
+            new MockLogAppender.UnseenEventExpectation("do not warn if empty again",
+                checkpointProviderlogger.getName(),
+                Level.WARN,
+                "Source did not resolve to any concrete indexes"),
+            new MockDataFrameAuditor.UnseenAuditExpectation("do not warn if empty again",
+                org.elasticsearch.xpack.core.common.notifications.Level.WARNING,
+                transformId,
+                "Source did not resolve to any concrete indexes"),
+            () -> {
+                    provider.reportSourceIndexChanges(Collections.emptySet(), Collections.emptySet());
+                });
+    }
+
+    public void testReportSourceIndexChangesAddDelete() throws Exception {
+        String transformId = getTestName();
+        DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId);
+
+        DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
+            client,
+            dataFrameTransformsConfigManager,
+            dataFrameAuditor,
+            transformConfig);
+
+        assertExpectation(
+            new MockLogAppender.SeenEventExpectation("info about adds/removal",
+                checkpointProviderlogger.getName(),
+                Level.DEBUG,
+                "Source index resolve found changes, removedIndexes: [index], new indexes: [other_index] for transform [" +
+                    transformId + "]"),
+            new MockDataFrameAuditor.SeenAuditExpectation("info about adds/removal",
+                org.elasticsearch.xpack.core.common.notifications.Level.INFO,
+                transformId,
+                "Source index resolve found changes, removedIndexes: [index], new indexes: [other_index]"),
+            () -> {
+                    provider.reportSourceIndexChanges(Collections.singleton("index"), Collections.singleton("other_index"));
+                });
+
+        assertExpectation(
+            new MockLogAppender.SeenEventExpectation("info about adds/removal",
+                checkpointProviderlogger.getName(),
+                Level.DEBUG,
+                "Source index resolve found changes, removedIndexes: [index], new indexes: [] for transform [" +
+                    transformId + "]"),
+            new MockDataFrameAuditor.SeenAuditExpectation("info about adds/removal",
+                org.elasticsearch.xpack.core.common.notifications.Level.INFO,
+                transformId,
+                "Source index resolve found changes, removedIndexes: [index], new indexes: []"),
+            () -> {
+                    provider.reportSourceIndexChanges(Sets.newHashSet("index", "other_index"), Collections.singleton("other_index"));
+                });
+        assertExpectation(
+            new MockLogAppender.SeenEventExpectation("info about adds/removal",
+                checkpointProviderlogger.getName(),
+                Level.DEBUG,
+                "Source index resolve found changes, removedIndexes: [], new indexes: [other_index] for transform [" +
+                    transformId + "]"),
+            new MockDataFrameAuditor.SeenAuditExpectation("info about adds/removal",
+                org.elasticsearch.xpack.core.common.notifications.Level.INFO,
+                transformId,
+                "Source index resolve found changes, removedIndexes: [], new indexes: [other_index]"),
+            () -> {
+                    provider.reportSourceIndexChanges(Collections.singleton("index"), Sets.newHashSet("index", "other_index"));
+                });
+    }
+
+    public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
+        String transformId = getTestName();
+        DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId);
+
+        DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
+            client,
+            dataFrameTransformsConfigManager,
+            dataFrameAuditor,
+            transformConfig);
+
+        HashSet<String> oldSet = new HashSet<>();
+        for (int i = 0; i < 100; ++i) {
+            oldSet.add(String.valueOf(i));
+        }
+        HashSet<String> newSet = new HashSet<>();
+        for (int i = 50; i < 150; ++i) {
+            newSet.add(String.valueOf(i));
+        }
+
+        assertExpectation(
+            new MockLogAppender.SeenEventExpectation("info about adds/removal",
+                checkpointProviderlogger.getName(),
+                Level.DEBUG,
+                "Source index resolve found more than 10 changes, [50] removed indexes, [50] new indexes for transform [" +
+                    transformId + "]"),
+            new MockDataFrameAuditor.SeenAuditExpectation("info about adds/removal",
+                org.elasticsearch.xpack.core.common.notifications.Level.INFO,
+                transformId,
+                "Source index resolve found more than 10 changes, [50] removed indexes, [50] new indexes"),
+            () -> {
+                    provider.reportSourceIndexChanges(oldSet, newSet);
+                });
+    }
+
+    private void assertExpectation(LoggingExpectation loggingExpectation,
+                                   AuditExpectation auditExpectation,
+                                   Runnable codeBlock) throws IllegalAccessException {
+        MockLogAppender mockLogAppender = new MockLogAppender();
+        mockLogAppender.start();
+
+        Loggers.setLevel(checkpointProviderlogger, Level.DEBUG);
+        mockLogAppender.addExpectation(loggingExpectation);
+
+        // always start fresh
+        dataFrameAuditor.reset();
+        dataFrameAuditor.addExpectation(auditExpectation);
+        try {
+            Loggers.addAppender(checkpointProviderlogger, mockLogAppender);
+            codeBlock.run();
+            mockLogAppender.assertAllExpectationsMatched();
+            dataFrameAuditor.assertAllExpectationsMatched();
+        } finally {
+            Loggers.removeAppender(checkpointProviderlogger, mockLogAppender);
+            mockLogAppender.stop();
+        }
+    }
+
+}

+ 138 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/notifications/MockDataFrameAuditor.java

@@ -0,0 +1,138 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.notifications;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.xpack.core.common.notifications.Level;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/*
+ * Test mock auditor to verify audit expectations.
+ *
+ * Shamelessly cop...inspired by {@link org.elasticsearch.test.MockLogAppender}
+ *
+ * TODO: ideally this would be a generalized  MockAuditor, but the current inheritance doesn't let us
+ */
+public class MockDataFrameAuditor extends DataFrameAuditor {
+
+    private List<AuditExpectation> expectations;
+
+    public MockDataFrameAuditor() {
+        super(mock(Client.class), "mock_node_name");
+        expectations = new CopyOnWriteArrayList<>();
+    }
+
+    public void addExpectation(AuditExpectation expectation) {
+        expectations.add(expectation);
+    }
+
+    // we can dynamically change the auditor, like attaching and removing the log appender
+    public void reset() {
+        expectations.clear();
+    }
+
+    @Override
+    public void info(String resourceId, String message) {
+        audit(Level.INFO, resourceId, message);
+    }
+
+    @Override
+    public void warning(String resourceId, String message) {
+        audit(Level.WARNING, resourceId, message);
+    }
+
+    @Override
+    public void error(String resourceId, String message) {
+        audit(Level.ERROR, resourceId, message);
+    }
+
+    public void assertAllExpectationsMatched() {
+        for (AuditExpectation expectation : expectations) {
+            expectation.assertMatched();
+        }
+    }
+
+    public interface AuditExpectation {
+        void match(Level level, String resourceId, String message);
+
+        void assertMatched();
+    }
+
+    public abstract static class AbstractAuditExpectation implements AuditExpectation {
+        protected final String expectedName;
+        protected final Level expectedLevel;
+        protected final String expectedResourceId;
+        protected final String expectedMessage;
+        volatile boolean saw;
+
+        public AbstractAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
+            this.expectedName = expectedName;
+            this.expectedLevel = expectedLevel;
+            this.expectedResourceId = expectedResourceId;
+            this.expectedMessage = expectedMessage;
+            this.saw = false;
+        }
+
+        @Override
+        public void match(final Level level, final String resourceId, final String message) {
+            if (level.equals(expectedLevel) && resourceId.equals(expectedResourceId) && innerMatch(level, resourceId, message)) {
+                if (Regex.isSimpleMatchPattern(expectedMessage)) {
+                    if (Regex.simpleMatch(expectedMessage, message)) {
+                        saw = true;
+                    }
+                } else {
+                    if (message.contains(expectedMessage)) {
+                        saw = true;
+                    }
+                }
+            }
+        }
+
+        public boolean innerMatch(final Level level, final String resourceId, final String message) {
+            return true;
+        }
+    }
+
+    public static class SeenAuditExpectation extends AbstractAuditExpectation {
+
+        public SeenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
+            super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
+        }
+
+        @Override
+        public void assertMatched() {
+            assertThat("expected to see " + expectedName + " but did not", saw, equalTo(true));
+        }
+    }
+
+    public static class UnseenAuditExpectation extends AbstractAuditExpectation {
+
+        public UnseenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
+            super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
+        }
+
+        @Override
+        public void assertMatched() {
+            assertThat("expected not to see " + expectedName + " but did", saw, equalTo(false));
+        }
+    }
+
+
+    private void audit(Level level, String resourceId, String message) {
+        for (AuditExpectation expectation : expectations) {
+            expectation.match(level, resourceId, message);
+        }
+    }
+
+}

+ 2 - 1
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java

@@ -98,9 +98,10 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
 
         ClusterState cs = csBuilder.build();
         Client client = mock(Client.class);
+        DataFrameAuditor mockAuditor = mock(DataFrameAuditor.class);
         DataFrameTransformsConfigManager transformsConfigManager = new DataFrameTransformsConfigManager(client, xContentRegistry());
         DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService = new DataFrameTransformsCheckpointService(client,
-            transformsConfigManager);
+            transformsConfigManager, mockAuditor);
         ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
             Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING));
         ClusterService clusterService = mock(ClusterService.class);