Przeglądaj źródła

Add ability to store results for long running tasks

The results of the tasks are stored in a special index .results
Igor Motov 9 lat temu
rodzic
commit
fb763c1e8e

+ 4 - 0
core/src/main/java/org/elasticsearch/action/ActionRequest.java

@@ -39,6 +39,10 @@ public abstract class ActionRequest<Request extends ActionRequest<Request>> exte
 
     public abstract ActionRequestValidationException validate();
 
+    public boolean getShouldPersistResult() {
+        return false;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);

+ 38 - 1
core/src/main/java/org/elasticsearch/action/support/TransportAction.java

@@ -133,6 +133,10 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
             return;
         }
 
+        if (task != null && request.getShouldPersistResult()) {
+            listener = new PersistentActionListener<>(taskManager, task, listener);
+        }
+
         if (filters.length == 0) {
             try {
                 doExecute(task, request, listener);
@@ -171,7 +175,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
                 if (i < this.action.filters.length) {
                     this.action.filters[i].apply(task, actionName, request, listener, this);
                 } else if (i == this.action.filters.length) {
-                    this.action.doExecute(task, request, new FilteredActionListener<Response>(actionName, listener,
+                    this.action.doExecute(task, request, new FilteredActionListener<>(actionName, listener,
                             new ResponseFilterChain<>(this.action.filters, logger)));
                 } else {
                     listener.onFailure(new IllegalStateException("proceed was called too many times"));
@@ -246,4 +250,37 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
             listener.onFailure(e);
         }
     }
+
+    /**
+     * Wrapper for an action listener that persists the result at the end of the execution
+     */
+    private static class PersistentActionListener<Response extends ActionResponse> implements ActionListener<Response> {
+        private final ActionListener<Response> delegate;
+        private final Task task;
+        private final TaskManager taskManager;
+
+        private  PersistentActionListener(TaskManager taskManager, Task task, ActionListener<Response> delegate) {
+            this.taskManager = taskManager;
+            this.task = task;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void onResponse(Response response) {
+            try {
+                taskManager.persistResult(task, response, delegate);
+            } catch (Throwable e) {
+                delegate.onFailure(e);
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+            try {
+                taskManager.persistResult(task, e, delegate);
+            } catch (Throwable e1) {
+                delegate.onFailure(e1);
+            }
+        }
+    }
 }

+ 2 - 0
core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -64,6 +64,7 @@ import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.ExtensionPoint;
 import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.tasks.TaskResultsService;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -157,5 +158,6 @@ public class ClusterModule extends AbstractModule {
         bind(NodeIndexDeletedAction.class).asEagerSingleton();
         bind(NodeMappingRefreshAction.class).asEagerSingleton();
         bind(MappingUpdatedAction.class).asEagerSingleton();
+        bind(TaskResultsService.class).asEagerSingleton();
     }
 }

+ 2 - 0
core/src/main/java/org/elasticsearch/node/Node.java

@@ -97,6 +97,7 @@ import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.snapshots.SnapshotShardsService;
 import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.tasks.TaskResultsService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPoolModule;
 import org.elasticsearch.transport.TransportService;
@@ -332,6 +333,7 @@ public class Node implements Closeable {
 
         // Start the transport service now so the publish address will be added to the local disco node in ClusterService
         TransportService transportService = injector.getInstance(TransportService.class);
+        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
         transportService.start();
 
         validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress());

+ 15 - 0
core/src/main/java/org/elasticsearch/tasks/Task.java

@@ -20,11 +20,14 @@
 
 package org.elasticsearch.tasks;
 
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.xcontent.ToXContent;
 
+import java.io.IOException;
+
 /**
  * Current task information
  */
@@ -132,4 +135,16 @@ public class Task {
     }
 
     public interface Status extends ToXContent, NamedWriteable {}
+
+    public TaskResult result(DiscoveryNode node, Throwable error) throws IOException {
+        return new TaskResult(taskInfo(node, true), error);
+    }
+
+    public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOException {
+        if (response instanceof ToXContent) {
+            return new TaskResult(taskInfo(node, true), (ToXContent) response);
+        } else {
+            throw new IllegalStateException("response has to implement ToXContent for persistence");
+        }
+    }
 }

+ 80 - 1
core/src/main/java/org/elasticsearch/tasks/TaskManager.java

@@ -19,8 +19,12 @@
 
 package org.elasticsearch.tasks;
 
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
@@ -28,6 +32,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
 import org.elasticsearch.transport.TransportRequest;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,11 +57,18 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
 
     private final Map<TaskId, String> banedParents = new ConcurrentHashMap<>();
 
+    private TaskResultsService taskResultsService;
+
+    private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
+
     public TaskManager(Settings settings) {
         super(settings);
     }
 
-    private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
+    public void setTaskResultsService(TaskResultsService taskResultsService) {
+        assert this.taskResultsService == null;
+        this.taskResultsService = taskResultsService;
+    }
 
     /**
      * Registers a task without parent task
@@ -130,6 +142,72 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
         }
     }
 
+    /**
+     * Stores the task failure
+     */
+    public <Response extends  ActionResponse> void persistResult(Task task, Throwable error, ActionListener<Response> listener) {
+        DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();
+        if (localNode == null) {
+            // too early to persist anything, shouldn't really be here - just pass the error along
+            listener.onFailure(error);
+            return;
+        }
+        final TaskResult taskResult;
+        try {
+            taskResult = task.result(localNode, error);
+        } catch (IOException ex) {
+            logger.warn("couldn't persist error {}", ex, ExceptionsHelper.detailedMessage(error));
+            listener.onFailure(ex);
+            return;
+        }
+        taskResultsService.persist(taskResult, new ActionListener<Void>() {
+            @Override
+            public void onResponse(Void aVoid) {
+                listener.onFailure(error);
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                logger.warn("couldn't persist error {}", e, ExceptionsHelper.detailedMessage(error));
+                listener.onFailure(e);
+            }
+        });
+    }
+
+    /**
+     * Stores the task result
+     */
+    public <Response extends  ActionResponse> void persistResult(Task task, Response response, ActionListener<Response> listener) {
+        DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();
+        if (localNode == null) {
+            // too early to persist anything, shouldn't really be here - just pass the response along
+            logger.warn("couldn't persist response {}, the node didn't join the cluster yet", response);
+            listener.onResponse(response);
+            return;
+        }
+        final TaskResult taskResult;
+        try {
+            taskResult = task.result(localNode, response);
+        } catch (IOException ex) {
+            logger.warn("couldn't persist response {}", ex, response);
+            listener.onFailure(ex);
+            return;
+        }
+
+        taskResultsService.persist(taskResult, new ActionListener<Void>() {
+            @Override
+            public void onResponse(Void aVoid) {
+                listener.onResponse(response);
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                logger.warn("couldn't persist response {}", e, response);
+                listener.onFailure(e);
+            }
+        });
+    }
+
     /**
      * Returns the list of currently running tasks on the node
      */
@@ -223,6 +301,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
 
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
+        lastDiscoveryNodes = event.state().getNodes();
         if (event.nodesRemoved()) {
             synchronized (banedParents) {
                 lastDiscoveryNodes = event.state().getNodes();

+ 90 - 0
core/src/main/java/org/elasticsearch/tasks/TaskResult.java

@@ -0,0 +1,90 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.tasks;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.IOException;
+
+/**
+ * Represents the result or failure of a running task
+ */
+public class TaskResult {
+
+    private final BytesReference result;
+
+    private final TaskId taskId;
+
+    public TaskResult(TaskInfo taskInfo, Throwable e) throws IOException {
+        ToXContent.Params params = ToXContent.EMPTY_PARAMS;
+        XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE);
+        builder.startObject();
+        {
+            builder.startObject("task");
+            {
+                taskInfo.toXContent(builder, params);
+            }
+            builder.endObject();
+            builder.startObject("error");
+            {
+                ElasticsearchException.toXContent(builder, params, e);
+            }
+            builder.endObject();
+        }
+        builder.endObject();
+        result = builder.bytes();
+        taskId = taskInfo.getTaskId();
+    }
+
+    public TaskResult(TaskInfo taskInfo, ToXContent toXContent) throws IOException {
+        ToXContent.Params params = ToXContent.EMPTY_PARAMS;
+        XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE);
+        builder.startObject();
+        {
+            builder.startObject("task");
+            {
+                taskInfo.toXContent(builder, params);
+            }
+            builder.endObject();
+            builder.startObject("result");
+            {
+                toXContent.toXContent(builder, params);
+            }
+            builder.endObject();
+        }
+        builder.endObject();
+        result = builder.bytes();
+        taskId = taskInfo.getTaskId();
+    }
+
+    public TaskId getTaskId() {
+        return taskId;
+    }
+
+    public BytesReference getResult() {
+        return result;
+    }
+}

+ 158 - 0
core/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

@@ -0,0 +1,158 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.tasks;
+
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.IndexAlreadyExistsException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+/**
+ * Service that can persist task results
+ */
+public class TaskResultsService extends AbstractComponent {
+
+    public static final String TASK_RESULT_INDEX = ".results";
+
+    public static final String TASK_RESULT_TYPE = "result";
+
+    public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-results-index-mapping.json";
+
+    private final Client client;
+
+    private final ClusterService clusterService;
+
+    private final TransportCreateIndexAction createIndexAction;
+
+    @Inject
+    public TaskResultsService(Settings settings, Client client, ClusterService clusterService,
+                              TransportCreateIndexAction createIndexAction) {
+        super(settings);
+        this.client = client;
+        this.clusterService = clusterService;
+        this.createIndexAction = createIndexAction;
+    }
+
+    public void persist(TaskResult taskResult, ActionListener<Void> listener) {
+
+        ClusterState state = clusterService.state();
+
+        if (state.routingTable().hasIndex(TASK_RESULT_INDEX) == false) {
+            CreateIndexRequest createIndexRequest = new CreateIndexRequest();
+            createIndexRequest.settings(taskResultIndexSettings());
+            createIndexRequest.index(TASK_RESULT_INDEX);
+            createIndexRequest.mapping(TASK_RESULT_TYPE, taskResultIndexMapping());
+            createIndexRequest.cause("auto(task api)");
+
+            createIndexAction.execute(null, createIndexRequest, new ActionListener<CreateIndexResponse>() {
+                @Override
+                public void onResponse(CreateIndexResponse result) {
+                    doPersist(taskResult, listener);
+                }
+
+                @Override
+                public void onFailure(Throwable e) {
+                    if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
+                        // we have the index, do it
+                        try {
+                            doPersist(taskResult, listener);
+                        } catch (Throwable e1) {
+                            listener.onFailure(e1);
+                        }
+                    } else {
+                        listener.onFailure(e);
+                    }
+                }
+            });
+        } else {
+            IndexMetaData metaData = state.getMetaData().index(TASK_RESULT_INDEX);
+            if (metaData.getMappings().containsKey(TASK_RESULT_TYPE) == false) {
+                // The index already exists but doesn't have our mapping
+                client.admin().indices().preparePutMapping(TASK_RESULT_INDEX).setType(TASK_RESULT_TYPE).setSource(taskResultIndexMapping())
+                    .execute(new ActionListener<PutMappingResponse>() {
+                                 @Override
+                                 public void onResponse(PutMappingResponse putMappingResponse) {
+                                     doPersist(taskResult, listener);
+                                 }
+
+                                 @Override
+                                 public void onFailure(Throwable e) {
+                                     listener.onFailure(e);
+                                 }
+                             }
+                    );
+            } else {
+                doPersist(taskResult, listener);
+            }
+        }
+    }
+
+
+    private void doPersist(TaskResult taskResult, ActionListener<Void> listener) {
+        client.prepareIndex(TASK_RESULT_INDEX, TASK_RESULT_TYPE, taskResult.getTaskId().toString()).setSource(taskResult.getResult())
+            .execute(new ActionListener<IndexResponse>() {
+                @Override
+                public void onResponse(IndexResponse indexResponse) {
+                    listener.onResponse(null);
+                }
+
+                @Override
+                public void onFailure(Throwable e) {
+                    listener.onFailure(e);
+                }
+            });
+
+    }
+
+    private Settings taskResultIndexSettings() {
+        return Settings.builder()
+            .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+            .put(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1")
+            .put(IndexMetaData.SETTING_PRIORITY, Integer.MAX_VALUE)
+            .build();
+    }
+
+    public String taskResultIndexMapping() {
+        try (InputStream is = getClass().getResourceAsStream(TASK_RESULT_INDEX_MAPPING_FILE)) {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            Streams.copy(is, out);
+            return out.toString(IOUtils.UTF_8);
+        } catch (Exception e) {
+            logger.error("failed to create tasks results index template [{}]", e, TASK_RESULT_INDEX_MAPPING_FILE);
+            throw new IllegalStateException("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]", e);
+        }
+
+    }
+}

+ 50 - 0
core/src/main/resources/org/elasticsearch/tasks/task-results-index-mapping.json

@@ -0,0 +1,50 @@
+{
+  "result" : {
+    "dynamic" : "strict",
+    "properties" : {
+      "task" : {
+        "properties": {
+          "action": {
+            "type": "keyword"
+          },
+          "cancellable": {
+            "type": "boolean"
+          },
+          "id": {
+            "type": "long"
+          },
+          "parent_id": {
+            "type": "keyword"
+          },
+          "node": {
+            "type": "keyword"
+          },
+          "running_time_in_nanos": {
+            "type": "long"
+          },
+          "start_time_in_millis": {
+            "type": "long"
+          },
+          "type": {
+            "type": "keyword"
+          },
+          "status": {
+            "type" : "object",
+            "enabled" : false
+          },
+          "description": {
+            "type": "string"
+          }
+        }
+      },
+      "result" : {
+        "type" : "object",
+        "enabled" : false
+      },
+      "error" : {
+        "type" : "object",
+        "enabled" : false
+      }
+    }
+  }
+}

+ 87 - 0
core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

@@ -32,14 +32,19 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
 import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.fieldstats.FieldStatsAction;
+import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexAction;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskResultsService;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.tasks.MockTaskManager;
 import org.elasticsearch.test.tasks.MockTaskManagerListener;
@@ -62,6 +67,9 @@ import java.util.function.Function;
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.emptyCollectionOf;
@@ -476,6 +484,85 @@ public class TasksIT extends ESIntegTestCase {
         assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
     }
 
+    public void testTaskResultPersistence() throws Exception {
+        // Randomly create an empty index to make sure the type is created automatically
+        if (randomBoolean()) {
+            logger.info("creating an empty results index with custom settings");
+            assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_RESULT_INDEX));
+        }
+
+        registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME);  // we need this to get task id of the process
+
+        // Start non-blocking test task
+        TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).setShouldPersistResult(true).setShouldBlock(false).get();
+
+        List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
+
+        assertEquals(1, events.size());
+        TaskInfo taskInfo = events.get(0);
+        String taskId = taskInfo.getTaskId().toString();
+
+        GetResponse resultDoc = client().prepareGet(TaskResultsService.TASK_RESULT_INDEX, TaskResultsService.TASK_RESULT_TYPE, taskId)
+            .get();
+        assertTrue(resultDoc.isExists());
+
+        Map<String, Object> source = resultDoc.getSource();
+        Map<String, Object> task = (Map<String, Object>) source.get("task");
+        assertEquals(taskInfo.getNode().getId(), task.get("node"));
+        assertEquals(taskInfo.getAction(), task.get("action"));
+        assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
+
+        Map<String, Object> result = (Map<String, Object>) source.get("result");
+        assertEquals("0", result.get("failure_count").toString());
+
+        assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_RESULT_INDEX).get());
+
+        SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_RESULT_INDEX)
+            .setTypes(TaskResultsService.TASK_RESULT_TYPE)
+            .setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.action", taskInfo.getAction())))
+            .get();
+
+        assertEquals(1L, searchResponse.getHits().totalHits());
+
+        searchResponse = client().prepareSearch(TaskResultsService.TASK_RESULT_INDEX).setTypes(TaskResultsService.TASK_RESULT_TYPE)
+            .setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.node", taskInfo.getNode().getId()))).get();
+
+        assertEquals(1L, searchResponse.getHits().totalHits());
+    }
+
+    public void testTaskFailurePersistence() throws Exception {
+        registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME);  // we need this to get task id of the process
+
+        // Start non-blocking test task that should fail
+        assertThrows(
+            TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
+                .setShouldFail(true)
+                .setShouldPersistResult(true)
+                .setShouldBlock(false),
+            IllegalStateException.class
+        );
+
+        List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
+        assertEquals(1, events.size());
+        TaskInfo failedTaskInfo = events.get(0);
+        String failedTaskId = failedTaskInfo.getTaskId().toString();
+
+        GetResponse failedResultDoc = client()
+            .prepareGet(TaskResultsService.TASK_RESULT_INDEX, TaskResultsService.TASK_RESULT_TYPE, failedTaskId)
+            .get();
+        assertTrue(failedResultDoc.isExists());
+
+        Map<String, Object> source = failedResultDoc.getSource();
+        Map<String, Object> task = (Map<String, Object>) source.get("task");
+        assertEquals(failedTaskInfo.getNode().getId(), task.get("node"));
+        assertEquals(failedTaskInfo.getAction(), task.get("action"));
+        assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
+
+        Map<String, Object> error = (Map<String, Object>) source.get("error");
+        assertEquals("Simulating operation failure", error.get("reason"));
+        assertEquals("illegal_state_exception", error.get("type"));
+    }
+
     @Override
     public void tearDown() throws Exception {
         for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {

+ 82 - 14
core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java

@@ -29,6 +29,7 @@ import org.elasticsearch.action.support.nodes.BaseNodeRequest;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.action.support.nodes.BaseNodesRequest;
 import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.action.support.tasks.BaseTasksRequest;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@@ -44,6 +45,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
@@ -108,7 +111,7 @@ public class TestTaskPlugin extends Plugin {
         }
     }
 
-    public static class NodesResponse extends BaseNodesResponse<NodeResponse> {
+    public static class NodesResponse extends BaseNodesResponse<NodeResponse> implements ToXContent {
 
         NodesResponse() {
 
@@ -128,23 +131,31 @@ public class TestTaskPlugin extends Plugin {
             out.writeStreamableList(nodes);
         }
 
-        public int failureCount() {
+        public int getFailureCount() {
             return failures().size();
         }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field("failure_count", getFailureCount());
+            return builder;
+        }
     }
 
     public static class NodeRequest extends BaseNodeRequest {
         protected String requestName;
         protected String nodeId;
+        protected boolean shouldBlock;
 
         public NodeRequest() {
             super();
         }
 
-        public NodeRequest(NodesRequest request, String nodeId) {
+        public NodeRequest(NodesRequest request, String nodeId, boolean shouldBlock) {
             super(nodeId);
             requestName = request.requestName;
             this.nodeId = nodeId;
+            this.shouldBlock = shouldBlock;
         }
 
         @Override
@@ -152,6 +163,7 @@ public class TestTaskPlugin extends Plugin {
             super.readFrom(in);
             requestName = in.readString();
             nodeId = in.readString();
+            shouldBlock = in.readBoolean();
         }
 
         @Override
@@ -159,6 +171,7 @@ public class TestTaskPlugin extends Plugin {
             super.writeTo(out);
             out.writeString(requestName);
             out.writeString(nodeId);
+            out.writeBoolean(shouldBlock);
         }
 
         @Override
@@ -174,6 +187,9 @@ public class TestTaskPlugin extends Plugin {
 
     public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
         private String requestName;
+        private boolean shouldPersistResult = false;
+        private boolean shouldBlock = true;
+        private boolean shouldFail = false;
 
         NodesRequest() {
             super();
@@ -184,16 +200,47 @@ public class TestTaskPlugin extends Plugin {
             this.requestName = requestName;
         }
 
+        public void setShouldPersistResult(boolean shouldPersistResult) {
+            this.shouldPersistResult = shouldPersistResult;
+        }
+
+        @Override
+        public boolean getShouldPersistResult() {
+            return shouldPersistResult;
+        }
+
+        public void setShouldBlock(boolean shouldBlock) {
+            this.shouldBlock = shouldBlock;
+        }
+
+        public boolean getShouldBlock() {
+            return shouldBlock;
+        }
+
+        public void setShouldFail(boolean shouldFail) {
+            this.shouldFail = shouldFail;
+        }
+
+        public boolean getShouldFail() {
+            return shouldFail;
+        }
+
         @Override
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
             requestName = in.readString();
+            shouldPersistResult = in.readBoolean();
+            shouldBlock = in.readBoolean();
+            shouldFail = in.readBoolean();
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(requestName);
+            out.writeBoolean(shouldPersistResult);
+            out.writeBoolean(shouldBlock);
+            out.writeBoolean(shouldFail);
         }
 
         @Override
@@ -219,6 +266,9 @@ public class TestTaskPlugin extends Plugin {
 
         @Override
         protected NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures) {
+            if (request.getShouldFail()) {
+                throw new IllegalStateException("Simulating operation failure");
+            }
             return new NodesResponse(clusterName, responses, failures);
         }
 
@@ -235,7 +285,7 @@ public class TestTaskPlugin extends Plugin {
 
         @Override
         protected NodeRequest newNodeRequest(String nodeId, NodesRequest request) {
-            return new NodeRequest(request, nodeId);
+            return new NodeRequest(request, nodeId, request.getShouldBlock());
         }
 
         @Override
@@ -251,15 +301,17 @@ public class TestTaskPlugin extends Plugin {
         @Override
         protected NodeResponse nodeOperation(NodeRequest request, Task task) {
             logger.info("Test task started on the node {}", clusterService.localNode());
-            try {
-                awaitBusy(() -> {
-                    if (((CancellableTask) task).isCancelled()) {
-                        throw new RuntimeException("Cancelled!");
-                    }
-                    return ((TestTask) task).isBlocked() == false;
-                });
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
+            if (request.shouldBlock) {
+                try {
+                    awaitBusy(() -> {
+                        if (((CancellableTask) task).isCancelled()) {
+                            throw new RuntimeException("Cancelled!");
+                        }
+                        return ((TestTask) task).isBlocked() == false;
+                    });
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
             }
             logger.info("Test task finished on the node {}", clusterService.localNode());
             return new NodeResponse(clusterService.localNode());
@@ -296,11 +348,27 @@ public class TestTaskPlugin extends Plugin {
         }
     }
 
-    public static class NodesRequestBuilder extends ActionRequestBuilder<NodesRequest, NodesResponse, NodesRequestBuilder> {
+    public static class NodesRequestBuilder extends NodesOperationRequestBuilder<NodesRequest, NodesResponse, NodesRequestBuilder> {
 
         protected NodesRequestBuilder(ElasticsearchClient client, Action<NodesRequest, NodesResponse, NodesRequestBuilder> action) {
             super(client, action, new NodesRequest("test"));
         }
+
+
+        public NodesRequestBuilder setShouldPersistResult(boolean shouldPersistResult) {
+            request().setShouldPersistResult(shouldPersistResult);
+            return this;
+        }
+
+        public NodesRequestBuilder setShouldBlock(boolean shouldBlock) {
+            request().setShouldBlock(shouldBlock);
+            return this;
+        }
+
+        public NodesRequestBuilder setShouldFail(boolean shouldFail) {
+            request().setShouldFail(shouldFail);
+            return this;
+        }
     }