|  | @@ -40,10 +40,8 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.block.ClusterBlockException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.block.ClusterBlockLevel;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.IndexMetaData;
 | 
	
		
			
				|  |  | -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.node.DiscoveryNode;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.routing.AllocationId;
 | 
	
		
			
				|  |  | -import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.routing.ShardRouting;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.service.ClusterService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.Nullable;
 | 
	
	
		
			
				|  | @@ -104,7 +102,6 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |      protected final ClusterService clusterService;
 | 
	
		
			
				|  |  |      protected final ShardStateAction shardStateAction;
 | 
	
		
			
				|  |  |      protected final IndicesService indicesService;
 | 
	
		
			
				|  |  | -    protected final IndexNameExpressionResolver indexNameExpressionResolver;
 | 
	
		
			
				|  |  |      protected final TransportRequestOptions transportOptions;
 | 
	
		
			
				|  |  |      protected final String executor;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -117,19 +114,17 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |      protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
 | 
	
		
			
				|  |  |                                           ClusterService clusterService, IndicesService indicesService,
 | 
	
		
			
				|  |  |                                           ThreadPool threadPool, ShardStateAction shardStateAction,
 | 
	
		
			
				|  |  | -                                         ActionFilters actionFilters,
 | 
	
		
			
				|  |  | -                                         IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
 | 
	
		
			
				|  |  | +                                         ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
 | 
	
		
			
				|  |  |                                           Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
 | 
	
		
			
				|  |  |          this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
 | 
	
		
			
				|  |  | -                indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
 | 
	
		
			
				|  |  | +                requestReader, replicaRequestReader, executor, false, false);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
 | 
	
		
			
				|  |  |                                           ClusterService clusterService, IndicesService indicesService,
 | 
	
		
			
				|  |  |                                           ThreadPool threadPool, ShardStateAction shardStateAction,
 | 
	
		
			
				|  |  | -                                         ActionFilters actionFilters,
 | 
	
		
			
				|  |  | -                                         IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
 | 
	
		
			
				|  |  | +                                         ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
 | 
	
		
			
				|  |  |                                           Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
 | 
	
		
			
				|  |  |                                           boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
 | 
	
		
			
				|  |  |          super(actionName, actionFilters, transportService.getTaskManager());
 | 
	
	
		
			
				|  | @@ -138,7 +133,6 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |          this.clusterService = clusterService;
 | 
	
		
			
				|  |  |          this.indicesService = indicesService;
 | 
	
		
			
				|  |  |          this.shardStateAction = shardStateAction;
 | 
	
		
			
				|  |  | -        this.indexNameExpressionResolver = indexNameExpressionResolver;
 | 
	
		
			
				|  |  |          this.executor = executor;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          this.transportPrimaryAction = actionName + "[p]";
 | 
	
	
		
			
				|  | @@ -219,21 +213,10 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |          return null;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    /**
 | 
	
		
			
				|  |  | -     * True if provided index should be resolved when resolving request
 | 
	
		
			
				|  |  | -     */
 | 
	
		
			
				|  |  | -    protected boolean resolveIndex() {
 | 
	
		
			
				|  |  | -        return true;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      protected TransportRequestOptions transportOptions(Settings settings) {
 | 
	
		
			
				|  |  |          return TransportRequestOptions.EMPTY;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
 | 
	
		
			
				|  |  | -        return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
 | 
	
		
			
				|  |  |          ClusterBlockLevel globalBlockLevel = globalBlockLevel();
 | 
	
		
			
				|  |  |          if (globalBlockLevel != null) {
 | 
	
	
		
			
				|  | @@ -648,8 +631,7 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |          protected void doRun() {
 | 
	
		
			
				|  |  |              setPhase(task, "routing");
 | 
	
		
			
				|  |  |              final ClusterState state = observer.setAndGetObservedState();
 | 
	
		
			
				|  |  | -            final String concreteIndex = concreteIndex(state, request);
 | 
	
		
			
				|  |  | -            final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
 | 
	
		
			
				|  |  | +            final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
 | 
	
		
			
				|  |  |              if (blockException != null) {
 | 
	
		
			
				|  |  |                  if (blockException.retryable()) {
 | 
	
		
			
				|  |  |                      logger.trace("cluster is blocked, scheduling a retry", blockException);
 | 
	
	
		
			
				|  | @@ -658,23 +640,47 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |                      finishAsFailed(blockException);
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              } else {
 | 
	
		
			
				|  |  | -                // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
 | 
	
		
			
				|  |  | -                final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
 | 
	
		
			
				|  |  | +                final IndexMetaData indexMetaData = state.metaData().index(request.shardId().getIndex());
 | 
	
		
			
				|  |  |                  if (indexMetaData == null) {
 | 
	
		
			
				|  |  | -                    retry(new IndexNotFoundException(concreteIndex));
 | 
	
		
			
				|  |  | -                    return;
 | 
	
		
			
				|  |  | +                    // ensure that the cluster state on the node is at least as high as the node that decided that the index was there
 | 
	
		
			
				|  |  | +                    if (state.version() < request.routedBasedOnClusterVersion()) {
 | 
	
		
			
				|  |  | +                        logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
 | 
	
		
			
				|  |  | +                                "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
 | 
	
		
			
				|  |  | +                            request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
 | 
	
		
			
				|  |  | +                        retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
 | 
	
		
			
				|  |  | +                            "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
 | 
	
		
			
				|  |  | +                            request.shardId().getIndexName()));
 | 
	
		
			
				|  |  | +                        return;
 | 
	
		
			
				|  |  | +                    } else {
 | 
	
		
			
				|  |  | +                        finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                        return;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |                  if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
 | 
	
		
			
				|  |  | -                    throw new IndexClosedException(indexMetaData.getIndex());
 | 
	
		
			
				|  |  | +                    finishAsFailed(new IndexClosedException(indexMetaData.getIndex()));
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                // resolve all derived request fields, so we can route and apply it
 | 
	
		
			
				|  |  | -                resolveRequest(indexMetaData, request);
 | 
	
		
			
				|  |  | +                if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
 | 
	
		
			
				|  |  | +                    // if the wait for active shard count has not been set in the request,
 | 
	
		
			
				|  |  | +                    // resolve it from the index settings
 | 
	
		
			
				|  |  | +                    request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |                  assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
 | 
	
		
			
				|  |  |                      "request waitForActiveShards must be set in resolveRequest";
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                final ShardRouting primary = primary(state);
 | 
	
		
			
				|  |  | -                if (retryIfUnavailable(state, primary)) {
 | 
	
		
			
				|  |  | +                final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
 | 
	
		
			
				|  |  | +                if (primary == null || primary.active() == false) {
 | 
	
		
			
				|  |  | +                    logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
 | 
	
		
			
				|  |  | +                        + "cluster state version [{}]", request.shardId(), actionName, request, state.version());
 | 
	
		
			
				|  |  | +                    retryBecauseUnavailable(request.shardId(), "primary shard is not active");
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
 | 
	
		
			
				|  |  | +                    logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
 | 
	
		
			
				|  |  | +                        + "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
 | 
	
		
			
				|  |  | +                    retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
 | 
	
		
			
				|  |  |                      return;
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
 | 
	
	
		
			
				|  | @@ -718,27 +724,6 @@ public abstract class TransportReplicationAction<
 | 
	
		
			
				|  |  |              performAction(node, actionName, false, request);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
 | 
	
		
			
				|  |  | -            if (primary == null || primary.active() == false) {
 | 
	
		
			
				|  |  | -                logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
 | 
	
		
			
				|  |  | -                    + "cluster state version [{}]", request.shardId(), actionName, request, state.version());
 | 
	
		
			
				|  |  | -                retryBecauseUnavailable(request.shardId(), "primary shard is not active");
 | 
	
		
			
				|  |  | -                return true;
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -            if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
 | 
	
		
			
				|  |  | -                logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
 | 
	
		
			
				|  |  | -                    + "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
 | 
	
		
			
				|  |  | -                retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
 | 
	
		
			
				|  |  | -                return true;
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -            return false;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        private ShardRouting primary(ClusterState state) {
 | 
	
		
			
				|  |  | -            IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
 | 
	
		
			
				|  |  | -            return indexShard.primaryShard();
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |          private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
 | 
	
		
			
				|  |  |                                     final TransportRequest requestToPerform) {
 | 
	
		
			
				|  |  |              transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
 |