Prechádzať zdrojové kódy

Merge pull request #10720 from jpountz/fix/simplify_mapperupdatedaction

Mappings: simplify dynamic mappings updates.

Close #10720
Adrien Grand 10 rokov pred
rodič
commit
803c3930f2

+ 62 - 25
src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -52,6 +52,7 @@ import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.Mapping;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.shard.IndexShard;
@@ -352,23 +353,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
 
     }
 
-    private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
-        // HACK: Rivers seem to have something specific that triggers potential
-        // deadlocks when doing concurrent indexing. So for now they keep the
-        // old behaviour of updating mappings locally first and then
-        // asynchronously notifying the master
-        // this can go away when rivers are removed
-        final String indexName = indexService.index().name();
-        final String indexUUID = indexService.indexUUID();
-        if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
-            indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
-            mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
-        } else {
-            mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
-            indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
-        }
-    }
-
     private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
                                             IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
 
@@ -392,20 +376,54 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
         Engine.IndexingOperation op;
         if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
             Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
-            if (index.parsedDoc().dynamicMappingsUpdate() != null) {
-                applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate());
+            Mapping update = index.parsedDoc().dynamicMappingsUpdate();
+            if (update != null) {
+                final String indexName = indexService.index().name();
+                if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+                    // With rivers, we have a chicken and egg problem if indexing
+                    // the _meta document triggers a mapping update. Because we would
+                    // like to validate the mapping update first, but on the other
+                    // hand putting the mapping would start the river, which expects
+                    // to find a _meta document
+                    // So we have no choice but to index first and send mappings afterwards
+                    MapperService mapperService = indexService.mapperService();
+                    mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
+                    indexShard.index(index);
+                    mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
+                } else {
+                    mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
+                    indexShard.index(index);
+                }
+            } else {
+                indexShard.index(index);
             }
-            indexShard.index(index);
             version = index.version();
             op = index;
             created = index.created();
         } else {
             Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
                     request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
-            if (create.parsedDoc().dynamicMappingsUpdate() != null) {
-                applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate());
+            Mapping update = create.parsedDoc().dynamicMappingsUpdate();
+            if (update != null) {
+                final String indexName = indexService.index().name();
+                if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+                    // With rivers, we have a chicken and egg problem if indexing
+                    // the _meta document triggers a mapping update. Because we would
+                    // like to validate the mapping update first, but on the other
+                    // hand putting the mapping would start the river, which expects
+                    // to find a _meta document
+                    // So we have no choice but to index first and send mappings afterwards
+                    MapperService mapperService = indexService.mapperService();
+                    mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
+                    indexShard.create(create);
+                    mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
+                } else {
+                    mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
+                    indexShard.create(create);
+                }
+            } else {
+                indexShard.create(create);
             }
-            indexShard.create(create);
             version = create.version();
             op = create;
             created = true;
@@ -528,8 +546,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
 
 
     @Override
-    protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
-        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
+    protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
+        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
+        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
         final BulkShardRequest request = shardRequest.request;
         for (int i = 0; i < request.items().length; i++) {
             BulkItemRequest item = request.items()[i];
@@ -544,11 +563,29 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
 
                     if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
                         Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
+                        if (index.parsedDoc().dynamicMappingsUpdate() != null) {
+                            if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+                                // mappings updates on the _river are not validated synchronously so we can't
+                                // assume they are here when indexing on a replica
+                                indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+                            } else {
+                                throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
+                            }
+                        }
                         indexShard.index(index);
                     } else {
                         Engine.Create create = indexShard.prepareCreate(sourceToParse,
                                 indexRequest.version(), indexRequest.versionType(),
                                 Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
+                        if (create.parsedDoc().dynamicMappingsUpdate() != null) {
+                            if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+                                // mappings updates on the _river are not validated synchronously so we can't
+                                // assume they are here when indexing on a replica
+                                indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+                            } else {
+                                throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
+                            }
+                        }
                         indexShard.create(create);
                     }
                 } catch (Throwable e) {

+ 65 - 25
src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.index;
 
+import org.elasticsearch.ElasticsearchIllegalStateException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.RoutingMissingException;
@@ -42,6 +43,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.Mapping;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.shard.IndexShard;
@@ -51,6 +53,8 @@ import org.elasticsearch.river.RiverIndexName;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
+
 /**
  * Performs the index operation.
  * <p/>
@@ -167,23 +171,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
                 .indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
     }
 
-    private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
-        // HACK: Rivers seem to have something specific that triggers potential
-        // deadlocks when doing concurrent indexing. So for now they keep the
-        // old behaviour of updating mappings locally first and then
-        // asynchronously notifying the master
-        // this can go away when rivers are removed
-        final String indexName = indexService.index().name();
-        final String indexUUID = indexService.indexUUID();
-        if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
-            indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
-            mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
-        } else {
-            mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
-            indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
-        }
-    }
-
     @Override
     protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
         final IndexRequest request = shardRequest.request;
@@ -206,19 +193,53 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
 
         if (request.opType() == IndexRequest.OpType.INDEX) {
             Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
-            if (index.parsedDoc().dynamicMappingsUpdate() != null) {
-                applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate());
+            Mapping update = index.parsedDoc().dynamicMappingsUpdate();
+            if (update != null) {
+                final String indexName = indexService.index().name();
+                if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+                    // With rivers, we have a chicken and egg problem if indexing
+                    // the _meta document triggers a mapping update. Because we would
+                    // like to validate the mapping update first, but on the other
+                    // hand putting the mapping would start the river, which expects
+                    // to find a _meta document
+                    // So we have no choice but to index first and send mappings afterwards
+                    MapperService mapperService = indexService.mapperService();
+                    mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
+                    indexShard.index(index);
+                    mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
+                } else {
+                    mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
+                    indexShard.index(index);
+                }
+            } else {
+                indexShard.index(index);
             }
-            indexShard.index(index);
             version = index.version();
             created = index.created();
         } else {
             Engine.Create create = indexShard.prepareCreate(sourceToParse,
                     request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
-            if (create.parsedDoc().dynamicMappingsUpdate() != null) {
-                applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate());
+            Mapping update = create.parsedDoc().dynamicMappingsUpdate();
+            if (update != null) {
+                final String indexName = indexService.index().name();
+                if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+                    // With rivers, we have a chicken and egg problem if indexing
+                    // the _meta document triggers a mapping update. Because we would
+                    // like to validate the mapping update first, but on the other
+                    // hand putting the mapping would start the river, which expects
+                    // to find a _meta document
+                    // So we have no choice but to index first and send mappings afterwards
+                    MapperService mapperService = indexService.mapperService();
+                    mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
+                    indexShard.create(create);
+                    mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
+                } else {
+                    mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
+                    indexShard.create(create);
+                }
+            } else {
+                indexShard.create(create);
             }
-            indexShard.create(create);
             version = create.version();
             created = true;
         }
@@ -239,17 +260,36 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
     }
 
     @Override
-    protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
-        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
+    protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
+        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
+        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
         IndexRequest request = shardRequest.request;
         SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
                 .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
         if (request.opType() == IndexRequest.OpType.INDEX) {
             Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
+            if (index.parsedDoc().dynamicMappingsUpdate() != null) {
+                if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+                    // mappings updates on the _river are not validated synchronously so we can't
+                    // assume they are here when indexing on a replica
+                    indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+                } else {
+                    throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
+                }
+            }
             indexShard.index(index);
         } else {
             Engine.Create create = indexShard.prepareCreate(sourceToParse,
                     request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
+            if (create.parsedDoc().dynamicMappingsUpdate() != null) {
+                if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+                    // mappings updates on the _river are not validated synchronously so we can't
+                    // assume they are here when indexing on a replica
+                    indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+                } else {
+                    throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
+                }
+            }
             indexShard.create(create);
         }
         if (request.refresh()) {

+ 1 - 1
src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

@@ -117,7 +117,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
      */
     protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest)  throws Throwable;
 
-    protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
+    protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
 
     protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
 

+ 46 - 318
src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

@@ -19,61 +19,31 @@
 
 package org.elasticsearch.cluster.action.index;
 
-import com.google.common.collect.ImmutableMap;
-
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchIllegalArgumentException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.IndicesRequest;
-import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
-import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
-import org.elasticsearch.cluster.block.ClusterBlockException;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaDataMappingService;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.compress.CompressedString;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.Mapping;
 import org.elasticsearch.node.settings.NodeSettingsService;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportService;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
  * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
  * in the cluster state meta data (and broadcast to all members).
  */
-public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
+public class MappingUpdatedAction extends AbstractComponent {
 
     public static final String INDICES_MAPPING_DYNAMIC_TIMEOUT = "indices.mapping.dynamic_timeout";
-    public static final String ACTION_NAME = "internal:cluster/mapping_updated";
-
-    private final MetaDataMappingService metaDataMappingService;
-
-    private volatile MasterMappingUpdater masterMappingUpdater;
 
+    private IndicesAdminClient client;
     private volatile TimeValue dynamicMappingUpdateTimeout;
 
     class ApplySettings implements NodeSettingsService.Listener {
@@ -89,44 +59,58 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
     }
 
     @Inject
-    public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
-                                MetaDataMappingService metaDataMappingService, NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
-        super(settings, ACTION_NAME, transportService, clusterService, threadPool, actionFilters);
-        this.metaDataMappingService = metaDataMappingService;
+    public MappingUpdatedAction(Settings settings, NodeSettingsService nodeSettingsService) {
+        super(settings);
         this.dynamicMappingUpdateTimeout = settings.getAsTime(INDICES_MAPPING_DYNAMIC_TIMEOUT, TimeValue.timeValueSeconds(30));
         nodeSettingsService.addListener(new ApplySettings());
     }
 
-    public void start() {
-        this.masterMappingUpdater = new MasterMappingUpdater(EsExecutors.threadName(settings, "master_mapping_updater"));
-        this.masterMappingUpdater.start();
+    public void setClient(Client client) {
+        this.client = client.admin().indices();
     }
 
-    public void stop() {
-        this.masterMappingUpdater.close();
-        this.masterMappingUpdater = null;
-    }
-
-    public void updateMappingOnMaster(String index, String indexUUID, String type, Mapping mappingUpdate, MappingUpdateListener listener) {
+    private PutMappingRequestBuilder updateMappingRequest(String index, String type, Mapping mappingUpdate, final TimeValue timeout) {
         if (type.equals(MapperService.DEFAULT_MAPPING)) {
             throw new ElasticsearchIllegalArgumentException("_default_ mapping should not be updated");
         }
-        try {
-            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
-            mappingUpdate.toXContent(builder, new ToXContent.MapParams(ImmutableMap.<String, String>of()));
-            final CompressedString mappingSource = new CompressedString(builder.endObject().bytes());
-            masterMappingUpdater.add(new MappingChange(index, indexUUID, type, mappingSource, listener));
-        } catch (IOException bogus) {
-            throw new AssertionError("Cannot happen", bogus);
+        return client.preparePutMapping(index).setType(type).setSource(mappingUpdate.toString())
+            .setMasterNodeTimeout(timeout).setTimeout(timeout);
+    }
+
+    public void updateMappingOnMaster(String index, String type, Mapping mappingUpdate, final TimeValue timeout, final MappingUpdateListener listener) {
+        final PutMappingRequestBuilder request = updateMappingRequest(index, type, mappingUpdate, timeout);
+        if (listener == null) {
+            request.execute();
+        } else {
+            final ActionListener<PutMappingResponse> actionListener = new ActionListener<PutMappingResponse>() {
+                @Override
+                public void onResponse(PutMappingResponse response) {
+                    if (response.isAcknowledged()) {
+                        listener.onMappingUpdate();
+                    } else {
+                        listener.onFailure(new TimeoutException("Failed to acknowledge the mapping response within [" + timeout + "]"));
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable e) {
+                    listener.onFailure(e);
+                }
+            };
+            request.execute(actionListener);
         }
     }
 
+    public void updateMappingOnMasterAsynchronously(String index, String type, Mapping mappingUpdate) throws Throwable {
+        updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout, null);
+    }
+
     /**
      * Same as {@link #updateMappingOnMasterSynchronously(String, String, String, Mapping, TimeValue)}
      * using the default timeout.
      */
-    public void updateMappingOnMasterSynchronously(String index, String indexUUID, String type, Mapping mappingUpdate) throws Throwable {
-        updateMappingOnMasterSynchronously(index, indexUUID, type, mappingUpdate, dynamicMappingUpdateTimeout);
+    public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate) throws Throwable {
+        updateMappingOnMasterSynchronously(index, type, mappingUpdate, dynamicMappingUpdateTimeout);
     }
 
     /**
@@ -134,179 +118,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
      * {@code timeout}. When this method returns successfully mappings have
      * been applied to the master node and propagated to data nodes.
      */
-    public void updateMappingOnMasterSynchronously(String index, String indexUUID, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final Throwable[] cause = new Throwable[1];
-        final MappingUpdateListener listener = new MappingUpdateListener() {
-
-            @Override
-            public void onMappingUpdate() {
-                latch.countDown();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                cause[0] = t;
-                latch.countDown();
-            }
-
-        };
-
-        updateMappingOnMaster(index, indexUUID, type, mappingUpdate, listener);
-        if (!latch.await(timeout.getMillis(), TimeUnit.MILLISECONDS)) {
-            throw new TimeoutException("Time out while waiting for the master node to validate a mapping update for type [" + type + "]");
-        }
-        if (cause[0] != null) {
-            throw cause[0];
-        }
-    }
-
-    @Override
-    protected ClusterBlockException checkBlock(MappingUpdatedRequest request, ClusterState state) {
-        // internal call by other nodes, no need to check for blocks
-        return null;
-    }
-
-    @Override
-    protected String executor() {
-        // we go async right away
-        return ThreadPool.Names.SAME;
-    }
-
-    @Override
-    protected MappingUpdatedRequest newRequest() {
-        return new MappingUpdatedRequest();
-    }
-
-    @Override
-    protected MappingUpdatedResponse newResponse() {
-        return new MappingUpdatedResponse();
-    }
-
-    @Override
-    protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticsearchException {
-        metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.nodeId, new ActionListener<ClusterStateUpdateResponse>() {
-            @Override
-            public void onResponse(ClusterStateUpdateResponse response) {
-                listener.onResponse(new MappingUpdatedResponse());
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                logger.warn("[{}] update-mapping [{}] failed to dynamically update the mapping in cluster_state from shard", t, request.index(), request.type());
-                listener.onFailure(t);
-            }
-        });
-    }
-
-    public static class MappingUpdatedResponse extends ActionResponse {
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-        }
-    }
-
-    public static class MappingUpdatedRequest extends MasterNodeOperationRequest<MappingUpdatedRequest> implements IndicesRequest {
-
-        private String index;
-        private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
-        private String type;
-        private CompressedString mappingSource;
-        private String nodeId = null; // null means not set
-
-        MappingUpdatedRequest() {
-        }
-
-        public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource, String nodeId) {
-            this.index = index;
-            this.indexUUID = indexUUID;
-            this.type = type;
-            this.mappingSource = mappingSource;
-            this.nodeId = nodeId;
-        }
-
-        public String index() {
-            return index;
-        }
-
-        @Override
-        public IndicesOptions indicesOptions() {
-            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
-        }
-
-        @Override
-        public String[] indices() {
-            return new String[]{index};
-        }
-
-        public String indexUUID() {
-            return indexUUID;
-        }
-
-        public String type() {
-            return type;
-        }
-
-        public CompressedString mappingSource() {
-            return mappingSource;
-        }
-
-        /**
-         * Returns null for not set.
-         */
-        public String nodeId() {
-            return this.nodeId;
-        }
-
-        @Override
-        public ActionRequestValidationException validate() {
-            return null;
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            index = in.readString();
-            type = in.readString();
-            mappingSource = CompressedString.readCompressedString(in);
-            indexUUID = in.readString();
-            nodeId = in.readOptionalString();
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeString(index);
-            out.writeString(type);
-            mappingSource.writeTo(out);
-            out.writeString(indexUUID);
-            out.writeOptionalString(nodeId);
-        }
-
-        @Override
-        public String toString() {
-            return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]";
-        }
-    }
-
-    private static class MappingChange {
-        public final String index;
-        public final String indexUUID;
-        public final String type;
-        public final CompressedString mappingSource;
-        public final MappingUpdateListener listener;
-
-        MappingChange(String index, String indexUUID, String type, CompressedString mappingSource, MappingUpdateListener listener) {
-            this.index = index;
-            this.indexUUID = indexUUID;
-            this.type = type;
-            this.mappingSource = mappingSource;
-            this.listener = listener;
+    public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable {
+        if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
+            throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
         }
     }
 
@@ -319,90 +133,4 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
 
         void onFailure(Throwable t);
     }
-
-    /**
-     * The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
-     * indexing thread.
-     * <p/>
-     * It also allows to reduce multiple mapping updates on the same index(UUID) and type into one update
-     * (refreshSource + sending to master), which allows to offload the number of times mappings are updated
-     * and sent to master for heavy single index requests that each introduce a new mapping, and when
-     * multiple shards exists on the same nodes, allowing to work on the index level in this case.
-     */
-    private class MasterMappingUpdater extends Thread {
-
-        private volatile boolean running = true;
-        private final BlockingQueue<MappingChange> queue = ConcurrentCollections.newBlockingQueue();
-
-        public MasterMappingUpdater(String name) {
-            super(name);
-        }
-
-        public void add(MappingChange change) {
-            queue.add(change);
-        }
-
-        public void close() {
-            running = false;
-            this.interrupt();
-        }
-
-        @Override
-        public void run() {
-            while (running) {
-                MappingUpdateListener listener = null;
-                try {
-                    final MappingChange change = queue.poll(10, TimeUnit.MINUTES);
-                    if (change == null) {
-                        continue;
-                    }
-                    listener = change.listener;
-
-                    final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
-                    try {
-                        DiscoveryNode node = clusterService.localNode();
-                        mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
-                                change.index, change.indexUUID, change.type, change.mappingSource, node != null ? node.id() : null
-                        );
-                    } catch (Throwable t) {
-                        logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.type + "]", t);
-                        if (change.listener != null) {
-                            change.listener.onFailure(t);
-                        }
-                        continue;
-                    }
-                    logger.trace("sending mapping updated to master: {}", mappingRequest);
-                    execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
-                        @Override
-                        public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
-                            logger.debug("successfully updated master with mapping update: {}", mappingRequest);
-                            if (change.listener != null) {
-                                change.listener.onMappingUpdate();
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Throwable e) {
-                            logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
-                            if (change.listener != null) {
-                                change.listener.onFailure(e);
-                            }
-                        }
-                    });
-                } catch (Throwable t) {
-                    if (listener != null) {
-                        // even if the failure is expected, eg. if we got interrupted,
-                        // we need to notify the listener as there might be a latch
-                        // waiting for it to be called
-                        listener.onFailure(t);
-                    }
-                    if (t instanceof InterruptedException && !running) {
-                        // all is well, we are shutting down
-                    } else {
-                        logger.warn("failed to process mapping update", t);
-                    }
-                }
-            }
-        }
-    }
 }

+ 0 - 41
src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java

@@ -331,47 +331,6 @@ public class MetaDataMappingService extends AbstractComponent {
         });
     }
 
-    public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final String nodeId, final ActionListener<ClusterStateUpdateResponse> listener) {
-        final long insertOrder;
-        synchronized (refreshOrUpdateMutex) {
-            insertOrder = ++refreshOrUpdateInsertOrder;
-            refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, nodeId, listener));
-        }
-        clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
-            private volatile List<MappingTask> allTasks;
-
-            @Override
-            public void onFailure(String source, Throwable t) {
-                listener.onFailure(t);
-            }
-
-            @Override
-            public ClusterState execute(final ClusterState currentState) throws Exception {
-                Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
-                this.allTasks = tuple.v2();
-                return tuple.v1();
-            }
-
-            @Override
-            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                if (allTasks == null) {
-                    return;
-                }
-                for (Object task : allTasks) {
-                    if (task instanceof UpdateTask) {
-                        UpdateTask uTask = (UpdateTask) task;
-                        ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
-                        try {
-                            uTask.listener.onResponse(response);
-                        } catch (Throwable t) {
-                            logger.debug("failed to ping back on response of mapping processing for task [{}]", t, uTask.listener);
-                        }
-                    }
-                }
-            }
-        });
-    }
-
     public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
 
         clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {

+ 1 - 1
src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java

@@ -165,7 +165,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
     private void validateMappingUpdate(final String type, Mapping update) {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Throwable> error = new AtomicReference<>();
-        mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.indexUUID(), type, update, new MappingUpdatedAction.MappingUpdateListener() {
+        mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() {
             @Override
             public void onMappingUpdate() {
                 latch.countDown();

+ 23 - 7
src/main/java/org/elasticsearch/index/termvectors/ShardTermVectorsService.java

@@ -20,7 +20,12 @@
 package org.elasticsearch.index.termvectors;
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.*;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.memory.MemoryIndex;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.termvectors.TermVectorsFilter;
@@ -40,18 +45,30 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.get.GetField;
 import org.elasticsearch.index.get.GetResult;
-import org.elasticsearch.index.mapper.*;
+import org.elasticsearch.index.mapper.DocumentMapper;
+import org.elasticsearch.index.mapper.FieldMapper;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.Mapping;
+import org.elasticsearch.index.mapper.ParseContext;
+import org.elasticsearch.index.mapper.ParsedDocument;
+import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.mapper.core.StringFieldMapper;
 import org.elasticsearch.index.mapper.internal.UidFieldMapper;
-import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.dfs.AggregatedDfs;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import static org.elasticsearch.index.mapper.SourceToParse.source;
 
@@ -285,7 +302,6 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent {
 
     private ParsedDocument parseDocument(String index, String type, BytesReference doc) throws Throwable {
         MapperService mapperService = indexShard.mapperService();
-        IndexService indexService = indexShard.indexService();
 
         // TODO: make parsing not dynamically create fields not in the original mapping
         Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate(type);
@@ -294,7 +310,7 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent {
             parsedDocument.addDynamicMappingsUpdate(docMapper.v2());
         }
         if (parsedDocument.dynamicMappingsUpdate() != null) {
-            mappingUpdatedAction.updateMappingOnMasterSynchronously(index, indexService.indexUUID(), type, parsedDocument.dynamicMappingsUpdate());
+            mappingUpdatedAction.updateMappingOnMasterSynchronously(index, type, parsedDocument.dynamicMappingsUpdate());
         }
         return parsedDocument;
     }

+ 1 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -567,7 +567,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
             }
         };
         for (DocumentMapper documentMapper : documentMappersToUpdate) {
-            mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), indexService.indexUUID(), documentMapper.type(), documentMapper.mapping(), listener);
+            mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper.type(), documentMapper.mapping(), recoverySettings.internalActionTimeout(), listener);
         }
         cancellableThreads.execute(new Interruptable() {
             @Override

+ 1 - 2
src/main/java/org/elasticsearch/node/Node.java

@@ -242,7 +242,7 @@ public class Node implements Releasable {
             injector.getInstance(plugin).start();
         }
 
-        injector.getInstance(MappingUpdatedAction.class).start();
+        injector.getInstance(MappingUpdatedAction.class).setClient(client);
         injector.getInstance(IndicesService.class).start();
         injector.getInstance(IndexingMemoryController.class).start();
         injector.getInstance(IndicesClusterStateService.class).start();
@@ -285,7 +285,6 @@ public class Node implements Releasable {
             injector.getInstance(HttpServer.class).stop();
         }
 
-        injector.getInstance(MappingUpdatedAction.class).stop();
         injector.getInstance(RiversManager.class).stop();
 
         injector.getInstance(SnapshotsService.class).stop();

+ 1 - 1
src/main/java/org/elasticsearch/percolator/PercolatorService.java

@@ -287,7 +287,7 @@ public class PercolatorService extends AbstractComponent {
                             doc.addDynamicMappingsUpdate(docMapper.v2());
                         }
                         if (doc.dynamicMappingsUpdate() != null) {
-                            mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), documentIndexService.indexUUID(), request.documentType(), doc.dynamicMappingsUpdate());
+                            mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), request.documentType(), doc.dynamicMappingsUpdate());
                         }
                         // the document parsing exists the "doc" object, so we need to set the new current field.
                         currentFieldName = parser.currentName();

+ 0 - 1
src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateTests.java

@@ -46,7 +46,6 @@ public class ConcurrentDynamicTemplateTests extends ElasticsearchIntegrationTest
     private final String mappingType = "test-mapping";
 
     @Test // see #3544
-    @AwaitsFix(bugUrl = "adrien is looking into this")
     public void testConcurrentDynamicMapping() throws Exception {
         final String fieldName = "field";
         final String mapping = "{ \"" + mappingType + "\": {" +