Browse Source

Allow `_shrink` to N shards if source shards is a multiple of N (#18699)

Today we allow to shrink to 1 shard but that might not be possible due to
too many document or a single shard doesn't meet the requirements for the index.
The logic can be expanded to N shards if the source index shards is a multiple of N.
This guarantees that there are not hotspots created due to different number of shards
being shrunk into one.
Simon Willnauer 9 years ago
parent
commit
b2c4c323e1
16 changed files with 640 additions and 89 deletions
  1. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java
  2. 1 1
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java
  3. 31 25
      core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java
  4. 114 5
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  5. 14 10
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
  6. 4 2
      core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java
  7. 7 2
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
  8. 6 1
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  9. 1 1
      core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java
  10. 70 0
      core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java
  11. 23 14
      core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java
  12. 126 0
      core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java
  13. 28 12
      core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java
  14. 173 0
      core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java
  15. 18 5
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java
  16. 23 10
      docs/reference/indices/shrink-index.asciidoc

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java

@@ -94,7 +94,7 @@ public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements
     /**
      * Returns the {@link CreateIndexRequest} for the shrink index
      */
-    public CreateIndexRequest getShrinkIndexReqeust() {
+    public CreateIndexRequest getShrinkIndexRequest() {
         return shrinkIndexRequest;
     }
 

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java

@@ -41,7 +41,7 @@ public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder<ShrinkReque
     }
 
     public ShrinkRequestBuilder setSettings(Settings settings) {
-        this.request.getShrinkIndexReqeust().settings(settings);
+        this.request.getShrinkIndexRequest().settings(settings);
         return this;
     }
 }

+ 31 - 25
core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java

@@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -34,27 +35,17 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
-import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.ShardRoutingState;
-import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Predicate;
+import java.util.function.IntFunction;
 
 /**
  * Main class to initiate shrinking an index into a new index with a single shard
@@ -87,7 +78,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
 
     @Override
     protected ClusterBlockException checkBlock(ShrinkRequest request, ClusterState state) {
-        return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexReqeust().index());
+        return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexRequest().index());
     }
 
     @Override
@@ -98,7 +89,10 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
             @Override
             public void onResponse(IndicesStatsResponse indicesStatsResponse) {
                 CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(shrinkRequest, state,
-                    indicesStatsResponse.getTotal().getDocs(), indexNameExpressionResolver);
+                    (i) -> {
+                        IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
+                        return shard == null ? null : shard.getPrimary().getDocs();
+                    }, indexNameExpressionResolver);
                 createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
                     @Override
                     public void onResponse(ClusterStateUpdateResponse response) {
@@ -127,24 +121,36 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
 
     // static for unittesting this method
     static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ShrinkRequest shrinkReqeust, final ClusterState state
-        , final DocsStats docsStats, IndexNameExpressionResolver indexNameExpressionResolver) {
+        , final IntFunction<DocsStats> perShardDocStats, IndexNameExpressionResolver indexNameExpressionResolver) {
         final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkReqeust.getSourceIndex());
-        final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexReqeust();
+        final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexRequest();
         final String targetIndexName = indexNameExpressionResolver.resolveDateMathExpression(targetIndex.index());
         final IndexMetaData metaData = state.metaData().index(sourceIndex);
         final Settings targetIndexSettings = Settings.builder().put(targetIndex.settings())
             .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
-        long count = docsStats.getCount();
-        if (count >= IndexWriter.MAX_DOCS) {
-            throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
-                + "] docs -  too many documents");
+        int numShards = 1;
+        if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) {
+            numShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings);
+        }
+        for (int i = 0; i < numShards; i++) {
+            Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(i, metaData, numShards);
+            long count = 0;
+            for (ShardId id : shardIds) {
+                DocsStats docsStats = perShardDocStats.apply(id.id());
+                if (docsStats != null) {
+                    count += docsStats.getCount();
+                }
+                if (count > IndexWriter.MAX_DOCS) {
+                    throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
+                        + "] docs - too many documents in shards " + shardIds);
+                }
+            }
+
         }
         targetIndex.cause("shrink_index");
-        targetIndex.settings(Settings.builder()
-            .put(targetIndexSettings)
-            // we can only shrink to 1 index so far!
-            .put("index.number_of_shards", 1)
-        );
+        Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
+        settingsBuilder.put("index.number_of_shards", numShards);
+        targetIndex.settings(settingsBuilder);
 
         return new CreateIndexClusterStateUpdateRequest(targetIndex,
             "shrink_index", targetIndexName, true)

+ 114 - 5
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -30,10 +30,8 @@ import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
 import org.elasticsearch.cluster.routing.RoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseFieldMatcher;
@@ -56,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.gateway.MetaDataStateFormat;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.rest.RestStatus;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -226,6 +225,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
     public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations";
     static final String KEY_VERSION = "version";
+    static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
     static final String KEY_SETTINGS = "settings";
     static final String KEY_STATE = "state";
     static final String KEY_MAPPINGS = "mappings";
@@ -233,6 +233,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     public static final String KEY_PRIMARY_TERMS = "primary_terms";
 
     public static final String INDEX_STATE_FILE_PREFIX = "state-";
+    private final int routingNumShards;
+    private final int routingFactor;
 
     private final int numberOfShards;
     private final int numberOfReplicas;
@@ -268,7 +270,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                           ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
                           ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
                           DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
-                          Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion) {
+                          Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
+                          int routingNumShards) {
 
         this.index = index;
         this.version = version;
@@ -290,6 +293,9 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         this.indexCreatedVersion = indexCreatedVersion;
         this.indexUpgradedVersion = indexUpgradedVersion;
         this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
+        this.routingNumShards = routingNumShards;
+        this.routingFactor = routingNumShards / numberOfShards;
+        assert numberOfShards * routingFactor == routingNumShards :  routingNumShards + " must be a multiple of " + numberOfShards;
     }
 
     public Index getIndex() {
@@ -484,7 +490,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         if (!customs.equals(that.customs)) {
             return false;
         }
-
+        if (routingNumShards != that.routingNumShards) {
+            return false;
+        }
+        if (routingFactor != that.routingFactor) {
+            return false;
+        }
         if (Arrays.equals(primaryTerms, that.primaryTerms) == false) {
             return false;
         }
@@ -503,6 +514,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         result = 31 * result + settings.hashCode();
         result = 31 * result + mappings.hashCode();
         result = 31 * result + customs.hashCode();
+        result = 31 * result + Long.hashCode(routingFactor);
+        result = 31 * result + Long.hashCode(routingNumShards);
         result = 31 * result + Arrays.hashCode(primaryTerms);
         result = 31 * result + activeAllocationIds.hashCode();
         return result;
@@ -533,6 +546,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
 
         private final String index;
+        private final int routingNumShards;
         private final long version;
         private final long[] primaryTerms;
         private final State state;
@@ -545,6 +559,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
             index = after.index.getName();
             version = after.version;
+            routingNumShards = after.routingNumShards;
             state = after.state;
             settings = after.settings;
             primaryTerms = after.primaryTerms;
@@ -557,6 +572,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
         public IndexMetaDataDiff(StreamInput in) throws IOException {
             index = in.readString();
+            routingNumShards = in.readInt();
             version = in.readLong();
             state = State.fromId(in.readByte());
             settings = Settings.readSettingsFromStream(in);
@@ -582,6 +598,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeString(index);
+            out.writeInt(routingNumShards);
             out.writeLong(version);
             out.writeByte(state.id);
             Settings.writeSettingsToStream(settings, out);
@@ -596,6 +613,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         public IndexMetaData apply(IndexMetaData part) {
             Builder builder = builder(index);
             builder.version(version);
+            builder.setRoutingNumShards(routingNumShards);
             builder.state(state);
             builder.settings(settings);
             builder.primaryTerms(primaryTerms);
@@ -611,6 +629,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     public IndexMetaData readFrom(StreamInput in) throws IOException {
         Builder builder = new Builder(in.readString());
         builder.version(in.readLong());
+        builder.setRoutingNumShards(in.readInt());
         builder.state(State.fromId(in.readByte()));
         builder.settings(readSettingsFromStream(in));
         builder.primaryTerms(in.readVLongArray());
@@ -643,6 +662,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(index.getName()); // uuid will come as part of settings
         out.writeLong(version);
+        out.writeInt(routingNumShards);
         out.writeByte(state.id());
         writeSettingsToStream(settings, out);
         out.writeVLongArray(primaryTerms);
@@ -685,6 +705,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         private final ImmutableOpenMap.Builder<String, AliasMetaData> aliases;
         private final ImmutableOpenMap.Builder<String, Custom> customs;
         private final ImmutableOpenIntMap.Builder<Set<String>> activeAllocationIds;
+        private Integer routingNumShards;
 
         public Builder(String index) {
             this.index = index;
@@ -703,6 +724,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
             this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases);
             this.customs = ImmutableOpenMap.builder(indexMetaData.customs);
+            this.routingNumShards = indexMetaData.routingNumShards;
             this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds);
         }
 
@@ -720,6 +742,26 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             return this;
         }
 
+        /**
+         * Sets the number of shards that should be used for routing. This should only be used if the number of shards in
+         * an index has changed ie if the index is shrunk.
+         */
+        public Builder setRoutingNumShards(int routingNumShards) {
+            this.routingNumShards = routingNumShards;
+            return this;
+        }
+
+        /**
+         * Returns number of shards that should be used for routing. By default this method will return the number of shards
+         * for this index.
+         *
+         * @see #setRoutingNumShards(int)
+         * @see #numberOfShards()
+         */
+        public int getRoutingNumShards() {
+            return routingNumShards == null ? numberOfShards() : routingNumShards;
+        }
+
         public int numberOfShards() {
             return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1);
         }
@@ -934,13 +976,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
             return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
                 tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
-                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
+                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards());
         }
 
         public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
             builder.startObject(indexMetaData.getIndex().getName());
 
             builder.field(KEY_VERSION, indexMetaData.getVersion());
+            builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards());
             builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
 
             boolean binary = params.paramAsBoolean("binary", false);
@@ -1101,6 +1144,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                         builder.state(State.fromString(parser.text()));
                     } else if (KEY_VERSION.equals(currentFieldName)) {
                         builder.version(parser.longValue());
+                    } else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
+                        builder.setRoutingNumShards(parser.intValue());
                     } else {
                         throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
                     }
@@ -1175,4 +1220,68 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             return Builder.fromXContent(parser);
         }
     };
+
+    /**
+     * Returns the number of shards that should be used for routing. This basically defines the hash space we use in
+     * {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetaData, String, String)} to route documents
+     * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only
+     * changes if and index is shrunk.
+     */
+    public int getRoutingNumShards() {
+        return routingNumShards;
+    }
+
+    /**
+     * Returns the routing factor for this index. The default is <tt>1</tt>.
+     *
+     * @see #getRoutingFactor(IndexMetaData, int) for details
+     */
+    public int getRoutingFactor() {
+        return routingFactor;
+    }
+
+    /**
+     * Returns the source shard ids to shrink into the given shard id.
+     * @param shardId the id of the target shard to shrink to
+     * @param sourceIndexMetadata the source index metadata
+     * @param numTargetShards the total number of shards in the target index
+     * @return a set of shard IDs to shrink into the given shard ID.
+     */
+    public static Set<ShardId> selectShrinkShards(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
+        if (shardId >= numTargetShards) {
+            throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
+                + shardId);
+        }
+        int routingFactor = getRoutingFactor(sourceIndexMetadata, numTargetShards);
+        Set<ShardId> shards = new HashSet<>(routingFactor);
+        for (int i = shardId * routingFactor; i < routingFactor*shardId + routingFactor; i++) {
+            shards.add(new ShardId(sourceIndexMetadata.getIndex(), i));
+        }
+        return shards;
+    }
+
+    /**
+     * Returns the routing factor for and shrunk index with the given number of target shards.
+     * This factor is used in the hash function in
+     * {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetaData, String, String)} to guarantee consistent
+     * hashing / routing of documents even if the number of shards changed (ie. a shrunk index).
+     *
+     * @param sourceIndexMetadata the metadata of the source index
+     * @param targetNumberOfShards the total number of shards in the target index
+     * @return the routing factor for and shrunk index with the given number of target shards.
+     * @throw IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards
+     * are not divisible by the number of target shards.
+     */
+    public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) {
+        int sourceNumberOfShards = sourceIndexMetadata.getNumberOfShards();
+        if (sourceNumberOfShards < targetNumberOfShards) {
+            throw new IllegalArgumentException("the number of target shards must be less that the number of source shards");
+        }
+        int factor = sourceNumberOfShards / targetNumberOfShards;
+        if (factor * targetNumberOfShards != sourceNumberOfShards || factor <= 1) {
+            throw new IllegalArgumentException("the number of source shards [" + sourceNumberOfShards + "] must be a must be a multiple of ["
+                + targetNumberOfShards + "]");
+        }
+        return factor;
+    }
 }

+ 14 - 10
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -21,7 +21,6 @@ package org.elasticsearch.cluster.metadata;
 
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
@@ -65,7 +64,6 @@ import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperParsingException;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.QueryShardContext;
-import org.elasticsearch.index.shard.DocsStats;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.indices.IndexCreationException;
 import org.elasticsearch.indices.IndicesService;
@@ -299,15 +297,19 @@ public class MetaDataCreateIndexService extends AbstractComponent {
 
                             indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
                             final Index shrinkFromIndex = request.shrinkFrom();
+                            int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());;
                             if (shrinkFromIndex != null) {
                                 prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex,
                                     request.index());
+                                IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
+                                routingNumShards = sourceMetaData.getRoutingNumShards();
                             }
 
                             Settings actualIndexSettings = indexSettingsBuilder.build();
-
+                            IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index())
+                                .setRoutingNumShards(routingNumShards);
                             // Set up everything, now locally create the index to see that things are ok, and apply
-                            final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build();
+                            final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
                             // create the index here (on the master) to validate it can be created, as well as adding the mapping
                             final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList());
                             createdIndex = indexService.index();
@@ -339,7 +341,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
                                 mappingsMetaData.put(mapper.type(), mappingMd);
                             }
 
-                            final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings);
+                            final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
+                                .settings(actualIndexSettings)
+                                .setRoutingNumShards(routingNumShards);
                             for (MappingMetaData mappingMd : mappingsMetaData.values()) {
                                 indexMetaDataBuilder.putMapping(mappingMd);
                             }
@@ -494,14 +498,16 @@ public class MetaDataCreateIndexService extends AbstractComponent {
             throw new IllegalArgumentException("can't shrink an index with only one shard");
         }
 
+
         if ((targetIndexMappingsTypes.size() > 1 ||
             (targetIndexMappingsTypes.isEmpty() || targetIndexMappingsTypes.contains(MapperService.DEFAULT_MAPPING)) == false)) {
             throw new IllegalArgumentException("mappings are not allowed when shrinking indices" +
                 ", all mappings are copied from the source index");
         }
-        if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)
-            && IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings) > 1) {
-            throw new IllegalArgumentException("can not shrink index into more than one shard");
+        if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) {
+            // this method applies all necessary checks ie. if the target shards are less than the source shards
+            // of if the source shards are divisible by the number of target shards
+            IndexMetaData.getRoutingFactor(sourceMetaData, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings));
         }
 
         // now check that index is all on one node
@@ -533,8 +539,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
         final Predicate<String> analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.")
             || s.startsWith("index.analysis.");
         indexSettingsBuilder
-            // we can only shrink to 1 shard so far!
-            .put("index.number_of_shards", 1)
             // we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
             // once we are allocated.
             .put("index.routing.allocation.initial_recovery._id",

+ 4 - 2
core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

@@ -218,14 +218,16 @@ public class OperationRouting extends AbstractComponent {
         return new ShardId(indexMetaData.getIndex(), generateShardId(indexMetaData, id, routing));
     }
 
-    private int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) {
+    static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) {
         final int hash;
         if (routing == null) {
             hash = Murmur3HashFunction.hash(id);
         } else {
             hash = Murmur3HashFunction.hash(routing);
         }
-        return Math.floorMod(hash, indexMetaData.getNumberOfShards());
+        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
+        // of original index to hash documents
+        return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
     }
 
     private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) {

+ 7 - 2
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

@@ -46,6 +46,7 @@ import org.elasticsearch.common.unit.RatioValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
 
 import java.util.Set;
 
@@ -653,10 +654,14 @@ public class DiskThresholdDecider extends AllocationDecider {
         if (metaData.getMergeSourceIndex() != null && shard.allocatedPostIndexCreate(metaData) == false) {
             // in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
             // the worst case
-            Index mergeSourceIndex = metaData.getMergeSourceIndex();
             long targetShardSize = 0;
+            final Index mergeSourceIndex = metaData.getMergeSourceIndex();
+            final IndexMetaData sourceIndexMeta = allocation.metaData().getIndexSafe(metaData.getMergeSourceIndex());
+            final Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(shard.id(), sourceIndexMeta, metaData.getNumberOfShards());
             for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) {
-                targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
+                if (shardIds.contains(shardRoutingTable.shardId())) {
+                    targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
+                }
             }
             return targetShardSize == 0 ? defaultValue : targetShardSize;
         } else {

+ 6 - 1
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -126,6 +126,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -133,6 +134,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 public class IndexShard extends AbstractIndexShardComponent {
 
@@ -1411,7 +1413,10 @@ public class IndexShard extends AbstractIndexShardComponent {
                     markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
                     threadPool.generic().execute(() -> {
                         try {
-                            if (recoverFromLocalShards(mappingUpdateConsumer, startedShards)) {
+                            final Set<ShardId> shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(),
+                                indexMetaData.getNumberOfShards());
+                            if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
+                                .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) {
                                 recoveryListener.onRecoveryDone(recoveryState);
                             }
                         } catch (Throwable t) {

+ 1 - 1
core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java

@@ -52,7 +52,7 @@ public class RestShrinkIndexAction extends BaseRestHandler {
         }
         ShrinkRequest shrinkIndexRequest = new ShrinkRequest(request.param("target"), request.param("index"));
         if (request.hasContent()) {
-            shrinkIndexRequest.getShrinkIndexReqeust().source(request.content());
+            shrinkIndexRequest.getShrinkIndexRequest().source(request.content());
         }
         shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
         shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));

+ 70 - 0
core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -289,6 +289,76 @@ public class CreateIndexIT extends ESIntegTestCase {
         ensureGreen("test");
     }
 
+    public void testCreateShrinkIndexToN() {
+        int[][] possibleShardSplits = new int[][] {{8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1}};
+        int[] shardSplits = randomFrom(possibleShardSplits);
+        assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
+        assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
+        for (int i = 0; i < 20; i++) {
+            client().prepareIndex("source", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
+        }
+        ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
+            .getDataNodes();
+        assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
+        DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
+        String mergeNode = discoveryNodes[0].getName();
+        // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
+        // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
+        // to the require._name below.
+        ensureGreen();
+        // relocate all shards to one node such that we can merge it.
+        client().admin().indices().prepareUpdateSettings("source")
+            .setSettings(Settings.builder()
+                .put("index.routing.allocation.require._name", mergeNode)
+                .put("index.blocks.write", true)).get();
+        ensureGreen();
+        // now merge source into a 4 shard index
+        assertAcked(client().admin().indices().prepareShrinkIndex("source", "first_shrink")
+            .setSettings(Settings.builder()
+                .put("index.number_of_replicas", 0)
+                .put("index.number_of_shards", shardSplits[1]).build()).get());
+        ensureGreen();
+        assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+
+        for (int i = 0; i < 20; i++) { // now update
+            client().prepareIndex("first_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
+        }
+        flushAndRefresh();
+        assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+        assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+
+        // relocate all shards to one node such that we can merge it.
+        client().admin().indices().prepareUpdateSettings("first_shrink")
+            .setSettings(Settings.builder()
+                .put("index.routing.allocation.require._name", mergeNode)
+                .put("index.blocks.write", true)).get();
+        ensureGreen();
+        // now merge source into a 2 shard index
+        assertAcked(client().admin().indices().prepareShrinkIndex("first_shrink", "second_shrink")
+            .setSettings(Settings.builder()
+                .put("index.number_of_replicas", 0)
+                .put("index.number_of_shards", shardSplits[2]).build()).get());
+        ensureGreen();
+        assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+        // let it be allocated anywhere and bump replicas
+        client().admin().indices().prepareUpdateSettings("second_shrink")
+            .setSettings(Settings.builder()
+                .putNull("index.routing.allocation.include._id")
+                .put("index.number_of_replicas", 1)).get();
+        ensureGreen();
+        assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+
+        for (int i = 0; i < 20; i++) { // now update
+            client().prepareIndex("second_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
+        }
+        flushAndRefresh();
+        assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+        assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+        assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+    }
+
     public void testCreateShrinkIndex() {
         internalCluster().ensureAtLeastNumDataNodes(2);
         prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get();

+ 23 - 14
core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java

@@ -28,7 +28,6 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
@@ -39,9 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.DummyTransportAddress;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.shard.DocsStats;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.gateway.NoopGatewayAllocator;
 
@@ -70,15 +67,26 @@ public class TransportShrinkActionTests extends ESTestCase {
     }
 
     public void testErrorCondition() {
-        ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10),
+        ClusterState state = createClusterState("source", randomIntBetween(2, 42), randomIntBetween(0, 10),
             Settings.builder().put("index.blocks.write", true).build());
-        DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000));
-
-        assertEquals("Can't merge index with more than [2147483519] docs -  too many documents",
+        assertTrue(
             expectThrows(IllegalStateException.class, () ->
             TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state,
-                new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
-        ).getMessage());
+                (i) -> new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
+        ).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
+
+
+        assertTrue(
+            expectThrows(IllegalStateException.class, () -> {
+                ShrinkRequest req = new ShrinkRequest("target", "source");
+                req.getShrinkIndexRequest().settings(Settings.builder().put("index.number_of_shards", 4));
+                ClusterState clusterState = createClusterState("source", 8, 1,
+                    Settings.builder().put("index.blocks.write", true).build());
+                    TransportShrinkAction.prepareCreateIndexRequest(req, clusterState,
+                        (i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE/2, randomIntBetween(1, 1000)) : null,
+                        new IndexNameExpressionResolver(Settings.EMPTY));
+                }
+            ).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
 
 
         // create one that won't fail
@@ -96,8 +104,8 @@ public class TransportShrinkActionTests extends ESTestCase {
             routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
         clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
 
-        TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, stats,
-            new IndexNameExpressionResolver(Settings.EMPTY));
+        TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState,
+            (i) -> new DocsStats(randomIntBetween(1, 1000), randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY));
     }
 
     public void testShrinkIndexSettings() {
@@ -118,11 +126,12 @@ public class TransportShrinkActionTests extends ESTestCase {
         routingTable = service.applyStartedShards(clusterState,
             routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
         clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
-
-        DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000));
+        int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
+        DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000));
         ShrinkRequest target = new ShrinkRequest("target", indexName);
         CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest(
-            target, clusterState, stats, new IndexNameExpressionResolver(Settings.EMPTY));
+            target, clusterState, (i) -> stats,
+            new IndexNameExpressionResolver(Settings.EMPTY));
         assertNotNull(request.shrinkFrom());
         assertEquals(indexName, request.shrinkFrom().getName());
         assertEquals("1", request.settings().get("index.number_of_shards"));

+ 126 - 0
core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java

@@ -0,0 +1,126 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class IndexMetaDataTests extends ESTestCase {
+
+    public void testIndexMetaDataSerialization() throws IOException {
+        Integer numShard = randomFrom(1, 2, 4, 8, 16);
+        int numberOfReplicas = randomIntBetween(0, 10);
+        IndexMetaData metaData = IndexMetaData.builder("foo")
+            .settings(Settings.builder()
+                .put("index.version.created", 1)
+                .put("index.number_of_shards", numShard)
+                .put("index.number_of_replicas", numberOfReplicas)
+                .build())
+            .creationDate(randomLong())
+            .primaryTerm(0, 2)
+            .setRoutingNumShards(32)
+            .build();
+
+        final XContentBuilder builder = JsonXContent.contentBuilder();
+        builder.startObject();
+        metaData.toXContent(builder, ToXContent.EMPTY_PARAMS);
+        builder.endObject();
+        XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
+        final IndexMetaData fromXContentMeta = IndexMetaData.PROTO.fromXContent(parser, null);
+        assertEquals(metaData, fromXContentMeta);
+        assertEquals(metaData.hashCode(), fromXContentMeta.hashCode());
+
+        assertEquals(metaData.getNumberOfReplicas(), fromXContentMeta.getNumberOfReplicas());
+        assertEquals(metaData.getNumberOfShards(), fromXContentMeta.getNumberOfShards());
+        assertEquals(metaData.getCreationVersion(), fromXContentMeta.getCreationVersion());
+        assertEquals(metaData.getRoutingNumShards(), fromXContentMeta.getRoutingNumShards());
+        assertEquals(metaData.getCreationDate(), fromXContentMeta.getCreationDate());
+        assertEquals(metaData.getRoutingFactor(), fromXContentMeta.getRoutingFactor());
+        assertEquals(metaData.primaryTerm(0), fromXContentMeta.primaryTerm(0));
+
+        final BytesStreamOutput out = new BytesStreamOutput();
+        metaData.writeTo(out);
+        IndexMetaData deserialized = IndexMetaData.PROTO.readFrom(StreamInput.wrap(out.bytes()));
+        assertEquals(metaData, deserialized);
+        assertEquals(metaData.hashCode(), deserialized.hashCode());
+
+        assertEquals(metaData.getNumberOfReplicas(), deserialized.getNumberOfReplicas());
+        assertEquals(metaData.getNumberOfShards(), deserialized.getNumberOfShards());
+        assertEquals(metaData.getCreationVersion(), deserialized.getCreationVersion());
+        assertEquals(metaData.getRoutingNumShards(), deserialized.getRoutingNumShards());
+        assertEquals(metaData.getCreationDate(), deserialized.getCreationDate());
+        assertEquals(metaData.getRoutingFactor(), deserialized.getRoutingFactor());
+        assertEquals(metaData.primaryTerm(0), deserialized.primaryTerm(0));
+    }
+
+    public void testGetRoutingFactor() {
+        int numberOfReplicas = randomIntBetween(0, 10);
+        IndexMetaData metaData = IndexMetaData.builder("foo")
+            .settings(Settings.builder()
+                .put("index.version.created", 1)
+                .put("index.number_of_shards", 32)
+                .put("index.number_of_replicas", numberOfReplicas)
+                .build())
+            .creationDate(randomLong())
+            .build();
+        Integer numShard = randomFrom(1, 2, 4, 8, 16);
+        int routingFactor = IndexMetaData.getRoutingFactor(metaData, numShard);
+        assertEquals(routingFactor * numShard, metaData.getNumberOfShards());
+
+        Integer brokenNumShards = randomFrom(3, 5, 9, 12, 29, 42, 64);
+        expectThrows(IllegalArgumentException.class, () -> IndexMetaData.getRoutingFactor(metaData, brokenNumShards));
+    }
+
+    public void testSelectShrinkShards() {
+        int numberOfReplicas = randomIntBetween(0, 10);
+        IndexMetaData metaData = IndexMetaData.builder("foo")
+            .settings(Settings.builder()
+                .put("index.version.created", 1)
+                .put("index.number_of_shards", 32)
+                .put("index.number_of_replicas", numberOfReplicas)
+                .build())
+            .creationDate(randomLong())
+            .build();
+        Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(0, metaData, 8);
+        assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 0), new ShardId(metaData.getIndex(), 1),
+            new ShardId(metaData.getIndex(), 2), new ShardId(metaData.getIndex(), 3)));
+        shardIds = IndexMetaData.selectShrinkShards(1, metaData, 8);
+        assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 4), new ShardId(metaData.getIndex(), 5),
+            new ShardId(metaData.getIndex(), 6), new ShardId(metaData.getIndex(), 7)));
+        shardIds = IndexMetaData.selectShrinkShards(7, metaData, 8);
+        assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 28), new ShardId(metaData.getIndex(), 29),
+            new ShardId(metaData.getIndex(), 30), new ShardId(metaData.getIndex(), 31)));
+
+        assertEquals("the number of target shards (8) must be greater than the shard id: 8",
+            expectThrows(IllegalArgumentException.class, () -> IndexMetaData.selectShrinkShards(8, metaData, 8)).getMessage());
+    }
+}

+ 28 - 12
core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java

@@ -66,8 +66,15 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
         return clusterState;
     }
 
+    public static boolean isShrinkable(int source, int target) {
+        int x = source / target;
+        assert source > target : source  + " <= " + target;
+        return target * x == source;
+    }
+
     public void testValidateShrinkIndex() {
-        ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10),
+        int numShards = randomIntBetween(2, 42);
+        ClusterState state = createClusterState("source", numShards, randomIntBetween(0, 10),
             Settings.builder().put("index.blocks.write", true).build());
 
         assertEquals("index [source] already exists",
@@ -81,12 +88,18 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
             ).getMessage());
 
         assertEquals("can't shrink an index with only one shard",
-            expectThrows(IllegalArgumentException.class, () ->
-                    MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0,
-                        Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(),
+            expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source",
+                1, 0, Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(),
                         "target", Settings.EMPTY)
             ).getMessage());
 
+        assertEquals("the number of target shards must be less that the number of source shards",
+            expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source",
+                5, 0, Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(),
+                "target", Settings.builder().put("index.number_of_shards", 10).build())
+            ).getMessage());
+
+
         assertEquals("index source must be read-only to shrink index. use \"index.blocks.write=true\"",
             expectThrows(IllegalStateException.class, () ->
                     MetaDataCreateIndexService.validateShrinkIndex(
@@ -99,11 +112,11 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
                 MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", Settings.EMPTY)
 
             ).getMessage());
-
-        assertEquals("can not shrink index into more than one shard",
+        assertEquals("the number of source shards [8] must be a must be a multiple of [3]",
             expectThrows(IllegalArgumentException.class, () ->
-                MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target",
-                    Settings.builder().put("index.number_of_shards", 2).build())
+                    MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 8, randomIntBetween(0, 10),
+                        Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), "target",
+                        Settings.builder().put("index.number_of_shards", 3).build())
             ).getMessage());
 
         assertEquals("mappings are not allowed when shrinking indices, all mappings are copied from the source index",
@@ -114,7 +127,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
             ).getMessage());
 
         // create one that won't fail
-        ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0,
+        ClusterState clusterState = ClusterState.builder(createClusterState("source", numShards, 0,
             Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1")))
             .build();
         AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
@@ -127,8 +140,12 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
         routingTable = service.applyStartedShards(clusterState,
             routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
         clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
-
-        MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", Settings.EMPTY);
+        int targetShards;
+        do {
+            targetShards = randomIntBetween(1, numShards/2);
+        } while (isShrinkable(numShards, targetShards) == false);
+        MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target",
+            Settings.builder().put("index.number_of_shards", targetShards).build());
     }
 
     public void testShrinkIndexSettings() {
@@ -155,7 +172,6 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
         Settings.Builder builder = Settings.builder();
         MetaDataCreateIndexService.prepareShrinkIndexSettings(
             clusterState, Collections.emptySet(), builder, clusterState.metaData().index(indexName).getIndex(), "target");
-        assertEquals("1", builder.build().get("index.number_of_shards"));
         assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type"));
         assertEquals("analysis settings must be copied",
             "keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer"));

+ 173 - 0
core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java

@@ -0,0 +1,173 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class OperationRoutingTests extends ESTestCase{
+
+    public void testGenerateShardId() {
+        int[][] possibleValues = new int[][] {
+            {8,4,2}, {20, 10, 2}, {36, 12, 3}, {15,5,1}
+        };
+        for (int i = 0; i < 10; i++) {
+            int[] shardSplits = randomFrom(possibleValues);
+            assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
+            assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);
+            IndexMetaData metaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[0])
+                .numberOfReplicas(1).build();
+            String term = randomAsciiOfLength(10);
+            final int shard = OperationRouting.generateShardId(metaData, term, null);
+            IndexMetaData shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[1])
+                .numberOfReplicas(1)
+                .setRoutingNumShards(shardSplits[0]).build();
+            int shrunkShard = OperationRouting.generateShardId(shrunk, term, null);
+            Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards());
+            assertEquals(1, shardIds.stream().filter((sid) -> sid.id() == shard).count());
+
+            shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[2]).numberOfReplicas(1)
+                .setRoutingNumShards(shardSplits[0]).build();
+            shrunkShard = OperationRouting.generateShardId(shrunk, term, null);
+            shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards());
+            assertEquals(Arrays.toString(shardSplits), 1, shardIds.stream().filter((sid) -> sid.id() == shard).count());
+        }
+    }
+
+    /**
+     * Ensures that all changes to the hash-function / shard selection are BWC
+     */
+    public void testBWC() {
+        Map<String, Integer> termToShard = new TreeMap<>();
+        termToShard.put("sEERfFzPSI", 1);
+        termToShard.put("cNRiIrjzYd", 7);
+        termToShard.put("BgfLBXUyWT", 5);
+        termToShard.put("cnepjZhQnb", 3);
+        termToShard.put("OKCmuYkeCK", 6);
+        termToShard.put("OutXGRQUja", 5);
+        termToShard.put("yCdyocKWou", 1);
+        termToShard.put("KXuNWWNgVj", 2);
+        termToShard.put("DGJOYrpESx", 4);
+        termToShard.put("upLDybdTGs", 5);
+        termToShard.put("yhZhzCPQby", 1);
+        termToShard.put("EyCVeiCouA", 1);
+        termToShard.put("tFyVdQauWR", 6);
+        termToShard.put("nyeRYDnDQr", 6);
+        termToShard.put("hswhrppvDH", 0);
+        termToShard.put("BSiWvDOsNE", 5);
+        termToShard.put("YHicpFBSaY", 1);
+        termToShard.put("EquPtdKaBZ", 4);
+        termToShard.put("rSjLZHCDfT", 5);
+        termToShard.put("qoZALVcite", 7);
+        termToShard.put("yDCCPVBiCm", 7);
+        termToShard.put("ngizYtQgGK", 5);
+        termToShard.put("FYQRIBcNqz", 0);
+        termToShard.put("EBzEDAPODe", 2);
+        termToShard.put("YePigbXgKb", 1);
+        termToShard.put("PeGJjomyik", 3);
+        termToShard.put("cyQIvDmyYD", 7);
+        termToShard.put("yIEfZrYfRk", 5);
+        termToShard.put("kblouyFUbu", 7);
+        termToShard.put("xvIGbRiGJF", 3);
+        termToShard.put("KWimwsREPf", 4);
+        termToShard.put("wsNavvIcdk", 7);
+        termToShard.put("xkWaPcCmpT", 0);
+        termToShard.put("FKKTOnJMDy", 7);
+        termToShard.put("RuLzobYixn", 2);
+        termToShard.put("mFohLeFRvF", 4);
+        termToShard.put("aAMXnamRJg", 7);
+        termToShard.put("zKBMYJDmBI", 0);
+        termToShard.put("ElSVuJQQuw", 7);
+        termToShard.put("pezPtTQAAm", 7);
+        termToShard.put("zBjjNEjAex", 2);
+        termToShard.put("PGgHcLNPYX", 7);
+        termToShard.put("hOkpeQqTDF", 3);
+        termToShard.put("chZXraUPBH", 7);
+        termToShard.put("FAIcSmmNXq", 5);
+        termToShard.put("EZmDicyayC", 0);
+        termToShard.put("GRIueBeIyL", 7);
+        termToShard.put("qCChjGZYLp", 3);
+        termToShard.put("IsSZQwwnUT", 3);
+        termToShard.put("MGlxLFyyCK", 3);
+        termToShard.put("YmscwrKSpB", 0);
+        termToShard.put("czSljcjMop", 5);
+        termToShard.put("XhfGWwNlng", 1);
+        termToShard.put("cWpKJjlzgj", 7);
+        termToShard.put("eDzIfMKbvk", 1);
+        termToShard.put("WFFWYBfnTb", 0);
+        termToShard.put("oDdHJxGxja", 7);
+        termToShard.put("PDOQQqgIKE", 1);
+        termToShard.put("bGEIEBLATe", 6);
+        termToShard.put("xpRkJPWVpu", 2);
+        termToShard.put("kTwZnPEeIi", 2);
+        termToShard.put("DifcuqSsKk", 1);
+        termToShard.put("CEmLmljpXe", 5);
+        termToShard.put("cuNKtLtyJQ", 7);
+        termToShard.put("yNjiAnxAmt", 5);
+        termToShard.put("bVDJDCeaFm", 2);
+        termToShard.put("vdnUhGLFtl", 0);
+        termToShard.put("LnqSYezXbr", 5);
+        termToShard.put("EzHgydDCSR", 3);
+        termToShard.put("ZSKjhJlcpn", 1);
+        termToShard.put("WRjUoZwtUz", 3);
+        termToShard.put("RiBbcCdIgk", 4);
+        termToShard.put("yizTqyjuDn", 4);
+        termToShard.put("QnFjcpcZUT", 4);
+        termToShard.put("agYhXYUUpl", 7);
+        termToShard.put("UOjiTugjNC", 7);
+        termToShard.put("nICGuWTdfV", 0);
+        termToShard.put("NrnSmcnUVF", 2);
+        termToShard.put("ZSzFcbpDqP", 3);
+        termToShard.put("YOhahLSzzE", 5);
+        termToShard.put("iWswCilUaT", 1);
+        termToShard.put("zXAamKsRwj", 2);
+        termToShard.put("aqGsrUPHFq", 5);
+        termToShard.put("eDItImYWTS", 1);
+        termToShard.put("JAYDZMRcpW", 4);
+        termToShard.put("lmvAaEPflK", 7);
+        termToShard.put("IKuOwPjKCx", 5);
+        termToShard.put("schsINzlYB", 1);
+        termToShard.put("OqbFNxrKrF", 2);
+        termToShard.put("QrklDfvEJU", 6);
+        termToShard.put("VLxKRKdLbx", 4);
+        termToShard.put("imoydNTZhV", 1);
+        termToShard.put("uFZyTyOMRO", 4);
+        termToShard.put("nVAZVMPNNx", 3);
+        termToShard.put("rPIdESYaAO", 5);
+        termToShard.put("nbZWPWJsIM", 0);
+        termToShard.put("wRZXPSoEgd", 3);
+        termToShard.put("nGzpgwsSBc", 4);
+        termToShard.put("AITyyoyLLs", 4);
+        IndexMetaData metaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(8)
+            .numberOfReplicas(1).build();
+        for (Map.Entry<String, Integer> entry : termToShard.entrySet()) {
+            String key = entry.getKey();
+            int shard = randomBoolean() ?
+                OperationRouting.generateShardId(metaData, key, null) : OperationRouting.generateShardId(metaData, "foobar", key);
+            assertEquals(shard, entry.getValue().intValue());
+        }
+    }
+}

+ 18 - 5
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java

@@ -298,17 +298,22 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
         ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
         shardSizes.put("[test][0][p]", 10L);
         shardSizes.put("[test][1][p]", 100L);
-        shardSizes.put("[test][2][p]", 1000L);
+        shardSizes.put("[test][2][p]", 500L);
+        shardSizes.put("[test][3][p]", 500L);
+
         ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build());
         MetaData.Builder metaBuilder = MetaData.builder();
         metaBuilder.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put("index.uuid", "1234"))
-            .numberOfShards(3).numberOfReplicas(0));
+            .numberOfShards(4).numberOfReplicas(0));
         metaBuilder.put(IndexMetaData.builder("target").settings(settings(Version.CURRENT).put("index.uuid", "5678")
             .put("index.shrink.source.name", "test").put("index.shrink.source.uuid", "1234")).numberOfShards(1).numberOfReplicas(0));
+        metaBuilder.put(IndexMetaData.builder("target2").settings(settings(Version.CURRENT).put("index.uuid", "9101112")
+            .put("index.shrink.source.name", "test").put("index.shrink.source.uuid", "1234")).numberOfShards(2).numberOfReplicas(0));
         MetaData metaData = metaBuilder.build();
         RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
         routingTableBuilder.addAsNew(metaData.index("test"));
         routingTableBuilder.addAsNew(metaData.index("target"));
+        routingTableBuilder.addAsNew(metaData.index("target2"));
         ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
             .metaData(metaData).routingTable(routingTableBuilder.build()).build();
 
@@ -339,18 +344,26 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
             new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
         test_2 = ShardRoutingHelper.initialize(test_2, "node1");
 
-        assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0));
+        ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), null, true,
+            new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
+        test_3 = ShardRoutingHelper.initialize(test_3, "node1");
+        assertEquals(500l, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0));
+        assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0));
         assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0));
         assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0));
 
 
         ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0),
             null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
-
-
         assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0));
 
+        ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0),
+            null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
+        assertEquals(110L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0));
 
+        target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1),
+            null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
+        assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0));
     }
 
 }

+ 23 - 10
docs/reference/indices/shrink-index.asciidoc

@@ -2,13 +2,18 @@
 == Shrink Index
 
 The shrink index API allows you to shrink an existing index into a new index
-with a single primary shard. Before shrinking, a (primary or replica) copy of
-every shard in the index must be present on the same node.
+with fewer primary shards. The number of primary shards in the target index
+must be a factor of the shards in the source index. For example an index with
+`8` primary shards can be shrunk into `4`, `2` or `1` primary shards or an index
+with `15` primary shards can be shrunk into `5`, `3` or `1`. If the number
+of shards in the index is a prime number it can only be shrunk into a single
+primary shard. Before shrinking, a (primary or replica) copy of every shard
+in the index must be present on the same node.
 
 Shrinking works as follows:
 
 * First, it creates a new target index with the same definition as the source
-  index, but with a single primary shard.
+  index, but with a smaller number of primary shards.
 
 * Then it hard-links segments from the source index into the target index. (If
   the file system doesn't support hard-linking, then all segments are copied
@@ -64,15 +69,19 @@ the cluster state -- it doesn't wait for the shrink operation to start.
 [IMPORTANT]
 =====================================
 
-Indices can only be shrunk into a single shard if they satisfy the following requirements:
+Indices can only be shrunk if they satisfy the following requirements:
 
- * the target index must not exist
+* the target index must not exist
 
-* The index must have more than one primary shard.
+* The index must have more primary shards than the target index.
+
+* The number of primary shards in the target index must be a factor of the
+  number of primary shards in the source index. must have more primary shards
+   than the target index.
 
 * The index must not contain more than `2,147,483,519` documents in total
-  across all shards as this is the maximum number of docs that can fit into a
-  single shard.
+  across all shards that will be shrunk into a single shard on the target index
+  as this is the maximum number of docs that can fit into a single shard.
 
 * The node handling the shrink process must have sufficient free disk space to
   accommodate a second copy of the existing index.
@@ -88,7 +97,8 @@ POST my_source_index/_shrink/my_target_index
 {
   "settings": {
     "index.number_of_replicas": 1,
-    "index.codec": "best_compression" <1>
+    "index.number_of_shards": 1, <1>
+    "index.codec": "best_compression" <2>
   },
   "aliases": {
     "my_search_indices": {}
@@ -96,10 +106,13 @@ POST my_source_index/_shrink/my_target_index
 }
 --------------------------------------------------
 
-<1> Best compression will only take affect when new writes are made to the
+<1> The number of shards in the target index. This must be a factor of the
+    number of shards in the source index.
+<2> Best compression will only take affect when new writes are made to the
     index, such as when <<indices-forcemerge,force-merging>> the shard to a single
     segment.
 
+
 NOTE: Mappings may not be specified in the `_shrink` request, and all
 `index.analysis.*` and `index.similarity.*` settings will be overwritten with
 the settings from the source index.