Bläddra i källkod

clean shard bulk mapping update to only use type
today we track both the index name and type for mapping updates in the shard bulk action, but we only work against on index in this level, so no need to track the index name itself
closes #6695

Shay Banon 11 år sedan
förälder
incheckning
c1bc269de9
1 ändrade filer med 24 tillägg och 25 borttagningar
  1. 24 25
      src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

+ 24 - 25
src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -145,7 +145,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
         IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
         IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
         Engine.IndexingOperation[] ops = null;
-        final Set<Tuple<String, String>> mappingsToUpdate = Sets.newHashSet();
+        final Set<String> mappingTypesToUpdate = Sets.newHashSet();
 
         BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
         long[] preVersions = new long[request.items().length];
@@ -162,8 +162,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                         // add the response
                         IndexResponse indexResponse = result.response();
                         responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
-                        if (result.mappingToUpdate != null) {
-                            mappingsToUpdate.add(result.mappingToUpdate);
+                        if (result.mappingTypeToUpdate != null) {
+                            mappingTypesToUpdate.add(result.mappingTypeToUpdate);
                         }
                         if (result.op != null) {
                             if (ops == null) {
@@ -172,9 +172,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                             ops[requestIndex] = result.op;
                         }
                     } catch (WriteFailure e) {
-                        Tuple<String, String> mappingsToUpdateOnFailure = e.mappingsToUpdate;
-                        if (mappingsToUpdateOnFailure != null) {
-                            mappingsToUpdate.add(mappingsToUpdateOnFailure);
+                        if (e.mappingTypeToUpdate != null) {
+                            mappingTypesToUpdate.add(e.mappingTypeToUpdate);
                         }
                         throw e.getCause();
                     }
@@ -185,10 +184,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                         for (int j = 0; j < requestIndex; j++) {
                             applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
                         }
-                        for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
-                            DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
+                        for (String mappingTypeToUpdate : mappingTypesToUpdate) {
+                            DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingTypeToUpdate);
                             if (docMapper != null) {
-                                mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
+                                mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                             }
                         }
                         throw (ElasticsearchException) e;
@@ -259,8 +258,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                                     updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
                                 }
                                 responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse);
-                                if (result.mappingToUpdate != null) {
-                                    mappingsToUpdate.add(result.mappingToUpdate);
+                                if (result.mappingTypeToUpdate != null) {
+                                    mappingTypesToUpdate.add(result.mappingTypeToUpdate);
                                 }
                                 if (result.op != null) {
                                     if (ops == null) {
@@ -348,10 +347,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
 
         }
 
-        for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
-            DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
+        for (String mappingTypToUpdate : mappingTypesToUpdate) {
+            DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingTypToUpdate);
             if (docMapper != null) {
-                mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
+                mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
             }
         }
 
@@ -369,12 +368,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
     static class WriteResult {
 
         final Object response;
-        final Tuple<String, String> mappingToUpdate;
+        final String mappingTypeToUpdate;
         final Engine.IndexingOperation op;
 
-        WriteResult(Object response, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
+        WriteResult(Object response, String mappingTypeToUpdate, Engine.IndexingOperation op) {
             this.response = response;
-            this.mappingToUpdate = mappingToUpdate;
+            this.mappingTypeToUpdate = mappingTypeToUpdate;
             this.op = op;
         }
 
@@ -387,12 +386,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
 
     static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
         @Nullable
-        final Tuple<String, String> mappingsToUpdate;
+        final String mappingTypeToUpdate;
 
-        WriteFailure(Throwable cause, Tuple<String, String> mappingsToUpdate) {
+        WriteFailure(Throwable cause, String mappingTypeToUpdate) {
             super(null, cause);
             assert cause != null;
-            this.mappingsToUpdate = mappingsToUpdate;
+            this.mappingTypeToUpdate = mappingTypeToUpdate;
         }
     }
 
@@ -415,7 +414,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                 .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
 
         // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
-        Tuple<String, String> mappingsToUpdate = null;
+        String mappingTypeToUpdate = null;
 
         long version;
         boolean created;
@@ -424,7 +423,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
             if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
                 Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
                 if (index.parsedDoc().mappingsModified()) {
-                    mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
+                    mappingTypeToUpdate = indexRequest.type();
                 }
                 indexShard.index(index);
                 version = index.version();
@@ -434,7 +433,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                 Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
                         request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
                 if (create.parsedDoc().mappingsModified()) {
-                    mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
+                    mappingTypeToUpdate = indexRequest.type();
                 }
                 indexShard.create(create);
                 version = create.version();
@@ -445,14 +444,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
             indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
             indexRequest.version(version);
         } catch (Throwable t) {
-            throw new WriteFailure(t, mappingsToUpdate);
+            throw new WriteFailure(t, mappingTypeToUpdate);
         }
 
         assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
 
 
         IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created);
-        return new WriteResult(indexResponse, mappingsToUpdate, op);
+        return new WriteResult(indexResponse, mappingTypeToUpdate, op);
     }
 
     private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {