Browse Source

Improve support for partial snapshots

Fixes #4701. Changes behavior of the snapshot operation. The operation now fails if not all primary shards are available at the beginning of the snapshot operation. The restore operation no longer tries to restore indices with shards that failed or were missing during snapshot operation.
Igor Motov 11 years ago
parent
commit
b987615f5e

+ 2 - 1
docs/reference/modules/snapshots.asciidoc

@@ -101,7 +101,8 @@ supports <<search-multi-index-type,multi index syntax>>. The snapshot request al
 `ignore_unavailable` option. Setting it to `true` will cause indices that do not exists to be ignored during snapshot
 creation. By default, when `ignore_unavailable` option is not set and an index is missing the snapshot request will fail.
 By setting `include_global_state` to false it's possible to prevent the cluster global state to be stored as part of
-the snapshot.
+the snapshot. By default, entire snapshot will fail if one or more indices participating in the snapshot don't have
+all primary shards available. This behaviour can be changed by setting `partial` to `true`.
 
 The index snapshot process is incremental. In the process of making the index snapshot Elasticsearch analyses
 the list of the index files that are already stored in the repository and copies only files that were created or

+ 34 - 9
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java

@@ -45,6 +45,7 @@ import static org.elasticsearch.common.Strings.hasLength;
 import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
 import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
 import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
 
 /**
  * Create snapshot request
@@ -70,6 +71,8 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
 
     private IndicesOptions indicesOptions = IndicesOptions.strict();
 
+    private boolean partial = false;
+
     private Settings settings = EMPTY_SETTINGS;
 
     private boolean includeGlobalState = true;
@@ -187,7 +190,7 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
     }
 
     /**
-     * Retuns a list of indices that should be included into the snapshot
+     * Returns a list of indices that should be included into the snapshot
      *
      * @return list of indices
      */
@@ -215,6 +218,27 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
         return this;
     }
 
+
+    /**
+     * Returns true if indices with unavailable shards should be be partially snapshotted.
+     *
+     * @return the desired behaviour regarding indices options
+     */
+    public boolean partial() {
+        return partial;
+    }
+
+    /**
+     * Set to true to allow indices with unavailable shards to be partially snapshotted.
+     *
+     * @param partial true if indices with unavailable shards should be be partially snapshotted.
+     * @return this request
+     */
+    public CreateSnapshotRequest partial(boolean partial) {
+        this.partial = partial;
+        return this;
+    }
+
     /**
      * If set to true the request should wait for the snapshot completion before returning.
      *
@@ -315,6 +339,7 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
 
     /**
      * Returns true if global state should be stored as part of the snapshot
+     *
      * @return true if global state should be stored as part of the snapshot
      */
     public boolean includeGlobalState() {
@@ -353,17 +378,15 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
                     throw new ElasticsearchIllegalArgumentException("malformed indices section, should be an array of strings");
                 }
             } else if (name.equals("ignore_unavailable") || name.equals("ignoreUnavailable")) {
-                assert entry.getValue() instanceof String;
-                ignoreUnavailable = Boolean.valueOf(entry.getValue().toString());
+                ignoreUnavailable = nodeBooleanValue(entry.getValue());
             } else if (name.equals("allow_no_indices") || name.equals("allowNoIndices")) {
-                assert entry.getValue() instanceof String;
-                allowNoIndices = Boolean.valueOf(entry.getValue().toString());
+                allowNoIndices = nodeBooleanValue(entry.getValue());
             } else if (name.equals("expand_wildcards_open") || name.equals("expandWildcardsOpen")) {
-                assert entry.getValue() instanceof String;
-                expandWildcardsOpen = Boolean.valueOf(entry.getValue().toString());
+                expandWildcardsOpen = nodeBooleanValue(entry.getValue());
             } else if (name.equals("expand_wildcards_closed") || name.equals("expandWildcardsClosed")) {
-                assert entry.getValue() instanceof String;
-                expandWildcardsClosed = Boolean.valueOf(entry.getValue().toString());
+                expandWildcardsClosed = nodeBooleanValue(entry.getValue());
+            } else if (name.equals("partial")) {
+                partial(nodeBooleanValue(entry.getValue()));
             } else if (name.equals("settings")) {
                 if (!(entry.getValue() instanceof Map)) {
                     throw new ElasticsearchIllegalArgumentException("malformed settings section, should indices an inner object");
@@ -450,6 +473,7 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
         settings = readSettingsFromStream(in);
         includeGlobalState = in.readBoolean();
         waitForCompletion = in.readBoolean();
+        partial = in.readBoolean();
     }
 
     @Override
@@ -462,5 +486,6 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap
         writeSettingsToStream(settings, out);
         out.writeBoolean(includeGlobalState);
         out.writeBoolean(waitForCompletion);
+        out.writeBoolean(partial);
     }
 }

+ 11 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotRequestBuilder.java

@@ -112,6 +112,17 @@ public class CreateSnapshotRequestBuilder extends MasterNodeOperationRequestBuil
         return this;
     }
 
+    /**
+     * If set to true the request should snapshot indices with unavailable shards
+     *
+     * @param partial true if request should snapshot indices with unavailable shards
+     * @return this builder
+     */
+    public CreateSnapshotRequestBuilder setPartial(boolean partial) {
+        request.partial(partial);
+        return this;
+    }
+
     /**
      * Sets repository-specific snapshot settings.
      * <p/>

+ 1 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java

@@ -78,6 +78,7 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeOperationA
                 new SnapshotsService.SnapshotRequest("create_snapshot[" + request.snapshot() + "]", request.snapshot(), request.repository())
                         .indices(request.indices())
                         .indicesOptions(request.indicesOptions())
+                        .partial(request.partial())
                         .settings(request.settings())
                         .includeGlobalState(request.includeGlobalState())
                         .masterNodeTimeout(request.masterNodeTimeout());

+ 5 - 8
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java

@@ -44,6 +44,7 @@ import static org.elasticsearch.common.Strings.hasLength;
 import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
 import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
 import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
 
 /**
  * Restore snapshot request
@@ -397,17 +398,13 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
                     throw new ElasticsearchIllegalArgumentException("malformed indices section, should be an array of strings");
                 }
             } else if (name.equals("ignore_unavailable") || name.equals("ignoreUnavailable")) {
-                assert entry.getValue() instanceof String;
-                ignoreUnavailable = Boolean.valueOf(entry.getValue().toString());
+                ignoreUnavailable = nodeBooleanValue(entry.getValue());
             } else if (name.equals("allow_no_indices") || name.equals("allowNoIndices")) {
-                assert entry.getValue() instanceof String;
-                allowNoIndices = Boolean.valueOf(entry.getValue().toString());
+                allowNoIndices = nodeBooleanValue(entry.getValue());
             } else if (name.equals("expand_wildcards_open") || name.equals("expandWildcardsOpen")) {
-                assert entry.getValue() instanceof String;
-                expandWildcardsOpen = Boolean.valueOf(entry.getValue().toString());
+                expandWildcardsOpen = nodeBooleanValue(entry.getValue());
             } else if (name.equals("expand_wildcards_closed") || name.equals("expandWildcardsClosed")) {
-                assert entry.getValue() instanceof String;
-                expandWildcardsClosed = Boolean.valueOf(entry.getValue().toString());
+                expandWildcardsClosed = nodeBooleanValue(entry.getValue());
             } else if (name.equals("settings")) {
                 if (!(entry.getValue() instanceof Map)) {
                     throw new ElasticsearchIllegalArgumentException("malformed settings section, should indices an inner object");

+ 41 - 2
src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java

@@ -203,7 +203,8 @@ public class SnapshotMetaData implements MetaData.Custom {
         STARTED((byte) 1),
         SUCCESS((byte) 2),
         FAILED((byte) 3),
-        ABORTED((byte) 4);
+        ABORTED((byte) 4),
+        MISSING((byte) 5);
 
         private byte value;
 
@@ -216,7 +217,43 @@ public class SnapshotMetaData implements MetaData.Custom {
         }
 
         public boolean completed() {
-            return this == SUCCESS || this == FAILED;
+            switch (this) {
+                case INIT:
+                    return false;
+                case STARTED:
+                    return false;
+                case SUCCESS:
+                    return true;
+                case FAILED:
+                    return true;
+                case ABORTED:
+                    return false;
+                case MISSING:
+                    return true;
+                default:
+                    assert false;
+                    return true;
+            }
+        }
+
+        public boolean failed() {
+            switch (this) {
+                case INIT:
+                    return false;
+                case STARTED:
+                    return false;
+                case SUCCESS:
+                    return false;
+                case FAILED:
+                    return true;
+                case ABORTED:
+                    return true;
+                case MISSING:
+                    return true;
+                default:
+                    assert false;
+                    return false;
+            }
         }
 
         public static State fromValue(byte value) {
@@ -231,6 +268,8 @@ public class SnapshotMetaData implements MetaData.Custom {
                     return FAILED;
                 case 4:
                     return ABORTED;
+                case 5:
+                    return MISSING;
                 default:
                     throw new ElasticsearchIllegalArgumentException("No snapshot state for value [" + value + "]");
             }

+ 26 - 3
src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -162,6 +162,10 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
                         ImmutableMap.Builder<ShardId, RestoreMetaData.ShardRestoreStatus> shards = ImmutableMap.builder();
                         for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
                             String index = indexEntry.getValue();
+                            // Make sure that index was fully snapshotted - don't restore
+                            if (failed(snapshot, index)) {
+                                throw new SnapshotRestoreException(snapshotId, "index [" + index + "] wasn't fully snapshotted - cannot restore");
+                            }
                             RestoreSource restoreSource = new RestoreSource(snapshotId, index);
                             String renamedIndex = indexEntry.getKey();
                             IndexMetaData snapshotIndexMetaData = metaData.index(index);
@@ -391,6 +395,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
         }
     }
 
+    private boolean failed(Snapshot snapshot, String index) {
+        for (SnapshotShardFailure failure : snapshot.shardFailures()) {
+            if (index.equals(failure.index())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean failed(Snapshot snapshot, String index, int shard) {
+        for (SnapshotShardFailure failure : snapshot.shardFailures()) {
+            if (index.equals(failure.index()) && shard == failure.shardId()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Adds restore completion listener
      * <p/>
@@ -427,16 +449,17 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
 
     /**
      * Checks if a repository is currently in use by one of the snapshots
+     *
      * @param clusterState cluster state
-     * @param repository repository id
+     * @param repository   repository id
      * @return true if repository is currently in use by one of the running snapshots
      */
     public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
         MetaData metaData = clusterState.metaData();
         RestoreMetaData snapshots = metaData.custom(RestoreMetaData.TYPE);
         if (snapshots != null) {
-            for(RestoreMetaData.Entry snapshot : snapshots.entries()) {
-                if(repository.equals(snapshot.snapshotId().getRepository())) {
+            for (RestoreMetaData.Entry snapshot : snapshots.entries()) {
+                if (repository.equals(snapshot.snapshotId().getRepository())) {
                     return true;
                 }
             }

+ 22 - 2
src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java

@@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.joda.FormatDateTimeFormatter;
 import org.elasticsearch.common.joda.Joda;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilderString;
@@ -43,6 +42,8 @@ public class SnapshotInfo implements ToXContent, Streamable {
 
     private SnapshotState state;
 
+    private String reason;
+
     private ImmutableList<String> indices;
 
     private long startTime;
@@ -67,6 +68,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
     public SnapshotInfo(Snapshot snapshot) {
         name = snapshot.name();
         state = snapshot.state();
+        reason = snapshot.reason();
         indices = snapshot.indices();
         startTime = snapshot.startTime();
         endTime = snapshot.endTime();
@@ -93,6 +95,15 @@ public class SnapshotInfo implements ToXContent, Streamable {
         return state;
     }
 
+    /**
+     * Returns snapshot failure reason
+     *
+     * @return snapshot failure reason
+     */
+    public String reason() {
+        return reason;
+    }
+
     /**
      * Returns indices that were included into this snapshot
      *
@@ -137,7 +148,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
      * @return number of failed shards
      */
     public int failedShards() {
-        return totalShards -  successfulShards;
+        return totalShards - successfulShards;
     }
 
     /**
@@ -162,6 +173,9 @@ public class SnapshotInfo implements ToXContent, Streamable {
      * Returns snapshot REST status
      */
     public RestStatus status() {
+        if (state == SnapshotState.FAILED) {
+            return RestStatus.INTERNAL_SERVER_ERROR;
+        }
         if (shardFailures.size() == 0) {
             return RestStatus.OK;
         }
@@ -179,6 +193,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
     static final class Fields {
         static final XContentBuilderString INDICES = new XContentBuilderString("indices");
         static final XContentBuilderString STATE = new XContentBuilderString("state");
+        static final XContentBuilderString REASON = new XContentBuilderString("reason");
         static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
         static final XContentBuilderString START_TIME_IN_MILLIS = new XContentBuilderString("start_time_in_millis");
         static final XContentBuilderString END_TIME = new XContentBuilderString("end_time");
@@ -202,6 +217,9 @@ public class SnapshotInfo implements ToXContent, Streamable {
         }
         builder.endArray();
         builder.field(Fields.STATE, state);
+        if (reason != null) {
+            builder.field(Fields.REASON, reason);
+        }
         if (startTime != 0) {
             builder.field(Fields.START_TIME, DATE_TIME_FORMATTER.printer().print(startTime));
             builder.field(Fields.START_TIME_IN_MILLIS, startTime);
@@ -235,6 +253,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
         }
         indices = indicesListBuilder.build();
         state = SnapshotState.fromValue(in.readByte());
+        reason = in.readOptionalString();
         startTime = in.readVLong();
         endTime = in.readVLong();
         totalShards = in.readVInt();
@@ -259,6 +278,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
             out.writeString(index);
         }
         out.writeByte(state.value());
+        out.writeOptionalString(reason);
         out.writeVLong(startTime);
         out.writeVLong(endTime);
         out.writeVInt(totalShards);

+ 74 - 14
src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -59,6 +59,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Maps.newHashMap;
 import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
+import static com.google.common.collect.Sets.newHashSet;
 
 /**
  * Service responsible for creating snapshots
@@ -67,7 +68,7 @@ import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
  * <ul>
  * <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots is currently running
  * and registers the new snapshot in cluster state</li>
- * <li>When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotMetaData.Entry, CreateSnapshotListener)} method
+ * <li>When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotMetaData.Entry, boolean, CreateSnapshotListener)} method
  * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
  * <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
  * start processing them through {@link #processIndexShardSnapshots(SnapshotMetaData)} method</li>
@@ -187,7 +188,7 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
                     threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
                         @Override
                         public void run() {
-                            beginSnapshot(newState, newSnapshot, listener);
+                            beginSnapshot(newState, newSnapshot, request.partial, listener);
                         }
                     });
                 }
@@ -243,9 +244,10 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
      *
      * @param clusterState               cluster state
      * @param snapshot                   snapshot meta data
+     * @param partial                    allow partial snapshots
      * @param userCreateSnapshotListener listener
      */
-    private void beginSnapshot(ClusterState clusterState, final SnapshotMetaData.Entry snapshot, final CreateSnapshotListener userCreateSnapshotListener) {
+    private void beginSnapshot(ClusterState clusterState, final SnapshotMetaData.Entry snapshot, final boolean partial, final CreateSnapshotListener userCreateSnapshotListener) {
         boolean snapshotCreated = false;
         try {
             Repository repository = repositoriesService.repository(snapshot.snapshotId().getRepository());
@@ -271,6 +273,7 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
             clusterService.submitStateUpdateTask("update_snapshot [" + snapshot + "]", new ProcessedClusterStateUpdateTask() {
                 boolean accepted = false;
                 SnapshotMetaData.Entry updatedSnapshot;
+                String failure = null;
 
                 @Override
                 public ClusterState execute(ClusterState currentState) {
@@ -282,6 +285,15 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
                         if (entry.snapshotId().equals(snapshot.snapshotId())) {
                             // Replace the snapshot that was just created
                             ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards = shards(snapshot.snapshotId(), currentState, snapshot.indices());
+                            if (!partial) {
+                                Set<String> indicesWithMissingShards = indicesWithMissingShards(shards);
+                                if (indicesWithMissingShards != null) {
+                                    updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.FAILED, snapshot.indices(), shards);
+                                    entries.add(updatedSnapshot);
+                                    failure = "Indices don't have primary shards +[" + indicesWithMissingShards + "]";
+                                    continue;
+                                }
+                            }
                             updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.STARTED, snapshot.indices(), shards);
                             entries.add(updatedSnapshot);
                             if (!completed(shards.values())) {
@@ -310,8 +322,11 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
                     userCreateSnapshotListener.onResponse();
 
                     // Now that snapshot completion listener is registered we can end the snapshot if needed
+                    // We should end snapshot only if 1) we didn't accept it for processing (which happens when there
+                    // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
+                    // go ahead and continue working on this snapshot rather then end here.
                     if (!accepted && updatedSnapshot != null) {
-                        endSnapshot(updatedSnapshot);
+                        endSnapshot(updatedSnapshot, failure);
                     }
                 }
             });
@@ -602,6 +617,25 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
         return true;
     }
 
+    /**
+     * Returns list of indices with missing shards
+     *
+     * @param shards list of shard statuses
+     * @return list of failed indices
+     */
+    private Set<String> indicesWithMissingShards(ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards) {
+        Set<String> indices = null;
+        for (ImmutableMap.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> entry : shards.entrySet()) {
+            if (entry.getValue().state() == State.MISSING) {
+                if (indices == null) {
+                    indices = newHashSet();
+                }
+                indices.add(entry.getKey().getIndex());
+            }
+        }
+        return indices;
+    }
+
     /**
      * Updates the shard status on master node
      *
@@ -661,25 +695,38 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
      *
      * @param entry snapshot
      */
-    private void endSnapshot(final SnapshotMetaData.Entry entry) {
+    private void endSnapshot(SnapshotMetaData.Entry entry) {
+        endSnapshot(entry, null);
+    }
+
+
+    /**
+     * Finalizes the shard in repository and then removes it from cluster state
+     * <p/>
+     * This is non-blocking method that runs on a thread from SNAPSHOT thread pool
+     *
+     * @param entry   snapshot
+     * @param failure failure reason or null if snapshot was successful
+     */
+    private void endSnapshot(final SnapshotMetaData.Entry entry, final String failure) {
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
             @Override
             public void run() {
                 SnapshotId snapshotId = entry.snapshotId();
                 try {
                     final Repository repository = repositoriesService.repository(snapshotId.getRepository());
-                    logger.trace("[{}] finalizing snapshot in repository", snapshotId);
+                    logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshotId, entry.state(), failure);
                     ArrayList<ShardSearchFailure> failures = newArrayList();
                     ArrayList<SnapshotShardFailure> shardFailures = newArrayList();
                     for (Map.Entry<ShardId, ShardSnapshotStatus> shardStatus : entry.shards().entrySet()) {
                         ShardId shardId = shardStatus.getKey();
                         ShardSnapshotStatus status = shardStatus.getValue();
-                        if (status.state() == State.FAILED) {
+                        if (status.state().failed()) {
                             failures.add(new ShardSearchFailure(status.reason(), new SearchShardTarget(status.nodeId(), shardId.getIndex(), shardId.id())));
                             shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId.getIndex(), shardId.id(), status.reason()));
                         }
                     }
-                    Snapshot snapshot = repository.finalizeSnapshot(snapshotId, null, entry.shards().size(), ImmutableList.copyOf(shardFailures));
+                    Snapshot snapshot = repository.finalizeSnapshot(snapshotId, failure, entry.shards().size(), ImmutableList.copyOf(shardFailures));
                     removeSnapshotFromClusterState(snapshotId, new SnapshotInfo(snapshot), null);
                 } catch (Throwable t) {
                     logger.warn("[{}] failed to finalize snapshot", t, snapshotId);
@@ -841,16 +888,17 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
 
     /**
      * Checks if a repository is currently in use by one of the snapshots
+     *
      * @param clusterState cluster state
-     * @param repository repository id
+     * @param repository   repository id
      * @return true if repository is currently in use by one of the running snapshots
      */
     public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
         MetaData metaData = clusterState.metaData();
         SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
         if (snapshots != null) {
-            for(SnapshotMetaData.Entry snapshot : snapshots.entries()) {
-                if(repository.equals(snapshot.snapshotId().getRepository())) {
+            for (SnapshotMetaData.Entry snapshot : snapshots.entries()) {
+                if (repository.equals(snapshot.snapshotId().getRepository())) {
                     return true;
                 }
             }
@@ -900,10 +948,9 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
                 ShardId shardId = new ShardId(index, i);
                 ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
                 if (primary == null || !primary.assignedToNode()) {
-                    //TODO: Should we bailout completely or just mark this shard as failed?
-                    builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.FAILED, "primary shard is not allocated"));
+                    builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
                 } else if (!primary.started()) {
-                    builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.FAILED, "primary shard hasn't been started yet"));
+                    builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet"));
                 } else {
                     builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId()));
                 }
@@ -985,6 +1032,8 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
 
         private IndicesOptions indicesOptions = IndicesOptions.strict();
 
+        private boolean partial;
+
         private Settings settings;
 
         private boolean includeGlobalState;
@@ -1059,6 +1108,17 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
             return this;
         }
 
+        /**
+         * Set to true if partial snapshot should be allowed
+         *
+         * @param partial true if partial snapshots should be allowed
+         * @return this request
+         */
+        public SnapshotRequest partial(boolean partial) {
+            this.partial = partial;
+            return this;
+        }
+
         /**
          * Returns cause for snapshot operation
          *

+ 0 - 30
src/test/java/org/elasticsearch/snapshots/AbstractSnapshotTests.java

@@ -26,8 +26,6 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoryMissingException;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Ignore;
 
 import java.io.File;
@@ -39,34 +37,6 @@ import static org.hamcrest.Matchers.equalTo;
 @Ignore
 public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest {
 
-
-    @After
-    public final void wipeAfter() {
-        wipeRepositories();
-    }
-
-    @Before
-    public final void wipeBefore() {
-        wipeRepositories();
-    }
-
-    /**
-     * Deletes repositories, supports wildcard notation.
-     */
-    public static void wipeRepositories(String... repositories) {
-        // if nothing is provided, delete all
-        if (repositories.length == 0) {
-            repositories = new String[]{"*"};
-        }
-        for (String repository : repositories) {
-            try {
-                client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
-            } catch (RepositoryMissingException ex) {
-                // ignore
-            }
-        }
-    }
-
     public static long getFailureCount(String repository) {
         long failureCount = 0;
         for (RepositoriesService repositoriesService : cluster().getInstances(RepositoriesService.class)) {

+ 75 - 1
src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java

@@ -22,11 +22,13 @@ package org.elasticsearch.snapshots;
 import com.carrotsearch.randomizedtesting.LifecycleScope;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.store.MockDirectoryHelper;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.Test;
@@ -36,7 +38,8 @@ import java.util.ArrayList;
 import static com.google.common.collect.Lists.newArrayList;
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.equalTo;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
+import static org.hamcrest.Matchers.*;
 
 /**
  */
@@ -126,4 +129,75 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
         logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
         logger.info("--> done");
     }
+
+    @Test
+    @TestLogging("snapshots:TRACE")
+    public void restoreIndexWithMissingShards() throws Exception {
+        logger.info("--> start 2 nodes");
+        cluster().startNode(settingsBuilder().put("gateway.type", "local"));
+        cluster().startNode(settingsBuilder().put("gateway.type", "local"));
+        wipeIndices("_all");
+
+        assertAcked(prepareCreate("test-idx-1", 2, settingsBuilder().put("number_of_shards", 6)
+                .put("number_of_replicas", 0)
+                .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
+        ensureGreen();
+
+        logger.info("--> indexing some data into test-idx-1");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
+        }
+        refresh();
+        assertThat(client().prepareCount("test-idx-1").get().getCount(), equalTo(100L));
+
+        logger.info("--> shutdown one of the nodes");
+        cluster().stopRandomNode();
+
+        assertAcked(prepareCreate("test-idx-2", 1, settingsBuilder().put("number_of_shards", 6)
+                .put("number_of_replicas", 0)
+                .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
+        ensureGreen("test-idx-2");
+
+        logger.info("--> indexing some data into test-idx-2");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx-2", "doc", Integer.toString(i), "foo", "bar" + i);
+        }
+        refresh();
+        assertThat(client().prepareCount("test-idx-2").get().getCount(), equalTo(100L));
+
+        logger.info("--> create repository");
+        logger.info("--> creating repository");
+        PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
+                .setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).execute().actionGet();
+        assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
+
+        logger.info("--> start snapshot with default settings - should fail");
+        CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet();
+
+        assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
+
+        createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setPartial(true).execute().actionGet();
+        logger.info("State: [{}], Reason: [{}]", createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
+        assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(12));
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(12));
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(6));
+        assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").execute().actionGet().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
+
+        assertAcked(client().admin().indices().prepareClose("test-idx-1", "test-idx-2").execute().actionGet());
+
+        logger.info("--> restore incomplete snapshot - should fail");
+        assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false).setWaitForCompletion(true).execute(), SnapshotRestoreException.class);
+
+        logger.info("--> restore snapshot for the index that was snapshotted completely");
+        RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false).setIndices("test-idx-2").setWaitForCompletion(true).execute().actionGet();
+        assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
+        assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
+        assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(6));
+        assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
+
+        ensureGreen("test-idx-2");
+
+        assertThat(client().prepareCount("test-idx-2").get().getCount(), equalTo(100L));
+
+    }
 }

+ 2 - 0
src/test/java/org/elasticsearch/snapshots/RepositoriesTests.java

@@ -30,6 +30,8 @@ import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.repositories.RepositoryException;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;

+ 5 - 4
src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java

@@ -42,6 +42,8 @@ import org.elasticsearch.indices.InvalidIndexNameException;
 import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.store.MockDirectoryHelper;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -53,7 +55,6 @@ import static org.hamcrest.Matchers.*;
  */
 public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
 
-
     @Override
     public Settings indexSettings() {
         // During restore we frequently restore index to exactly the same state it was before, that might cause the same
@@ -496,10 +497,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
 
         logger.info("--> snapshot");
         CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
+        assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
         assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
-        assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(3));
-        assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), equalTo(3));
-        assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().get(0).reason(), startsWith("primary shard is not allocated"));
+        assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
+        assertThat(createSnapshotResponse.getSnapshotInfo().reason(), startsWith("Indices don't have primary shards"));
     }
 
     @Test

+ 22 - 0
src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

@@ -57,6 +57,7 @@ import org.elasticsearch.index.mapper.FieldMapper.Loading;
 import org.elasticsearch.index.merge.policy.*;
 import org.elasticsearch.indices.IndexMissingException;
 import org.elasticsearch.indices.IndexTemplateMissingException;
+import org.elasticsearch.repositories.RepositoryMissingException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchService;
 import org.junit.After;
@@ -189,6 +190,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
             wipeIndices("_all");
             wipeTemplates();
             randomIndexTemplate();
+            wipeRepositories();
             logger.info("[{}#{}]: before test", getTestClass().getSimpleName(), getTestName());
         } catch (OutOfMemoryError e) {
             if (e.getMessage().contains("unable to create new native thread")) {
@@ -237,6 +239,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
             wipeIndices("_all"); // wipe after to make sure we fail in the test that
             // didn't ack the delete
             wipeTemplates();
+            wipeRepositories();
             ensureAllSearchersClosed();
             ensureAllFilesClosed();
             logger.info("[{}#{}]: cleaned up after test", getTestClass().getSimpleName(), getTestName());
@@ -368,6 +371,25 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
         }
     }
 
+    /**
+     * Deletes repositories, supports wildcard notation.
+     */
+    public static void wipeRepositories(String... repositories) {
+        if (cluster().size() > 0) {
+            // if nothing is provided, delete all
+            if (repositories.length == 0) {
+                repositories = new String[]{"*"};
+            }
+            for (String repository : repositories) {
+                try {
+                    client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
+                } catch (RepositoryMissingException ex) {
+                    // ignore
+                }
+            }
+        }
+    }
+
     /**
      * Creates one or more indices and asserts that the indices are acknowledged. If one of the indices
      * already exists this method will fail and wipe all the indices created so far.