Browse Source

Add ability to snapshot replicating primary shards

This change adds a new cluster state that waits for the replication of a shard to finish before starting snapshotting process. Because this change adds a new snapshot state, an pre-1.2.0 nodes will not be able to join the 1.2.0 cluster that is currently running snapshot/restore operation.

Closes #5531
Igor Motov 11 years ago
parent
commit
91c7892305

+ 4 - 1
docs/reference/modules/snapshots.asciidoc

@@ -131,7 +131,10 @@ changed since the last snapshot. That allows multiple snapshots to be preserved
 Snapshotting process is executed in non-blocking fashion. All indexing and searching operation can continue to be
 executed against the index that is being snapshotted. However, a snapshot represents the point-in-time view of the index
 at the moment when snapshot was created, so no records that were added to the index after snapshot process had started
-will be present in the snapshot.
+will be present in the snapshot. The snapshot process starts immediately for the primary shards that has been started
+and are not relocating at the moment. Before version 1.2.0 the snapshot operation fails if cluster has any relocating or
+initializing primaries of indices participating in the snapshot. Starting with version 1.2.0, Elasticsearch waits for
+are relocating or initializing shards to start before snapshotting them.
 
 Besides creating a copy of each index the snapshot process can also store global cluster metadata, which includes persistent
 cluster settings and templates. The transient settings and registered snapshot repositories are not stored as part of

+ 1 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

@@ -49,6 +49,7 @@ public class SnapshotIndexShardStatus extends BroadcastShardOperationResponse im
     SnapshotIndexShardStatus(String index, int shardId, SnapshotIndexShardStage stage) {
         super(index, shardId);
         this.stage = stage;
+        this.stats = new SnapshotStats();
     }
 
     SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus) {

+ 8 - 3
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -111,9 +111,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
                     snapshotIds, request.masterNodeTimeout(), new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
                 @Override
                 public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
-                    ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
-                            snapshotsService.currentSnapshots(request.repository(), request.snapshots());
-                    listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
+                    try {
+                        ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
+                                snapshotsService.currentSnapshots(request.repository(), request.snapshots());
+                        listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
+                    } catch (Throwable e) {
+                        listener.onFailure(e);
+                    }
                 }
 
                 @Override
@@ -169,6 +173,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
                             stage = SnapshotIndexShardStage.FAILURE;
                             break;
                         case INIT:
+                        case WAITING:
                         case STARTED:
                             stage = SnapshotIndexShardStage.STARTED;
                             break;

+ 52 - 41
src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java

@@ -32,6 +32,8 @@ import org.elasticsearch.index.shard.ShardId;
 import java.io.IOException;
 import java.util.Map;
 
+import static com.google.common.collect.Maps.newHashMap;
+
 /**
  * Meta data about snapshots that are currently executing
  */
@@ -63,6 +65,7 @@ public class SnapshotMetaData implements MetaData.Custom {
         private final boolean includeGlobalState;
         private final ImmutableMap<ShardId, ShardSnapshotStatus> shards;
         private final ImmutableList<String> indices;
+        private final ImmutableMap<String, ImmutableList<ShardId>> waitingIndices;
 
         public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
             this.state = state;
@@ -71,8 +74,10 @@ public class SnapshotMetaData implements MetaData.Custom {
             this.indices = indices;
             if (shards == null) {
                 this.shards = ImmutableMap.of();
+                this.waitingIndices = ImmutableMap.of();
             } else {
                 this.shards = shards;
+                this.waitingIndices = findWaitingIndices(shards);
             }
         }
 
@@ -92,6 +97,10 @@ public class SnapshotMetaData implements MetaData.Custom {
             return indices;
         }
 
+        public ImmutableMap<String, ImmutableList<ShardId>> waitingIndices() {
+            return waitingIndices;
+        }
+
         public boolean includeGlobalState() {
             return includeGlobalState;
         }
@@ -121,6 +130,31 @@ public class SnapshotMetaData implements MetaData.Custom {
             result = 31 * result + indices.hashCode();
             return result;
         }
+
+        private ImmutableMap<String, ImmutableList<ShardId>> findWaitingIndices(ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
+            Map<String, ImmutableList.Builder<ShardId>> waitingIndicesMap = newHashMap();
+            for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
+                if (entry.getValue().state() == State.WAITING) {
+                    ImmutableList.Builder<ShardId> waitingShards = waitingIndicesMap.get(entry.getKey().getIndex());
+                    if (waitingShards == null) {
+                        waitingShards = ImmutableList.builder();
+                        waitingIndicesMap.put(entry.getKey().getIndex(), waitingShards);
+                    }
+                    waitingShards.add(entry.getKey());
+                }
+            }
+            if (!waitingIndicesMap.isEmpty()) {
+                ImmutableMap.Builder<String, ImmutableList<ShardId>> waitingIndicesBuilder = ImmutableMap.builder();
+                for (Map.Entry<String, ImmutableList.Builder<ShardId>> entry : waitingIndicesMap.entrySet()) {
+                    waitingIndicesBuilder.put(entry.getKey(), entry.getValue().build());
+                }
+                return waitingIndicesBuilder.build();
+            } else {
+                return ImmutableMap.of();
+            }
+
+        }
+
     }
 
     public static class ShardSnapshotStatus {
@@ -199,17 +233,24 @@ public class SnapshotMetaData implements MetaData.Custom {
     }
 
     public static enum State {
-        INIT((byte) 0),
-        STARTED((byte) 1),
-        SUCCESS((byte) 2),
-        FAILED((byte) 3),
-        ABORTED((byte) 4),
-        MISSING((byte) 5);
+        INIT((byte) 0, false, false),
+        STARTED((byte) 1, false, false),
+        SUCCESS((byte) 2, true, false),
+        FAILED((byte) 3, true, true),
+        ABORTED((byte) 4, false, true),
+        MISSING((byte) 5, true, true),
+        WAITING((byte) 6, false, false);
 
         private byte value;
 
-        State(byte value) {
+        private boolean completed;
+
+        private boolean failed;
+
+        State(byte value, boolean completed, boolean failed) {
             this.value = value;
+            this.completed = completed;
+            this.failed = failed;
         }
 
         public byte value() {
@@ -217,43 +258,11 @@ public class SnapshotMetaData implements MetaData.Custom {
         }
 
         public boolean completed() {
-            switch (this) {
-                case INIT:
-                    return false;
-                case STARTED:
-                    return false;
-                case SUCCESS:
-                    return true;
-                case FAILED:
-                    return true;
-                case ABORTED:
-                    return false;
-                case MISSING:
-                    return true;
-                default:
-                    assert false;
-                    return true;
-            }
+            return completed;
         }
 
         public boolean failed() {
-            switch (this) {
-                case INIT:
-                    return false;
-                case STARTED:
-                    return false;
-                case SUCCESS:
-                    return false;
-                case FAILED:
-                    return true;
-                case ABORTED:
-                    return true;
-                case MISSING:
-                    return true;
-                default:
-                    assert false;
-                    return false;
-            }
+            return failed;
         }
 
         public static State fromValue(byte value) {
@@ -270,6 +279,8 @@ public class SnapshotMetaData implements MetaData.Custom {
                     return ABORTED;
                 case 5:
                     return MISSING;
+                case 6:
+                    return WAITING;
                 default:
                     throw new ElasticsearchIllegalArgumentException("No snapshot state for value [" + value + "]");
             }

+ 15 - 0
src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

@@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.UnmodifiableIterator;
 import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Booleans;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -282,6 +283,20 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
         return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
     }
 
+
+    /**
+     * Returns the version of the node with the oldest version in the cluster
+     *
+     * @return the oldest version in the cluster
+     */
+    public Version smallestVersion() {
+        Version version = Version.CURRENT;
+        for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
+            version = Version.smallest(version, cursor.value.version());
+        }
+        return version;
+    }
+
     /**
      * Resolve a node with a given id
      *

+ 5 - 0
src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -254,10 +254,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
                 return;
             }
 
+            logger.trace("processing new index repositories for state version [{}]", event.state().version());
+
             Map<String, RepositoryHolder> survivors = newHashMap();
             // First, remove repositories that are no longer there
             for (Map.Entry<String, RepositoryHolder> entry : repositories.entrySet()) {
                 if (newMetaData == null || newMetaData.repository(entry.getKey()) == null) {
+                    logger.debug("unregistering repository [{}]", entry.getKey());
                     closeRepository(entry.getKey(), entry.getValue());
                 } else {
                     survivors.put(entry.getKey(), entry.getValue());
@@ -273,6 +276,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
                         // Found previous version of this repository
                         if (!holder.type.equals(repositoryMetaData.type()) || !holder.settings.equals(repositoryMetaData.settings())) {
                             // Previous version is different from the version in settings
+                            logger.debug("updating repository [{}]", repositoryMetaData.name());
                             closeRepository(repositoryMetaData.name(), holder);
                             holder = createRepositoryHolder(repositoryMetaData);
                         }
@@ -280,6 +284,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
                         holder = createRepositoryHolder(repositoryMetaData);
                     }
                     if (holder != null) {
+                        logger.debug("registering repository [{}]", repositoryMetaData.name());
                         builder.put(repositoryMetaData.name(), holder);
                     }
                 }

+ 153 - 32
src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.*;
@@ -33,6 +34,8 @@ import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -63,7 +66,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
 import static com.google.common.collect.Sets.newHashSet;
 
 /**
@@ -503,6 +505,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
                 if (event.nodesRemoved()) {
                     processSnapshotsOnRemovedNodes(event);
                 }
+                if (event.routingTableChanged()) {
+                    processStartedShards(event);
+                }
             }
             SnapshotMetaData prev = event.previousState().metaData().custom(SnapshotMetaData.TYPE);
             SnapshotMetaData curr = event.state().metaData().custom(SnapshotMetaData.TYPE);
@@ -605,6 +610,112 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
         }
     }
 
+    private void processStartedShards(ClusterChangedEvent event) {
+        if (waitingShardsStartedOrUnassigned(event)) {
+            clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    MetaData metaData = currentState.metaData();
+                    RoutingTable routingTable = currentState.routingTable();
+                    MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
+                    SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
+                    if (snapshots != null) {
+                        boolean changed = false;
+                        ArrayList<SnapshotMetaData.Entry> entries = newArrayList();
+                        for (final SnapshotMetaData.Entry snapshot : snapshots.entries()) {
+                            SnapshotMetaData.Entry updatedSnapshot = snapshot;
+                            if (snapshot.state() == State.STARTED) {
+                                ImmutableMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(), routingTable);
+                                if (shards != null) {
+                                    changed = true;
+                                    if (!snapshot.state().completed() && completed(shards.values())) {
+                                        updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.SUCCESS, snapshot.indices(), shards);
+                                        endSnapshot(updatedSnapshot);
+                                    } else {
+                                        updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), snapshot.state(), snapshot.indices(), shards);
+                                    }
+                                }
+                                entries.add(updatedSnapshot);
+                            }
+                        }
+                        if (changed) {
+                            snapshots = new SnapshotMetaData(entries.toArray(new SnapshotMetaData.Entry[entries.size()]));
+                            mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
+                            return ClusterState.builder(currentState).metaData(mdBuilder).build();
+                        }
+                    }
+                    return currentState;
+                }
+
+                @Override
+                public void onFailure(String source, Throwable t) {
+                    logger.warn("failed to update snapshot state after shards started from [{}] ", t, source);
+                }
+            });
+        }
+    }
+
+    private ImmutableMap<ShardId, ShardSnapshotStatus> processWaitingShards(ImmutableMap<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable) {
+        boolean snapshotChanged = false;
+        ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
+        for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards.entrySet()) {
+            ShardSnapshotStatus shardStatus = shardEntry.getValue();
+            if (shardStatus.state() == State.WAITING) {
+                ShardId shardId = shardEntry.getKey();
+                IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
+                if (indexShardRoutingTable != null) {
+                    IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id());
+                    if (shardRouting != null && shardRouting.primaryShard() != null) {
+                        if (shardRouting.primaryShard().started()) {
+                            // Shard that we were waiting for has started on a node, let's process it
+                            snapshotChanged = true;
+                            logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardEntry.getKey(), shardStatus.nodeId());
+                            shards.put(shardEntry.getKey(), new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId()));
+                            continue;
+                        } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) {
+                            // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait
+                            shards.put(shardEntry);
+                            continue;
+                        }
+                    }
+                }
+                // Shard that we were waiting for went into unassigned state or disappeared - giving up
+                snapshotChanged = true;
+                logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardEntry.getKey(), shardStatus.nodeId());
+                shards.put(shardEntry.getKey(), new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
+            } else {
+                shards.put(shardEntry);
+            }
+        }
+        if (snapshotChanged) {
+            return shards.build();
+        } else {
+            return null;
+        }
+    }
+
+    private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) {
+        SnapshotMetaData curr = event.state().metaData().custom(SnapshotMetaData.TYPE);
+        if (curr != null) {
+            for (SnapshotMetaData.Entry entry : curr.entries()) {
+                if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) {
+                    for (String index : entry.waitingIndices().keySet()) {
+                        if (event.indexRoutingTableChanged(index)) {
+                            IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index);
+                            for (ShardId shardId : entry.waitingIndices().get(index)) {
+                                ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard();
+                                if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) {
+                                    return true;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
     private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
         // Check if we just became the master
         boolean newMaster = !event.previousState().nodes().localNodeMaster();
@@ -646,44 +757,51 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
 
         // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running
         // snapshots in the future
-        HashMap<SnapshotId, SnapshotShards> newSnapshots = null;
+        Map<SnapshotId, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = newHashMap();
         // Now go through all snapshots and update existing or create missing
         final String localNodeId = clusterService.localNode().id();
         for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
-            HashMap<ShardId, IndexShardSnapshotStatus> startedShards = null;
-            for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
-                // Check if we have new shards to start processing on
-                if (localNodeId.equals(shard.getValue().nodeId())) {
-                    if (entry.state() == State.STARTED) {
-                        if (startedShards == null) {
-                            startedShards = newHashMap();
-                        }
-                        startedShards.put(shard.getKey(), new IndexShardSnapshotStatus());
-                    } else if (entry.state() == State.ABORTED) {
-                        SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
-                        if (snapshotShards != null) {
-                            IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
-                            if (snapshotStatus != null) {
-                                snapshotStatus.abort();
-                            }
+            if (entry.state() == State.STARTED) {
+                Map<ShardId, IndexShardSnapshotStatus> startedShards = newHashMap();
+                SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
+                for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
+                    // Add all new shards to start processing on
+                    if (localNodeId.equals(shard.getValue().nodeId())) {
+                        if (shard.getValue().state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.getKey()))) {
+                            logger.trace("[{}] - Adding shard to the queue", shard.getKey());
+                            startedShards.put(shard.getKey(), new IndexShardSnapshotStatus());
                         }
                     }
                 }
-            }
-            if (startedShards != null) {
-                if (!survivors.containsKey(entry.snapshotId())) {
-                    if (newSnapshots == null) {
-                        newSnapshots = newHashMapWithExpectedSize(2);
+                if (!startedShards.isEmpty()) {
+                    newSnapshots.put(entry.snapshotId(), startedShards);
+                    if (snapshotShards != null) {
+                        // We already saw this snapshot but we need to add more started shards
+                        ImmutableMap.Builder<ShardId, IndexShardSnapshotStatus> shards = ImmutableMap.builder();
+                        // Put all shards that were already running on this node
+                        shards.putAll(snapshotShards.shards);
+                        // Put all newly started shards
+                        shards.putAll(startedShards);
+                        survivors.put(entry.snapshotId(), new SnapshotShards(shards.build()));
+                    } else {
+                        // Brand new snapshot that we haven't seen before
+                        survivors.put(entry.snapshotId(), new SnapshotShards(ImmutableMap.copyOf(startedShards)));
+                    }
+                }
+            } else if (entry.state() == State.ABORTED) {
+                // Abort all running shards for this snapshot
+                SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
+                if (snapshotShards != null) {
+                    for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
+                        IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
+                        if (snapshotStatus != null) {
+                            snapshotStatus.abort();
+                        }
                     }
-                    newSnapshots.put(entry.snapshotId(), new SnapshotShards(ImmutableMap.copyOf(startedShards)));
                 }
             }
         }
 
-        if (newSnapshots != null) {
-            survivors.putAll(newSnapshots);
-        }
-
         // Update the list of snapshots that we saw and tried to started
         // If startup of these shards fails later, we don't want to try starting these shards again
         shutdownLock.lock();
@@ -697,10 +815,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
             shutdownLock.unlock();
         }
 
-        // We have new snapshots to process -
-        if (newSnapshots != null) {
-            for (final Map.Entry<SnapshotId, SnapshotShards> entry : newSnapshots.entrySet()) {
-                for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().shards.entrySet()) {
+        // We have new shards to starts
+        if (!newSnapshots.isEmpty()) {
+            for (final Map.Entry<SnapshotId, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) {
+                for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
                     try {
                         final IndexShardSnapshotAndRestoreService shardSnapshotService = indicesService.indexServiceSafe(shardEntry.getKey().getIndex()).shardInjectorSafe(shardEntry.getKey().id())
                                 .getInstance(IndexShardSnapshotAndRestoreService.class);
@@ -1089,6 +1207,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
                 ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
                 if (primary == null || !primary.assignedToNode()) {
                     builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
+                } else if (clusterState.getNodes().smallestVersion().onOrAfter(Version.V_1_2_0) && (primary.relocating() || primary.initializing())) {
+                    // The WAITING state was introduced in V1.2.0 - don't use it if there are nodes with older version in the cluster
+                    builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
                 } else if (!primary.started()) {
                     builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet"));
                 } else {

+ 0 - 2
src/test/java/org/elasticsearch/indices/IndicesOptionsIntegrationTests.java

@@ -171,7 +171,6 @@ public class IndicesOptionsIntegrationTests extends ElasticsearchIntegrationTest
     @Test
     public void testSpecifiedIndexUnavailable_snapshotRestore() throws Exception {
         createIndex("test1");
-        ensureYellow();
 
         PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("dummy-repo")
                 .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).get();
@@ -327,7 +326,6 @@ public class IndicesOptionsIntegrationTests extends ElasticsearchIntegrationTest
     @Test
     public void testWildcardBehaviour_snapshotRestore() throws Exception {
         createIndex("foobar");
-        ensureYellow();
 
         PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("dummy-repo")
                 .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).get();

+ 147 - 0
src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java

@@ -20,22 +20,30 @@
 package org.elasticsearch.snapshots;
 
 import com.carrotsearch.randomizedtesting.LifecycleScope;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.store.support.AbstractIndexStore;
 import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.store.MockDirectoryHelper;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@@ -261,6 +269,145 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
         ensureGreen("test-idx-2");
 
         assertThat(client().prepareCount("test-idx-2").get().getCount(), equalTo(100L));
+    }
+
+    @Test
+    @TestLogging("snapshots:TRACE,repositories:TRACE")
+    @Ignore
+    public void chaosSnapshotTest() throws Exception {
+        final List<String> indices = new CopyOnWriteArrayList<>();
+        Settings settings = settingsBuilder().put("action.write_consistency", "one").build();
+        int initialNodes = between(1, 3);
+        logger.info("--> start {} nodes", initialNodes);
+        for (int i = 0; i < initialNodes; i++) {
+            cluster().startNode(settings);
+        }
+
+        logger.info("-->  creating repository");
+        assertAcked(client().admin().cluster().preparePutRepository("test-repo")
+                .setType("fs").setSettings(ImmutableSettings.settingsBuilder()
+                        .put("location", newTempDir(LifecycleScope.SUITE))
+                        .put("compress", randomBoolean())
+                        .put("chunk_size", randomIntBetween(100, 1000))));
+
+        int initialIndices = between(1, 3);
+        logger.info("--> create {} indices", initialIndices);
+        for (int i = 0; i < initialIndices; i++) {
+            createTestIndex("test-" + i);
+            indices.add("test-" + i);
+        }
+
+        int asyncNodes = between(0, 5);
+        logger.info("--> start {} additional nodes asynchronously", asyncNodes);
+        ListenableFuture<List<String>> asyncNodesFuture = cluster().startNodesAsync(asyncNodes, settings);
+
+        int asyncIndices = between(0, 10);
+        logger.info("--> create {} additional indices asynchronously", asyncIndices);
+        Thread[] asyncIndexThreads = new Thread[asyncIndices];
+        for (int i = 0; i < asyncIndices; i++) {
+            final int cur = i;
+            asyncIndexThreads[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    createTestIndex("test-async-" + cur);
+                    indices.add("test-async-" + cur);
+
+                }
+            });
+            asyncIndexThreads[i].start();
+        }
+
+        logger.info("--> snapshot");
+
+        ListenableActionFuture<CreateSnapshotResponse> snapshotResponseFuture = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-*").setPartial(true).execute();
+
+        long start = System.currentTimeMillis();
+        // Produce chaos for 30 sec or until snapshot is done whatever comes first
+        int randomIndices = 0;
+        while (System.currentTimeMillis() - start < 30000 && !snapshotIsDone("test-repo", "test-snap")) {
+            Thread.sleep(100);
+            int chaosType = randomInt(10);
+            if (chaosType < 4) {
+                // Randomly delete an index
+                if (indices.size() > 0) {
+                    String index = indices.remove(randomInt(indices.size() - 1));
+                    logger.info("--> deleting random index [{}]", index);
+                    cluster().wipeIndices(index);
+                }
+            } else if (chaosType < 6) {
+                // Randomly shutdown a node
+                if (cluster().size() > 1) {
+                    logger.info("--> shutting down random node");
+                    cluster().stopRandomDataNode();
+                }
+            } else if (chaosType < 8) {
+                // Randomly create an index
+                String index = "test-rand-" + randomIndices;
+                logger.info("--> creating random index [{}]", index);
+                createTestIndex(index);
+                randomIndices++;
+            } else {
+                // Take a break
+                logger.info("--> noop");
+            }
+        }
+
+        logger.info("--> waiting for async indices creation to finish");
+        for (int i = 0; i < asyncIndices; i++) {
+            asyncIndexThreads[i].join();
+        }
+
+        logger.info("--> update index settings to back to normal");
+        assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(ImmutableSettings.builder()
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
+        ));
+
+        // Make sure that snapshot finished - doesn't matter if it failed or succeeded
+        try {
+            CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get();
+            SnapshotInfo snapshotInfo = snapshotResponse.getSnapshotInfo();
+            assertNotNull(snapshotInfo);
+            logger.info("--> snapshot is done with state [{}], total shards [{}], successful shards [{}]", snapshotInfo.state(), snapshotInfo.totalShards(), snapshotInfo.successfulShards());
+        } catch (Exception ex) {
+            logger.info("--> snapshot didn't start properly", ex);
+        }
+
+        asyncNodesFuture.get();
+        logger.info("--> done");
+    }
+
+    private boolean snapshotIsDone(String repository, String snapshot) {
+        try {
+            SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus(repository).setSnapshots(snapshot).get();
+            if (snapshotsStatusResponse.getSnapshots().isEmpty()) {
+                return false;
+            }
+            for (SnapshotStatus snapshotStatus : snapshotsStatusResponse.getSnapshots()) {
+                if (snapshotStatus.getState().completed()) {
+                    return true;
+                }
+            }
+            return false;
+        } catch (SnapshotMissingException ex) {
+            return false;
+        }
+    }
+
+    private void createTestIndex(String name) {
+        assertAcked(prepareCreate(name, 0, settingsBuilder().put("number_of_shards", between(1, 6))
+                .put("number_of_replicas", between(1, 6))
+                .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
+
+        ensureYellow(name);
+
+        logger.info("--> indexing some data into {}", name);
+        for (int i = 0; i < between(10, 500); i++) {
+            index(name, "doc", Integer.toString(i), "foo", "bar" + i);
+        }
 
+        assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(ImmutableSettings.builder()
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000))
+        ));
     }
 }

+ 68 - 10
src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.snapshots;
 
 import com.carrotsearch.randomizedtesting.LifecycleScope;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.elasticsearch.ExceptionsHelper;
@@ -43,6 +44,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.store.support.AbstractIndexStore;
 import org.elasticsearch.indices.InvalidIndexNameException;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
@@ -51,6 +53,7 @@ import org.elasticsearch.test.store.MockDirectoryHelper;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@@ -1045,7 +1048,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
         assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED));
         // We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
         assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
-        for( SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
+        for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
             if (shardStatus.getStage() == SnapshotIndexShardStage.STARTED) {
                 assertThat(shardStatus.getNodeId(), notNullValue());
             }
@@ -1058,7 +1061,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
         assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED));
         // We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
         assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
-        for( SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
+        for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
             if (shardStatus.getStage() == SnapshotIndexShardStage.STARTED) {
                 assertThat(shardStatus.getNodeId(), notNullValue());
             }
@@ -1093,17 +1096,72 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
         } catch (SnapshotMissingException ex) {
             // Expected
         }
+    }
+
 
+    @Test
+    public void snapshotRelocatingPrimary() throws Exception {
+        Client client = client();
+        logger.info("-->  creating repository");
+        assertAcked(client.admin().cluster().preparePutRepository("test-repo")
+                .setType("fs").setSettings(ImmutableSettings.settingsBuilder()
+                        .put("location", newTempDir(LifecycleScope.SUITE))
+                        .put("compress", randomBoolean())
+                        .put("chunk_size", randomIntBetween(100, 1000))));
+
+        // Create index on 1 nodes and make sure each node has a primary by setting no replicas
+        assertAcked(prepareCreate("test-idx", 1, ImmutableSettings.builder().put("number_of_replicas", 0)));
+
+        logger.info("--> indexing some data");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
+        }
+        refresh();
+        assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
+
+        // Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished
+        assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100)
+        ));
+
+        logger.info("--> start relocations");
+        allowNodes("test-idx", cluster().numDataNodes());
+
+        logger.info("--> wait for relocations to start");
+
+        waitForRelocationsToStart("test-idx", TimeValue.timeValueMillis(300));
+
+        logger.info("--> snapshot");
+        client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
+
+        // Update settings to back to normal
+        assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
+                .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
+        ));
+
+        logger.info("--> wait for snapshot to complete");
+        SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
+        assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
+        assertThat(snapshotInfo.shardFailures().size(), equalTo(0));
+        logger.info("--> done");
     }
 
-    private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException {
-        long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() - start < timeout.millis()) {
-            if (client().admin().indices().prepareExists(index).execute().actionGet().isExists()) {
-                return true;
+    private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException {
+        return awaitBusy(new Predicate<Object>() {
+            @Override
+            public boolean apply(Object o) {
+                return client().admin().indices().prepareExists(index).execute().actionGet().isExists();
             }
-            Thread.sleep(100);
-        }
-        return false;
+        }, timeout.millis(), TimeUnit.MILLISECONDS);
+    }
+
+    private boolean waitForRelocationsToStart(final String index, TimeValue timeout) throws InterruptedException {
+        return awaitBusy(new Predicate<Object>() {
+            @Override
+            public boolean apply(Object o) {
+                return client().admin().cluster().prepareHealth(index).execute().actionGet().getRelocatingShards() > 0;
+            }
+        }, timeout.millis(), TimeUnit.MILLISECONDS);
     }
 }