Sfoglia il codice sorgente

[ML] add auditor to data frame plugin (#40012)

* [Data Frame] add auditor

* Adjusting Level, Auditor, and message to address pr comments

* Addressing PR comments
Benjamin Trent 6 anni fa
parent
commit
3b6d42d479
16 ha cambiato i file con 833 aggiunte e 17 eliminazioni
  1. 122 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java
  2. 85 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Auditor.java
  3. 28 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java
  4. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java
  5. 72 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java
  6. 10 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java
  7. 126 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java
  8. 96 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AuditorTests.java
  9. 34 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java
  10. 73 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java
  11. 2 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java
  12. 76 0
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java
  13. 18 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java
  14. 57 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java
  15. 12 5
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java
  16. 21 7
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

+ 122 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Objects;
+
+public abstract class AbstractAuditMessage implements ToXContentObject {
+    public static final ParseField TYPE = new ParseField("audit_message");
+
+    public static final ParseField MESSAGE = new ParseField("message");
+    public static final ParseField LEVEL = new ParseField("level");
+    public static final ParseField TIMESTAMP = new ParseField("timestamp");
+    public static final ParseField NODE_NAME = new ParseField("node_name");
+
+    private final String resourceId;
+    private final String message;
+    private final Level level;
+    private final Date timestamp;
+    private final String nodeName;
+
+    public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) {
+        this.resourceId = resourceId;
+        this.message = Objects.requireNonNull(message);
+        this.level = Objects.requireNonNull(level);
+        this.timestamp = new Date();
+        this.nodeName = nodeName;
+    }
+
+    protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
+        this.resourceId = resourceId;
+        this.message = Objects.requireNonNull(message);
+        this.level = Objects.requireNonNull(level);
+        this.timestamp = Objects.requireNonNull(timestamp);
+        this.nodeName = nodeName;
+    }
+
+    public final String getResourceId() {
+        return resourceId;
+    }
+
+    public final String getMessage() {
+        return message;
+    }
+
+    public final Level getLevel() {
+        return level;
+    }
+
+    public final Date getTimestamp() {
+        return timestamp;
+    }
+
+    public final String getNodeName() {
+        return nodeName;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.startObject();
+        if (resourceId != null) {
+            builder.field(getResourceField(), resourceId);
+        }
+        builder.field(MESSAGE.getPreferredName(), message);
+        builder.field(LEVEL.getPreferredName(), level);
+        builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
+        if (nodeName != null) {
+            builder.field(NODE_NAME.getPreferredName(), nodeName);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceId, message, level, timestamp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj instanceof AbstractAuditMessage == false) {
+            return false;
+        }
+
+        AbstractAuditMessage other = (AbstractAuditMessage) obj;
+        return Objects.equals(resourceId, other.resourceId) &&
+            Objects.equals(message, other.message) &&
+            Objects.equals(level, other.level) &&
+            Objects.equals(timestamp, other.timestamp);
+    }
+
+    protected abstract String getResourceField();
+
+    public abstract static class AbstractBuilder<T extends AbstractAuditMessage> {
+
+        public T info(String resourceId, String message, String nodeName) {
+            return newMessage(Level.INFO, resourceId, message, nodeName);
+        }
+
+        public T warning(String resourceId, String message, String nodeName) {
+            return newMessage(Level.WARNING, resourceId, message, nodeName);
+        }
+
+        public T error(String resourceId, String message, String nodeName) {
+            return newMessage(Level.ERROR, resourceId, message, nodeName);
+        }
+
+        protected abstract T newMessage(Level level, String resourceId, String message, String nodeName);
+    }
+}

+ 85 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Auditor.java

@@ -0,0 +1,85 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
+public class Auditor<T extends AbstractAuditMessage> {
+
+    private static final Logger logger = LogManager.getLogger(Auditor.class);
+    private final Client client;
+    private final String nodeName;
+    private final String auditIndex;
+    private final String executionOrigin;
+    private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder;
+
+    public Auditor(Client client,
+                   String nodeName,
+                   String auditIndex,
+                   String executionOrigin,
+                   AbstractAuditMessage.AbstractBuilder<T> messageBuilder) {
+        this.client = Objects.requireNonNull(client);
+        this.nodeName = Objects.requireNonNull(nodeName);
+        this.auditIndex = auditIndex;
+        this.executionOrigin = executionOrigin;
+        this.messageBuilder = Objects.requireNonNull(messageBuilder);
+    }
+
+    public final void info(String resourceId, String message) {
+        indexDoc(messageBuilder.info(resourceId, message, nodeName));
+    }
+
+    public final void warning(String resourceId, String message) {
+        indexDoc(messageBuilder.warning(resourceId, message, nodeName));
+    }
+
+    public final void error(String resourceId, String message) {
+        indexDoc(messageBuilder.error(resourceId, message, nodeName));
+    }
+
+    protected void onIndexResponse(IndexResponse response) {
+        logger.trace("Successfully wrote audit message");
+    }
+
+    protected void onIndexFailure(Exception exception) {
+        logger.debug("Failed to write audit message", exception);
+    }
+
+    private void indexDoc(ToXContent toXContent) {
+        IndexRequest indexRequest = new IndexRequest(auditIndex);
+        indexRequest.source(toXContentBuilder(toXContent));
+        indexRequest.timeout(TimeValue.timeValueSeconds(5));
+        executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+            executionOrigin,
+            indexRequest,
+            ActionListener.wrap(
+                this::onIndexResponse,
+                this::onIndexFailure
+            ), client::index);
+    }
+
+    private XContentBuilder toXContentBuilder(ToXContent toXContent) {
+        try (XContentBuilder jsonBuilder = jsonBuilder()) {
+            return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 28 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java

@@ -0,0 +1,28 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import java.util.Locale;
+
+public enum Level {
+    INFO, WARNING, ERROR;
+
+    /**
+     * Case-insensitive from string method.
+     *
+     * @param value
+     *            String representation
+     * @return The condition type
+     */
+    public static Level fromString(String value) {
+        return Level.valueOf(value.toUpperCase(Locale.ROOT));
+    }
+
+    @Override
+    public String toString() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+}

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

@@ -32,6 +32,7 @@ public final class DataFrameField {
     public static final String REST_BASE_PATH = "/_data_frame/";
     public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
     public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";
+    public static final String DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD = "transform_id";
 
     // note: this is used to match tasks
     public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

+ 72 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java

@@ -0,0 +1,72 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.dataframe.notifications;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
+import org.elasticsearch.xpack.core.common.notifications.Level;
+import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
+
+import java.util.Date;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
+
+public class DataFrameAuditMessage extends AbstractAuditMessage {
+
+    private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD);
+    public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
+        "data_frame_audit_message",
+        true,
+        a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
+
+    static {
+        PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID);
+        PARSER.declareString(constructorArg(), MESSAGE);
+        PARSER.declareField(constructorArg(), p -> {
+            if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                return Level.fromString(p.text());
+            }
+            throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+        }, LEVEL, ObjectParser.ValueType.STRING);
+        PARSER.declareField(constructorArg(), parser -> {
+            if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+                return new Date(parser.longValue());
+            } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
+                return new Date(TimeUtils.dateStringToEpoch(parser.text()));
+            }
+            throw new IllegalArgumentException(
+                "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
+        }, TIMESTAMP, ObjectParser.ValueType.VALUE);
+        PARSER.declareString(optionalConstructorArg(), NODE_NAME);
+    }
+
+    public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) {
+        super(resourceId, message, level, nodeName);
+    }
+
+    protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
+        super(resourceId, message, level, timestamp, nodeName);
+    }
+
+    @Override
+    protected String getResourceField() {
+        return TRANSFORM_ID.getPreferredName();
+    }
+
+    public static AbstractAuditMessage.AbstractBuilder<DataFrameAuditMessage> builder() {
+        return new AbstractBuilder<DataFrameAuditMessage>() {
+            @Override
+            protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
+                return new DataFrameAuditMessage(resourceId, message, level, nodeName);
+            }
+        };
+    }
+}

+ 10 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java

@@ -158,10 +158,18 @@ public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListene
                         null, MetadataUtils.DEFAULT_RESERVED_METADATA))
                 .put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin",
                         new String[] { "manage_data_frame_transforms" },
-                        null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
+                        new RoleDescriptor.IndicesPrivileges[]{
+                            RoleDescriptor.IndicesPrivileges.builder()
+                                .indices(".data-frame-notifications*")
+                                .privileges("view_index_metadata", "read").build()
+                        }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
                 .put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user",
                         new String[] { "monitor_data_frame_transforms" },
-                        null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
+                        new RoleDescriptor.IndicesPrivileges[]{
+                            RoleDescriptor.IndicesPrivileges.builder()
+                                .indices(".data-frame-notifications*")
+                                .privileges("view_index_metadata", "read").build()
+                        }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
                 .put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" },
                         new RoleDescriptor.IndicesPrivileges[] {
                                 RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME,

+ 126 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java

@@ -0,0 +1,126 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
+import org.junit.Before;
+
+import java.util.Date;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class AbstractAuditMessageTests extends AbstractXContentTestCase<AbstractAuditMessageTests.TestAuditMessage> {
+    private long startMillis;
+
+    static class TestAuditMessage extends AbstractAuditMessage {
+        private static final ParseField ID = new ParseField("test_id");
+        public static final ConstructingObjectParser<TestAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
+            AbstractAuditMessage.TYPE.getPreferredName(),
+            true,
+            a -> new TestAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
+
+        static {
+            PARSER.declareString(optionalConstructorArg(), ID);
+            PARSER.declareString(constructorArg(), MESSAGE);
+            PARSER.declareField(constructorArg(), p -> {
+                if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                    return Level.fromString(p.text());
+                }
+                throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+            }, LEVEL, ObjectParser.ValueType.STRING);
+            PARSER.declareField(constructorArg(), parser -> {
+                if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+                    return new Date(parser.longValue());
+                } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
+                    return new Date(TimeUtils.dateStringToEpoch(parser.text()));
+                }
+                throw new IllegalArgumentException(
+                    "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
+            }, TIMESTAMP, ObjectParser.ValueType.VALUE);
+            PARSER.declareString(optionalConstructorArg(), NODE_NAME);
+        }
+
+        TestAuditMessage(String resourceId, String message, Level level, String nodeName) {
+            super(resourceId, message, level, nodeName);
+        }
+
+        TestAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
+            super(resourceId, message, level, timestamp, nodeName);
+        }
+
+        @Override
+        protected String getResourceField() {
+            return "test_id";
+        }
+
+        static AbstractAuditMessage.AbstractBuilder<TestAuditMessage> newBuilder() {
+            return new AbstractBuilder<TestAuditMessage>() {
+                @Override
+                protected TestAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
+                    return new TestAuditMessage(resourceId, message, level, nodeName);
+                }
+            };
+        }
+    }
+
+    @Before
+    public void setStartTime() {
+        startMillis = System.currentTimeMillis();
+    }
+
+    public void testNewInfo() {
+        TestAuditMessage info = TestAuditMessage.newBuilder().info("foo", "some info", "some_node");
+        assertEquals("foo", info.getResourceId());
+        assertEquals("some info", info.getMessage());
+        assertEquals(Level.INFO, info.getLevel());
+        assertDateBetweenStartAndNow(info.getTimestamp());
+    }
+
+    public void testNewWarning() {
+        TestAuditMessage warning = TestAuditMessage.newBuilder().warning("bar", "some warning", "some_node");
+        assertEquals("bar", warning.getResourceId());
+        assertEquals("some warning", warning.getMessage());
+        assertEquals(Level.WARNING, warning.getLevel());
+        assertDateBetweenStartAndNow(warning.getTimestamp());
+    }
+
+
+    public void testNewError() {
+        TestAuditMessage error = TestAuditMessage.newBuilder().error("foo", "some error", "some_node");
+        assertEquals("foo", error.getResourceId());
+        assertEquals("some error", error.getMessage());
+        assertEquals(Level.ERROR, error.getLevel());
+        assertDateBetweenStartAndNow(error.getTimestamp());
+    }
+
+    private void assertDateBetweenStartAndNow(Date timestamp) {
+        long timestampMillis = timestamp.getTime();
+        assertTrue(timestampMillis >= startMillis);
+        assertTrue(timestampMillis <= System.currentTimeMillis());
+    }
+
+    @Override
+    protected TestAuditMessage doParseInstance(XContentParser parser) {
+        return TestAuditMessage.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected TestAuditMessage createTestInstance() {
+        return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200),
+                randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20));
+    }
+}

+ 96 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AuditorTests.java

@@ -0,0 +1,96 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Before;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AuditorTests extends ESTestCase {
+    private Client client;
+    private ArgumentCaptor<IndexRequest> indexRequestCaptor;
+    private static final String TEST_ORIGIN = "test_origin";
+    private static final String TEST_INDEX = "test_index";
+    private static final AbstractAuditMessage.AbstractBuilder<AbstractAuditMessageTests.TestAuditMessage> builder = 
+        AbstractAuditMessageTests.TestAuditMessage.newBuilder();
+    
+    @Before
+    public void setUpMocks() {
+        client = mock(Client.class);
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(client.threadPool()).thenReturn(threadPool);
+        when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
+
+        indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
+    }
+
+    public void testInfo() throws IOException {
+        Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
+        auditor.info("foo", "Here is my info");
+
+        verify(client).index(indexRequestCaptor.capture(), any());
+        IndexRequest indexRequest = indexRequestCaptor.getValue();
+        assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
+        assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
+        AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
+        assertEquals("foo", auditMessage.getResourceId());
+        assertEquals("Here is my info", auditMessage.getMessage());
+        assertEquals(Level.INFO, auditMessage.getLevel());
+    }
+
+    public void testWarning() throws IOException {
+        Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
+        auditor.warning("bar", "Here is my warning");
+
+        verify(client).index(indexRequestCaptor.capture(), any());
+        IndexRequest indexRequest = indexRequestCaptor.getValue();
+        assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
+        assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
+        AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
+        assertEquals("bar", auditMessage.getResourceId());
+        assertEquals("Here is my warning", auditMessage.getMessage());
+        assertEquals(Level.WARNING, auditMessage.getLevel());
+    }
+
+    public void testError() throws IOException {
+        Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
+        auditor.error("foobar", "Here is my error");
+
+        verify(client).index(indexRequestCaptor.capture(), any());
+        IndexRequest indexRequest = indexRequestCaptor.getValue();
+        assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
+        assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
+        AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
+        assertEquals("foobar", auditMessage.getResourceId());
+        assertEquals("Here is my error", auditMessage.getMessage());
+        assertEquals(Level.ERROR, auditMessage.getLevel());
+    }
+
+    private AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException {
+        XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg))
+            .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput());
+        return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
+    }
+}

+ 34 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java

@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.common.notifications;
+
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class LevelTests extends ESTestCase {
+
+    public void testFromString() {
+        assertEquals(Level.INFO, Level.fromString("info"));
+        assertEquals(Level.INFO, Level.fromString("INFO"));
+        assertEquals(Level.WARNING, Level.fromString("warning"));
+        assertEquals(Level.WARNING, Level.fromString("WARNING"));
+        assertEquals(Level.ERROR, Level.fromString("error"));
+        assertEquals(Level.ERROR, Level.fromString("ERROR"));
+    }
+
+    public void testToString() {
+        assertEquals("info", Level.INFO.toString());
+        assertEquals("warning", Level.WARNING.toString());
+        assertEquals("error", Level.ERROR.toString());
+    }
+
+    public void testValidOrdinals() {
+        assertThat(Level.INFO.ordinal(), equalTo(0));
+        assertThat(Level.WARNING.ordinal(), equalTo(1));
+        assertThat(Level.ERROR.ordinal(), equalTo(2));
+    }
+}

+ 73 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java

@@ -0,0 +1,73 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.dataframe.notifications;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+import org.elasticsearch.xpack.core.common.notifications.Level;
+import org.junit.Before;
+
+import java.util.Date;
+
+public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {
+    private long startMillis;
+
+    @Before
+    public void setStartTime() {
+        startMillis = System.currentTimeMillis();
+    }
+
+    public void testNewInfo() {
+        DataFrameAuditMessage info = DataFrameAuditMessage.builder().info("foo", "some info", "some_node");
+        assertEquals("foo", info.getResourceId());
+        assertEquals("some info", info.getMessage());
+        assertEquals(Level.INFO, info.getLevel());
+        assertDateBetweenStartAndNow(info.getTimestamp());
+    }
+
+    public void testNewWarning() {
+        DataFrameAuditMessage warning = DataFrameAuditMessage.builder().warning("bar", "some warning", "some_node");
+        assertEquals("bar", warning.getResourceId());
+        assertEquals("some warning", warning.getMessage());
+        assertEquals(Level.WARNING, warning.getLevel());
+        assertDateBetweenStartAndNow(warning.getTimestamp());
+    }
+
+
+    public void testNewError() {
+        DataFrameAuditMessage error = DataFrameAuditMessage.builder().error("foo", "some error", "some_node");
+        assertEquals("foo", error.getResourceId());
+        assertEquals("some error", error.getMessage());
+        assertEquals(Level.ERROR, error.getLevel());
+        assertDateBetweenStartAndNow(error.getTimestamp());
+    }
+
+    private void assertDateBetweenStartAndNow(Date timestamp) {
+        long timestampMillis = timestamp.getTime();
+        assertTrue(timestampMillis >= startMillis);
+        assertTrue(timestampMillis <= System.currentTimeMillis());
+    }
+
+    @Override
+    protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
+        return DataFrameAuditMessage.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected DataFrameAuditMessage createTestInstance() {
+        return new DataFrameAuditMessage(
+            randomBoolean() ? null : randomAlphaOfLength(10),
+            randomAlphaOfLengthBetween(1, 20),
+            randomFrom(Level.values()),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
+        );
+    }
+}

+ 2 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

@@ -1047,6 +1047,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
         assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(true));
         assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false));
 
+        assertOnlyReadAllowed(role, ".data-frame-notifications-1");
         assertNoAccessAllowed(role, "foo");
         assertNoAccessAllowed(role, ".data-frame-internal-1"); // internal use only
 
@@ -1070,6 +1071,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
         assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(false));
         assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false));
 
+        assertOnlyReadAllowed(role, ".data-frame-notifications-1");
         assertNoAccessAllowed(role, "foo");
         assertNoAccessAllowed(role, ".data-frame-internal-1");
 

+ 76 - 0
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.integration;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.is;
+
+public class DataFrameAuditorIT extends DataFrameRestTestCase {
+
+    private static final String TEST_USER_NAME = "df_admin_plus_data";
+    private static final String DATA_ACCESS_ROLE = "test_data_access";
+    private static final String BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS =
+        basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
+
+    private static boolean indicesCreated = false;
+
+    // preserve indices in order to reuse source indices in several test cases
+    @Override
+    protected boolean preserveIndicesUponCompletion() {
+        return true;
+    }
+
+    @Before
+    public void createIndexes() throws IOException {
+
+        // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
+        if (indicesCreated) {
+            return;
+        }
+
+        createReviewsIndex();
+        indicesCreated = true;
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
+        setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testAuditorWritesAudits() throws Exception {
+        String transformId = "simplePivotForAudit";
+        String dataFrameIndex = "pivot_reviews_user_id_above_20";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
+        String query = "\"match\": {\"user_id\": \"user_26\"}";
+
+        createPivotReviewsTransform(transformId, dataFrameIndex, query, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        // Make sure we wrote to the audit
+        assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX));
+        Request request = new Request("GET", DataFrameInternalIndex.AUDIT_INDEX + "/_search");
+        request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simplePivotForAudit\"}}}");
+        Map<String, Object> response = entityAsMap(client().performRequest(request));
+        Map<?, ?> hitRsp = (Map<?, ?>) ((List<?>) ((Map<?, ?>)response.get("hits")).get("hits")).get(0);
+        Map<String, Object> source = (Map<String, Object>)hitRsp.get("_source");
+        assertThat(source.get("transform_id"), equalTo(transformId));
+        assertThat(source.get("level"), equalTo("info"));
+        assertThat(source.get("message"), is(notNullValue()));
+        assertThat(source.get("node_name"), is(notNullValue()));
+        assertThat(source.get("timestamp"), is(notNullValue()));
+    }
+}

+ 18 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -40,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.common.notifications.Auditor;
 import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@@ -48,6 +49,7 @@ import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
@@ -83,6 +85,7 @@ import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 
 import static java.util.Collections.emptyList;
+import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
 
 public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin {
 
@@ -99,6 +102,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
     private final Settings settings;
     private final boolean transportClientMode;
     private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
+    private final SetOnce<Auditor<DataFrameAuditMessage>> dataFrameAuditor = new SetOnce<>();
     private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>();
     private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
 
@@ -180,11 +184,15 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
         if (enabled == false || transportClientMode) {
             return emptyList();
         }
-
+        dataFrameAuditor.set(new Auditor<>(client,
+            clusterService.getNodeName(),
+            DataFrameInternalIndex.AUDIT_INDEX,
+            DATA_FRAME_ORIGIN,
+            DataFrameAuditMessage.builder()));
         dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
         dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client));
 
-        return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameTransformsCheckpointService.get());
+        return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get(), dataFrameTransformsCheckpointService.get());
     }
 
     @Override
@@ -195,6 +203,11 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
             } catch (IOException e) {
                 logger.error("Error creating data frame index template", e);
             }
+            try {
+                templates.put(DataFrameInternalIndex.AUDIT_INDEX, DataFrameInternalIndex.getAuditIndexTemplateMetaData());
+            } catch (IOException e) {
+                logger.warn("Error creating data frame audit index", e);
+            }
             return templates;
         };
     }
@@ -210,10 +223,12 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
 
         // the transforms config manager should have been created
         assert dataFrameTransformsConfigManager.get() != null;
+        // the auditor should have been created
+        assert dataFrameAuditor.get() != null;
         assert dataFrameTransformsCheckpointService.get() != null;
 
         return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
-                dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), threadPool));
+                dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool));
     }
 
     @Override

+ 57 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java

@@ -13,12 +13,15 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 
 import java.io.IOException;
 import java.util.Collections;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
+import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
 
 public final class DataFrameInternalIndex {
 
@@ -28,10 +31,18 @@ public final class DataFrameInternalIndex {
     public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION;
     public static final String INDEX_NAME = INDEX_TEMPLATE_NAME;
 
+    public static final String AUDIT_TEMPLATE_VERSION = "1";
+    public static final String AUDIT_INDEX_PREFIX = ".data-frame-notifications-";
+    public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION;
+
     // constants for mappings
     public static final String DYNAMIC = "dynamic";
     public static final String PROPERTIES = "properties";
     public static final String TYPE = "type";
+    public static final String DATE = "date";
+    public static final String TEXT = "text";
+    public static final String FIELDS = "fields";
+    public static final String RAW = "raw";
 
     // data types
     public static final String DOUBLE = "double";
@@ -50,6 +61,52 @@ public final class DataFrameInternalIndex {
         return dataFrameTemplate;
     }
 
+    public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException {
+        IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(AUDIT_INDEX)
+            .patterns(Collections.singletonList(AUDIT_INDEX_PREFIX + "*"))
+            .version(Version.CURRENT.id)
+            .settings(Settings.builder()
+                // the audits are expected to be small
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
+            .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
+            .build();
+        return dataFrameTemplate;
+    }
+
+    private static XContentBuilder auditMappings() throws IOException {
+        XContentBuilder builder = jsonBuilder().startObject();
+        builder.startObject(SINGLE_MAPPING_NAME);
+        addMetaInformation(builder);
+        builder.field(DYNAMIC, "false");
+        builder.startObject(PROPERTIES)
+            .startObject(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD)
+            .field(TYPE, KEYWORD)
+            .endObject()
+            .startObject(AbstractAuditMessage.LEVEL.getPreferredName())
+            .field(TYPE, KEYWORD)
+            .endObject()
+            .startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
+            .field(TYPE, TEXT)
+            .startObject(FIELDS)
+            .startObject(RAW)
+            .field(TYPE, KEYWORD)
+            .endObject()
+            .endObject()
+            .endObject()
+            .startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
+            .field(TYPE, DATE)
+            .endObject()
+            .startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
+            .field(TYPE, KEYWORD)
+            .endObject()
+            .endObject()
+            .endObject()
+            .endObject();
+
+        return builder;
+    }
+
     private static XContentBuilder mappings() throws IOException {
         XContentBuilder builder = jsonBuilder();
         builder.startObject();

+ 12 - 5
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -16,7 +16,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.common.notifications.Auditor;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@@ -35,15 +37,20 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
+    private final Auditor<DataFrameAuditMessage> auditor;
 
-    public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager,
-            DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine,
-            ThreadPool threadPool) {
+    public DataFrameTransformPersistentTasksExecutor(Client client,
+                                                     DataFrameTransformsConfigManager transformsConfigManager,
+                                                     DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService,
+                                                     SchedulerEngine schedulerEngine,
+                                                     Auditor<DataFrameAuditMessage> auditor,
+                                                     ThreadPool threadPool) {
         super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME);
         this.client = client;
         this.transformsConfigManager = transformsConfigManager;
         this.dataFrameTransformsCheckpointService = dataFrameTransformsCheckpointService;
         this.schedulerEngine = schedulerEngine;
+        this.auditor = auditor;
         this.threadPool = threadPool;
     }
 
@@ -71,7 +78,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
             PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
         return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
-                (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, dataFrameTransformsCheckpointService,
-                schedulerEngine, threadPool, headers);
+            (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager,
+            dataFrameTransformsCheckpointService, schedulerEngine, auditor, threadPool, headers);
     }
 }

+ 21 - 7
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -22,13 +22,15 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.common.notifications.Auditor;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
+import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
@@ -52,6 +54,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
     private final DataFrameIndexer indexer;
+    private final Auditor<DataFrameAuditMessage> auditor;
 
     // the generation of this data frame, for v1 there will be only
     // 0: data frame not created or still indexing
@@ -59,13 +62,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     private final AtomicReference<Long> generation;
 
     public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
-            DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
-            DataFrameTransformsCheckpointService transformsCheckpointService, SchedulerEngine schedulerEngine, ThreadPool threadPool,
-            Map<String, String> headers) {
+                                  DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
+                                  DataFrameTransformsCheckpointService transformsCheckpointService,
+                                  SchedulerEngine schedulerEngine, Auditor<DataFrameAuditMessage> auditor,
+                                  ThreadPool threadPool, Map<String, String> headers) {
         super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
         this.transform = transform;
         this.schedulerEngine = schedulerEngine;
         this.threadPool = threadPool;
+        this.auditor = auditor;
         IndexerState initialState = IndexerState.STOPPED;
         long initialGeneration = 0;
         Map<String, Object> initialPosition = null;
@@ -87,7 +92,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
 
         this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
-                new AtomicReference<>(initialState), initialPosition, client);
+            new AtomicReference<>(initialState), initialPosition, client, auditor);
         this.generation = new AtomicReference<Long>(initialGeneration);
     }
 
@@ -142,6 +147,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         updatePersistentTaskState(state,
                 ActionListener.wrap(
                         (task) -> {
+                            auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
                             logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
                                     + state.getIndexerState() + "][" + state.getPosition() + "]");
                             listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
@@ -169,6 +175,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             // overwrite some docs and eventually checkpoint.
             DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
             updatePersistentTaskState(state, ActionListener.wrap((task) -> {
+                auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
                 logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(),
                         state.getIndexerState());
                 listener.onResponse(new StopDataFrameTransformAction.Response(true));
@@ -231,18 +238,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         private final DataFrameTransformsConfigManager transformsConfigManager;
         private final DataFrameTransformsCheckpointService transformsCheckpointService;
         private final String transformId;
+        private final Auditor<DataFrameAuditMessage> auditor;
         private Map<String, String> fieldMappings = null;
 
         private DataFrameTransformConfig transformConfig = null;
 
         public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager,
-                DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference<IndexerState> initialState,
-                Map<String, Object> initialPosition, Client client) {
+                                      DataFrameTransformsCheckpointService transformsCheckpointService,
+                                      AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
+                                      Auditor<DataFrameAuditMessage> auditor) {
             super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition);
             this.transformId = transformId;
             this.transformsConfigManager = transformsConfigManager;
             this.transformsCheckpointService = transformsCheckpointService;
             this.client = client;
+            this.auditor = auditor;
         }
 
         @Override
@@ -282,6 +292,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
             // todo: set job into failed state
             if (transformConfig.isValid() == false) {
+                auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid");
                 throw new RuntimeException(
                         DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
             }
@@ -346,16 +357,19 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
         @Override
         protected void onFailure(Exception exc) {
+            auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage());
             logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc);
         }
 
         @Override
         protected void onFinish() {
+            auditor.info(transform.getId(), "Finished indexing for data frame transform");
             logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
         }
 
         @Override
         protected void onAbort() {
+            auditor.info(transform.getId(), "Received abort request, stopping indexer");
             logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer");
             shutdown();
         }