|
@@ -40,6 +40,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.AllocationId;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
@@ -103,6 +104,7 @@ public abstract class TransportReplicationAction<
|
|
|
protected final ClusterService clusterService;
|
|
|
protected final ShardStateAction shardStateAction;
|
|
|
protected final IndicesService indicesService;
|
|
|
+ protected final IndexNameExpressionResolver indexNameExpressionResolver;
|
|
|
protected final TransportRequestOptions transportOptions;
|
|
|
protected final String executor;
|
|
|
|
|
@@ -115,17 +117,19 @@ public abstract class TransportReplicationAction<
|
|
|
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
|
|
|
ClusterService clusterService, IndicesService indicesService,
|
|
|
ThreadPool threadPool, ShardStateAction shardStateAction,
|
|
|
- ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
|
|
|
+ ActionFilters actionFilters,
|
|
|
+ IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
|
|
|
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
|
|
|
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
|
|
- requestReader, replicaRequestReader, executor, false, false);
|
|
|
+ indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
|
|
|
}
|
|
|
|
|
|
|
|
|
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
|
|
|
ClusterService clusterService, IndicesService indicesService,
|
|
|
ThreadPool threadPool, ShardStateAction shardStateAction,
|
|
|
- ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
|
|
|
+ ActionFilters actionFilters,
|
|
|
+ IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
|
|
|
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
|
|
|
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
|
|
|
super(actionName, actionFilters, transportService.getTaskManager());
|
|
@@ -134,6 +138,7 @@ public abstract class TransportReplicationAction<
|
|
|
this.clusterService = clusterService;
|
|
|
this.indicesService = indicesService;
|
|
|
this.shardStateAction = shardStateAction;
|
|
|
+ this.indexNameExpressionResolver = indexNameExpressionResolver;
|
|
|
this.executor = executor;
|
|
|
|
|
|
this.transportPrimaryAction = actionName + "[p]";
|
|
@@ -214,10 +219,21 @@ public abstract class TransportReplicationAction<
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * True if provided index should be resolved when resolving request
|
|
|
+ */
|
|
|
+ protected boolean resolveIndex() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
protected TransportRequestOptions transportOptions(Settings settings) {
|
|
|
return TransportRequestOptions.EMPTY;
|
|
|
}
|
|
|
|
|
|
+ private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
|
|
|
+ return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
|
|
|
+ }
|
|
|
+
|
|
|
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
|
|
|
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
|
|
|
if (globalBlockLevel != null) {
|
|
@@ -632,7 +648,7 @@ public abstract class TransportReplicationAction<
|
|
|
protected void doRun() {
|
|
|
setPhase(task, "routing");
|
|
|
final ClusterState state = observer.setAndGetObservedState();
|
|
|
- final String concreteIndex = request.shardId().getIndexName();
|
|
|
+ final String concreteIndex = concreteIndex(state, request);
|
|
|
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
|
|
|
if (blockException != null) {
|
|
|
if (blockException.retryable()) {
|
|
@@ -642,6 +658,7 @@ public abstract class TransportReplicationAction<
|
|
|
finishAsFailed(blockException);
|
|
|
}
|
|
|
} else {
|
|
|
+ // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
|
|
|
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
|
|
|
if (indexMetaData == null) {
|
|
|
retry(new IndexNotFoundException(concreteIndex));
|