Browse Source

Add ability to associate an ID with tasks (#27764)

Adds support for capturing the X-Opaque-Id header from a REST request and storing it's value in the tasks that this request started. It works for all user-initiated tasks (not only search).

Closes #23250

Usage:
```
$ curl -H "X-Opaque-Id: imotov" -H "foo:bar" "localhost:9200/_tasks?pretty&group_by=parents"
{
  "tasks" : {
    "7qrTVbiDQKiZfubUP7DPkg:6998" : {
      "node" : "7qrTVbiDQKiZfubUP7DPkg",
      "id" : 6998,
      "type" : "transport",
      "action" : "cluster:monitor/tasks/lists",
      "start_time_in_millis" : 1513029940042,
      "running_time_in_nanos" : 266794,
      "cancellable" : false,
      "headers" : {
        "X-Opaque-Id" : "imotov"
      },
      "children" : [
        {
          "node" : "V-PuCjPhRp2ryuEsNw6V1g",
          "id" : 6088,
          "type" : "netty",
          "action" : "cluster:monitor/tasks/lists[n]",
          "start_time_in_millis" : 1513029940043,
          "running_time_in_nanos" : 67785,
          "cancellable" : false,
          "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998",
          "headers" : {
            "X-Opaque-Id" : "imotov"
          }
        },
        {
          "node" : "7qrTVbiDQKiZfubUP7DPkg",
          "id" : 6999,
          "type" : "direct",
          "action" : "cluster:monitor/tasks/lists[n]",
          "start_time_in_millis" : 1513029940043,
          "running_time_in_nanos" : 98754,
          "cancellable" : false,
          "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998",
          "headers" : {
            "X-Opaque-Id" : "imotov"
          }
        }
      ]
    }
  }
}
```
Igor Motov 7 years ago
parent
commit
c75ac319a6
84 changed files with 627 additions and 180 deletions
  1. 68 0
      docs/reference/cluster/tasks.asciidoc
  2. 6 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
  3. 6 3
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java
  4. 1 1
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java
  5. 1 1
      plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java
  6. 41 0
      qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml
  7. 39 0
      qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml
  8. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json
  9. 16 0
      rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml
  10. 6 1
      server/src/main/java/org/elasticsearch/action/ActionModule.java
  11. 1 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java
  12. 15 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java
  13. 1 1
      server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  14. 3 2
      server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
  15. 3 2
      server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java
  16. 4 2
      server/src/main/java/org/elasticsearch/action/search/SearchTask.java
  17. 3 2
      server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  18. 3 2
      server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
  19. 3 2
      server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  20. 1 1
      server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
  21. 3 2
      server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
  22. 3 3
      server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java
  23. 5 4
      server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
  24. 8 3
      server/src/main/java/org/elasticsearch/node/Node.java
  25. 7 0
      server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java
  26. 12 1
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java
  27. 3 2
      server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java
  28. 3 2
      server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java
  29. 3 2
      server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java
  30. 3 2
      server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java
  31. 3 2
      server/src/main/java/org/elasticsearch/tasks/CancellableTask.java
  32. 17 4
      server/src/main/java/org/elasticsearch/tasks/Task.java
  33. 4 2
      server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java
  34. 38 5
      server/src/main/java/org/elasticsearch/tasks/TaskInfo.java
  35. 32 2
      server/src/main/java/org/elasticsearch/tasks/TaskManager.java
  36. 18 1
      server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java
  37. 6 4
      server/src/main/java/org/elasticsearch/transport/TransportService.java
  38. 7 0
      server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json
  39. 5 4
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java
  40. 9 6
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java
  41. 4 1
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java
  42. 44 17
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java
  43. 13 6
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java
  44. 4 4
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java
  45. 1 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  46. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
  47. 2 1
      server/src/test/java/org/elasticsearch/action/main/MainActionTests.java
  48. 1 1
      server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java
  49. 2 1
      server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java
  50. 25 4
      server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java
  51. 16 2
      server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java
  52. 2 1
      server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java
  53. 1 1
      server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
  54. 1 1
      server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java
  55. 1 1
      server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java
  56. 3 3
      server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  57. 4 2
      server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
  58. 3 1
      server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java
  59. 1 1
      server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java
  60. 1 1
      server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java
  61. 2 1
      server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
  62. 2 1
      server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java
  63. 2 1
      server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java
  64. 1 1
      server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java
  65. 12 6
      server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java
  66. 2 1
      server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java
  67. 2 1
      server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java
  68. 2 1
      server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  69. 1 1
      server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java
  70. 7 4
      server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
  71. 2 1
      server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java
  72. 2 1
      server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
  73. 2 2
      server/src/test/java/org/elasticsearch/search/SearchServiceTests.java
  74. 9 8
      server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java
  75. 5 2
      server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java
  76. 3 1
      server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java
  77. 1 1
      server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java
  78. 3 1
      test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java
  79. 4 3
      test/framework/src/main/java/org/elasticsearch/node/MockNode.java
  80. 4 2
      test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java
  81. 10 9
      test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
  82. 5 3
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  83. 1 1
      test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java
  84. 1 1
      test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java

+ 68 - 0
docs/reference/cluster/tasks.asciidoc

@@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks:
 GET _tasks?group_by=parents
 --------------------------------------------------
 // CONSOLE
+
+The grouping can be disabled by specifying `none` as a `group_by` parameter:
+
+[source,js]
+--------------------------------------------------
+GET _tasks?group_by=none
+--------------------------------------------------
+// CONSOLE
+
+[float]
+=== Identifying running tasks
+
+The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as
+in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with
+a the client that started them:
+
+[source,sh]
+--------------------------------------------------
+curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents"
+--------------------------------------------------
+// NOTCONSOLE
+
+The result will look similar to the following:
+
+[source,js]
+--------------------------------------------------
+HTTP/1.1 200 OK
+X-Opaque-Id: 123456 <1>
+content-type: application/json; charset=UTF-8
+content-length: 831
+
+{
+  "tasks" : {
+    "u5lcZHqcQhu-rUoFaqDphA:45" : {
+      "node" : "u5lcZHqcQhu-rUoFaqDphA",
+      "id" : 45,
+      "type" : "transport",
+      "action" : "cluster:monitor/tasks/lists",
+      "start_time_in_millis" : 1513823752749,
+      "running_time_in_nanos" : 293139,
+      "cancellable" : false,
+      "headers" : {
+        "X-Opaque-Id" : "123456" <2>
+      },
+      "children" : [
+        {
+          "node" : "u5lcZHqcQhu-rUoFaqDphA",
+          "id" : 46,
+          "type" : "direct",
+          "action" : "cluster:monitor/tasks/lists[n]",
+          "start_time_in_millis" : 1513823752750,
+          "running_time_in_nanos" : 92133,
+          "cancellable" : false,
+          "parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45",
+          "headers" : {
+            "X-Opaque-Id" : "123456" <3>
+          }
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// NOTCONSOLE
+
+<1> id as a part of the response header
+<2> id for the tasks that was initiated by the REST request
+<3> the child task of the task initiated by the REST request

+ 6 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -81,6 +81,7 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
@@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     private SearchRequest firstSearchRequest;
     private PlainActionFuture<BulkByScrollResponse> listener;
     private String scrollId;
+    private ThreadPool threadPool;
     private TaskManager taskManager;
     private BulkByScrollTask testTask;
     private WorkerBulkByScrollTaskState worker;
@@ -141,7 +143,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
         listener = new PlainActionFuture<>();
         scrollId = null;
-        taskManager = new TaskManager(Settings.EMPTY);
+        threadPool = new TestThreadPool(getClass().getName());
+        taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
         testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
         testTask.setWorker(testRequest.getRequestsPerSecond(), null);
         worker = testTask.getWorkerState();
@@ -159,8 +162,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     @After
-    public void tearDownAndVerifyCommonStuff() {
+    public void tearDownAndVerifyCommonStuff() throws Exception {
         client.close();
+        terminate(threadPool);
     }
 
     /**

+ 6 - 3
modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java

@@ -32,6 +32,7 @@ import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
     @Before
     public void createTask() {
         slices = between(2, 50);
-        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
+        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
         task.setWorkerCount(slices);
     }
 
@@ -101,7 +102,8 @@ public class TransportRethrottleActionTests extends ESTestCase {
         List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
         for (int i = 0; i < slices; i++) {
             BulkByScrollTask.Status status = believeableInProgressStatus(i);
-            tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
+            tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
+                Collections.emptyMap()));
             sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
         }
         rethrottleTestCase(slices,
@@ -121,7 +123,8 @@ public class TransportRethrottleActionTests extends ESTestCase {
         List<TaskInfo> tasks = new ArrayList<>();
         for (int i = succeeded; i < slices; i++) {
             BulkByScrollTask.Status status = believeableInProgressStatus(i);
-            tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
+            tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
+                Collections.emptyMap()));
             sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
         }
         rethrottleTestCase(slices - succeeded,

+ 1 - 1
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

@@ -73,7 +73,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
             }
         };
         MockTransportService mockTransportService =
-            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
+            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
         mockTransportService.start();
         return mockTransportService;
     }

+ 1 - 1
plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

@@ -77,7 +77,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
             }
         };
         MockTransportService mockTransportService =
-            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
+            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
         mockTransportService.start();
         return mockTransportService;
     }

+ 41 - 0
qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml

@@ -212,3 +212,44 @@
                 field3: value
   - match: { hits.total: 1 }
   - match: { hits.hits.0._id: q3 }
+
+---
+"Create a task result record in the old cluster":
+  - do:
+      indices.create:
+        index: reindexed_index
+        body:
+          settings:
+            index:
+              number_of_replicas: 0
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
+          - '{"f1": "1"}'
+          - '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
+          - '{"f1": "2"}'
+          - '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
+          - '{"f1": "3"}'
+          - '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
+          - '{"f1": "4"}'
+          - '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
+          - '{"f1": "5"}'
+
+  - do:
+      reindex:
+        wait_for_completion: false
+        body:
+          source:
+            index: reindexed_index
+            size: 1
+          dest:
+            index: reindexed_index_copy
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      tasks.get:
+        wait_for_completion: true
+        task_id: $task

+ 39 - 0
qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml

@@ -126,3 +126,42 @@
                 field3: value
   - match: { hits.total: 1 }
   - match: { hits.hits.0._id: q3 }
+
+---
+"Find a task result record from the old cluster":
+  - do:
+      search:
+        index: .tasks
+        body:
+          query:
+            match_all: {}
+  - match: { hits.total: 1 }
+  - match: { hits.hits.0._id: '/.+:\d+/' }
+  - set: {hits.hits.0._id: task_id}
+
+  - do:
+      tasks.get:
+        wait_for_completion: true
+        task_id: $task_id
+
+  - is_false: node_failures
+  - is_true: task
+
+  - do:
+      headers: { "X-Opaque-Id": "Reindexing Again" }
+      reindex:
+        wait_for_completion: false
+        body:
+          source:
+            index: reindexed_index_copy
+            size: 1
+          dest:
+            index: reindexed_index_another_copy
+  - match: { task: '/.+:\d+/' }
+  - set: { task: task_id }
+
+  - do:
+      tasks.get:
+        wait_for_completion: true
+        task_id: $task_id
+  - match: { task.headers.X-Opaque-Id: "Reindexing Again" }

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json

@@ -34,7 +34,7 @@
         "group_by": {
           "type" : "enum",
           "description": "Group tasks by nodes or parent/child relationships",
-          "options" : ["nodes", "parents"],
+          "options" : ["nodes", "parents", "none"],
           "default" : "nodes"
         }
 

+ 16 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml

@@ -17,3 +17,19 @@
         group_by: parents
 
   - is_true: tasks
+
+---
+"tasks_list headers":
+  - skip:
+      version: " - 6.99.99"
+      reason:  task headers has been added in 7.0.0
+
+  - do:
+      headers: { "X-Opaque-Id": "That is me" }
+      tasks.list:
+        actions: "cluster:monitor/tasks/lists"
+        group_by: none
+
+  - is_true: tasks
+  - match: { tasks.0.headers.X-Opaque-Id: "That is me" }
+

+ 6 - 1
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -312,6 +312,7 @@ import org.elasticsearch.rest.action.search.RestExplainAction;
 import org.elasticsearch.rest.action.search.RestMultiSearchAction;
 import org.elasticsearch.rest.action.search.RestSearchAction;
 import org.elasticsearch.rest.action.search.RestSearchScrollAction;
+import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.usage.UsageService;
 
@@ -324,6 +325,7 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.unmodifiableMap;
 
@@ -362,7 +364,10 @@ public class ActionModule extends AbstractModule {
         actionFilters = setupActionFilters(actionPlugins);
         autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
         destructiveOperations = new DestructiveOperations(settings, clusterSettings);
-        Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
+        Set<String> headers = Stream.concat(
+            actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
+            Stream.of("X-Opaque-Id")
+        ).collect(Collectors.toSet());
         UnaryOperator<RestHandler> restWrapper = null;
         for (ActionPlugin plugin : actionPlugins) {
             UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

@@ -56,7 +56,7 @@ import java.util.function.Consumer;
  * Transport action that can be used to cancel currently running cancellable tasks.
  * <p>
  * For a task to be cancellable it has to return an instance of
- * {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)}
+ * {@link CancellableTask} from {@link TransportRequest#createTask}
  */
 public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {
 

+ 15 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java

@@ -186,6 +186,21 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContentOb
         return builder;
     }
 
+    /**
+     * Presents a flat list of tasks
+     */
+    public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
+        toXContentCommon(builder, params);
+        builder.startArray("tasks");
+        for (TaskInfo taskInfo : getTasks()) {
+            builder.startObject();
+            taskInfo.toXContent(builder, params);
+            builder.endObject();
+        }
+        builder.endArray();
+        return builder;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();

+ 1 - 1
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * Max length of the source document to include into toString()
      *
-     * @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId)
+     * @see ReplicationRequest#createTask
      */
     static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;
 

+ 3 - 2
server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

@@ -38,6 +38,7 @@ import org.elasticsearch.tasks.TaskId;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -428,9 +429,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
         // generating description in a lazy way since source can be quite big
-        return new SearchTask(id, type, action, null, parentTaskId) {
+        return new SearchTask(id, type, action, null, parentTaskId, headers) {
             @Override
             public String getDescription() {
                 StringBuilder sb = new StringBuilder();

+ 3 - 2
server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java

@@ -32,6 +32,7 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -117,8 +118,8 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new SearchTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     @Override

+ 4 - 2
server/src/main/java/org/elasticsearch/action/search/SearchTask.java

@@ -22,13 +22,15 @@ package org.elasticsearch.action.search;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.TaskId;
 
+import java.util.Map;
+
 /**
  * Task storing information about a currently running search request.
  */
 public class SearchTask extends CancellableTask {
 
-    public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) {
-        super(id, type, action, description, parentTaskId);
+    public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+        super(id, type, action, description, parentTaskId, headers);
     }
 
     @Override

+ 3 - 2
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -35,6 +35,7 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -207,8 +208,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new ReplicationTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     /**

+ 3 - 2
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java

@@ -27,6 +27,7 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
+import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
@@ -36,8 +37,8 @@ import static java.util.Objects.requireNonNull;
 public class ReplicationTask extends Task {
     private volatile String phase = "starting";
 
-    public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId) {
-        super(id, type, action, description, parentTaskId);
+    public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+        super(id, type, action, description, parentTaskId, headers);
     }
 
     /**

+ 3 - 2
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -81,6 +81,7 @@ import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -1228,8 +1229,8 @@ public abstract class TransportReplicationAction<
             return request.getParentTask();
         }
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return request.createTask(id, type, action, parentTaskId);
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return request.createTask(id, type, action, parentTaskId, headers);
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

@@ -180,7 +180,7 @@ public abstract class TransportClient extends AbstractClient {
             final TransportService transportService = new TransportService(settings, transport, threadPool,
                 networkModule.getTransportInterceptor(),
                 boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
-                    UUIDs.randomBase64UUID()), null);
+                    UUIDs.randomBase64UUID()), null, Collections.emptySet());
             modules.add((b -> {
                 b.bind(BigArrays.class).toInstance(bigArrays);
                 b.bind(PluginsService.class).toInstance(pluginsService);

+ 3 - 2
server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -35,6 +35,7 @@ import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -408,8 +409,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     @Override

+ 3 - 3
server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.ToXContent.Params;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.tasks.CancellableTask;
@@ -38,6 +37,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static java.lang.Math.min;
@@ -62,8 +62,8 @@ public class BulkByScrollTask extends CancellableTask {
     private volatile LeaderBulkByScrollTaskState leaderState;
     private volatile WorkerBulkByScrollTaskState workerState;
 
-    public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
-        super(id, type, action, description, parentTaskId);
+    public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+        super(id, type, action, description, parentTaskId, headers);
     }
 
     @Override

+ 5 - 4
server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

@@ -45,6 +45,7 @@ import org.elasticsearch.transport.TransportService;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -271,8 +272,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return new ResyncTask(id, type, action, getDescription(), parentTaskId);
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new ResyncTask(id, type, action, getDescription(), parentTaskId, headers);
         }
 
         @Override
@@ -297,8 +298,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
         private volatile int resyncedOperations;
         private volatile int skippedOperations;
 
-        public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
-            super(id, type, action, description, parentTaskId);
+        public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+            super(id, type, action, description, parentTaskId, headers);
         }
 
         /**

+ 8 - 3
server/src/main/java/org/elasticsearch/node/Node.java

@@ -156,6 +156,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -424,8 +425,12 @@ public class Node implements Closeable {
                 metaDataIndexUpgradeService, metaDataUpgrader);
             new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
             final Transport transport = networkModule.getTransportSupplier().get();
+            Set<String> taskHeaders = Stream.concat(
+                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
+                Stream.of("X-Opaque-Id")
+            ).collect(Collectors.toSet());
             final TransportService transportService = newTransportService(settings, transport, threadPool,
-                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
+                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
             final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
             final SearchTransportService searchTransportService =  new SearchTransportService(settings, transportService,
                 SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
@@ -543,8 +548,8 @@ public class Node implements Closeable {
     protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                    TransportInterceptor interceptor,
                                                    Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
-                                                   ClusterSettings clusterSettings) {
-        return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
+                                                   ClusterSettings clusterSettings, Set<String> taskHeaders) {
+        return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
     }
 
     protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {

+ 7 - 0
server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java

@@ -84,6 +84,13 @@ public interface ActionPlugin {
         return Collections.emptyList();
     }
 
+    /**
+     * Returns headers which should be copied from internal requests into tasks.
+     */
+    default Collection<String> getTaskHeaders() {
+        return Collections.emptyList();
+    }
+
     /**
      * Returns a function used to wrap each rest request before handling the request.
      *

+ 12 - 1
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java

@@ -103,10 +103,21 @@ public class RestListTasksAction extends BaseRestHandler {
                     return new BytesRestResponse(RestStatus.OK, builder);
                 }
             };
+        } else if ("none".equals(groupBy)) {
+            return new RestBuilderListener<T>(channel) {
+                @Override
+                public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
+                    builder.startObject();
+                    response.toXContentGroupedByNone(builder, channel.request());
+                    builder.endObject();
+                    return new BytesRestResponse(RestStatus.OK, builder);
+                }
+            };
+
         } else if ("parents".equals(groupBy)) {
             return new RestToXContentListener<>(channel);
         } else {
-            throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]");
+            throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]");
         }
     }
 

+ 3 - 2
server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java

@@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Shard level fetch base request. Holds all the info needed to execute a fetch.
@@ -115,8 +116,8 @@ public class ShardFetchRequest extends TransportRequest {
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new SearchTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     @Override

+ 3 - 2
server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java

@@ -29,6 +29,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
+import java.util.Map;
 
 public class InternalScrollSearchRequest extends TransportRequest {
 
@@ -76,8 +77,8 @@ public class InternalScrollSearchRequest extends TransportRequest {
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new SearchTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     @Override

+ 3 - 2
server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java

@@ -40,6 +40,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Shard level search request that represents an actual search sent from the coordinating node to the nodes holding
@@ -177,8 +178,8 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new SearchTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     @Override

+ 3 - 2
server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java

@@ -32,6 +32,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
+import java.util.Map;
 
 import static org.elasticsearch.search.dfs.AggregatedDfs.readAggregatedDfs;
 
@@ -91,8 +92,8 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
     }
 
     @Override
-    public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new SearchTask(id, type, action, getDescription(), parentTaskId);
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     public String getDescription() {

+ 3 - 2
server/src/main/java/org/elasticsearch/tasks/CancellableTask.java

@@ -21,6 +21,7 @@ package org.elasticsearch.tasks;
 
 import org.elasticsearch.common.Nullable;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -30,8 +31,8 @@ public abstract class CancellableTask extends Task {
 
     private final AtomicReference<String> reason = new AtomicReference<>();
 
-    public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId) {
-        super(id, type, action, description, parentTaskId);
+    public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+        super(id, type, action, description, parentTaskId, headers);
     }
 
     /**

+ 17 - 4
server/src/main/java/org/elasticsearch/tasks/Task.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Current task information
@@ -43,6 +44,8 @@ public class Task {
 
     private final TaskId parentTask;
 
+    private final Map<String, String> headers;
+
     /**
      * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
      */
@@ -53,11 +56,12 @@ public class Task {
      */
     private final long startTimeNanos;
 
-    public Task(long id, String type, String action, String description, TaskId parentTask) {
-        this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime());
+    public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
+        this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers);
     }
 
-    public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos) {
+    public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos,
+                Map<String, String> headers) {
         this.id = id;
         this.type = type;
         this.action = action;
@@ -65,6 +69,7 @@ public class Task {
         this.parentTask = parentTask;
         this.startTime = startTime;
         this.startTimeNanos = startTimeNanos;
+        this.headers = headers;
     }
 
     /**
@@ -92,7 +97,7 @@ public class Task {
      */
     protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
         return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime,
-                System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask);
+                System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers);
     }
 
     /**
@@ -149,6 +154,14 @@ public class Task {
 
     public interface Status extends ToXContentObject, NamedWriteable {}
 
+
+    /**
+     * Returns stored task header associated with the task
+     */
+    public String getHeader(String header) {
+        return headers.get(header);
+    }
+
     public TaskResult result(DiscoveryNode node, Exception error) throws IOException {
         return new TaskResult(taskInfo(node.getId(), true), error);
     }

+ 4 - 2
server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.tasks;
 
+import java.util.Map;
+
 /**
  * An interface for a request that can be used to register a task manager task
  */
@@ -47,8 +49,8 @@ public interface TaskAwareRequest {
      * A request can override this method and return null to avoid being tracked by the task
      * manager.
      */
-    default Task createTask(long id, String type, String action, TaskId parentTaskId) {
-        return new Task(id, type, action, getDescription(), parentTaskId);
+    default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new Task(id, type, action, getDescription(), parentTaskId, headers);
     }
 
     /**

+ 38 - 5
server/src/main/java/org/elasticsearch/tasks/TaskInfo.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.tasks;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -31,6 +32,8 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -65,8 +68,10 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
 
     private final TaskId parentTaskId;
 
+    private final Map<String, String> headers;
+
     public TaskInfo(TaskId taskId, String type, String action, String description, Task.Status status, long startTime,
-                    long runningTimeNanos, boolean cancellable, TaskId parentTaskId) {
+                    long runningTimeNanos, boolean cancellable, TaskId parentTaskId, Map<String, String> headers) {
         this.taskId = taskId;
         this.type = type;
         this.action = action;
@@ -76,6 +81,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         this.runningTimeNanos = runningTimeNanos;
         this.cancellable = cancellable;
         this.parentTaskId = parentTaskId;
+        this.headers = headers;
     }
 
     /**
@@ -91,6 +97,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         runningTimeNanos = in.readLong();
         cancellable = in.readBoolean();
         parentTaskId = TaskId.readFromStream(in);
+        if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            headers = in.readMap(StreamInput::readString, StreamInput::readString);
+        } else {
+            headers = Collections.emptyMap();
+        }
     }
 
     @Override
@@ -104,6 +115,9 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         out.writeLong(runningTimeNanos);
         out.writeBoolean(cancellable);
         parentTaskId.writeTo(out);
+        if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
+        }
     }
 
     public TaskId getTaskId() {
@@ -162,6 +176,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         return parentTaskId;
     }
 
+    /**
+     * Returns the task headers
+     */
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.field("node", taskId.getNodeId());
@@ -180,6 +201,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         if (parentTaskId.isSet()) {
             builder.field("parent_task_id", parentTaskId.toString());
         }
+        builder.startObject("headers");
+        for(Map.Entry<String, String> attribute : headers.entrySet()) {
+            builder.field(attribute.getKey(), attribute.getValue());
+        }
+        builder.endObject();
         return builder;
     }
 
@@ -195,10 +221,15 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
                 long runningTimeNanos = (Long) a[i++];
                 boolean cancellable = (Boolean) a[i++];
                 String parentTaskIdString = (String) a[i++];
-
+                @SuppressWarnings("unchecked") Map<String, String> headers = (Map<String, String>) a[i++];
+                if (headers == null) {
+                    // This might happen if we are reading an old version of task info
+                    headers = Collections.emptyMap();
+                }
                 RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
                 TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
-                return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId);
+                return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId,
+                    headers);
             });
     static {
         // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format
@@ -212,6 +243,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
         PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos"));
         PARSER.declareBoolean(constructorArg(), new ParseField("cancellable"));
         PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
+        PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
     }
 
     @Override
@@ -234,11 +266,12 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
                 && Objects.equals(runningTimeNanos, other.runningTimeNanos)
                 && Objects.equals(parentTaskId, other.parentTaskId)
                 && Objects.equals(cancellable, other.cancellable)
-                && Objects.equals(status, other.status);
+                && Objects.equals(status, other.status)
+                && Objects.equals(headers, other.headers);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status);
+        return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers);
     }
 }

+ 32 - 2
server/src/main/java/org/elasticsearch/tasks/TaskManager.java

@@ -32,19 +32,26 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
+import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
 
 /**
  * Task Manager service for keeping track of currently running tasks on the nodes
@@ -52,6 +59,10 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 public class TaskManager extends AbstractComponent implements ClusterStateApplier {
     private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
 
+    /** Rest headers that are copied to the task */
+    private final List<String> taskHeaders;
+    private final ThreadPool threadPool;
+
     private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
 
     private final ConcurrentMapLong<CancellableTaskHolder> cancellableTasks = ConcurrentCollections
@@ -65,8 +76,13 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
 
     private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
 
-    public TaskManager(Settings settings) {
+    private final ByteSizeValue maxHeaderSize;
+
+    public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
         super(settings);
+        this.threadPool = threadPool;
+        this.taskHeaders = new ArrayList<>(taskHeaders);
+        this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
     }
 
     public void setTaskResultsService(TaskResultsService taskResultsService) {
@@ -80,7 +96,21 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
      * Returns the task manager tracked task or null if the task doesn't support the task manager
      */
     public Task register(String type, String action, TaskAwareRequest request) {
-        Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask());
+        Map<String, String> headers = new HashMap<>();
+        long headerSize = 0;
+        long maxSize = maxHeaderSize.getBytes();
+        ThreadContext threadContext = threadPool.getThreadContext();
+        for (String key : taskHeaders) {
+            String httpHeader = threadContext.getHeader(key);
+            if (httpHeader != null) {
+                headerSize += key.length() * 2 + httpHeader.length() * 2;
+                if (headerSize > maxSize) {
+                    throw new IllegalArgumentException("Request exceeded the maximum size of task headers " + maxHeaderSize);
+                }
+                headers.put(key, httpHeader);
+            }
+        }
+        Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
         if (task == null) {
             return null;
         }

+ 18 - 1
server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

@@ -34,6 +34,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
@@ -48,6 +49,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
 
 /**
  * Service that can store task results.
@@ -60,6 +62,10 @@ public class TaskResultsService extends AbstractComponent {
 
     public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json";
 
+    public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";
+
+    public static final int TASK_RESULT_MAPPING_VERSION = 2;
+
     private final Client client;
 
     private final ClusterService clusterService;
@@ -109,7 +115,7 @@ public class TaskResultsService extends AbstractComponent {
             });
         } else {
             IndexMetaData metaData = state.getMetaData().index(TASK_INDEX);
-            if (metaData.getMappings().containsKey(TASK_TYPE) == false) {
+            if (getTaskResultMappingVersion(metaData) < TASK_RESULT_MAPPING_VERSION) {
                 // The index already exists but doesn't have our mapping
                 client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE)
                     .setSource(taskResultIndexMapping(), XContentType.JSON)
@@ -131,6 +137,17 @@ public class TaskResultsService extends AbstractComponent {
         }
     }
 
+    private int getTaskResultMappingVersion(IndexMetaData metaData) {
+        MappingMetaData mappingMetaData = metaData.getMappings().get(TASK_TYPE);
+        if (mappingMetaData == null) {
+            return 0;
+        }
+        @SuppressWarnings("unchecked") Map<String, Object> meta = (Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
+        if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) {
+            return 1; // The mapping was created before meta field was introduced
+        }
+        return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD);
+    }
 
     private void doStoreResult(TaskResult taskResult, ActionListener<Void> listener) {
         IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE, taskResult.getTask().getTaskId().toString());

+ 6 - 4
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -61,6 +61,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -149,7 +150,8 @@ public class TransportService extends AbstractLifecycleComponent {
  *        updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
      */
     public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
-                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings) {
+                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
+                            Set<String> taskHeaders) {
         super(settings);
         this.transport = transport;
         this.threadPool = threadPool;
@@ -158,7 +160,7 @@ public class TransportService extends AbstractLifecycleComponent {
         setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings));
         setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
         tracerLog = Loggers.getLogger(logger, ".tracer");
-        taskManager = createTaskManager();
+        taskManager = createTaskManager(settings, threadPool, taskHeaders);
         this.interceptor = transportInterceptor;
         this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
         this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
@@ -184,8 +186,8 @@ public class TransportService extends AbstractLifecycleComponent {
         return taskManager;
     }
 
-    protected TaskManager createTaskManager() {
-        return new TaskManager(settings);
+    protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
+        return new TaskManager(settings, threadPool, taskHeaders);
     }
 
     /**

+ 7 - 0
server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json

@@ -1,5 +1,8 @@
 {
   "task" : {
+    "_meta": {
+      "version": 2
+    },
     "dynamic" : "strict",
     "properties" : {
       "completed": {
@@ -37,6 +40,10 @@
           },
           "description": {
             "type": "text"
+          },
+          "headers": {
+            "type" : "object",
+            "enabled" : false
           }
         }
       },

+ 5 - 4
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java

@@ -45,6 +45,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -91,8 +92,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
                 @Override
                 public boolean shouldCancelChildrenOnCancellation() {
                     return false;
@@ -131,8 +132,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
                 @Override
                 public boolean shouldCancelChildrenOnCancellation() {
                     return true;

+ 9 - 6
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

@@ -56,9 +56,11 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -175,15 +177,16 @@ public abstract class TaskManagerTestCase extends ESTestCase {
                 };
             transportService = new TransportService(settings,
                 new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
-                        new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
-                        new NetworkService(Collections.emptyList())),
-                threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) {
+                    new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
+                    new NetworkService(Collections.emptyList())),
+                threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null,
+                Collections.emptySet()) {
                 @Override
-                protected TaskManager createTaskManager() {
+                protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
                     if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
-                        return new MockTaskManager(settings);
+                        return new MockTaskManager(settings, threadPool, taskHeaders);
                     } else {
-                        return super.createTaskManager();
+                        return super.createTaskManager(settings, threadPool, taskHeaders);
                     }
                 }
             };

+ 4 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.ESTestCase;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Map;
 
 public class TaskTests extends ESTestCase {
@@ -36,7 +37,8 @@ public class TaskTests extends ESTestCase {
         long runningTime = randomNonNegativeLong();
         boolean cancellable = randomBoolean();
         TaskInfo taskInfo = new TaskInfo(new TaskId(nodeId, taskId), "test_type",
-            "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID);
+            "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID,
+            Collections.singletonMap("foo", "bar"));
         String taskInfoString = taskInfo.toString();
         Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
         assertEquals(((Number)map.get("id")).longValue(), taskId);
@@ -46,6 +48,7 @@ public class TaskTests extends ESTestCase {
         assertEquals(((Number)map.get("start_time_in_millis")).longValue(), startTime);
         assertEquals(((Number)map.get("running_time_in_nanos")).longValue(), runningTime);
         assertEquals(map.get("cancellable"), cancellable);
+        assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar"));
     }
 
 }

+ 44 - 17
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

@@ -84,6 +84,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
@@ -355,19 +356,26 @@ public class TasksIT extends ESIntegTestCase {
         client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
             .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
 
-        assertSearchResponse(client().prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get());
+        Map<String, String> headers = new HashMap<>();
+        headers.put("X-Opaque-Id", "my_id");
+        headers.put("Foo-Header", "bar");
+        headers.put("Custom-Task-Header", "my_value");
+        assertSearchResponse(
+            client().filterWithHeader(headers).prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get());
 
         // the search operation should produce one main task
         List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1);
         assertEquals(1, mainTask.size());
         assertThat(mainTask.get(0).getDescription(), startsWith("indices[test], types[doc], search_type["));
         assertThat(mainTask.get(0).getDescription(), containsString("\"query\":{\"match_all\""));
+        assertTaskHeaders(mainTask.get(0));
 
         // check that if we have any shard-level requests they all have non-zero length description
         List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1);
         for (TaskInfo taskInfo : shardTasks) {
             assertThat(taskInfo.getParentTaskId(), notNullValue());
             assertEquals(mainTask.get(0).getTaskId(), taskInfo.getParentTaskId());
+            assertTaskHeaders(taskInfo);
             switch (taskInfo.getAction()) {
                 case SearchTransportService.QUERY_ACTION_NAME:
                 case SearchTransportService.DFS_ACTION_NAME:
@@ -392,6 +400,25 @@ public class TasksIT extends ESIntegTestCase {
 
     }
 
+    public void testSearchTaskHeaderLimit() {
+        int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1);
+
+        Map<String, String> headers = new HashMap<>();
+        headers.put("X-Opaque-Id", "my_id");
+        headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
+        IllegalArgumentException ex = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().filterWithHeader(headers).admin().cluster().prepareListTasks().get()
+        );
+        assertThat(ex.getMessage(), startsWith("Request exceeded the maximum size of task headers "));
+    }
+
+    private void assertTaskHeaders(TaskInfo taskInfo) {
+        assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
+        assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id"));
+        assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
+    }
+
     /**
      * Very basic "is it plugged in" style test that indexes a document and makes sure that you can fetch the status of the process. The
      * goal here is to verify that the large moving parts that make fetching task status work fit together rather than to verify any
@@ -802,24 +829,24 @@ public class TasksIT extends ESIntegTestCase {
         // Save a fake task that looks like it is from a node that isn't part of the cluster
         CyclicBarrier b = new CyclicBarrier(2);
         TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
-        resultsService.storeResult(
-                new TaskResult(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID),
-                        new RuntimeException("test")),
-                new ActionListener<Void>() {
-                    @Override
-                    public void onResponse(Void response) {
-                        try {
-                            b.await();
-                        } catch (InterruptedException | BrokenBarrierException e) {
-                            onFailure(e);
-                        }
+        resultsService.storeResult(new TaskResult(
+                new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
+                new RuntimeException("test")),
+            new ActionListener<Void>() {
+                @Override
+                public void onResponse(Void response) {
+                    try {
+                        b.await();
+                    } catch (InterruptedException | BrokenBarrierException e) {
+                        onFailure(e);
                     }
+                }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                });
+                @Override
+                public void onFailure(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
         b.await();
 
         // Now we can find it!

+ 13 - 6
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java

@@ -59,9 +59,11 @@ import org.elasticsearch.transport.TransportService;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.elasticsearch.test.ESTestCase.awaitBusy;
 
@@ -76,12 +78,17 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
                 new ActionHandler<>(UnblockTestTasksAction.INSTANCE, TransportUnblockTestTasksAction.class));
     }
 
+    @Override
+    public Collection<String> getTaskHeaders() {
+        return Collections.singleton("Custom-Task-Header");
+    }
+
     static class TestTask extends CancellableTask {
 
         private volatile boolean blocked = true;
 
-        TestTask(long id, String type, String action, String description, TaskId parentTaskId) {
-            super(id, type, action, description, parentTaskId);
+        TestTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
+            super(id, type, action, description, parentTaskId, headers);
         }
 
         @Override
@@ -178,8 +185,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return new TestTask(id, type, action, this.getDescription(), parentTaskId);
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new TestTask(id, type, action, this.getDescription(), parentTaskId, headers);
         }
     }
 
@@ -247,8 +254,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
-            return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) {
                 @Override
                 public boolean shouldCancelChildrenOnCancellation() {
                     return true;

+ 4 - 4
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java

@@ -109,9 +109,9 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
             if (enableTaskManager) {
-                return super.createTask(id, type, action, parentTaskId);
+                return super.createTask(id, type, action, parentTaskId, headers);
             } else {
                 return null;
             }
@@ -156,9 +156,9 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
         }
 
         @Override
-        public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
             if (enableTaskManager) {
-                return super.createTask(id, type, action, parentTaskId);
+                return super.createTask(id, type, action, parentTaskId, headers);
             } else {
                 return null;
             }

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -82,7 +82,7 @@ public class TransportBulkActionTests extends ESTestCase {
         CapturingTransport capturingTransport = new CapturingTransport();
         transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
                 TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> clusterService.localNode(), null);
+            boundAddress -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         bulkAction = new TestTransportBulkAction();

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

@@ -49,6 +49,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -92,7 +93,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
         CapturingTransport capturingTransport = new CapturingTransport();
         TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
                 TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> clusterService.localNode(), null);
+            boundAddress -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);

+ 2 - 1
server/src/test/java/org/elasticsearch/action/main/MainActionTests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Mockito.mock;
@@ -68,7 +69,7 @@ public class MainActionTests extends ESTestCase {
         when(clusterService.state()).thenReturn(state);
 
         TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            x -> null, null);
+            x -> null, null, Collections.emptySet());
         TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), transportService, mock(ActionFilters.class),
                 mock(IndexNameExpressionResolver.class), clusterService);
         AtomicReference<MainResponse> responseRef = new AtomicReference<>();

+ 1 - 1
server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

@@ -73,7 +73,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
 
     @Override
     public SearchTask getTask() {
-        return new SearchTask(0, "n/a", "n/a", "test", null);
+        return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
     }
 
     @Override

+ 2 - 1
server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java

@@ -130,7 +130,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
         Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build();
         TaskManager taskManager = mock(TaskManager.class);
         TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-                boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) {
+            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
+            Collections.emptySet()) {
             @Override
             public TaskManager getTaskManager() {
                 return taskManager;

+ 25 - 4
server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java

@@ -35,8 +35,11 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.junit.After;
+import org.junit.Before;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,6 +57,22 @@ import static org.mockito.Mockito.when;
 
 public class TransportMultiSearchActionTests extends ESTestCase {
 
+    protected ThreadPool threadPool;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        threadPool = new TestThreadPool(getTestName());
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        threadPool.shutdown();
+        super.tearDown();
+    }
+
     public void testBatchExecute() throws Exception {
         // Initialize dependencies of TransportMultiSearchAction
         Settings settings = Settings.builder()
@@ -63,8 +82,10 @@ public class TransportMultiSearchActionTests extends ESTestCase {
         when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
         ThreadPool threadPool = new ThreadPool(settings);
         TaskManager taskManager = mock(TaskManager.class);
-        TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) {
+        TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool,
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
+            Collections.emptySet()) {
             @Override
             public TaskManager getTaskManager() {
                 return taskManager;
@@ -102,8 +123,8 @@ public class TransportMultiSearchActionTests extends ESTestCase {
                 });
             }
         };
-        
-        TransportMultiSearchAction action = 
+
+        TransportMultiSearchAction action =
                 new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10,
                 System::nanoTime);
 

+ 16 - 2
server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java

@@ -25,12 +25,16 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
 import org.junit.Before;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -48,10 +52,17 @@ import static org.hamcrest.CoreMatchers.notNullValue;
 public class TransportActionFilterChainTests extends ESTestCase {
 
     private AtomicInteger counter;
+    private ThreadPool threadPool;
 
     @Before
     public void init() throws Exception {
          counter = new AtomicInteger();
+        threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "TransportActionFilterChainTests").build());
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        terminate(threadPool);
     }
 
     public void testActionFiltersRequest() throws ExecutionException, InterruptedException {
@@ -68,7 +79,9 @@ public class TransportActionFilterChainTests extends ESTestCase {
 
         String actionName = randomAlphaOfLength(randomInt(30));
         ActionFilters actionFilters = new ActionFilters(filters);
-        TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
+        TransportAction<TestRequest, TestResponse> transportAction =
+            new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null,
+                new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) {
             @Override
             protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
                 listener.onResponse(new TestResponse());
@@ -144,7 +157,8 @@ public class TransportActionFilterChainTests extends ESTestCase {
 
         String actionName = randomAlphaOfLength(randomInt(30));
         ActionFilters actionFilters = new ActionFilters(filters);
-        TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
+        TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY,
+            actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) {
             @Override
             protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
                 listener.onResponse(new TestResponse());

+ 2 - 1
server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

@@ -67,6 +67,7 @@ import org.junit.BeforeClass;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -191,7 +192,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(THREAD_POOL);
         final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
-            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         setClusterState(clusterService, TEST_INDEX);

+ 1 - 1
server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

@@ -88,7 +88,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(threadPool);
         transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
-                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(),

+ 1 - 1
server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java

@@ -181,7 +181,7 @@ public class TransportNodesActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(THREAD_POOL);
         transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
-            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         int numNodes = randomIntBetween(3, 10);

+ 1 - 1
server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java

@@ -96,7 +96,7 @@ public class BroadcastReplicationTests extends ESTestCase {
             new NetworkService(Collections.emptyList()));
         clusterService = createClusterService(threadPool);
         transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
-                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService,

+ 3 - 3
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -163,7 +163,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(threadPool);
         transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
-            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
@@ -977,7 +977,7 @@ public class TransportReplicationActionTests extends ESTestCase {
                 new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
                 Version.CURRENT);
         transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-                x -> clusterService.localNode(),null);
+                x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
 
@@ -1040,7 +1040,7 @@ public class TransportReplicationActionTests extends ESTestCase {
      * half the time.
      */
     private ReplicationTask maybeTask() {
-        return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null) : null;
+        return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, null) : null;
     }
 
     /**

+ 4 - 2
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -62,6 +62,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.mockito.ArgumentCaptor;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Locale;
 import java.util.concurrent.ExecutionException;
@@ -254,7 +255,7 @@ public class TransportWriteActionTests extends ESTestCase {
     public void testReplicaProxy() throws InterruptedException, ExecutionException {
         CapturingTransport transport = new CapturingTransport();
         TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
-                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
@@ -355,7 +356,8 @@ public class TransportWriteActionTests extends ESTestCase {
 
         protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
             super(Settings.EMPTY, "test",
-                    new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null,
+                    new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                        Collections.emptySet()), null,
                     null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
                     TestRequest::new, ThreadPool.Names.SAME);
             this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;

+ 3 - 1
server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java

@@ -53,6 +53,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -144,7 +145,8 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(THREAD_POOL);
         transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
-                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()
+        );
         transportService.start();
         transportService.acceptIncomingRequests();
         action = new TestTransportInstanceSingleOperationAction(

+ 1 - 1
server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java

@@ -59,7 +59,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase {
     private static class InternalTransportAction extends TransportAction {
 
         private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) {
-            super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings));
+            super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings, threadPool, Collections.emptySet()));
         }
 
         @Override

+ 1 - 1
server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

@@ -163,7 +163,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
             }, (addr) -> {
                 assert addr == null : "boundAddress: " + addr;
                 return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID());
-            }, null);
+            }, null, Collections.emptySet());
             transportService.start();
             transportService.acceptIncomingRequests();
             transportClientNodesService =

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

@@ -155,7 +155,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
         this.threadPool = new TestThreadPool(getClass().getName());
         this.transport = new MockTransport();
         transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null);
+            boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null,
+            Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
     }

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java

@@ -48,6 +48,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,7 +109,7 @@ public class ShardStateActionTests extends ESTestCase {
         this.transport = new CapturingTransport();
         clusterService = createClusterService(THREAD_POOL);
         transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
-                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+                TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java

@@ -60,6 +60,7 @@ import org.junit.BeforeClass;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -94,7 +95,7 @@ public class ClusterStateHealthTests extends ESTestCase {
         super.setUp();
         clusterService = createClusterService(threadPool);
         transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool,
-            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java

@@ -145,7 +145,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
                 (boundAddress) ->
                     new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), boundAddress.publishAddress(),
                         Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),
-                null);
+                null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         return transportService;

+ 12 - 6
server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

@@ -402,7 +402,8 @@ public class UnicastZenPingTests extends ESTestCase {
         };
         closeables.push(transport);
         final TransportService transportService =
-            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
+            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                Collections.emptySet());
         closeables.push(transportService);
         final int limitPortCounts = randomIntBetween(1, 10);
         final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
@@ -447,7 +448,8 @@ public class UnicastZenPingTests extends ESTestCase {
         };
         closeables.push(transport);
         final TransportService transportService =
-            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
+            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                Collections.emptySet());
         closeables.push(transportService);
         final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
             executorService,
@@ -497,7 +499,8 @@ public class UnicastZenPingTests extends ESTestCase {
         closeables.push(transport);
 
         final TransportService transportService =
-            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
+            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                Collections.emptySet());
         closeables.push(transportService);
 
         final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
@@ -555,7 +558,8 @@ public class UnicastZenPingTests extends ESTestCase {
         closeables.push(transport);
 
         final TransportService transportService =
-            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
+            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                Collections.emptySet());
         closeables.push(transportService);
         final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
         try {
@@ -723,7 +727,8 @@ public class UnicastZenPingTests extends ESTestCase {
         closeables.push(transport);
 
         final TransportService transportService =
-            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
+            new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
+                Collections.emptySet());
         closeables.push(transportService);
         final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
             executorService,
@@ -772,7 +777,8 @@ public class UnicastZenPingTests extends ESTestCase {
         final Transport transport = supplier.apply(nodeSettings, version);
         final MockTransportService transportService =
             new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,  boundAddress ->
-                new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null);
+                new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null,
+                Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();

+ 2 - 1
server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java

@@ -26,6 +26,7 @@ import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static java.util.Collections.emptyList;
@@ -42,7 +43,7 @@ public class LeaderBulkByScrollTaskStateTests extends ESTestCase {
     @Before
     public void createTask() {
         slices = between(2, 50);
-        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
+        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
         task.setWorkerCount(slices);
         taskState = task.getLeaderState();
     }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CyclicBarrier;
@@ -52,7 +53,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
 
     @Before
     public void createTask() {
-        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
+        task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
         task.setWorker(Float.POSITIVE_INFINITY, null);
         workerState = task.getWorkerState();
     }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -130,7 +130,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         private final AtomicInteger replicaId = new AtomicInteger();
         private final AtomicInteger docId = new AtomicInteger();
         boolean closed = false;
-        private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY),
+        private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY,
+            new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
             (request, parentTask, primaryAllocationId, primaryTerm, listener) -> {
                 try {
                     new ResyncAction(request, listener, ReplicationGroup.this).execute();

+ 1 - 1
server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java

@@ -58,7 +58,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
         transport = new CapturingTransport();
         clusterService = createClusterService(threadPool);
         transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
-            TransportService.NOOP_TRANSPORT_INTERCEPTOR,  boundAddress -> clusterService.localNode(), null);
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,  boundAddress -> clusterService.localNode(), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);

+ 7 - 4
server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

@@ -50,7 +50,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
 
     public void testSyncerSendsOffCorrectDocuments() throws Exception {
         IndexShard shard = newStartedShard(true);
-        TaskManager taskManager = new TaskManager(Settings.EMPTY);
+        TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
         AtomicBoolean syncActionCalled = new AtomicBoolean();
         PrimaryReplicaSyncer.SyncAction syncAction =
             (request, parentTask, allocationId, primaryTerm, listener) -> {
@@ -112,7 +112,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
                 syncCalledLatch.countDown();
                 threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
             };
-        PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction);
+        PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY,
+            new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction);
         syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately
 
         int numDocs = 10;
@@ -158,7 +159,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
     }
 
     public void testStatusEquals() throws IOException {
-        PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
+        PrimaryReplicaSyncer.ResyncTask task =
+            new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap());
         task.setPhase(randomAlphaOfLength(10));
         task.setResyncedOperations(randomIntBetween(0, 1000));
         task.setTotalOperations(randomIntBetween(0, 1000));
@@ -181,7 +183,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
     }
 
     public void testStatusReportsCorrectNumbers() throws IOException {
-        PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
+        PrimaryReplicaSyncer.ResyncTask task =
+            new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap());
         task.setPhase(randomAlphaOfLength(10));
         task.setResyncedOperations(randomIntBetween(0, 1000));
         task.setTotalOperations(randomIntBetween(0, 1000));

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -159,7 +159,8 @@ public class ClusterStateChanges extends AbstractComponent {
         // services
         TransportService transportService = new TransportService(settings, transport, threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings);
+            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings,
+            Collections.emptySet());
         MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry, null, null,
             null) {
             // metaData upgrader should do nothing

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

@@ -401,7 +401,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
         final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build();
         final TransportService transportService = new TransportService(settings, null, threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null);
+            boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
+            Collections.emptySet());
         final ClusterService clusterService = mock(ClusterService.class);
         final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
             transportService, null);

+ 2 - 2
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -213,11 +213,11 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
                     SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
                             new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
                                     new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f),
-                        new SearchTask(123L, "", "", "", null));
+                        new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
                     IntArrayList intCursors = new IntArrayList(1);
                     intCursors.add(0);
                     ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */);
-                    service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null));
+                    service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
                 } catch (AlreadyClosedException ex) {
                     throw ex;
                 } catch (IllegalStateException ex) {

+ 9 - 8
server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java

@@ -62,6 +62,7 @@ import org.elasticsearch.test.TestSearchContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.Matchers.anyOf;
@@ -94,7 +95,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         TestSearchContext context = new TestSearchContext(null, indexShard);
         context.parsedQuery(new ParsedQuery(query));
         context.setSize(0);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
 
         final IndexSearcher searcher = shouldCollect ? new IndexSearcher(reader) :
             getAssertingEarlyTerminationSearcher(reader, 0);
@@ -166,7 +167,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         IndexReader reader = DirectoryReader.open(dir);
         IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0);
         TestSearchContext context = new TestSearchContext(null, indexShard);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
 
         QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
@@ -195,7 +196,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         TestSearchContext context = new TestSearchContext(null, indexShard);
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.setSize(0);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
         assertEquals(1, context.queryResult().topDocs().totalHits);
 
@@ -209,7 +210,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
 
     public void testQueryCapturesThreadPoolStats() throws Exception {
         TestSearchContext context = new TestSearchContext(null, indexShard);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
 
         Directory dir = newDirectory();
@@ -251,7 +252,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         scrollContext.maxScore = Float.NaN;
         scrollContext.totalHits = -1;
         context.scrollContext(scrollContext);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         int size = randomIntBetween(2, 5);
         context.setSize(size);
 
@@ -290,7 +291,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         }
         w.close();
         TestSearchContext context = new TestSearchContext(null, indexShard);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.terminateAfter(1);
 
@@ -384,7 +385,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         TestSearchContext context = new TestSearchContext(null, indexShard);
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.setSize(1);
-        context.setTask(new SearchTask(123L, "", "", "", null));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW}));
 
         final IndexReader reader = DirectoryReader.open(dir);
@@ -471,7 +472,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
             scrollContext.maxScore = Float.NaN;
             scrollContext.totalHits = -1;
             context.scrollContext(scrollContext);
-            context.setTask(new SearchTask(123L, "", "", "", null));
+            context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
             context.setSize(10);
             context.sort(searchSortAndFormat);
 

+ 5 - 2
server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java

@@ -22,6 +22,8 @@ package org.elasticsearch.tasks;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.Collections;
+
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 
@@ -33,10 +35,11 @@ public class ListTasksResponseTests extends ESTestCase {
 
     public void testNonEmptyToString() {
         TaskInfo info = new TaskInfo(
-            new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0));
+            new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0),
+            Collections.singletonMap("foo", "bar"));
         ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
         assertEquals("{\"tasks\":{\"node1:1\":{\"node\":\"node1\",\"id\":1,\"type\":\"dummy-type\",\"action\":\"dummy-action\","
                 + "\"description\":\"dummy-description\",\"start_time_in_millis\":0,\"running_time_in_nanos\":1,\"cancellable\":true,"
-                + "\"parent_task_id\":\"node1:0\"}}}", tasksResponse.toString());
+                + "\"parent_task_id\":\"node1:0\",\"headers\":{\"foo\":\"bar\"}}}}", tasksResponse.toString());
     }
 }

+ 3 - 1
server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java

@@ -134,7 +134,9 @@ public class TaskResultTests extends ESTestCase {
         long runningTimeNanos = randomLong();
         boolean cancellable = randomBoolean();
         TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
-        return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId);
+        Map<String, String> headers =
+            randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
+        return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
     }
 
     private static TaskId randomTaskId() {

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

@@ -73,7 +73,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
             boundAddress.publishAddress(),
             emptyMap(),
             emptySet(),
-            version), null);
+            version), null, Collections.emptySet());
         transportService.start();
         transportService.acceptIncomingRequests();
         transportServices.add(transportService);

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java

@@ -27,6 +27,8 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
+import java.util.Collections;
+
 public abstract class AbstractAsyncBulkByScrollActionTestCase<
                 Request extends AbstractBulkByScrollRequest<Request>,
                 Response extends BulkByScrollResponse>
@@ -37,7 +39,7 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase<
     @Before
     public void setupForTest() {
         threadPool = new TestThreadPool(getTestName());
-        task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID);
+        task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
         task.setWorker(Float.POSITIVE_INFINITY, null);
 
     }

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/node/MockNode.java

@@ -50,6 +50,7 @@ import org.elasticsearch.transport.TransportService;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -115,15 +116,15 @@ public class MockNode extends Node {
     protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                    TransportInterceptor interceptor,
                                                    Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
-                                                   ClusterSettings clusterSettings) {
+                                                   ClusterSettings clusterSettings, Set<String> taskHeaders) {
         // we use the MockTransportService.TestPlugin class as a marker to create a network
         // module with this MockNetworkService. NetworkService is such an integral part of the systme
         // we don't allow to plug it in from plugins or anything. this is a test-only override and
         // can't be done in a production env.
         if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) {
-            return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
+            return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
         } else {
-            return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
+            return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
         }
     }
 

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java

@@ -27,8 +27,10 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskAwareRequest;
 import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -41,8 +43,8 @@ public class MockTaskManager extends TaskManager {
 
     private final Collection<MockTaskManagerListener> listeners = new CopyOnWriteArrayList<>();
 
-    public MockTaskManager(Settings settings) {
-        super(settings);
+    public MockTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
+        super(settings, threadPool, taskHeaders);
     }
 
     @Override

+ 10 - 9
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -111,16 +111,16 @@ public final class MockTransportService extends TransportService {
         NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
         final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
             new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
-        return createNewService(settings, transport, version, threadPool, clusterSettings);
+        return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
     }
 
     public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
-                                                        @Nullable ClusterSettings clusterSettings) {
+                                                        @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
         return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             boundAddress ->
                 new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
                     Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),
-            clusterSettings);
+            clusterSettings, taskHeaders);
     }
 
     private final Transport original;
@@ -135,7 +135,7 @@ public final class MockTransportService extends TransportService {
                                 @Nullable ClusterSettings clusterSettings) {
         this(settings, transport, threadPool, interceptor, (boundAddress) ->
             DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(),
-                UUIDs.randomBase64UUID())), clusterSettings);
+                UUIDs.randomBase64UUID())), clusterSettings, Collections.emptySet());
     }
 
     /**
@@ -146,8 +146,9 @@ public final class MockTransportService extends TransportService {
      */
     public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
                                 Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
-                                @Nullable ClusterSettings clusterSettings) {
-        super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings);
+                                @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
+        super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings,
+            taskHeaders);
         this.original = transport;
     }
 
@@ -160,11 +161,11 @@ public final class MockTransportService extends TransportService {
     }
 
     @Override
-    protected TaskManager createTaskManager() {
+    protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
         if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
-            return new MockTaskManager(settings);
+            return new MockTaskManager(settings, threadPool, taskHeaders);
         } else {
-            return super.createTaskManager();
+            return super.createTaskManager(settings, threadPool, taskHeaders);
         }
     }
 

+ 5 - 3
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -1931,7 +1931,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version version = Version.fromString("2.0.0");
         try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
             new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
-             MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) {
+             MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null,
+                 Collections.emptySet())) {
             service.start();
             service.acceptIncomingRequests();
             DiscoveryNode node =
@@ -1953,7 +1954,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
         try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
             new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
-             MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) {
+             MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null,
+                 Collections.emptySet())) {
             service.start();
             service.acceptIncomingRequests();
             DiscoveryNode node =
@@ -1989,7 +1991,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         };
 
         try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, Version.CURRENT, threadPool,
-            null)) {
+            null, Collections.emptySet())) {
             service.start();
             service.acceptIncomingRequests();
             // this acts like a node that doesn't have support for handshakes

+ 1 - 1
test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

@@ -50,7 +50,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
             }
         };
         MockTransportService mockTransportService =
-            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
+            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
         mockTransportService.start();
         return mockTransportService;
     }

+ 1 - 1
test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java

@@ -78,7 +78,7 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
 
         };
         MockTransportService mockTransportService =
-            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
+            MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
         mockTransportService.start();
         return mockTransportService;
     }