Browse Source

Refactors TransportReplicationAction to decouple request routing and shard operation logic

Areek Zillur 10 years ago
parent
commit
025e9818e7
20 changed files with 691 additions and 644 deletions
  1. 7 3
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java
  2. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java
  3. 10 18
      core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java
  4. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java
  5. 10 16
      core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java
  6. 7 4
      core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java
  7. 2 2
      core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  8. 24 35
      core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  9. 17 26
      core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
  10. 14 26
      core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  11. 35 9
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  12. 322 380
      core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  13. 2 3
      core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java
  14. 15 32
      core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java
  15. 20 0
      core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
  16. 190 75
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  17. 0 2
      core/src/test/java/org/elasticsearch/cluster/routing/RoutingBackwardCompatibilityTests.java
  18. 2 2
      core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java
  19. 7 4
      core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java
  20. 5 5
      modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java

+ 7 - 3
core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

@@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.flush;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
 
@@ -29,8 +30,8 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
 
     private FlushRequest request = new FlushRequest();
 
-    public ShardFlushRequest(FlushRequest request) {
-        super(request);
+    public ShardFlushRequest(FlushRequest request, ShardId shardId) {
+        super(request, shardId);
         this.request = request;
     }
 
@@ -53,5 +54,8 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
         request.writeTo(out);
     }
 
-
+    @Override
+    public String toString() {
+        return "flush {" + super.toString() + "}";
+    }
 }

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java

@@ -53,7 +53,7 @@ public class TransportFlushAction extends TransportBroadcastReplicationAction<Fl
 
     @Override
     protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
-        return new ShardFlushRequest(request).setShardId(shardId);
+        return new ShardFlushRequest(request, shardId);
     }
 
     @Override

+ 10 - 18
core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

@@ -23,18 +23,15 @@ import org.elasticsearch.action.ActionWriteResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
-import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -61,15 +58,15 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
     }
 
     @Override
-    protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
-        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
-        indexShard.flush(shardRequest.request.getRequest());
+    protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
+        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
+        indexShard.flush(shardRequest.getRequest());
         logger.trace("{} flush request executed on primary", indexShard.shardId());
-        return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
+        return new Tuple<>(new ActionWriteResponse(), shardRequest);
     }
 
     @Override
-    protected void shardOperationOnReplica(ShardId shardId, ShardFlushRequest request) {
+    protected void shardOperationOnReplica(ShardFlushRequest request) {
         IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
         indexShard.flush(request.getRequest());
         logger.trace("{} flush request executed on replica", indexShard.shardId());
@@ -81,18 +78,13 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
     }
 
     @Override
-    protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-        return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
+    protected ClusterBlockLevel globalBlockLevel() {
+        return ClusterBlockLevel.METADATA_WRITE;
     }
 
     @Override
-    protected ClusterBlockException checkGlobalBlock(ClusterState state) {
-        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
-    }
-
-    @Override
-    protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
-        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
+    protected ClusterBlockLevel indexBlockLevel() {
+        return ClusterBlockLevel.METADATA_WRITE;
     }
 
     @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java

@@ -54,7 +54,7 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
 
     @Override
     protected ReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
-        return new ReplicationRequest(request).setShardId(shardId);
+        return new ReplicationRequest(request, shardId);
     }
 
     @Override

+ 10 - 16
core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -24,13 +24,11 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
-import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -62,15 +60,16 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Repl
     }
 
     @Override
-    protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
-        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
+    protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
+        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
         indexShard.refresh("api");
         logger.trace("{} refresh request executed on primary", indexShard.shardId());
-        return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
+        return new Tuple<>(new ActionWriteResponse(), shardRequest);
     }
 
     @Override
-    protected void shardOperationOnReplica(ShardId shardId, ReplicationRequest request) {
+    protected void shardOperationOnReplica(ReplicationRequest request) {
+        final ShardId shardId = request.shardId();
         IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
         indexShard.refresh("api");
         logger.trace("{} refresh request executed on replica", indexShard.shardId());
@@ -82,18 +81,13 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Repl
     }
 
     @Override
-    protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-        return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
+    protected ClusterBlockLevel globalBlockLevel() {
+        return ClusterBlockLevel.METADATA_WRITE;
     }
 
     @Override
-    protected ClusterBlockException checkGlobalBlock(ClusterState state) {
-        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
-    }
-
-    @Override
-    protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
-        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
+    protected ClusterBlockLevel indexBlockLevel() {
+        return ClusterBlockLevel.METADATA_WRITE;
     }
 
     @Override

+ 7 - 4
core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

@@ -40,10 +40,8 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
     public BulkShardRequest() {
     }
 
-    BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) {
-        super(bulkRequest);
-        this.index = index;
-        this.setShardId(new ShardId(index, shardId));
+    BulkShardRequest(BulkRequest bulkRequest, ShardId shardId, boolean refresh, BulkItemRequest[] items) {
+        super(bulkRequest, shardId);
         this.items = items;
         this.refresh = refresh;
     }
@@ -93,4 +91,9 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
         }
         refresh = in.readBoolean();
     }
+
+    @Override
+    public String toString() {
+        return "shard bulk {" + super.toString() + "}";
+    }
 }

+ 2 - 2
core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -275,7 +275,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         list.add(new BulkItemRequest(i, new DeleteRequest(deleteRequest)));
                     }
                 } else {
-                    ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
+                    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
                     List<BulkItemRequest> list = requestsByShard.get(shardId);
                     if (list == null) {
                         list = new ArrayList<>();
@@ -312,7 +312,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
             final ShardId shardId = entry.getKey();
             final List<BulkItemRequest> requests = entry.getValue();
-            BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
+            BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId, bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
             bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
             bulkShardRequest.timeout(bulkRequest.timeout());
             shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {

+ 24 - 35
core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -35,12 +35,11 @@ import org.elasticsearch.action.update.UpdateHelper;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
@@ -87,11 +86,6 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
         this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
     }
 
-    @Override
-    protected boolean checkWriteConsistency() {
-        return true;
-    }
-
     @Override
     protected TransportRequestOptions transportOptions() {
         return BulkAction.INSTANCE.transportOptions(settings);
@@ -108,15 +102,9 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
     }
 
     @Override
-    protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-        return clusterState.routingTable().index(request.concreteIndex()).shard(request.request().shardId().id()).shardsIt();
-    }
-
-    @Override
-    protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
-        final BulkShardRequest request = shardRequest.request;
+    protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(MetaData metaData, BulkShardRequest request) {
         final IndexService indexService = indicesService.indexServiceSafe(request.index());
-        final IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
+        final IndexShard indexShard = indexService.getShard(request.shardId().id());
 
         long[] preVersions = new long[request.items().length];
         VersionType[] preVersionTypes = new VersionType[request.items().length];
@@ -128,7 +116,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                 preVersions[requestIndex] = indexRequest.version();
                 preVersionTypes[requestIndex] = indexRequest.versionType();
                 try {
-                    WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
+                    WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, metaData, indexShard, true);
                     location = locationToSync(location, result.location);
                     // add the response
                     IndexResponse indexResponse = result.response();
@@ -143,9 +131,9 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                         throw (ElasticsearchException) e;
                     }
                     if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
-                        logger.trace("{} failed to execute bulk item (index) {}", e, shardRequest.shardId, indexRequest);
+                        logger.trace("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
                     } else {
-                        logger.debug("{} failed to execute bulk item (index) {}", e, shardRequest.shardId, indexRequest);
+                        logger.debug("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
                     }
                     // if its a conflict failure, and we already executed the request on a primary (and we execute it
                     // again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
@@ -178,9 +166,9 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                         throw (ElasticsearchException) e;
                     }
                     if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
-                        logger.trace("{} failed to execute bulk item (delete) {}", e, shardRequest.shardId, deleteRequest);
+                        logger.trace("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
                     } else {
-                        logger.debug("{} failed to execute bulk item (delete) {}", e, shardRequest.shardId, deleteRequest);
+                        logger.debug("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
                     }
                     // if its a conflict failure, and we already executed the request on a primary (and we execute it
                     // again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
@@ -200,7 +188,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                 for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
                     UpdateResult updateResult;
                     try {
-                        updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
+                        updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
                     } catch (Throwable t) {
                         updateResult = new UpdateResult(null, null, false, t, null);
                     }
@@ -219,7 +207,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                                 UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
                                 if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
                                     Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
-                                    updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
+                                    updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
                                 }
                                 item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
                                 setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
@@ -229,7 +217,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                                 DeleteResponse response = writeResult.response();
                                 DeleteRequest deleteRequest = updateResult.request();
                                 updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
-                                updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
+                                updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
                                 // Replace the update request to the translated delete request to execute on the replica.
                                 item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
                                 setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
@@ -264,16 +252,16 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                             if (item.getPrimaryResponse() != null && isConflictException(t)) {
                                 setResponse(item, item.getPrimaryResponse());
                             } else if (updateResult.result == null) {
-                                setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(shardRequest.request.index(), updateRequest.type(), updateRequest.id(), t)));
+                                setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
                             } else {
                                 switch (updateResult.result.operation()) {
                                     case UPSERT:
                                     case INDEX:
                                         IndexRequest indexRequest = updateResult.request();
                                         if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
-                                            logger.trace("{} failed to execute bulk item (index) {}", t, shardRequest.shardId, indexRequest);
+                                            logger.trace("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
                                         } else {
-                                            logger.debug("{} failed to execute bulk item (index) {}", t, shardRequest.shardId, indexRequest);
+                                            logger.debug("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
                                         }
                                         setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
                                                 new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
@@ -281,9 +269,9 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
                                     case DELETE:
                                         DeleteRequest deleteRequest = updateResult.request();
                                         if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
-                                            logger.trace("{} failed to execute bulk item (delete) {}", t, shardRequest.shardId, deleteRequest);
+                                            logger.trace("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
                                         } else {
-                                            logger.debug("{} failed to execute bulk item (delete) {}", t, shardRequest.shardId, deleteRequest);
+                                            logger.debug("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
                                         }
                                         setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
                                                 new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
@@ -310,7 +298,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
         for (int i = 0; i < items.length; i++) {
             responses[i] = items[i].getPrimaryResponse();
         }
-        return new Tuple<>(new BulkShardResponse(shardRequest.shardId, responses), shardRequest.request);
+        return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
     }
 
     private void setResponse(BulkItemRequest request, BulkItemResponse response) {
@@ -320,11 +308,11 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
         }
     }
 
-    private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
+    private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, MetaData metaData,
                                             IndexShard indexShard, boolean processed) throws Throwable {
 
         // validate, if routing is required, that we got routing
-        MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
+        MappingMetaData mappingMd = metaData.index(request.index()).mappingOrDefault(indexRequest.type());
         if (mappingMd != null && mappingMd.routing().required()) {
             if (indexRequest.routing() == null) {
                 throw new RoutingMissingException(request.index(), indexRequest.type(), indexRequest.id());
@@ -332,7 +320,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
         }
 
         if (!processed) {
-            indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
+            indexRequest.process(metaData, mappingMd, allowIdGeneration, request.index());
         }
         return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
     }
@@ -390,14 +378,14 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
 
     }
 
-    private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
+    private UpdateResult shardUpdateOperation(MetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
         UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
         switch (translate.operation()) {
             case UPSERT:
             case INDEX:
                 IndexRequest indexRequest = translate.action();
                 try {
-                    WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false);
+                    WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, metaData, indexShard, false);
                     return new UpdateResult(translate, indexRequest, result);
                 } catch (Throwable t) {
                     t = ExceptionsHelper.unwrapCause(t);
@@ -431,7 +419,8 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
 
 
     @Override
-    protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) {
+    protected void shardOperationOnReplica(BulkShardRequest request) {
+        final ShardId shardId = request.shardId();
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
         Translog.Location location = null;

+ 17 - 26
core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

@@ -34,7 +34,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -94,45 +94,41 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
     }
 
     @Override
-    protected void resolveRequest(final ClusterState state, final InternalRequest request, final ActionListener<DeleteResponse> listener) {
-        request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
-        if (state.metaData().hasIndex(request.concreteIndex())) {
+    protected void resolveRequest(final MetaData metaData, String concreteIndex, DeleteRequest request) {
+        request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
+        if (metaData.hasIndex(concreteIndex)) {
             // check if routing is required, if so, do a broadcast delete
-            MappingMetaData mappingMd = state.metaData().index(request.concreteIndex()).mappingOrDefault(request.request().type());
+            MappingMetaData mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
             if (mappingMd != null && mappingMd.routing().required()) {
-                if (request.request().routing() == null) {
-                    if (request.request().versionType() != VersionType.INTERNAL) {
+                if (request.routing() == null) {
+                    if (request.versionType() != VersionType.INTERNAL) {
                         // TODO: implement this feature
-                        throw new IllegalArgumentException("routing value is required for deleting documents of type [" + request.request().type()
-                                + "] while using version_type [" + request.request().versionType() + "]");
+                        throw new IllegalArgumentException("routing value is required for deleting documents of type [" + request.type()
+                                + "] while using version_type [" + request.versionType() + "]");
                     }
-                    throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
+                    throw new RoutingMissingException(concreteIndex, request.type(), request.id());
                 }
             }
         }
+        ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
+        request.setShardId(shardId);
     }
 
     private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
         super.doExecute(request, listener);
     }
 
-    @Override
-    protected boolean checkWriteConsistency() {
-        return true;
-    }
-
     @Override
     protected DeleteResponse newResponseInstance() {
         return new DeleteResponse();
     }
 
     @Override
-    protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
-        DeleteRequest request = shardRequest.request;
-        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
+    protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(MetaData metaData, DeleteRequest request) {
+        IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
         final WriteResult<DeleteResponse> result = executeDeleteRequestOnPrimary(request, indexShard);
         processAfterWrite(request.refresh(), indexShard, result.location);
-        return new Tuple<>(result.response, shardRequest.request);
+        return new Tuple<>(result.response, request);
     }
 
     public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
@@ -154,17 +150,12 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
         return delete;
     }
 
-
     @Override
-    protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) {
+    protected void shardOperationOnReplica(DeleteRequest request) {
+        final ShardId shardId = request.shardId();
         IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
         Engine.Delete delete = executeDeleteRequestOnReplica(request, indexShard);
         processAfterWrite(request.refresh(), indexShard, delete.getTranslogLocation());
     }
 
-    @Override
-    protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-        return clusterService.operationRouting()
-                .deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
-    }
 }

+ 14 - 26
core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -36,7 +36,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -120,62 +119,51 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
     }
 
     @Override
-    protected void resolveRequest(ClusterState state, InternalRequest request, ActionListener<IndexResponse> indexResponseActionListener) {
-        MetaData metaData = clusterService.state().metaData();
-
+    protected void resolveRequest(MetaData metaData, String concreteIndex, IndexRequest request) {
         MappingMetaData mappingMd = null;
-        if (metaData.hasIndex(request.concreteIndex())) {
-            mappingMd = metaData.index(request.concreteIndex()).mappingOrDefault(request.request().type());
+        if (metaData.hasIndex(concreteIndex)) {
+            mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
         }
-        request.request().process(metaData, mappingMd, allowIdGeneration, request.concreteIndex());
+        request.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
+        ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
+        request.setShardId(shardId);
     }
 
     private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
         super.doExecute(request, listener);
     }
 
-    @Override
-    protected boolean checkWriteConsistency() {
-        return true;
-    }
-
     @Override
     protected IndexResponse newResponseInstance() {
         return new IndexResponse();
     }
 
     @Override
-    protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-        return clusterService.operationRouting()
-                .indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
-    }
-
-    @Override
-    protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
-        final IndexRequest request = shardRequest.request;
+    protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable {
 
         // validate, if routing is required, that we got routing
-        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
+        IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
         MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
         if (mappingMd != null && mappingMd.routing().required()) {
             if (request.routing() == null) {
-                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
+                throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
             }
         }
 
-        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
-        IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
+        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
+        IndexShard indexShard = indexService.getShard(request.shardId().id());
 
         final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
 
         final IndexResponse response = result.response;
         final Translog.Location location = result.location;
         processAfterWrite(request.refresh(), indexShard, location);
-        return new Tuple<>(response, shardRequest.request);
+        return new Tuple<>(response, request);
     }
 
     @Override
-    protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
+    protected void shardOperationOnReplica(IndexRequest request) {
+        final ShardId shardId = request.shardId();
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
         final Engine.Index operation = executeIndexRequestOnReplica(request, indexShard);

+ 35 - 9
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -42,7 +42,12 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
 
     public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
 
-    ShardId internalShardId;
+    /**
+     * Target shard the request should execute on. In case of index and delete requests,
+     * shard id gets resolved by the transport action before performing request operation
+     * and at request creation time for shard-level bulk, refresh and flush requests.
+     */
+    protected ShardId shardId;
 
     protected TimeValue timeout = DEFAULT_TIMEOUT;
     protected String index;
@@ -60,6 +65,15 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
         super(request);
     }
 
+    /**
+     * Creates a new request with resolved shard id
+     */
+    public ReplicationRequest(ActionRequest request, ShardId shardId) {
+        super(request);
+        this.index = shardId.getIndex();
+        this.shardId = shardId;
+    }
+
     /**
      * Copy constructor that creates a new request that is a copy of the one provided as an argument.
      */
@@ -124,12 +138,12 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
 
     /**
      * @return the shardId of the shard where this operation should be executed on.
-     * can be null in case the shardId is determined by a single document (index, type, id) for example for index or delete request.
+     * can be null if the shardID has not yet been resolved
      */
     public
     @Nullable
     ShardId shardId() {
-        return internalShardId;
+        return shardId;
     }
 
     /**
@@ -154,9 +168,9 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         if (in.readBoolean()) {
-            internalShardId = ShardId.readShardId(in);
+            shardId = ShardId.readShardId(in);
         } else {
-            internalShardId = null;
+            shardId = null;
         }
         consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
         timeout = TimeValue.readTimeValue(in);
@@ -166,9 +180,9 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        if (internalShardId != null) {
+        if (shardId != null) {
             out.writeBoolean(true);
-            internalShardId.writeTo(out);
+            shardId.writeTo(out);
         } else {
             out.writeBoolean(false);
         }
@@ -177,9 +191,21 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
         out.writeString(index);
     }
 
+    /**
+     * Sets the target shard id for the request. The shard id is set when a
+     * index/delete request is resolved by the transport action
+     */
     public T setShardId(ShardId shardId) {
-        this.internalShardId = shardId;
-        this.index = shardId.getIndex();
+        this.shardId = shardId;
         return (T) this;
     }
+
+    @Override
+    public String toString() {
+        if (shardId != null) {
+            return shardId.toString();
+        } else {
+            return index;
+        }
+    }
 }

File diff suppressed because it is too large
+ 322 - 380
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java


+ 2 - 3
core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.action.termvectors;
 
-import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocumentRequest;
 import org.elasticsearch.action.support.ActionFilters;
@@ -79,8 +78,8 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
                         new IllegalArgumentException("routing is required for [" + concreteSingleIndex + "]/[" + termVectorsRequest.type() + "]/[" + termVectorsRequest.id() + "]"))));
                 continue;
             }
-            ShardId shardId = clusterService.operationRouting().getShards(clusterState, concreteSingleIndex,
-                    termVectorsRequest.type(), termVectorsRequest.id(), termVectorsRequest.routing(), null).shardId();
+            ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
+                    termVectorsRequest.id(), termVectorsRequest.routing());
             MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
             if (shardRequest == null) {
                 shardRequest = new MultiTermVectorsShardRequest(request, shardId.index().name(), shardId.id());

+ 15 - 32
core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

@@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.math.MathUtils;
@@ -55,19 +54,16 @@ public class OperationRouting extends AbstractComponent {
     }
 
     public ShardIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
-        return shards(clusterState, index, type, id, routing).shardsIt();
-    }
-
-    public ShardIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
-        return shards(clusterState, index, type, id, routing).shardsIt();
+        return shards(clusterState, index, id, routing).shardsIt();
     }
 
     public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) {
-        return preferenceActiveShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
+        return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
     }
 
     public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) {
-        return preferenceActiveShardIterator(shards(clusterState, index, shardId), clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
+        final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId);
+        return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
     }
 
     public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) {
@@ -102,7 +98,7 @@ public class OperationRouting extends AbstractComponent {
             final Set<String> effectiveRouting = routing.get(index);
             if (effectiveRouting != null) {
                 for (String r : effectiveRouting) {
-                    int shardId = shardId(clusterState, index, null, null, r);
+                    int shardId = generateShardId(clusterState, index, null, r);
                     IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
                     if (indexShard == null) {
                         throw new ShardNotFoundException(new ShardId(index, shardId));
@@ -200,14 +196,6 @@ public class OperationRouting extends AbstractComponent {
         }
     }
 
-    public IndexMetaData indexMetaData(ClusterState clusterState, String index) {
-        IndexMetaData indexMetaData = clusterState.metaData().index(index);
-        if (indexMetaData == null) {
-            throw new IndexNotFoundException(index);
-        }
-        return indexMetaData;
-    }
-
     protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
         IndexRoutingTable indexRouting = clusterState.routingTable().index(index);
         if (indexRouting == null) {
@@ -216,25 +204,20 @@ public class OperationRouting extends AbstractComponent {
         return indexRouting;
     }
 
-
-    // either routing is set, or type/id are set
-
-    protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String type, String id, String routing) {
-        int shardId = shardId(clusterState, index, type, id, routing);
-        return shards(clusterState, index, shardId);
+    protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
+        int shardId = generateShardId(clusterState, index, id, routing);
+        return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
     }
 
-    protected IndexShardRoutingTable shards(ClusterState clusterState, String index, int shardId) {
-        IndexShardRoutingTable indexShard = indexRoutingTable(clusterState, index).shard(shardId);
-        if (indexShard == null) {
-            throw new ShardNotFoundException(new ShardId(index, shardId));
-        }
-        return indexShard;
+    public ShardId shardId(ClusterState clusterState, String index, String id, @Nullable String routing) {
+        return new ShardId(index, generateShardId(clusterState, index, id, routing));
     }
 
-    @SuppressForbidden(reason = "Math#abs is trappy")
-    private int shardId(ClusterState clusterState, String index, String type, String id, @Nullable String routing) {
-        final IndexMetaData indexMetaData = indexMetaData(clusterState, index);
+    private int generateShardId(ClusterState clusterState, String index, String id, @Nullable String routing) {
+        IndexMetaData indexMetaData = clusterState.metaData().index(index);
+        if (indexMetaData == null) {
+            throw new IndexNotFoundException(index);
+        }
         final int hash;
         if (routing == null) {
             hash = Murmur3HashFunction.hash(id);

+ 20 - 0
core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

@@ -33,6 +33,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -95,6 +97,24 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
         return indicesRouting();
     }
 
+    /**
+     * All shards for the provided index and shard id
+     * @return All the shard routing entries for the given index and shard id
+     * @throws IndexNotFoundException if provided index does not exist
+     * @throws ShardNotFoundException if provided shard id is unknown
+     */
+    public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
+        IndexRoutingTable indexRouting = index(index);
+        if (indexRouting == null) {
+            throw new IndexNotFoundException(index);
+        }
+        IndexShardRoutingTable shard = indexRouting.shard(shardId);
+        if (shard == null) {
+            throw new ShardNotFoundException(new ShardId(index, shardId));
+        }
+        return shard;
+    }
+
     public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
         RoutingTableValidation validation = validate(metaData);
         if (!validation.valid()) {

+ 190 - 75
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -28,7 +28,6 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -46,18 +45,17 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.shard.IndexShardNotStartedException;
 import org.elasticsearch.index.shard.IndexShardState;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.cluster.TestClusterService;
 import org.elasticsearch.test.transport.CapturingTransport;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportChannel;
-import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseOptions;
-import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.transport.*;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -132,22 +130,22 @@ public class TransportReplicationActionTests extends ESTestCase {
         ClusterBlocks.Builder block = ClusterBlocks.builder()
                 .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
-        assertFalse("primary phase should stop execution", primaryPhase.checkBlocks());
+        TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
         assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
 
         block = ClusterBlocks.builder()
                 .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
         clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
         listener = new PlainActionFuture<>();
-        primaryPhase = action.new PrimaryPhase(new Request().timeout("5ms"), listener);
-        assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks());
+        reroutePhase = action.new ReroutePhase(new Request().timeout("5ms"), listener);
+        reroutePhase.run();
         assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
 
 
         listener = new PlainActionFuture<>();
-        primaryPhase = action.new PrimaryPhase(new Request(), listener);
-        assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks());
+        reroutePhase = action.new ReroutePhase(new Request(), listener);
+        reroutePhase.run();
         assertFalse("primary phase should wait on retryable block", listener.isDone());
 
         block = ClusterBlocks.builder()
@@ -172,25 +170,47 @@ public class TransportReplicationActionTests extends ESTestCase {
 
         Request request = new Request(shardId).timeout("1ms");
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
-        primaryPhase.run();
+        TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
         assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
 
         request = new Request(shardId);
         listener = new PlainActionFuture<>();
-        primaryPhase = action.new PrimaryPhase(request, listener);
-        primaryPhase.run();
+        reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
         assertFalse("unassigned primary didn't cause a retry", listener.isDone());
 
         clusterService.setState(state(index, true, ShardRoutingState.STARTED));
         logger.debug("--> primary assigned state:\n{}", clusterService.state().prettyPrint());
 
-        listener.get();
-        assertTrue("request wasn't processed on primary, despite of it being assigned", request.processedOnPrimary.get());
+        final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
+        final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
+        final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
+        assertThat(capturedRequests, notNullValue());
+        assertThat(capturedRequests.size(), equalTo(1));
+        assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
         assertIndexShardCounter(1);
     }
 
-    public void testRoutingToPrimary() {
+    public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
+        final String index = "test";
+        // no replicas in oder to skip the replication part
+        clusterService.setState(state(index, true,
+                randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
+        logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
+        Request request = new Request(new ShardId("unknown_index", 0)).timeout("1ms");
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
+        assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
+        request = new Request(new ShardId(index, 10)).timeout("1ms");
+        listener = new PlainActionFuture<>();
+        reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
+        assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
+    }
+
+    public void testRoutePhaseExecutesRequest() {
         final String index = "test";
         final ShardId shardId = new ShardId(index, 0);
 
@@ -203,25 +223,126 @@ public class TransportReplicationActionTests extends ESTestCase {
         Request request = new Request(shardId);
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
 
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
-        assertTrue(primaryPhase.checkBlocks());
-        primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt());
-        if (primaryNodeId.equals(clusterService.localNode().id())) {
-            logger.info("--> primary is assigned locally, testing for execution");
-            assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get());
-            if (transport.capturedRequests().length > 0) {
-                assertIndexShardCounter(2);
-            } else {
-                assertIndexShardCounter(1);
-            }
+        TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
+        reroutePhase.run();
+        assertThat(request.shardId(), equalTo(shardId));
+        logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
+        final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
+        assertThat(capturedRequests, notNullValue());
+        assertThat(capturedRequests.size(), equalTo(1));
+        if (clusterService.state().nodes().localNodeId().equals(primaryNodeId)) {
+            assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
         } else {
-            logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
-            final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
-            assertThat(capturedRequests, notNullValue());
-            assertThat(capturedRequests.size(), equalTo(1));
             assertThat(capturedRequests.get(0).action, equalTo("testAction"));
-            assertIndexShardUninitialized();
         }
+        assertIndexShardUninitialized();
+    }
+
+    public void testPrimaryPhaseExecutesRequest() throws InterruptedException, ExecutionException {
+        final String index = "test";
+        final ShardId shardId = new ShardId(index, 0);
+        clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED));
+        Request request = new Request(shardId).timeout("1ms");
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
+        primaryPhase.run();
+        assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
+        final String replicaNodeId = clusterService.state().getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0).currentNodeId();
+        final List<CapturingTransport.CapturedRequest> requests = transport.capturedRequestsByTargetNode().get(replicaNodeId);
+        assertThat(requests, notNullValue());
+        assertThat(requests.size(), equalTo(1));
+        assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]"));
+    }
+
+    public void testAddedReplicaAfterPrimaryOperation() {
+        final String index = "test";
+        final ShardId shardId = new ShardId(index, 0);
+        // start with no replicas
+        clusterService.setState(stateWithStartedPrimary(index, true, 0));
+        logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
+        final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED);
+
+        final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
+                final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
+                // add replicas after primary operation
+                ((TestClusterService) clusterService).setState(stateWithAddedReplicas);
+                logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint());
+                return operationOnPrimary;
+            }
+        };
+
+        Request request = new Request(shardId);
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithAddedReplicaAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
+        primaryPhase.run();
+        assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
+        for (ShardRouting replica : stateWithAddedReplicas.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards()) {
+            List<CapturingTransport.CapturedRequest> requests = transport.capturedRequestsByTargetNode().get(replica.currentNodeId());
+            assertThat(requests, notNullValue());
+            assertThat(requests.size(), equalTo(1));
+            assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]"));
+        }
+    }
+
+    public void testRelocatingReplicaAfterPrimaryOperation() {
+        final String index = "test";
+        final ShardId shardId = new ShardId(index, 0);
+        // start with a replica
+        clusterService.setState(state(index, true, ShardRoutingState.STARTED,  randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED));
+        logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
+        final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
+
+        final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
+                final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
+                // set replica to relocating
+                ((TestClusterService) clusterService).setState(stateWithRelocatingReplica);
+                logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint());
+                return operationOnPrimary;
+            }
+        };
+
+        Request request = new Request(shardId);
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
+        primaryPhase.run();
+        assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
+        ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0);
+        for (String node : new String[] {relocatingReplicaShard.currentNodeId(), relocatingReplicaShard.relocatingNodeId()}) {
+            List<CapturingTransport.CapturedRequest> requests = transport.capturedRequestsByTargetNode().get(node);
+            assertThat(requests, notNullValue());
+            assertThat(requests.size(), equalTo(1));
+            assertThat("replica request was not sent to replica", requests.get(0).action, equalTo("testAction[r]"));
+        }
+    }
+
+    public void testIndexDeletedAfterPrimaryOperation() {
+        final String index = "test";
+        final ShardId shardId = new ShardId(index, 0);
+        clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED));
+        logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
+        final ClusterState stateWithDeletedIndex = state(index + "_new", true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
+
+        final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
+                final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
+                // delete index after primary op
+                ((TestClusterService) clusterService).setState(stateWithDeletedIndex);
+                logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint());
+                return operationOnPrimary;
+            }
+        };
+
+        Request request = new Request(shardId);
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(request, createTransportChannel(listener));
+        primaryPhase.run();
+        assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
+        assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0));
     }
 
     public void testWriteConsistency() throws ExecutionException, InterruptedException {
@@ -266,10 +387,9 @@ public class TransportReplicationActionTests extends ESTestCase {
 
         final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
-
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
+        TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
         if (passesWriteConsistency) {
-            assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue());
+            assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), nullValue());
             primaryPhase.run();
             assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get());
             if (assignedReplicas > 0) {
@@ -278,14 +398,18 @@ public class TransportReplicationActionTests extends ESTestCase {
                 assertIndexShardCounter(1);
             }
         } else {
-            assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue());
+            assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), notNullValue());
             primaryPhase.run();
             assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get());
+            assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class);
             assertIndexShardUninitialized();
             for (int i = 0; i < replicaStates.length; i++) {
                 replicaStates[i] = ShardRoutingState.STARTED;
             }
             clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates));
+            listener = new PlainActionFuture<>();
+            primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
+            primaryPhase.run();
             assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get());
             assertIndexShardCounter(2);
         }
@@ -340,23 +464,19 @@ public class TransportReplicationActionTests extends ESTestCase {
 
 
     protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException {
-        final ShardRouting primaryShard = shardRoutingTable.primaryShard();
         final ShardIterator shardIt = shardRoutingTable.shardsIt();
         final ShardId shardId = shardIt.shardId();
-        final Request request = new Request();
-        PlainActionFuture<Response> listener = new PlainActionFuture<>();
-
+        final Request request = new Request(shardId);
+        final PlainActionFuture<Response> listener = new PlainActionFuture<>();
         logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
 
-        final TransportReplicationAction<Request, Request, Response>.InternalRequest internalRequest = action.new InternalRequest(request);
-        internalRequest.concreteIndex(shardId.index().name());
         Releasable reference = getOrCreateIndexShardOperationsCounter();
         assertIndexShardCounter(2);
         // TODO: set a default timeout
         TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
-                action.new ReplicationPhase(shardIt, request,
-                        new Response(), new ClusterStateObserver(clusterService, logger),
-                        primaryShard, internalRequest, listener, reference, null);
+                action.new ReplicationPhase(request,
+                        new Response(),
+                        request.shardId(), createTransportChannel(listener), reference, null);
 
         assertThat(replicationPhase.totalShards(), equalTo(totalShards));
         assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
@@ -433,7 +553,7 @@ public class TransportReplicationActionTests extends ESTestCase {
          * However, this failure would only become apparent once listener.get is called. Seems a little implicit.
          * */
         action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
-        final TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
+        final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
         Thread t = new Thread() {
             @Override
             public void run() {
@@ -464,7 +584,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
         Request request = new Request(shardId).timeout("100ms");
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
+        TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
         primaryPhase.run();
         assertIndexShardCounter(2);
         assertThat(transport.capturedRequests().length, equalTo(1));
@@ -473,7 +593,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertIndexShardCounter(1);
         transport.clear();
         request = new Request(shardId).timeout("100ms");
-        primaryPhase = action.new PrimaryPhase(request, listener);
+        primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
         primaryPhase.run();
         assertIndexShardCounter(2);
         CapturingTransport.CapturedRequest[] replicationRequests = transport.capturedRequests();
@@ -498,7 +618,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             @Override
             public void run() {
                 try {
-                    replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel());
+                    replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel(new PlainActionFuture<>()));
                 } catch (Exception e) {
                 }
             }
@@ -515,7 +635,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         action = new ActionWithExceptions(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
         final Action.ReplicaOperationTransportHandler replicaOperationTransportHandlerForException = action.new ReplicaOperationTransportHandler();
         try {
-            replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel());
+            replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()));
             fail();
         } catch (Throwable t2) {
         }
@@ -531,7 +651,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
         Request request = new Request(shardId).timeout("100ms");
         PlainActionFuture<Response> listener = new PlainActionFuture<>();
-        TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
+        TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener));
         primaryPhase.run();
         // no replica request should have been sent yet
         assertThat(transport.capturedRequests().length, equalTo(0));
@@ -559,7 +679,6 @@ public class TransportReplicationActionTests extends ESTestCase {
     }
 
     public static class Request extends ReplicationRequest<Request> {
-        int shardId;
         public AtomicBoolean processedOnPrimary = new AtomicBoolean();
         public AtomicInteger processedOnReplicas = new AtomicInteger();
 
@@ -568,21 +687,19 @@ public class TransportReplicationActionTests extends ESTestCase {
 
         Request(ShardId shardId) {
             this();
-            this.shardId = shardId.id();
-            this.index(shardId.index().name());
+            this.shardId = shardId;
+            this.index = shardId.getIndex();
             // keep things simple
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
-            out.writeVInt(shardId);
         }
 
         @Override
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
-            shardId = in.readVInt();
         }
     }
 
@@ -605,22 +722,17 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
 
         @Override
-        protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
-            boolean executedBefore = shardRequest.request.processedOnPrimary.getAndSet(true);
+        protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
+            boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
             assert executedBefore == false : "request has already been executed on the primary";
-            return new Tuple<>(new Response(), shardRequest.request);
+            return new Tuple<>(new Response(), shardRequest);
         }
 
         @Override
-        protected void shardOperationOnReplica(ShardId shardId, Request request) {
+        protected void shardOperationOnReplica(Request request) {
             request.processedOnReplicas.incrementAndGet();
         }
 
-        @Override
-        protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
-            return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().shardId).shardsIt();
-        }
-
         @Override
         protected boolean checkWriteConsistency() {
             return false;
@@ -659,8 +771,8 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
 
         @Override
-        protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
-            return throwException(shardRequest.shardId);
+        protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
+            return throwException(shardRequest.shardId());
         }
 
         private Tuple<Response, Request> throwException(ShardId shardId) {
@@ -681,8 +793,8 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
 
         @Override
-        protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) {
-            throwException(shardRequest.internalShardId);
+        protected void shardOperationOnReplica(Request shardRequest) {
+            throwException(shardRequest.shardId());
         }
     }
 
@@ -697,9 +809,9 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
 
         @Override
-        protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
+        protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable {
             awaitLatch();
-            return new Tuple<>(new Response(), shardRequest.request);
+            return new Tuple<>(new Response(), shardRequest);
         }
 
         private void awaitLatch() throws InterruptedException {
@@ -708,7 +820,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
 
         @Override
-        protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) {
+        protected void shardOperationOnReplica(Request shardRequest) {
             try {
                 awaitLatch();
             } catch (InterruptedException e) {
@@ -720,7 +832,7 @@ public class TransportReplicationActionTests extends ESTestCase {
     /*
     * Transport channel that is needed for replica operation testing.
     * */
-    public TransportChannel createTransportChannel() {
+    public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
         return new TransportChannel() {
 
             @Override
@@ -735,14 +847,17 @@ public class TransportReplicationActionTests extends ESTestCase {
 
             @Override
             public void sendResponse(TransportResponse response) throws IOException {
+                listener.onResponse(((Response) response));
             }
 
             @Override
             public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
+                listener.onResponse(((Response) response));
             }
 
             @Override
             public void sendResponse(Throwable error) throws IOException {
+                listener.onFailure(error);
             }
         };
     }

+ 0 - 2
core/src/test/java/org/elasticsearch/cluster/routing/RoutingBackwardCompatibilityTests.java

@@ -26,13 +26,11 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
-import java.nio.file.Path;
 import java.util.Arrays;
 
 public class RoutingBackwardCompatibilityTests extends ESTestCase {

+ 2 - 2
core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java

@@ -53,7 +53,7 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
             fail("can't index, does not match consistency");
         } catch (UnavailableShardsException e) {
             assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
-            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
+            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
             // but really, all is well
         }
 
@@ -76,7 +76,7 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
             fail("can't index, does not match consistency");
         } catch (UnavailableShardsException e) {
             assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
-            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
+            assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
             // but really, all is well
         }
 

+ 7 - 4
core/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.index.mapper;
 
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.common.compress.CompressedXContent;
@@ -117,8 +118,9 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
             if (t instanceof ExecutionException) {
                 t = ((ExecutionException) t).getCause();
             }
-            if (t instanceof IllegalArgumentException) {
-                assertEquals("It is forbidden to index into the default mapping [_default_]", t.getMessage());
+            final Throwable throwable = ExceptionsHelper.unwrapCause(t);
+            if (throwable instanceof IllegalArgumentException) {
+                assertEquals("It is forbidden to index into the default mapping [_default_]", throwable.getMessage());
             } else {
                 throw t;
             }
@@ -133,8 +135,9 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
             if (t instanceof ExecutionException) {
                 t = ((ExecutionException) t).getCause();
             }
-            if (t instanceof IllegalArgumentException) {
-                assertEquals("It is forbidden to index into the default mapping [_default_]", t.getMessage());
+            final Throwable throwable = ExceptionsHelper.unwrapCause(t);
+            if (throwable instanceof IllegalArgumentException) {
+                assertEquals("It is forbidden to index into the default mapping [_default_]", throwable.getMessage());
             } else {
                 throw t;
             }

+ 5 - 5
modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java

@@ -178,7 +178,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
     }
 
     public void testIndex() {
-        String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[r]"};
+        String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"};
         interceptTransportActions(indexShardActions);
 
         IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value");
@@ -189,7 +189,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
     }
 
     public void testDelete() {
-        String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[r]"};
+        String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"};
         interceptTransportActions(deleteShardActions);
 
         DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id");
@@ -244,7 +244,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
     }
 
     public void testBulk() {
-        String[] bulkShardActions = new String[]{BulkAction.NAME + "[s]", BulkAction.NAME + "[s][r]"};
+        String[] bulkShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
         interceptTransportActions(bulkShardActions);
 
         List<String> indices = new ArrayList<>();
@@ -344,7 +344,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
     }
 
     public void testFlush() {
-        String[] indexShardActions = new String[]{TransportShardFlushAction.NAME + "[r]", TransportShardFlushAction.NAME};
+        String[] indexShardActions = new String[]{TransportShardFlushAction.NAME, TransportShardFlushAction.NAME + "[r]", TransportShardFlushAction.NAME + "[p]"};
         interceptTransportActions(indexShardActions);
 
         FlushRequest flushRequest = new FlushRequest(randomIndicesOrAliases());
@@ -367,7 +367,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
     }
 
     public void testRefresh() {
-        String[] indexShardActions = new String[]{TransportShardRefreshAction.NAME + "[r]", TransportShardRefreshAction.NAME};
+        String[] indexShardActions = new String[]{TransportShardRefreshAction.NAME, TransportShardRefreshAction.NAME + "[r]", TransportShardRefreshAction.NAME + "[p]"};
         interceptTransportActions(indexShardActions);
 
         RefreshRequest refreshRequest = new RefreshRequest(randomIndicesOrAliases());

Some files were not shown because too many files changed in this diff