浏览代码

Added an option to add arbitrary headers to the client requests

The headers are key/value pairs defined in the settings under the `request.headers` namespace.
uboness 11 年之前
父节点
当前提交
6f73c93692

+ 7 - 1
src/main/java/org/elasticsearch/client/node/NodeClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.support.AbstractClient;
 import org.elasticsearch.client.support.AbstractClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
@@ -44,11 +45,14 @@ public class NodeClient extends AbstractClient {
 
 
     private final ImmutableMap<ClientAction, TransportAction> actions;
     private final ImmutableMap<ClientAction, TransportAction> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
-    public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions) {
+    public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions, Headers headers) {
         this.settings = settings;
         this.settings = settings;
         this.threadPool = threadPool;
         this.threadPool = threadPool;
         this.admin = admin;
         this.admin = admin;
+        this.headers = headers;
         MapBuilder<ClientAction, TransportAction> actionsBuilder = new MapBuilder<>();
         MapBuilder<ClientAction, TransportAction> actionsBuilder = new MapBuilder<>();
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
             if (entry.getKey() instanceof ClientAction) {
             if (entry.getKey() instanceof ClientAction) {
@@ -81,6 +85,7 @@ public class NodeClient extends AbstractClient {
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
         TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
         return transportAction.execute(request);
         return transportAction.execute(request);
     }
     }
@@ -88,6 +93,7 @@ public class NodeClient extends AbstractClient {
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
         TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
         transportAction.execute(request, listener);
         transportAction.execute(request, listener);
     }
     }

+ 2 - 0
src/main/java/org/elasticsearch/client/node/NodeClientModule.java

@@ -23,6 +23,7 @@ import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.common.inject.AbstractModule;
 
 
 /**
 /**
@@ -32,6 +33,7 @@ public class NodeClientModule extends AbstractModule {
 
 
     @Override
     @Override
     protected void configure() {
     protected void configure() {
+        bind(Headers.class).asEagerSingleton();
         bind(ClusterAdminClient.class).to(NodeClusterAdminClient.class).asEagerSingleton();
         bind(ClusterAdminClient.class).to(NodeClusterAdminClient.class).asEagerSingleton();
         bind(IndicesAdminClient.class).to(NodeIndicesAdminClient.class).asEagerSingleton();
         bind(IndicesAdminClient.class).to(NodeIndicesAdminClient.class).asEagerSingleton();
         bind(AdminClient.class).to(NodeAdminClient.class).asEagerSingleton();
         bind(AdminClient.class).to(NodeAdminClient.class).asEagerSingleton();

+ 7 - 1
src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.ClusterAction;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.support.AbstractClusterAdminClient;
 import org.elasticsearch.client.support.AbstractClusterAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -40,9 +41,12 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
 
 
     private final ImmutableMap<ClusterAction, TransportAction> actions;
     private final ImmutableMap<ClusterAction, TransportAction> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
-    public NodeClusterAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions) {
+    public NodeClusterAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
         this.threadPool = threadPool;
         this.threadPool = threadPool;
+        this.headers = headers;
         MapBuilder<ClusterAction, TransportAction> actionsBuilder = new MapBuilder<>();
         MapBuilder<ClusterAction, TransportAction> actionsBuilder = new MapBuilder<>();
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
             if (entry.getKey() instanceof ClusterAction) {
             if (entry.getKey() instanceof ClusterAction) {
@@ -60,6 +64,7 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
         TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
         return transportAction.execute(request);
         return transportAction.execute(request);
     }
     }
@@ -67,6 +72,7 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
         TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
         transportAction.execute(request, listener);
         transportAction.execute(request, listener);
     }
     }

+ 7 - 1
src/main/java/org/elasticsearch/client/node/NodeIndicesAdminClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.IndicesAction;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.support.AbstractIndicesAdminClient;
 import org.elasticsearch.client.support.AbstractIndicesAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -40,9 +41,12 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
 
 
     private final ImmutableMap<IndicesAction, TransportAction> actions;
     private final ImmutableMap<IndicesAction, TransportAction> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
-    public NodeIndicesAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions) {
+    public NodeIndicesAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
         this.threadPool = threadPool;
         this.threadPool = threadPool;
+        this.headers = headers;
         MapBuilder<IndicesAction, TransportAction> actionsBuilder = new MapBuilder<>();
         MapBuilder<IndicesAction, TransportAction> actionsBuilder = new MapBuilder<>();
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
         for (Map.Entry<GenericAction, TransportAction> entry : actions.entrySet()) {
             if (entry.getKey() instanceof IndicesAction) {
             if (entry.getKey() instanceof IndicesAction) {
@@ -60,6 +64,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
         TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
         return transportAction.execute(request);
         return transportAction.execute(request);
     }
     }
@@ -67,6 +72,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
+        headers.applyTo(request);
         TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
         TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
         transportAction.execute(request, listener);
         transportAction.execute(request, listener);
     }
     }

+ 59 - 0
src/main/java/org/elasticsearch/client/support/Headers.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.support;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+
+/**
+ * Client request headers picked up from the client settings. Applied to every
+ * request sent by the client (both transport and node clients)
+ */
+public class Headers {
+
+    public static final String PREFIX = "request.headers";
+
+    public static final Headers EMPTY = new Headers(ImmutableSettings.EMPTY) {
+        @Override
+        public void applyTo(ActionRequest request) {
+        }
+    };
+
+    private final Settings headers;
+
+    @Inject
+    public Headers(Settings settings) {
+        headers = resolveHeaders(settings);
+    }
+
+    public void applyTo(ActionRequest request) {
+        for (String key : headers.names()) {
+            request.putHeader(key, headers.get(key));
+        }
+    }
+
+    static Settings resolveHeaders(Settings settings) {
+        Settings headers = settings.getAsSettings(PREFIX);
+        return headers != null ? headers : ImmutableSettings.EMPTY;
+    }
+
+}

+ 2 - 0
src/main/java/org/elasticsearch/client/transport/ClientTransportModule.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.client.transport;
 package org.elasticsearch.client.transport;
 
 
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.client.transport.support.InternalTransportAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportClient;
 import org.elasticsearch.client.transport.support.InternalTransportClient;
 import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient;
@@ -32,6 +33,7 @@ public class ClientTransportModule extends AbstractModule {
 
 
     @Override
     @Override
     protected void configure() {
     protected void configure() {
+        bind(Headers.class).asEagerSingleton();
         bind(InternalTransportClient.class).asEagerSingleton();
         bind(InternalTransportClient.class).asEagerSingleton();
         bind(InternalTransportAdminClient.class).asEagerSingleton();
         bind(InternalTransportAdminClient.class).asEagerSingleton();
         bind(InternalTransportIndicesAdminClient.class).asEagerSingleton();
         bind(InternalTransportIndicesAdminClient.class).asEagerSingleton();

+ 6 - 1
src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.support.AbstractClient;
 import org.elasticsearch.client.support.AbstractClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
@@ -49,14 +50,17 @@ public class InternalTransportClient extends AbstractClient {
 
 
     private final ImmutableMap<Action, TransportActionNodeProxy> actions;
     private final ImmutableMap<Action, TransportActionNodeProxy> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
     public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
     public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
                                    TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
                                    TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
-                                   Map<String, GenericAction> actions) {
+                                   Map<String, GenericAction> actions, Headers headers) {
         this.settings = settings;
         this.settings = settings;
         this.threadPool = threadPool;
         this.threadPool = threadPool;
         this.nodesService = nodesService;
         this.nodesService = nodesService;
         this.adminClient = adminClient;
         this.adminClient = adminClient;
+        this.headers = headers;
         MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         for (GenericAction action : actions.values()) {
         for (GenericAction action : actions.values()) {
             if (action instanceof Action) {
             if (action instanceof Action) {
@@ -97,6 +101,7 @@ public class InternalTransportClient extends AbstractClient {
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request, ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request, ActionListener<Response> listener) {
+        headers.applyTo(request);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
             @Override
             @Override

+ 6 - 1
src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.ClusterAction;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.support.AbstractClusterAdminClient;
 import org.elasticsearch.client.support.AbstractClusterAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
@@ -47,11 +48,14 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
 
 
     private final ImmutableMap<ClusterAction, TransportActionNodeProxy> actions;
     private final ImmutableMap<ClusterAction, TransportActionNodeProxy> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
     public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, TransportService transportService,
     public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, TransportService transportService,
-                                               Map<String, GenericAction> actions) {
+                                               Map<String, GenericAction> actions, Headers headers) {
         this.nodesService = nodesService;
         this.nodesService = nodesService;
         this.threadPool = threadPool;
         this.threadPool = threadPool;
+        this.headers = headers;
         MapBuilder<ClusterAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         MapBuilder<ClusterAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         for (GenericAction action : actions.values()) {
         for (GenericAction action : actions.values()) {
             if (action instanceof ClusterAction) {
             if (action instanceof ClusterAction) {
@@ -77,6 +81,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, final ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, final ActionListener<Response> listener) {
+        headers.applyTo(request);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
             @Override
             @Override

+ 6 - 1
src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.IndicesAction;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.support.AbstractIndicesAdminClient;
 import org.elasticsearch.client.support.AbstractIndicesAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.collect.MapBuilder;
@@ -47,11 +48,14 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
 
 
     private final ImmutableMap<Action, TransportActionNodeProxy> actions;
     private final ImmutableMap<Action, TransportActionNodeProxy> actions;
 
 
+    private final Headers headers;
+
     @Inject
     @Inject
     public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, TransportService transportService, ThreadPool threadPool,
     public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, TransportService transportService, ThreadPool threadPool,
-                                               Map<String, GenericAction> actions) {
+                                               Map<String, GenericAction> actions, Headers headers) {
         this.nodesService = nodesService;
         this.nodesService = nodesService;
         this.threadPool = threadPool;
         this.threadPool = threadPool;
+        this.headers = headers;
         MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
         for (GenericAction action : actions.values()) {
         for (GenericAction action : actions.values()) {
             if (action instanceof IndicesAction) {
             if (action instanceof IndicesAction) {
@@ -77,6 +81,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request, ActionListener<Response> listener) {
     public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request, ActionListener<Response> listener) {
+        headers.applyTo(request);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
         nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
             @Override
             @Override

+ 193 - 0
src/test/java/org/elasticsearch/client/AbstractClientHeadersTests.java

@@ -0,0 +1,193 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import com.google.common.base.Throwables;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.GenericAction;
+import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownAction;
+import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
+import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
+import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction;
+import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
+import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.flush.FlushAction;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.bench.*;
+import org.elasticsearch.action.delete.DeleteAction;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetAction;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexAction;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptAction;
+import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.support.Headers;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.*;
+
+/**
+ *
+ */
+public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase {
+
+    protected static final Settings HEADER_SETTINGS = ImmutableSettings.builder()
+            .put(Headers.PREFIX + ".key1", "val1")
+            .put(Headers.PREFIX + ".key2", "val 2")
+            .build();
+
+    @SuppressWarnings("unchecked")
+    private static final GenericAction[] ACTIONS = new GenericAction[] {
+                // client actions
+                GetAction.INSTANCE, SearchAction.INSTANCE, DeleteAction.INSTANCE, DeleteIndexedScriptAction.INSTANCE,
+                IndexAction.INSTANCE, AbortBenchmarkAction.INSTANCE, BenchmarkAction.INSTANCE, BenchmarkStatusAction.INSTANCE,
+
+                // cluster admin actions
+                ClusterStatsAction.INSTANCE, CreateSnapshotAction.INSTANCE, NodesShutdownAction.INSTANCE, ClusterRerouteAction.INSTANCE,
+
+                // indices admin actions
+                CreateIndexAction.INSTANCE, IndicesStatsAction.INSTANCE, ClearIndicesCacheAction.INSTANCE, FlushAction.INSTANCE
+    };
+
+    private Client client;
+
+    @Before
+    public void initClient() {
+        client = buildClient(HEADER_SETTINGS, ACTIONS);
+    }
+
+    @After
+    public void cleanupClient() {
+        client.close();
+    }
+
+    protected abstract Client buildClient(Settings headersSettings, GenericAction[] testedActions);
+
+
+    @Test
+    public void testActions() {
+
+        // TODO this is a really shitty way to test it, we need to figure out a way to test all the client methods
+        //      without specifying each one (reflection doesn't as each action needs its own special settings, without
+        //      them, request validation will fail before the test is executed. (one option is to enable disabling the
+        //      validation in the settings??? - ugly and conceptually wrong)
+
+        // choosing arbitrary top level actions to test
+        client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<GetResponse>(GetAction.NAME));
+        client.prepareSearch().execute().addListener(new AssertingActionListener<SearchResponse>(SearchAction.NAME));
+        client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<DeleteResponse>(DeleteAction.NAME));
+        client.prepareDeleteIndexedScript("lang", "id").execute().addListener(new AssertingActionListener<DeleteIndexedScriptResponse>(DeleteIndexedScriptAction.NAME));
+        client.prepareIndex("idx", "type", "id").setSource("source").execute().addListener(new AssertingActionListener<IndexResponse>(IndexAction.NAME));
+        client.prepareAbortBench("bname").execute().addListener(new AssertingActionListener<AbortBenchmarkResponse>(AbortBenchmarkAction.NAME));
+        client.prepareBench("idx").setBenchmarkId("id").addCompetitor(new BenchmarkCompetitorBuilder().setName("name")).execute().addListener(new AssertingActionListener<BenchmarkResponse>(BenchmarkAction.NAME));
+        client.prepareBenchStatus().execute().addListener(new AssertingActionListener<BenchmarkStatusResponse>(BenchmarkStatusAction.NAME));
+
+        // choosing arbitrary cluster admin actions to test
+        client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<ClusterStatsResponse>(ClusterStatsAction.NAME));
+        client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<CreateSnapshotResponse>(CreateSnapshotAction.NAME));
+        client.admin().cluster().prepareNodesShutdown("n1", "n2").execute().addListener(new AssertingActionListener<NodesShutdownResponse>(NodesShutdownAction.NAME));
+        client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<ClusterRerouteResponse>(ClusterRerouteAction.NAME));
+
+        // choosing arbitrary indices admin actions to test
+        client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<CreateIndexResponse>(CreateIndexAction.NAME));
+        client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<IndicesStatsResponse>(IndicesStatsAction.NAME));
+        client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<ClearIndicesCacheResponse>(ClearIndicesCacheAction.NAME));
+        client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<FlushResponse>(FlushAction.NAME));
+    }
+
+    protected static class InternalException extends Exception {
+
+        private final String action;
+        private final Map<String, Object> headers;
+
+        public InternalException(String action, Map<String, Object> headers) {
+            this.action = action;
+            this.headers = headers;
+        }
+    }
+
+    protected static class AssertingActionListener<T> implements ActionListener<T> {
+
+        private final String action;
+
+        public AssertingActionListener(String action) {
+            this.action = action;
+        }
+
+        @Override
+        public void onResponse(T t) {
+            fail("an internal exception was expected for action [" + action + "]");
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+            Throwable e = unwrap(t, InternalException.class);
+            assertThat("expected action [" + action + "] to throw an internal exception", e, notNullValue());
+            assertThat(action, equalTo(((InternalException) e).action));
+            Map<String, Object> headers = ((InternalException) e).headers;
+            assertThat(headers, notNullValue());
+            assertThat(headers.size(), is(2));
+            assertThat(headers.get("key1"), notNullValue());
+            assertThat(headers.get("key1").toString(), equalTo("val1"));
+            assertThat(headers.get("key2"), notNullValue());
+            assertThat(headers.get("key2").toString(), equalTo("val 2"));
+        }
+
+        public Throwable unwrap(Throwable t, Class<? extends Throwable> exceptionType) {
+            int counter = 0;
+            Throwable result = t;
+            while (!exceptionType.isInstance(result)) {
+                if (result.getCause() == null) {
+                    return null;
+                }
+                if (result.getCause() == result) {
+                    return null;
+                }
+                if (counter++ > 10) {
+                    // dear god, if we got more than 10 levels down, WTF? just bail
+                    fail("Exception cause unwrapping ran for 10 levels: " + Throwables.getStackTraceAsString(t));
+                    return null;
+                }
+                result = result.getCause();
+            }
+            return result;
+        }
+
+    }
+
+}

+ 92 - 0
src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.node;
+
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.GenericAction;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportAction;
+import org.elasticsearch.client.AbstractClientHeadersTests;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.support.Headers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+
+/**
+ *
+ */
+public class NodeClientHeadersTests extends AbstractClientHeadersTests {
+
+    private static final ActionFilters EMPTY_FILTERS = new ActionFilters(ImmutableSet.of());
+
+    private ThreadPool threadPool;
+
+    @Before
+    public void init() {
+        threadPool = new ThreadPool("test");
+    }
+
+    @After
+    public void cleanup() {
+        threadPool.shutdownNow();
+    }
+
+    @Override
+    protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
+        Settings settings = HEADER_SETTINGS;
+
+        Headers headers = new Headers(settings);
+        Actions actions = new Actions(settings, threadPool, testedActions);
+
+        NodeClusterAdminClient clusterClient = new NodeClusterAdminClient(threadPool, actions, headers);
+        NodeIndicesAdminClient indicesClient = new NodeIndicesAdminClient(threadPool, actions, headers);
+        NodeAdminClient adminClient = new NodeAdminClient(settings, clusterClient, indicesClient);
+        return new NodeClient(settings, threadPool, adminClient, actions, headers);
+    }
+
+    private static class Actions extends HashMap<GenericAction, TransportAction> {
+
+        private Actions(Settings settings, ThreadPool threadPool, GenericAction[] actions) {
+            for (GenericAction action : actions) {
+                put(action, new InternalTransportAction(settings, action.name(), threadPool));
+            }
+        }
+    }
+
+    private static class InternalTransportAction extends TransportAction {
+
+        private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) {
+            super(settings, actionName, threadPool, EMPTY_FILTERS);
+        }
+
+        @Override
+        protected void doExecute(ActionRequest request, ActionListener listener) {
+            listener.onFailure(new InternalException(actionName, request.getHeaders()));
+        }
+    }
+
+
+}

+ 4 - 3
src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.IndicesAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.client.transport.support.InternalTransportAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportClient;
 import org.elasticsearch.client.transport.support.InternalTransportClient;
 import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient;
 import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient;
@@ -78,10 +79,10 @@ public class InternalTransportClientTests extends ElasticsearchTestCase {
             actions.put(IndicesAdminTestAction.NAME, IndicesAdminTestAction.INSTANCE);
             actions.put(IndicesAdminTestAction.NAME, IndicesAdminTestAction.INSTANCE);
             actions.put(ClusterAdminTestAction.NAME, ClusterAdminTestAction.INSTANCE);
             actions.put(ClusterAdminTestAction.NAME, ClusterAdminTestAction.INSTANCE);
 
 
-            InternalTransportIndicesAdminClient indicesAdminClient = new InternalTransportIndicesAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, transportService, threadPool, actions);
-            InternalTransportClusterAdminClient clusterAdminClient = new InternalTransportClusterAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, threadPool, transportService, actions);
+            InternalTransportIndicesAdminClient indicesAdminClient = new InternalTransportIndicesAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, transportService, threadPool, actions, Headers.EMPTY);
+            InternalTransportClusterAdminClient clusterAdminClient = new InternalTransportClusterAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, threadPool, transportService, actions, Headers.EMPTY);
             InternalTransportAdminClient adminClient = new InternalTransportAdminClient(ImmutableSettings.EMPTY, indicesAdminClient, clusterAdminClient);
             InternalTransportAdminClient adminClient = new InternalTransportAdminClient(ImmutableSettings.EMPTY, indicesAdminClient, clusterAdminClient);
-            internalTransportClient = new InternalTransportClient(ImmutableSettings.EMPTY, threadPool, transportService, transportClientNodesService, adminClient, actions);
+            internalTransportClient = new InternalTransportClient(ImmutableSettings.EMPTY, threadPool, transportService, transportClientNodesService, adminClient, actions, Headers.EMPTY);
 
 
             nodesCount = randomIntBetween(1, 10);
             nodesCount = randomIntBetween(1, 10);
             for (int i = 0; i < nodesCount; i++) {
             for (int i = 0; i < nodesCount; i++) {

+ 86 - 0
src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java

@@ -0,0 +1,86 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.transport;
+
+import org.elasticsearch.action.GenericAction;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.client.AbstractClientHeadersTests;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.*;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ *
+ */
+public class TransportClientHeadersTests extends AbstractClientHeadersTests {
+
+    private static final LocalTransportAddress address = new LocalTransportAddress("test");
+
+    @Override
+    protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
+        TransportClient client = new TransportClient(ImmutableSettings.builder()
+                .put("client.transport.sniff", false)
+                .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InternalTransportService.class.getName())
+                .put(HEADER_SETTINGS)
+                .build());
+
+        client.addTransportAddress(address);
+        return client;
+    }
+
+    public static class InternalTransportService extends TransportService {
+
+        @Inject
+        public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
+            super(settings, transport, threadPool);
+        }
+
+        @Override @SuppressWarnings("unchecked")
+        public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler<T> handler) {
+            if (NodesInfoAction.NAME.equals(action)) {
+                ((TransportResponseHandler<NodesInfoResponse>) handler).handleResponse(new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[0]));
+                return;
+            }
+            handler.handleException(new TransportException("", new InternalException(action, request.getHeaders())));
+        }
+
+        @Override
+        public boolean nodeConnected(DiscoveryNode node) {
+            assertThat((LocalTransportAddress) node.getAddress(), equalTo(address));
+            return true;
+        }
+
+        @Override
+        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+            assertThat((LocalTransportAddress) node.getAddress(), equalTo(address));
+        }
+    }
+
+}