|
|
@@ -53,6 +53,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
protected final TransportService transportService;
|
|
|
|
|
|
final String executor;
|
|
|
+ final String shardActionName;
|
|
|
|
|
|
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool,
|
|
|
ClusterService clusterService, TransportService transportService,
|
|
|
@@ -61,6 +62,8 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
this.clusterService = clusterService;
|
|
|
this.transportService = transportService;
|
|
|
this.executor = executor();
|
|
|
+ this.shardActionName = actionName + "[s]";
|
|
|
+ transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -70,7 +73,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
|
|
|
protected abstract String executor();
|
|
|
|
|
|
- protected abstract void shardOperation(InternalRequest request, ActionListener<Response> listener);
|
|
|
+ protected abstract void shardOperation(Request request, ActionListener<Response> listener);
|
|
|
|
|
|
protected abstract Response newResponse();
|
|
|
|
|
|
@@ -78,14 +81,14 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
|
|
|
}
|
|
|
|
|
|
- protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
|
|
|
+ protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
|
|
|
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.concreteIndex());
|
|
|
}
|
|
|
/**
|
|
|
* Resolves the request. If the resolve means a different execution, then return false
|
|
|
* here to indicate not to continue and execute this request.
|
|
|
*/
|
|
|
- protected abstract boolean resolveRequest(ClusterState state, InternalRequest request, ActionListener<Response> listener);
|
|
|
+ protected abstract boolean resolveRequest(ClusterState state, Request request, ActionListener<Response> listener);
|
|
|
|
|
|
protected boolean retryOnFailure(Throwable e) {
|
|
|
return false;
|
|
|
@@ -98,24 +101,24 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
/**
|
|
|
* Should return an iterator with a single shard!
|
|
|
*/
|
|
|
- protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request);
|
|
|
+ protected abstract ShardIterator shards(ClusterState clusterState, Request request);
|
|
|
|
|
|
class AsyncSingleAction {
|
|
|
|
|
|
private final ActionListener<Response> listener;
|
|
|
- private final InternalRequest internalRequest;
|
|
|
+ private final Request request;
|
|
|
private volatile ClusterStateObserver observer;
|
|
|
private ShardIterator shardIt;
|
|
|
private DiscoveryNodes nodes;
|
|
|
private final AtomicBoolean operationStarted = new AtomicBoolean();
|
|
|
|
|
|
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
|
|
- this.internalRequest = new InternalRequest(request);
|
|
|
+ this.request = request;
|
|
|
this.listener = listener;
|
|
|
}
|
|
|
|
|
|
public void start() {
|
|
|
- this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
|
|
|
+ this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
|
|
|
doStart();
|
|
|
}
|
|
|
|
|
|
@@ -131,12 +134,12 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
throw blockException;
|
|
|
}
|
|
|
}
|
|
|
- internalRequest.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), internalRequest.request()));
|
|
|
+ request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request));
|
|
|
// check if we need to execute, and if not, return
|
|
|
- if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
|
|
|
+ if (!resolveRequest(observer.observedState(), request, listener)) {
|
|
|
return true;
|
|
|
}
|
|
|
- blockException = checkRequestBlock(observer.observedState(), internalRequest);
|
|
|
+ blockException = checkRequestBlock(observer.observedState(), request);
|
|
|
if (blockException != null) {
|
|
|
if (blockException.retryable()) {
|
|
|
retry(blockException);
|
|
|
@@ -145,7 +148,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
throw blockException;
|
|
|
}
|
|
|
}
|
|
|
- shardIt = shards(observer.observedState(), internalRequest);
|
|
|
+ shardIt = shards(observer.observedState(), request);
|
|
|
} catch (Throwable e) {
|
|
|
listener.onFailure(e);
|
|
|
return true;
|
|
|
@@ -172,68 +175,39 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- internalRequest.request().shardId = shardIt.shardId().id();
|
|
|
- if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
|
|
- internalRequest.request().beforeLocalFork();
|
|
|
- try {
|
|
|
- threadPool.executor(executor).execute(new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- shardOperation(internalRequest, listener);
|
|
|
- } catch (Throwable e) {
|
|
|
- if (retryOnFailure(e)) {
|
|
|
- operationStarted.set(false);
|
|
|
- // we already marked it as started when we executed it (removed the listener) so pass false
|
|
|
- // to re-add to the cluster listener
|
|
|
- retry(null);
|
|
|
- } else {
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (Throwable e) {
|
|
|
- if (retryOnFailure(e)) {
|
|
|
- retry(null);
|
|
|
- } else {
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- DiscoveryNode node = nodes.get(shard.currentNodeId());
|
|
|
- transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions(), new BaseTransportResponseHandler<Response>() {
|
|
|
+ request.shardId = shardIt.shardId().id();
|
|
|
+ DiscoveryNode node = nodes.get(shard.currentNodeId());
|
|
|
+ transportService.sendRequest(node, shardActionName, request, transportOptions(), new BaseTransportResponseHandler<Response>() {
|
|
|
|
|
|
- @Override
|
|
|
- public Response newInstance() {
|
|
|
- return newResponse();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public Response newInstance() {
|
|
|
+ return newResponse();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public String executor() {
|
|
|
- return ThreadPool.Names.SAME;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public String executor() {
|
|
|
+ return ThreadPool.Names.SAME;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void handleResponse(Response response) {
|
|
|
- listener.onResponse(response);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void handleResponse(Response response) {
|
|
|
+ listener.onResponse(response);
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void handleException(TransportException exp) {
|
|
|
- // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
|
|
|
- if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
|
|
|
- retryOnFailure(exp)) {
|
|
|
- operationStarted.set(false);
|
|
|
- // we already marked it as started when we executed it (removed the listener) so pass false
|
|
|
- // to re-add to the cluster listener
|
|
|
- retry(null);
|
|
|
- } else {
|
|
|
- listener.onFailure(exp);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException exp) {
|
|
|
+ // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
|
|
|
+ if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
|
|
|
+ retryOnFailure(exp)) {
|
|
|
+ operationStarted.set(false);
|
|
|
+ // we already marked it as started when we executed it (removed the listener) so pass false
|
|
|
+ // to re-add to the cluster listener
|
|
|
+ retry(null);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(exp);
|
|
|
}
|
|
|
- });
|
|
|
- }
|
|
|
+ }
|
|
|
+ });
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
@@ -243,8 +217,6 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // make it threaded operation so we fork on the discovery listener thread
|
|
|
- internalRequest.request().beforeLocalFork();
|
|
|
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
|
|
@Override
|
|
|
public void onNewClusterState(ClusterState state) {
|
|
|
@@ -263,39 +235,42 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
|
|
Throwable listenFailure = failure;
|
|
|
if (listenFailure == null) {
|
|
|
if (shardIt == null) {
|
|
|
- listenFailure = new UnavailableShardsException(new ShardId(internalRequest.concreteIndex(), -1), "Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
|
|
|
+ listenFailure = new UnavailableShardsException(new ShardId(request.concreteIndex(), -1), "Timeout waiting for [" + timeout + "], request: " + request.toString());
|
|
|
} else {
|
|
|
- listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
|
|
|
+ listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString());
|
|
|
}
|
|
|
}
|
|
|
listener.onFailure(listenFailure);
|
|
|
}
|
|
|
}
|
|
|
- }, internalRequest.request().timeout());
|
|
|
+ }, request.timeout());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Internal request class that gets built on each node. Holds the original request plus additional info.
|
|
|
- */
|
|
|
- protected class InternalRequest {
|
|
|
- final Request request;
|
|
|
- String concreteIndex;
|
|
|
-
|
|
|
- InternalRequest(Request request) {
|
|
|
- this.request = request;
|
|
|
- }
|
|
|
+ private class ShardTransportHandler implements TransportRequestHandler<Request> {
|
|
|
|
|
|
- public Request request() {
|
|
|
- return request;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
|
|
+ shardOperation(request, new ActionListener<Response>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(Response response) {
|
|
|
+ try {
|
|
|
+ channel.sendResponse(response);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- void concreteIndex(String concreteIndex) {
|
|
|
- this.concreteIndex = concreteIndex;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable e) {
|
|
|
+ try {
|
|
|
+ channel.sendResponse(e);
|
|
|
+ } catch (Exception e1) {
|
|
|
+ logger.warn("failed to send response for get", e1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
- public String concreteIndex() {
|
|
|
- return concreteIndex;
|
|
|
}
|
|
|
}
|
|
|
}
|