|
@@ -29,6 +29,7 @@ import org.elasticsearch.action.ActionResponse;
|
|
|
import org.elasticsearch.action.UnavailableShardsException;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.ActiveShardCount;
|
|
|
+import org.elasticsearch.action.support.ChannelActionListener;
|
|
|
import org.elasticsearch.action.support.TransportAction;
|
|
|
import org.elasticsearch.action.support.TransportActions;
|
|
|
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|
@@ -69,10 +70,8 @@ import org.elasticsearch.tasks.TaskId;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.ConnectTransportException;
|
|
|
import org.elasticsearch.transport.TransportChannel;
|
|
|
-import org.elasticsearch.transport.TransportChannelResponseHandler;
|
|
|
import org.elasticsearch.transport.TransportException;
|
|
|
import org.elasticsearch.transport.TransportRequest;
|
|
|
-import org.elasticsearch.transport.TransportRequestHandler;
|
|
|
import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
import org.elasticsearch.transport.TransportResponse;
|
|
|
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
@@ -154,14 +153,12 @@ public abstract class TransportReplicationAction<
|
|
|
|
|
|
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
|
|
|
Supplier<ReplicaRequest> replicaRequest, String executor) {
|
|
|
- transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
|
|
|
+ transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
|
|
|
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
|
|
|
- new PrimaryOperationTransportHandler());
|
|
|
+ this::handlePrimaryRequest);
|
|
|
// we must never reject on because of thread pool capacity on replicas
|
|
|
- transportService.registerRequestHandler(transportReplicaAction,
|
|
|
- () -> new ConcreteReplicaRequest<>(replicaRequest),
|
|
|
- executor, true, true,
|
|
|
- new ReplicaOperationTransportHandler());
|
|
|
+ transportService.registerRequestHandler(
|
|
|
+ transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -271,71 +268,30 @@ public abstract class TransportReplicationAction<
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- protected class OperationTransportHandler implements TransportRequestHandler<Request> {
|
|
|
-
|
|
|
- public OperationTransportHandler() {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
|
|
- execute(task, request, new ActionListener<Response>() {
|
|
|
- @Override
|
|
|
- public void onResponse(Response result) {
|
|
|
- try {
|
|
|
- channel.sendResponse(result);
|
|
|
- } catch (Exception e) {
|
|
|
- onFailure(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- try {
|
|
|
- channel.sendResponse(e);
|
|
|
- } catch (Exception inner) {
|
|
|
- inner.addSuppressed(e);
|
|
|
- logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
+ protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
|
|
|
+ execute(task, request, new ChannelActionListener<>(channel, actionName, request));
|
|
|
}
|
|
|
|
|
|
- protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
|
|
|
-
|
|
|
- public PrimaryOperationTransportHandler() {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
|
|
|
- new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
|
|
|
- }
|
|
|
+ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
|
|
|
+ new AsyncPrimaryAction(
|
|
|
+ request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run();
|
|
|
}
|
|
|
|
|
|
class AsyncPrimaryAction extends AbstractRunnable {
|
|
|
-
|
|
|
- private final Request request;
|
|
|
- // targetAllocationID of the shard this request is meant for
|
|
|
- private final String targetAllocationID;
|
|
|
- // primary term of the shard this request is meant for
|
|
|
- private final long primaryTerm;
|
|
|
- private final TransportChannel channel;
|
|
|
+ private final ActionListener<Response> onCompletionListener;
|
|
|
private final ReplicationTask replicationTask;
|
|
|
+ private final ConcreteShardRequest<Request> primaryRequest;
|
|
|
|
|
|
- AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel,
|
|
|
+ AsyncPrimaryAction(ConcreteShardRequest<Request> primaryRequest, ActionListener<Response> onCompletionListener,
|
|
|
ReplicationTask replicationTask) {
|
|
|
- this.request = request;
|
|
|
- this.targetAllocationID = targetAllocationID;
|
|
|
- this.primaryTerm = primaryTerm;
|
|
|
- this.channel = channel;
|
|
|
+ this.primaryRequest = primaryRequest;
|
|
|
+ this.onCompletionListener = onCompletionListener;
|
|
|
this.replicationTask = replicationTask;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void doRun() throws Exception {
|
|
|
- final ShardId shardId = request.shardId();
|
|
|
+ final ShardId shardId = primaryRequest.getRequest().shardId();
|
|
|
final IndexShard indexShard = getIndexShard(shardId);
|
|
|
final ShardRouting shardRouting = indexShard.routingEntry();
|
|
|
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
|
@@ -345,17 +301,17 @@ public abstract class TransportReplicationAction<
|
|
|
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
|
|
|
}
|
|
|
final String actualAllocationId = shardRouting.allocationId().getId();
|
|
|
- if (actualAllocationId.equals(targetAllocationID) == false) {
|
|
|
- throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
|
|
|
- actualAllocationId);
|
|
|
+ if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
|
|
|
+ throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
|
|
|
+ primaryRequest.getTargetAllocationID(), actualAllocationId);
|
|
|
}
|
|
|
final long actualTerm = indexShard.getPendingPrimaryTerm();
|
|
|
- if (actualTerm != primaryTerm) {
|
|
|
- throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
|
|
|
- primaryTerm, actualTerm);
|
|
|
+ if (actualTerm != primaryRequest.getPrimaryTerm()) {
|
|
|
+ throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
|
|
|
+ primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
|
|
|
}
|
|
|
|
|
|
- acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
|
|
|
+ acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
|
|
|
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
|
|
|
this::onFailure
|
|
|
));
|
|
@@ -387,11 +343,10 @@ public abstract class TransportReplicationAction<
|
|
|
};
|
|
|
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
|
|
|
transportService.sendRequest(relocatingNode, transportPrimaryAction,
|
|
|
- new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
|
|
|
+ new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
|
|
|
+ primaryRequest.getPrimaryTerm()),
|
|
|
transportOptions,
|
|
|
- new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
|
|
|
- reader) {
|
|
|
-
|
|
|
+ new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
|
|
|
@Override
|
|
|
public void handleResponse(Response response) {
|
|
|
setPhase(replicationTask, "finished");
|
|
@@ -407,7 +362,7 @@ public abstract class TransportReplicationAction<
|
|
|
} else {
|
|
|
setPhase(replicationTask, "primary");
|
|
|
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
|
|
|
- createReplicatedOperation(request,
|
|
|
+ createReplicatedOperation(primaryRequest.getRequest(),
|
|
|
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
|
|
|
primaryShardReference)
|
|
|
.execute();
|
|
@@ -421,12 +376,7 @@ public abstract class TransportReplicationAction<
|
|
|
@Override
|
|
|
public void onFailure(Exception e) {
|
|
|
setPhase(replicationTask, "finished");
|
|
|
- try {
|
|
|
- channel.sendResponse(e);
|
|
|
- } catch (IOException inner) {
|
|
|
- inner.addSuppressed(e);
|
|
|
- logger.warn("failed to send response", inner);
|
|
|
- }
|
|
|
+ onCompletionListener.onFailure(e);
|
|
|
}
|
|
|
|
|
|
private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
|
|
@@ -451,22 +401,14 @@ public abstract class TransportReplicationAction<
|
|
|
}
|
|
|
primaryShardReference.close(); // release shard operation lock before responding to caller
|
|
|
setPhase(replicationTask, "finished");
|
|
|
- try {
|
|
|
- channel.sendResponse(response);
|
|
|
- } catch (IOException e) {
|
|
|
- onFailure(e);
|
|
|
- }
|
|
|
+ onCompletionListener.onResponse(response);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void onFailure(Exception e) {
|
|
|
primaryShardReference.close(); // release shard operation lock before responding to caller
|
|
|
setPhase(replicationTask, "finished");
|
|
|
- try {
|
|
|
- channel.sendResponse(e);
|
|
|
- } catch (IOException e1) {
|
|
|
- logger.warn("failed to send response", e);
|
|
|
- }
|
|
|
+ onCompletionListener.onFailure(e);
|
|
|
}
|
|
|
};
|
|
|
}
|
|
@@ -475,7 +417,7 @@ public abstract class TransportReplicationAction<
|
|
|
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
|
|
|
PrimaryShardReference primaryShardReference) {
|
|
|
return new ReplicationOperation<>(request, primaryShardReference, listener,
|
|
|
- newReplicasProxy(primaryTerm), logger, actionName);
|
|
|
+ newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -544,24 +486,10 @@ public abstract class TransportReplicationAction<
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageReceived(
|
|
|
- final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
|
|
|
- final TransportChannel channel,
|
|
|
- final Task task)
|
|
|
- throws Exception {
|
|
|
- new AsyncReplicaAction(
|
|
|
- replicaRequest.getRequest(),
|
|
|
- replicaRequest.getTargetAllocationID(),
|
|
|
- replicaRequest.getPrimaryTerm(),
|
|
|
- replicaRequest.getGlobalCheckpoint(),
|
|
|
- replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
|
|
|
- channel,
|
|
|
- (ReplicationTask) task).run();
|
|
|
- }
|
|
|
-
|
|
|
+ protected void handleReplicaRequest(final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
|
|
|
+ final TransportChannel channel, final Task task) {
|
|
|
+ new AsyncReplicaAction(
|
|
|
+ replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run();
|
|
|
}
|
|
|
|
|
|
public static class RetryOnReplicaException extends ElasticsearchException {
|
|
@@ -577,13 +505,7 @@ public abstract class TransportReplicationAction<
|
|
|
}
|
|
|
|
|
|
private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
|
|
|
- private final ReplicaRequest request;
|
|
|
- // allocation id of the replica this request is meant for
|
|
|
- private final String targetAllocationID;
|
|
|
- private final long primaryTerm;
|
|
|
- private final long globalCheckpoint;
|
|
|
- private final long maxSeqNoOfUpdatesOrDeletes;
|
|
|
- private final TransportChannel channel;
|
|
|
+ private final ActionListener<ReplicaResponse> onCompletionListener;
|
|
|
private final IndexShard replica;
|
|
|
/**
|
|
|
* The task on the node with the replica shard.
|
|
@@ -592,23 +514,14 @@ public abstract class TransportReplicationAction<
|
|
|
// important: we pass null as a timeout as failing a replica is
|
|
|
// something we want to avoid at all costs
|
|
|
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
|
|
+ private final ConcreteReplicaRequest<ReplicaRequest> replicaRequest;
|
|
|
|
|
|
- AsyncReplicaAction(
|
|
|
- ReplicaRequest request,
|
|
|
- String targetAllocationID,
|
|
|
- long primaryTerm,
|
|
|
- long globalCheckpoint,
|
|
|
- long maxSeqNoOfUpdatesOrDeletes,
|
|
|
- TransportChannel channel,
|
|
|
- ReplicationTask task) {
|
|
|
- this.request = request;
|
|
|
- this.channel = channel;
|
|
|
+ AsyncReplicaAction(ConcreteReplicaRequest<ReplicaRequest> replicaRequest, ActionListener<ReplicaResponse> onCompletionListener,
|
|
|
+ ReplicationTask task) {
|
|
|
+ this.replicaRequest = replicaRequest;
|
|
|
+ this.onCompletionListener = onCompletionListener;
|
|
|
this.task = task;
|
|
|
- this.targetAllocationID = targetAllocationID;
|
|
|
- this.primaryTerm = primaryTerm;
|
|
|
- this.globalCheckpoint = globalCheckpoint;
|
|
|
- this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
|
|
|
- final ShardId shardId = request.shardId();
|
|
|
+ final ShardId shardId = replicaRequest.getRequest().shardId();
|
|
|
assert shardId != null : "request shardId must be set";
|
|
|
this.replica = getIndexShard(shardId);
|
|
|
}
|
|
@@ -616,7 +529,7 @@ public abstract class TransportReplicationAction<
|
|
|
@Override
|
|
|
public void onResponse(Releasable releasable) {
|
|
|
try {
|
|
|
- final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
|
|
|
+ final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
|
|
|
releasable.close(); // release shard operation lock before responding to caller
|
|
|
final TransportReplicationAction.ReplicaResponse response =
|
|
|
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
|
|
@@ -634,22 +547,17 @@ public abstract class TransportReplicationAction<
|
|
|
() -> new ParameterizedMessage(
|
|
|
"Retrying operation on replica, action [{}], request [{}]",
|
|
|
transportReplicaAction,
|
|
|
- request),
|
|
|
+ replicaRequest.getRequest()),
|
|
|
e);
|
|
|
- request.onRetry();
|
|
|
+ replicaRequest.getRequest().onRetry();
|
|
|
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
|
|
@Override
|
|
|
public void onNewClusterState(ClusterState state) {
|
|
|
// Forking a thread on local node via transport service so that custom transport service have an
|
|
|
// opportunity to execute custom logic before the replica operation begins
|
|
|
- String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
|
|
|
- TransportChannelResponseHandler<TransportResponse.Empty> handler =
|
|
|
- new TransportChannelResponseHandler<>(logger, channel, extraMessage,
|
|
|
- (in) -> TransportResponse.Empty.INSTANCE);
|
|
|
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
|
|
|
- new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
|
|
|
- globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
|
|
|
- handler);
|
|
|
+ replicaRequest,
|
|
|
+ new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse()));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -668,25 +576,20 @@ public abstract class TransportReplicationAction<
|
|
|
}
|
|
|
|
|
|
protected void responseWithFailure(Exception e) {
|
|
|
- try {
|
|
|
- setPhase(task, "finished");
|
|
|
- channel.sendResponse(e);
|
|
|
- } catch (IOException responseException) {
|
|
|
- responseException.addSuppressed(e);
|
|
|
- logger.warn(() -> new ParameterizedMessage(
|
|
|
- "failed to send error message back to client for action [{}]", transportReplicaAction), responseException);
|
|
|
- }
|
|
|
+ setPhase(task, "finished");
|
|
|
+ onCompletionListener.onFailure(e);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void doRun() throws Exception {
|
|
|
setPhase(task, "replica");
|
|
|
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
|
|
|
- if (actualAllocationId.equals(targetAllocationID) == false) {
|
|
|
- throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
|
|
|
- actualAllocationId);
|
|
|
+ if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) {
|
|
|
+ throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]",
|
|
|
+ replicaRequest.getTargetAllocationID(), actualAllocationId);
|
|
|
}
|
|
|
- acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
|
|
|
+ acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
|
|
|
+ replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -702,15 +605,12 @@ public abstract class TransportReplicationAction<
|
|
|
@Override
|
|
|
public void onResponse(Empty response) {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(),
|
|
|
- request);
|
|
|
+ logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
|
|
|
+ replicaRequest.getRequest().shardId(),
|
|
|
+ replicaRequest.getRequest());
|
|
|
}
|
|
|
setPhase(task, "finished");
|
|
|
- try {
|
|
|
- channel.sendResponse(replicaResponse);
|
|
|
- } catch (Exception e) {
|
|
|
- onFailure(e);
|
|
|
- }
|
|
|
+ onCompletionListener.onResponse(replicaResponse);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -982,12 +882,13 @@ public abstract class TransportReplicationAction<
|
|
|
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
|
|
|
}
|
|
|
|
|
|
- class ShardReference implements Releasable {
|
|
|
+ class PrimaryShardReference implements Releasable,
|
|
|
+ ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
|
|
|
|
|
|
protected final IndexShard indexShard;
|
|
|
private final Releasable operationLock;
|
|
|
|
|
|
- ShardReference(IndexShard indexShard, Releasable operationLock) {
|
|
|
+ PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
|
|
|
this.indexShard = indexShard;
|
|
|
this.operationLock = operationLock;
|
|
|
}
|
|
@@ -1005,15 +906,6 @@ public abstract class TransportReplicationAction<
|
|
|
return indexShard.routingEntry();
|
|
|
}
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
- class PrimaryShardReference extends ShardReference
|
|
|
- implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
|
|
|
-
|
|
|
- PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
|
|
|
- super(indexShard, operationLock);
|
|
|
- }
|
|
|
-
|
|
|
public boolean isRelocated() {
|
|
|
return indexShard.isRelocatedPrimary();
|
|
|
}
|
|
@@ -1028,8 +920,8 @@ public abstract class TransportReplicationAction<
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public PrimaryResult perform(Request request) throws Exception {
|
|
|
- PrimaryResult result = shardOperationOnPrimary(request, indexShard);
|
|
|
+ public PrimaryResult<ReplicaRequest, Response> perform(Request request) throws Exception {
|
|
|
+ PrimaryResult<ReplicaRequest, Response> result = shardOperationOnPrimary(request, indexShard);
|
|
|
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
|
|
|
+ "] with a primary failure [" + result.finalFailure + "]";
|
|
|
return result;
|